1 /** 2 * Copyright: Copyright Auburn Sounds 2015-2016 3 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) 4 * Authors: Guillaume Piolat 5 */ 6 module dplug.core.ringbuf; 7 8 import core.atomic; 9 10 import dplug.core.sync; 11 import dplug.core.nogc; 12 import dplug.core.alignedbuffer; 13 14 15 RingBufferNoGC!T ringBufferNoGC(T)(size_t initialCapacity) nothrow @nogc 16 { 17 return RingBufferNoGC!T(initialCapacity); 18 } 19 20 /// @nogc ring-buffer 21 struct RingBufferNoGC(T) 22 { 23 public 24 { 25 /// Create a RingBuffer with specified initial capacity. 26 this(size_t initialCapacity) nothrow @nogc 27 { 28 _data.reallocBuffer(initialCapacity); 29 clear(); 30 } 31 32 @disable this(this); 33 34 ~this() nothrow @nogc 35 { 36 _data.reallocBuffer(0); 37 } 38 39 bool isFull() pure const nothrow 40 { 41 return _count == _data.length; 42 } 43 44 /// Adds an item on the back of the queue. 45 void pushBack(T x) nothrow @nogc 46 { 47 checkOverflow!popFront(); 48 _data.ptr[(_first + _count) % _data.length] = x; 49 ++_count; 50 } 51 52 /// Adds an item on the front of the queue. 53 void pushFront(T x) nothrow @nogc 54 { 55 checkOverflow!popBack(); 56 ++_count; 57 _first = (_first - 1 + _data.length) % _data.length; 58 _data.ptr[_first] = x; 59 } 60 61 /// Removes an item from the front of the queue. 62 /// Returns: the removed item. 63 T popFront() nothrow @nogc 64 { 65 T res = _data.ptr[_first]; 66 _first = (_first + 1) % _data.length; 67 --_count; 68 return res; 69 } 70 71 /// Removes an item from the back of the queue. 72 /// Returns: the removed item. 73 T popBack() nothrow @nogc 74 { 75 --_count; 76 return _data.ptr[(_first + _count) % _data.length]; 77 } 78 79 /// Removes all items from the queue. 80 void clear() nothrow @nogc 81 { 82 _first = 0; 83 _count = 0; 84 } 85 86 /// Returns: number of items in the queue. 87 size_t length() pure const nothrow @nogc 88 { 89 return _count; 90 } 91 92 /// Returns: maximum number of items in the queue. 93 size_t capacity() pure const nothrow @nogc 94 { 95 return _data.length; 96 } 97 98 /// Returns: item at the front of the queue. 99 T front() pure nothrow @nogc 100 { 101 return _data.ptr[_first]; 102 } 103 104 /// Returns: item on the back of the queue. 105 T back() pure nothrow @nogc 106 { 107 return _data.ptr[(_first + _count + _data.length - 1) % _data.length]; 108 } 109 110 /// Returns: item index from the queue. 111 T opIndex(size_t index) nothrow @nogc 112 { 113 // crash if index out-of-bounds (not recoverable) 114 if (index > _count) 115 assert(0); 116 117 return _data.ptr[(_first + index) % _data.length]; 118 } 119 } 120 121 private 122 { 123 // element lie from _first to _first + _count - 1 index, modulo the allocated size 124 T[] _data; 125 size_t _first; 126 size_t _count; 127 128 void checkOverflow(alias popMethod)() nothrow 129 { 130 if (isFull()) 131 popMethod(); 132 } 133 } 134 } 135 136 137 unittest 138 { 139 RingBufferNoGC!float a; 140 } 141 142 /// Reusable mechanism to provide the UI with continuously available non-critical data from the audio thread. 143 /// eg: for waveforms, analyzers, displays, etc... 144 /// In the case where the FIFO is empty, it may be that there is nothing to draw or audio processing has stopped. 145 /// And because audio buffers may be long, we can't just use atomics and avoid updating the UI when the buffer has already been processed. 146 /// It would cause slowness with small buffers. 147 struct TimedFIFO(T) 148 { 149 private: 150 151 T[] _data; 152 int _count; 153 int _readIndex; 154 int _inputTimestamp; 155 156 // Note: information about sample-rate is passed through atomics, out-of-band 157 shared(float) _sampleRate = 44100.0f; 158 159 int _indexMask; 160 int _dividerMask; 161 float _invDivider; 162 163 // protects: _readIndex, _count, _data 164 UncheckedMutex _dataMutex; 165 166 float _timeDebt; 167 float _integerDebt; // because of rounding 168 169 /// Returns: true of i is a power of 2. 170 static bool isPowerOf2(int i) @nogc nothrow 171 { 172 assert(i >= 0); 173 return (i != 0) && ((i & (i - 1)) == 0); 174 } 175 176 public: 177 178 /// Params: 179 /// size = size of the buffer 180 /// divider = only one in divider sample(s) is actually pushed in the FIFO. 181 void initialize(int size, int divider = 1) 182 { 183 assert(isPowerOf2(size)); 184 assert(isPowerOf2(divider)); 185 186 _count = 0; // no data at start 187 _readIndex = 0; 188 _indexMask = size - 1; 189 _inputTimestamp = 0; 190 _dividerMask = divider - 1; 191 _invDivider = 1.0f / divider; 192 193 _data.reallocBuffer(size); 194 _dataMutex = makeMutex(); 195 196 _timeDebt = 0; 197 _integerDebt = 0; 198 199 } 200 201 ~this() 202 { 203 _dataMutex.destroy(); 204 _data.reallocBuffer(0); 205 } 206 207 void pushData(T[] input, float sampleRate) nothrow @nogc 208 { 209 // Here we are in the audio thread, so blocking is not welcome 210 atomicStore(_sampleRate, sampleRate); 211 212 // Push incoming data, but it's not that bad if we miss some 213 if (_dataMutex.tryLock()) 214 { 215 foreach (i; 0..input.length) 216 { 217 _inputTimestamp++; 218 if ( (_inputTimestamp & _dividerMask) == 0 ) // should depend on samplerate? 219 { 220 _data[ (_readIndex + _count) & _indexMask ] = input[i]; 221 if (_count >= _data.length) 222 ++_readIndex; // overflow, drop older data 223 else 224 ++_count; // grow buffer 225 } 226 } 227 _dataMutex.unlock(); 228 } 229 } 230 231 /// Same but with 1 element. 232 void pushData(T input, float sampleRate) nothrow @nogc 233 { 234 pushData( (&input)[0..1], sampleRate); 235 } 236 237 // Get some amount of oldest samples in the FIFO 238 // The drop some amount of samples that correspond to time passing of dt 239 // Returns: the number of sample data returned. Also return no data if tryLock failed to take the lock. 240 // Note that there is a disconnect between the data that is dropped, and the data that is returned. 241 // The same data may well be returned multiple time given a large buffer, or zero time. 242 int readOldestDataAndDropSome(T[] output, double dt, int keepAtLeast = 0) nothrow @nogc 243 { 244 assert(dt >= 0); 245 _timeDebt += dt * 1.01; // add 1% because it's better to be a bit short in buffer than too large. 246 if (_dataMutex.tryLock()) 247 { 248 scope(exit) _dataMutex.unlock(); 249 250 int pointsNeeded = cast(int)(output.length); 251 252 int pointsAvailable = ( (_count < pointsNeeded) ? _count : pointsNeeded); 253 254 bool noData = (pointsAvailable == 0); 255 256 if (noData) 257 return 0; 258 259 foreach (i ; 0..pointsAvailable) 260 { 261 output[i] = _data[ (_readIndex + i) & _indexMask ]; 262 } 263 264 // drop samples 265 float sampleRate = atomicLoad(_sampleRate); 266 267 float samplesToDrop = _timeDebt * sampleRate * _invDivider + _integerDebt; 268 int maxDroppable = _count - keepAtLeast; 269 if (samplesToDrop > maxDroppable) 270 samplesToDrop = maxDroppable; 271 if (samplesToDrop < 0) 272 samplesToDrop = 0; 273 274 int numSamplesToDrop = cast(int)(samplesToDrop); 275 _timeDebt = 0; 276 _integerDebt = (samplesToDrop - numSamplesToDrop); 277 278 _count -= numSamplesToDrop; 279 _readIndex += numSamplesToDrop; 280 return pointsAvailable; 281 } 282 else 283 return 0; 284 } 285 286 // Same but with one element 287 bool readOldestDataAndDropSome(T* output, double dt) nothrow @nogc 288 { 289 return readOldestDataAndDropSome(output[0..1], dt) != 0; 290 } 291 } 292 293 294 unittest 295 { 296 TimedFIFO!float a; 297 }