1 /**
2 Home of `RingBufferNoGC` and the mighty `TimedFIFO`.
3 
4 Copyright: Auburn Sounds 2015-2016.
5 License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
6 */
7 module dplug.core.ringbuf;
8 
9 import core.atomic;
10 
11 import dplug.core.sync;
12 import dplug.core.nogc;
13 import dplug.core.vec;
14 
15 RingBufferNoGC!T makeRingBufferNoGC(T)(size_t initialCapacity) nothrow @nogc
16 {
17     return RingBufferNoGC!T(initialCapacity);
18 }
19 
20 /// @nogc ring-buffer
21 struct RingBufferNoGC(T)
22 {
23     public
24     {
25         /// Create a RingBuffer with specified initial capacity.
26         this(size_t initialCapacity) nothrow @nogc
27         {
28             _data.reallocBuffer(initialCapacity);
29             clear();
30         }
31 
32         @disable this(this);
33 
34         ~this() nothrow @nogc
35         {
36             _data.reallocBuffer(0);
37         }
38 
39         bool isFull() pure const nothrow
40         {
41             return _count == _data.length;
42         }
43 
44         /// Adds an item on the back of the queue.
45         void pushBack(T x) nothrow @nogc
46         {
47             checkOverflow!popFront();
48            _data.ptr[(_first + _count) % _data.length] = x;
49             ++_count;
50         }
51 
52         /// Adds an item on the front of the queue.
53         void pushFront(T x) nothrow @nogc
54         {
55             checkOverflow!popBack();
56             ++_count;
57             _first = (_first - 1 + _data.length) % _data.length;
58             _data.ptr[_first] = x;
59         }
60 
61         /// Removes an item from the front of the queue.
62         /// Returns: the removed item.
63         T popFront() nothrow @nogc
64         {
65             T res = _data.ptr[_first];
66             _first = (_first + 1) % _data.length;
67             --_count;
68             return res;
69         }
70 
71         /// Removes an item from the back of the queue.
72         /// Returns: the removed item.
73         T popBack() nothrow @nogc
74         {
75             --_count;
76             return _data.ptr[(_first + _count) % _data.length];
77         }
78 
79         /// Removes all items from the queue.
80         void clear() nothrow @nogc
81         {
82             _first = 0;
83             _count = 0;
84         }
85 
86         /// Returns: number of items in the queue.
87         size_t length() pure const nothrow @nogc
88         {
89             return _count;
90         }
91 
92         /// Returns: maximum number of items in the queue.
93         size_t capacity() pure const nothrow @nogc
94         {
95             return _data.length;
96         }
97 
98         /// Returns: item at the front of the queue.
99         T front() pure nothrow @nogc
100         {
101             return _data.ptr[_first];
102         }
103 
104         /// Returns: item on the back of the queue.
105         T back() pure nothrow @nogc
106         {
107             return _data.ptr[(_first + _count + _data.length - 1) % _data.length];
108         }
109 
110         // Returns: true if there are no items in the queue.
111         bool empty() @safe pure nothrow @nogc
112         {
113             return length() == 0;
114         }
115 
116         /// Implements the assignment operator using .pushBack()
117         ///
118         /// See_Also: pushBack
119         void opOpAssign(string op : "~")(T x)
120         {
121             pushBack(x);
122         }
123 
124         /// Returns: item index from the queue.
125         T opIndex(size_t index) nothrow @nogc
126         {
127             // crash if index out-of-bounds (not recoverable)
128             version(D_NoBoundsChecks) {}
129             else
130             {
131                 if (index >= _count)
132                     assert(0);
133             }
134 
135             return _data.ptr[(_first + index) % _data.length];
136         }
137 
138         ///
139         int opApply(scope int delegate(T) dg)
140         {
141             int result = 0;
142 
143             for (size_t i = 0; i < length(); i++)
144             {
145                 result = dg(this[i]);
146                 if (result)
147                     break;
148             }
149             return result;
150         }
151     }
152 
153     private
154     {
155         // element lie from _first to _first + _count - 1 index, modulo the allocated size
156         T[] _data;
157         size_t _first;
158         size_t _count;
159 
160         void checkOverflow(alias popMethod)() nothrow
161         {
162             if (isFull())
163                 popMethod();
164         }
165     }
166 }
167 
168 
169 unittest
170 {
171     RingBufferNoGC!float a;
172 }
173 
174 unittest
175 {
176     auto rb = makeRingBufferNoGC!int(2);
177 
178     rb ~= 100;
179     assert(rb.length == 1);
180     assert(rb[0] == 100);
181 
182     rb.pushFront(200);
183     assert(rb[0] == 200);
184     assert(rb[1] == 100);
185 
186     rb.pushBack(300);
187     Vec!int vec = makeVec!int();
188     foreach(i; rb)
189     {
190         vec.pushBack(i);
191     }
192     assert(vec.releaseData() == [100, 300]);
193 }
194 
195 /// Reusable mechanism to provide the UI with continuously available non-critical data from the audio thread.
196 /// eg: for waveforms, analyzers, displays, etc...
197 /// In the case where the FIFO is empty, it may be that there is nothing to draw or audio processing has stopped.
198 /// And because audio buffers may be long, we can't just use atomics and avoid updating the UI when the buffer has already been processed.
199 /// It would cause slowness with small buffers.
200 struct TimedFIFO(T)
201 {
202 private:
203 
204     T[] _data;
205     int _count;
206     int _readIndex;
207     int _readIndexAtLastRead;
208     int _inputTimestamp;
209 
210     // Note: information about sample-rate is passed through atomics, out-of-band
211     shared(float) _sampleRate = 44100.0f;
212 
213     int _indexMask;
214     int _dividerMask;
215     float _invDivider;
216 
217     // protects: _readIndex, _count, _data
218     UncheckedMutex _dataMutex;
219 
220     float _timeDebt;
221     float _integerDebt; // because of rounding
222 
223     /// Returns: true of i is a power of 2.
224     static bool isPowerOf2(int i) @nogc nothrow
225     {
226         assert(i >= 0);
227         return (i != 0) && ((i & (i - 1)) == 0);
228     }
229 
230 public:
231 
232     /// Params:
233     ///     size = size of the buffer
234     ///     divider = only one in divider sample(s) is actually pushed in the FIFO.
235     void initialize(int size, int divider = 1)
236     {
237         assert(isPowerOf2(size));
238         assert(isPowerOf2(divider));
239 
240         _count = 0; // no data at start
241         _readIndex = 0;
242         _readIndexAtLastRead = 0;
243         _indexMask = size - 1;
244         _inputTimestamp = 0;
245         _dividerMask = divider - 1;
246         _invDivider = 1.0f / divider;
247 
248         _data.reallocBuffer(size);
249         _dataMutex = makeMutex();
250 
251         _timeDebt = 0;
252         _integerDebt = 0;
253 
254     }
255 
256     ~this()
257     {
258         _dataMutex.destroy();
259         _data.reallocBuffer(0);
260     }
261 
262     void pushData(T[] input, float sampleRate) nothrow @nogc
263     {
264         // Here we are in the audio thread, so blocking is not welcome
265         atomicStore(_sampleRate, sampleRate);
266 
267         // Push incoming data, but it's not that bad if we miss some
268         if (_dataMutex.tryLock())
269         {
270             foreach (i; 0..input.length)
271             {
272                 _inputTimestamp++;
273                 if ( (_inputTimestamp & _dividerMask) == 0 ) // should depend on samplerate?
274                 {
275                     _data[ (_readIndex + _count) & _indexMask ] = input[i];
276                     if (_count >= _data.length)
277                         ++_readIndex; // overflow, drop older data
278                     else
279                         ++_count; // grow buffer
280                 }
281             }
282             _dataMutex.unlock();
283         }
284     }
285 
286     /// Same but with 1 element.
287     void pushData(T input, float sampleRate) nothrow @nogc
288     {
289         pushData( (&input)[0..1], sampleRate);
290     }
291 
292     /// Get some amount of oldest samples in the FIFO
293     /// Then drop some amount of samples that correspond to time passing of dt
294     /// Returns: the number of sample data returned. Also return no data if tryLock failed to take the lock.
295     /// Note that there is a disconnect between the data that is dropped, and the data that is returned.
296     /// The same data may well be returned multiple time given a large buffer, or zero time.
297     int readOldestDataAndDropSome(T[] output, double dt, int keepAtLeast = 0) nothrow @nogc
298     {
299         assert(dt >= 0);
300         _timeDebt += dt * 1.01; // add 1% because it's better to be a bit short in buffer than too large.
301         if (_dataMutex.tryLock())
302         {
303             scope(exit) _dataMutex.unlock();
304 
305             int pointsNeeded = cast(int)(output.length);
306 
307             int pointsAvailable = ( (_count < pointsNeeded) ? _count : pointsNeeded);
308 
309             // Purpose: always drop data albeit slowly.
310             // In case of CPU overrun in Reaper, playback is stopped.
311             // To avoid display of the same information, we force keepAtLeast 
312             // to be always smaller than pointsAvailable.
313             if (keepAtLeast >= pointsAvailable && pointsAvailable > 0)
314             {
315                 keepAtLeast = pointsAvailable - 1;
316             }
317 
318             bool noData = (pointsAvailable == 0);
319 
320             if (noData)
321                 return 0;
322 
323             foreach (i ; 0..pointsAvailable)
324             {
325                 output[i] = _data[ (_readIndex + i) & _indexMask ];
326             }
327             _readIndexAtLastRead = _readIndex;
328 
329             // drop samples
330             float sampleRate = atomicLoad(_sampleRate);
331 
332             float samplesToDrop = _timeDebt * sampleRate * _invDivider + _integerDebt;
333             int maxDroppable = _count - keepAtLeast;
334             if (samplesToDrop > maxDroppable)
335                 samplesToDrop = maxDroppable;
336             if (samplesToDrop < 0)
337                 samplesToDrop = 0;
338 
339             int numSamplesToDrop = cast(int)(samplesToDrop);
340             _timeDebt = 0;
341             _integerDebt = (samplesToDrop - numSamplesToDrop);
342 
343             _count -= numSamplesToDrop;
344             _readIndex += numSamplesToDrop;
345             return pointsAvailable;
346         }
347         else
348             return 0;
349     }
350 
351     // Same but with one element
352     bool readOldestDataAndDropSome(T* output, double dt) nothrow @nogc
353     {
354         return readOldestDataAndDropSome(output[0..1], dt) != 0;
355     }
356 
357     /// Getting the last used read index is useful if you want to further filter the TImedFIFO output,
358     /// and need a modulo reference for to alignment.
359     /// In Couture, this is used to avoid "humps" on gain display.
360     int readIndexAtLastRead()
361     {
362         return _readIndexAtLastRead;
363     }
364 }
365 
366 
367 unittest
368 {
369     TimedFIFO!float a;
370 }