1 /**
2 * Copyright: Copyright Auburn Sounds 2015-2016
3 * License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
4 * Authors:   Guillaume Piolat
5 */
6 module dplug.core.ringbuf;
7 
8 import core.atomic;
9 
10 import dplug.core.sync;
11 import dplug.core.nogc;
12 import dplug.core.alignedbuffer;
13 
14 
15 RingBufferNoGC!T ringBufferNoGC(T)(size_t initialCapacity) nothrow @nogc
16 {
17     return RingBufferNoGC!T(initialCapacity);
18 }
19 
20 /// @nogc ring-buffer
21 struct RingBufferNoGC(T)
22 {
23     public
24     {
25         /// Create a RingBuffer with specified initial capacity.
26         this(size_t initialCapacity) nothrow @nogc
27         {
28             _data.reallocBuffer(initialCapacity);
29             clear();
30         }
31 
32         @disable this(this);
33 
34         ~this() nothrow @nogc
35         {
36             _data.reallocBuffer(0);
37         }
38 
39         bool isFull() pure const nothrow
40         {
41             return _count == _data.length;
42         }
43 
44         /// Adds an item on the back of the queue.
45         void pushBack(T x) nothrow @nogc
46         {
47             checkOverflow!popFront();
48            _data.ptr[(_first + _count) % _data.length] = x;
49             ++_count;
50         }
51 
52         /// Adds an item on the front of the queue.
53         void pushFront(T x) nothrow @nogc
54         {
55             checkOverflow!popBack();
56             ++_count;
57             _first = (_first - 1 + _data.length) % _data.length;
58             _data.ptr[_first] = x;
59         }
60 
61         /// Removes an item from the front of the queue.
62         /// Returns: the removed item.
63         T popFront() nothrow @nogc
64         {
65             T res = _data.ptr[_first];
66             _first = (_first + 1) % _data.length;
67             --_count;
68             return res;
69         }
70 
71         /// Removes an item from the back of the queue.
72         /// Returns: the removed item.
73         T popBack() nothrow @nogc
74         {
75             --_count;
76             return _data.ptr[(_first + _count) % _data.length];
77         }
78 
79         /// Removes all items from the queue.
80         void clear() nothrow @nogc
81         {
82             _first = 0;
83             _count = 0;
84         }
85 
86         /// Returns: number of items in the queue.
87         size_t length() pure const nothrow @nogc
88         {
89             return _count;
90         }
91 
92         /// Returns: maximum number of items in the queue.
93         size_t capacity() pure const nothrow @nogc
94         {
95             return _data.length;
96         }
97 
98         /// Returns: item at the front of the queue.
99         T front() pure nothrow @nogc
100         {
101             return _data.ptr[_first];
102         }
103 
104         /// Returns: item on the back of the queue.
105         T back() pure nothrow @nogc
106         {
107             return _data.ptr[(_first + _count + _data.length - 1) % _data.length];
108         }
109 
110         /// Returns: item index from the queue.
111         T opIndex(size_t index) nothrow @nogc
112         {
113             // crash if index out-of-bounds (not recoverable)
114             if (index > _count)
115                 assert(0);
116 
117             return _data.ptr[(_first + index) % _data.length];
118         }
119     }
120 
121     private
122     {
123         // element lie from _first to _first + _count - 1 index, modulo the allocated size
124         T[] _data;
125         size_t _first;
126         size_t _count;
127 
128         void checkOverflow(alias popMethod)() nothrow
129         {
130             if (isFull())
131                 popMethod();
132         }
133     }
134 }
135 
136 
137 unittest
138 {
139     RingBufferNoGC!float a;
140 }
141 
142 /// Reusable mechanism to provide the UI with continuously available non-critical data from the audio thread.
143 /// eg: for waveforms, analyzers, displays, etc...
144 /// In the case where the FIFO is empty, it may be that there is nothing to draw or audio processing has stopped.
145 /// And because audio buffers may be long, we can't just use atomics and avoid updating the UI when the buffer has already been processed.
146 /// It would cause slowness with small buffers.
147 struct TimedFIFO(T)
148 {
149 private:
150 
151     T[] _data;
152     int _count;
153     int _readIndex;
154     int _inputTimestamp;
155 
156     // Note: information about sample-rate is passed through atomics, out-of-band
157     shared(float) _sampleRate = 44100.0f;
158 
159     int _indexMask;
160     int _dividerMask;
161     float _invDivider;
162 
163     // protects: _readIndex, _count, _data
164     UncheckedMutex _dataMutex;
165 
166     float _timeDebt;
167     float _integerDebt; // because of rounding
168 
169     /// Returns: true of i is a power of 2.
170     static bool isPowerOf2(int i) @nogc nothrow
171     {
172         assert(i >= 0);
173         return (i != 0) && ((i & (i - 1)) == 0);
174     }
175 
176 public:
177 
178     /// Params:
179     ///     size = size of the buffer
180     ///     divider = only one in divider sample(s) is actually pushed in the FIFO.
181     void initialize(int size, int divider = 1)
182     {
183         assert(isPowerOf2(size));
184         assert(isPowerOf2(divider));
185 
186         _count = 0; // no data at start
187         _readIndex = 0;
188         _indexMask = size - 1;
189         _inputTimestamp = 0;
190         _dividerMask = divider - 1;
191         _invDivider = 1.0f / divider;
192 
193         _data.reallocBuffer(size);
194         _dataMutex = makeMutex();
195 
196         _timeDebt = 0;
197         _integerDebt = 0;
198 
199     }
200 
201     ~this()
202     {
203         _dataMutex.destroy();
204         _data.reallocBuffer(0);
205     }
206 
207     void pushData(T[] input, float sampleRate) nothrow @nogc
208     {
209         // Here we are in the audio thread, so blocking is not welcome
210         atomicStore(_sampleRate, sampleRate);
211 
212         // Push incoming data, but it's not that bad if we miss some
213         if (_dataMutex.tryLock())
214         {
215             foreach (i; 0..input.length)
216             {
217                 _inputTimestamp++;
218                 if ( (_inputTimestamp & _dividerMask) == 0 ) // should depend on samplerate?
219                 {
220                     _data[ (_readIndex + _count) & _indexMask ] = input[i];
221                     if (_count >= _data.length)
222                         ++_readIndex; // overflow, drop older data
223                     else
224                         ++_count; // grow buffer
225                 }
226             }
227             _dataMutex.unlock();
228         }
229     }
230 
231     /// Same but with 1 element.
232     void pushData(T input, float sampleRate) nothrow @nogc
233     {
234         pushData( (&input)[0..1], sampleRate);
235     }
236 
237     // Get some amount of oldest samples in the FIFO
238     // The drop some amount of samples that correspond to time passing of dt
239     // Returns: the number of sample data returned. Also return no data if tryLock failed to take the lock.
240     // Note that there is a disconnect between the data that is dropped, and the data that is returned.
241     // The same data may well be returned multiple time given a large buffer, or zero time.
242     int readOldestDataAndDropSome(T[] output, double dt, int keepAtLeast = 0) nothrow @nogc
243     {
244         assert(dt >= 0);
245         _timeDebt += dt * 1.01; // add 1% because it's better to be a bit short in buffer than too large.
246         if (_dataMutex.tryLock())
247         {
248             scope(exit) _dataMutex.unlock();
249 
250             int pointsNeeded = cast(int)(output.length);
251 
252             int pointsAvailable = ( (_count < pointsNeeded) ? _count : pointsNeeded);
253 
254             bool noData = (pointsAvailable == 0);
255 
256             if (noData)
257                 return 0;
258 
259             foreach (i ; 0..pointsAvailable)
260             {
261                 output[i] = _data[ (_readIndex + i) & _indexMask ];
262             }
263 
264             // drop samples
265             float sampleRate = atomicLoad(_sampleRate);
266 
267             float samplesToDrop = _timeDebt * sampleRate * _invDivider + _integerDebt;
268             int maxDroppable = _count - keepAtLeast;
269             if (samplesToDrop > maxDroppable)
270                 samplesToDrop = maxDroppable;
271             if (samplesToDrop < 0)
272                 samplesToDrop = 0;
273 
274             int numSamplesToDrop = cast(int)(samplesToDrop);
275             _timeDebt = 0;
276             _integerDebt = (samplesToDrop - numSamplesToDrop);
277 
278             _count -= numSamplesToDrop;
279             _readIndex += numSamplesToDrop;
280             return pointsAvailable;
281         }
282         else
283             return 0;
284     }
285 
286     // Same but with one element
287     bool readOldestDataAndDropSome(T* output, double dt) nothrow @nogc
288     {
289         return readOldestDataAndDropSome(output[0..1], dt) != 0;
290     }
291 }
292 
293 
294 unittest
295 {
296     TimedFIFO!float a;
297 }