1 /** 2 Multiple writers, multiple readers interlocked queue. 3 4 Copyright: Auburn Sounds 2015-2016. 5 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) 6 */ 7 module dplug.core.lockedqueue; 8 9 import dplug.core.ringbuf; 10 import dplug.core.sync; 11 import dplug.core.nogc; 12 13 14 auto makeLockedQueue(T)(size_t capacity) nothrow @nogc 15 { 16 return LockedQueue!T(capacity); 17 } 18 19 /** 20 Locked queue for inter-thread communication. 21 Support multiple writers, multiple readers. 22 Blocks threads either when empty or full. 23 @nogc once in use. 24 25 See_also: $(LINK2 #Queue, Queue) 26 */ 27 struct LockedQueue(T) 28 { 29 public 30 { 31 /// Creates a locked queue with an initial capacity. 32 this(size_t capacity) nothrow @nogc 33 { 34 _queue = makeRingBufferNoGC!T(capacity); 35 _rwMutex = makeMutex(); 36 _readerSemaphore = makeSemaphore(0); 37 _writerSemaphore = makeSemaphore(cast(uint)capacity); 38 _initialized = true; 39 } 40 41 ~this() nothrow @nogc 42 { 43 if (_initialized) 44 { 45 clear(); 46 _initialized = false; 47 } 48 } 49 50 @disable this(this); 51 52 /// Returns: Capacity of the locked queue. 53 size_t capacity() const nothrow @nogc 54 { 55 // no lock-required as capacity does not change 56 return _queue.capacity; 57 } 58 59 /// Push an item to the back, block if queue is full. 60 void pushBack(T x) nothrow @nogc 61 { 62 _writerSemaphore.wait(); 63 { 64 _rwMutex.lock(); 65 _queue.pushBack(x); 66 _rwMutex.unlock(); 67 } 68 _readerSemaphore.notify(); 69 } 70 71 /// Push an item to the front, block if queue is full. 72 void pushFront(T x) nothrow @nogc 73 { 74 _writerSemaphore.wait(); 75 { 76 _rwMutex.lock(); 77 _queue.pushFront(x); 78 _rwMutex.unlock(); 79 } 80 _readerSemaphore.notify(); 81 } 82 83 /// Pop an item from the front, block if queue is empty. 84 T popFront() nothrow @nogc 85 { 86 _readerSemaphore.wait(); 87 _rwMutex.lock(); 88 T res = _queue.popFront(); 89 _rwMutex.unlock(); 90 _writerSemaphore.notify(); 91 return res; 92 } 93 94 /// Pop an item from the back, block if queue is empty. 95 T popBack() nothrow @nogc 96 { 97 _readerSemaphore.wait(); 98 _rwMutex.lock(); 99 T res = _queue.popBack(); 100 _rwMutex.unlock(); 101 _writerSemaphore.notify(); 102 return res; 103 } 104 105 /// Tries to pop an item from the front immediately. 106 /// Returns: true if an item was returned, false if the queue is empty. 107 bool tryPopFront(out T result) nothrow @nogc 108 { 109 if (_readerSemaphore.tryWait()) 110 { 111 _rwMutex.lock(); 112 result = _queue.popFront(); 113 _rwMutex.unlock(); 114 _writerSemaphore.notify(); 115 return true; 116 } 117 else 118 return false; 119 } 120 121 /// Tries to pop an item from the back immediately. 122 /// Returns: true if an item was returned, false if the queue is empty. 123 bool tryPopBack(out T result) nothrow @nogc 124 { 125 if (_readerSemaphore.tryWait()) 126 { 127 _rwMutex.lock(); 128 result = _queue.popBack(); 129 _rwMutex.unlock(); 130 _writerSemaphore.notify(); 131 return true; 132 } 133 else 134 return false; 135 } 136 137 /// Removes all locked queue items. 138 void clear() nothrow @nogc 139 { 140 while (_readerSemaphore.tryWait()) 141 { 142 _rwMutex.lock(); 143 _queue.popBack(); 144 _rwMutex.unlock(); 145 _writerSemaphore.notify(); 146 } 147 } 148 } 149 150 private 151 { 152 RingBufferNoGC!T _queue; 153 UncheckedMutex _rwMutex; 154 UncheckedSemaphore _readerSemaphore, _writerSemaphore; 155 bool _initialized; 156 } 157 } 158 159 160 unittest 161 { 162 import dplug.core.nogc; 163 auto lq = mallocNew!(LockedQueue!int)(3); 164 scope(exit) lq.destroyFree(); 165 lq.clear(); 166 lq.pushFront(2); 167 lq.pushBack(3); 168 lq.pushFront(1); 169 170 // should contain [1 2 3] here 171 assert(lq.popBack() == 3); 172 assert(lq.popFront() == 1); 173 int res; 174 if (lq.tryPopFront(res)) 175 { 176 assert(res == 2); 177 } 178 } 179