1 /**
2 * Home of `RingBufferNoGC` and the mighty `TimedFIFO`.
3 *
4 * Copyright: Copyright Auburn Sounds 2015-2016
5 * License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
6 * Authors:   Guillaume Piolat
7 */
8 module dplug.core.ringbuf;
9 
10 import core.atomic;
11 
12 import dplug.core.sync;
13 import dplug.core.nogc;
14 import dplug.core.vec;
15 
16 
17 RingBufferNoGC!T ringBufferNoGC(T)(size_t initialCapacity) nothrow @nogc
18 {
19     return RingBufferNoGC!T(initialCapacity);
20 }
21 
22 /// @nogc ring-buffer
23 struct RingBufferNoGC(T)
24 {
25     public
26     {
27         /// Create a RingBuffer with specified initial capacity.
28         this(size_t initialCapacity) nothrow @nogc
29         {
30             _data.reallocBuffer(initialCapacity);
31             clear();
32         }
33 
34         @disable this(this);
35 
36         ~this() nothrow @nogc
37         {
38             _data.reallocBuffer(0);
39         }
40 
41         bool isFull() pure const nothrow
42         {
43             return _count == _data.length;
44         }
45 
46         /// Adds an item on the back of the queue.
47         void pushBack(T x) nothrow @nogc
48         {
49             checkOverflow!popFront();
50            _data.ptr[(_first + _count) % _data.length] = x;
51             ++_count;
52         }
53 
54         /// Adds an item on the front of the queue.
55         void pushFront(T x) nothrow @nogc
56         {
57             checkOverflow!popBack();
58             ++_count;
59             _first = (_first - 1 + _data.length) % _data.length;
60             _data.ptr[_first] = x;
61         }
62 
63         /// Removes an item from the front of the queue.
64         /// Returns: the removed item.
65         T popFront() nothrow @nogc
66         {
67             T res = _data.ptr[_first];
68             _first = (_first + 1) % _data.length;
69             --_count;
70             return res;
71         }
72 
73         /// Removes an item from the back of the queue.
74         /// Returns: the removed item.
75         T popBack() nothrow @nogc
76         {
77             --_count;
78             return _data.ptr[(_first + _count) % _data.length];
79         }
80 
81         /// Removes all items from the queue.
82         void clear() nothrow @nogc
83         {
84             _first = 0;
85             _count = 0;
86         }
87 
88         /// Returns: number of items in the queue.
89         size_t length() pure const nothrow @nogc
90         {
91             return _count;
92         }
93 
94         /// Returns: maximum number of items in the queue.
95         size_t capacity() pure const nothrow @nogc
96         {
97             return _data.length;
98         }
99 
100         /// Returns: item at the front of the queue.
101         T front() pure nothrow @nogc
102         {
103             return _data.ptr[_first];
104         }
105 
106         /// Returns: item on the back of the queue.
107         T back() pure nothrow @nogc
108         {
109             return _data.ptr[(_first + _count + _data.length - 1) % _data.length];
110         }
111 
112         /// Returns: item index from the queue.
113         T opIndex(size_t index) nothrow @nogc
114         {
115             // crash if index out-of-bounds (not recoverable)
116             if (index > _count)
117                 assert(0);
118 
119             return _data.ptr[(_first + index) % _data.length];
120         }
121     }
122 
123     private
124     {
125         // element lie from _first to _first + _count - 1 index, modulo the allocated size
126         T[] _data;
127         size_t _first;
128         size_t _count;
129 
130         void checkOverflow(alias popMethod)() nothrow
131         {
132             if (isFull())
133                 popMethod();
134         }
135     }
136 }
137 
138 
139 unittest
140 {
141     RingBufferNoGC!float a;
142 }
143 
144 /// Reusable mechanism to provide the UI with continuously available non-critical data from the audio thread.
145 /// eg: for waveforms, analyzers, displays, etc...
146 /// In the case where the FIFO is empty, it may be that there is nothing to draw or audio processing has stopped.
147 /// And because audio buffers may be long, we can't just use atomics and avoid updating the UI when the buffer has already been processed.
148 /// It would cause slowness with small buffers.
149 struct TimedFIFO(T)
150 {
151 private:
152 
153     T[] _data;
154     int _count;
155     int _readIndex;
156     int _inputTimestamp;
157 
158     // Note: information about sample-rate is passed through atomics, out-of-band
159     shared(float) _sampleRate = 44100.0f;
160 
161     int _indexMask;
162     int _dividerMask;
163     float _invDivider;
164 
165     // protects: _readIndex, _count, _data
166     UncheckedMutex _dataMutex;
167 
168     float _timeDebt;
169     float _integerDebt; // because of rounding
170 
171     /// Returns: true of i is a power of 2.
172     static bool isPowerOf2(int i) @nogc nothrow
173     {
174         assert(i >= 0);
175         return (i != 0) && ((i & (i - 1)) == 0);
176     }
177 
178 public:
179 
180     /// Params:
181     ///     size = size of the buffer
182     ///     divider = only one in divider sample(s) is actually pushed in the FIFO.
183     void initialize(int size, int divider = 1)
184     {
185         assert(isPowerOf2(size));
186         assert(isPowerOf2(divider));
187 
188         _count = 0; // no data at start
189         _readIndex = 0;
190         _indexMask = size - 1;
191         _inputTimestamp = 0;
192         _dividerMask = divider - 1;
193         _invDivider = 1.0f / divider;
194 
195         _data.reallocBuffer(size);
196         _dataMutex = makeMutex();
197 
198         _timeDebt = 0;
199         _integerDebt = 0;
200 
201     }
202 
203     ~this()
204     {
205         _dataMutex.destroy();
206         _data.reallocBuffer(0);
207     }
208 
209     void pushData(T[] input, float sampleRate) nothrow @nogc
210     {
211         // Here we are in the audio thread, so blocking is not welcome
212         atomicStore(_sampleRate, sampleRate);
213 
214         // Push incoming data, but it's not that bad if we miss some
215         if (_dataMutex.tryLock())
216         {
217             foreach (i; 0..input.length)
218             {
219                 _inputTimestamp++;
220                 if ( (_inputTimestamp & _dividerMask) == 0 ) // should depend on samplerate?
221                 {
222                     _data[ (_readIndex + _count) & _indexMask ] = input[i];
223                     if (_count >= _data.length)
224                         ++_readIndex; // overflow, drop older data
225                     else
226                         ++_count; // grow buffer
227                 }
228             }
229             _dataMutex.unlock();
230         }
231     }
232 
233     /// Same but with 1 element.
234     void pushData(T input, float sampleRate) nothrow @nogc
235     {
236         pushData( (&input)[0..1], sampleRate);
237     }
238 
239     /// Get some amount of oldest samples in the FIFO
240     /// The drop some amount of samples that correspond to time passing of dt
241     /// Returns: the number of sample data returned. Also return no data if tryLock failed to take the lock.
242     /// Note that there is a disconnect between the data that is dropped, and the data that is returned.
243     /// The same data may well be returned multiple time given a large buffer, or zero time.
244     int readOldestDataAndDropSome(T[] output, double dt, int keepAtLeast = 0) nothrow @nogc
245     {
246         assert(dt >= 0);
247         _timeDebt += dt * 1.01; // add 1% because it's better to be a bit short in buffer than too large.
248         if (_dataMutex.tryLock())
249         {
250             scope(exit) _dataMutex.unlock();
251 
252             int pointsNeeded = cast(int)(output.length);
253 
254             int pointsAvailable = ( (_count < pointsNeeded) ? _count : pointsNeeded);
255 
256             // Purpose: always drop data albeit slowly.
257             // In case of CPU overrun in Reaper, playback is stopped.
258             // To avoid display of the same information, we force keepAtLeast 
259             // to be always smaller than pointsAvailable.
260             if (keepAtLeast >= pointsAvailable && pointsAvailable > 0)
261             {
262                 keepAtLeast = pointsAvailable - 1;
263             }
264 
265             bool noData = (pointsAvailable == 0);
266 
267             if (noData)
268                 return 0;
269 
270             foreach (i ; 0..pointsAvailable)
271             {
272                 output[i] = _data[ (_readIndex + i) & _indexMask ];
273             }
274 
275             // drop samples
276             float sampleRate = atomicLoad(_sampleRate);
277 
278             float samplesToDrop = _timeDebt * sampleRate * _invDivider + _integerDebt;
279             int maxDroppable = _count - keepAtLeast;
280             if (samplesToDrop > maxDroppable)
281                 samplesToDrop = maxDroppable;
282             if (samplesToDrop < 0)
283                 samplesToDrop = 0;
284 
285             int numSamplesToDrop = cast(int)(samplesToDrop);
286             _timeDebt = 0;
287             _integerDebt = (samplesToDrop - numSamplesToDrop);
288 
289             _count -= numSamplesToDrop;
290             _readIndex += numSamplesToDrop;
291             return pointsAvailable;
292         }
293         else
294             return 0;
295     }
296 
297     // Same but with one element
298     bool readOldestDataAndDropSome(T* output, double dt) nothrow @nogc
299     {
300         return readOldestDataAndDropSome(output[0..1], dt) != 0;
301     }
302 }
303 
304 
305 unittest
306 {
307     TimedFIFO!float a;
308 }