1 /** 2 * Multiple writers, multiple readers interlocked queue. 3 * 4 * Copyright: Copyright Auburn Sounds 2015-2016 5 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) 6 * Authors: Guillaume Piolat 7 */ 8 module dplug.core.lockedqueue; 9 10 import dplug.core.ringbuf; 11 import dplug.core.sync; 12 import dplug.core.nogc; 13 14 15 deprecated("Use makeLockedQueue instead") alias lockedQueue = makeLockedQueue; 16 auto makeLockedQueue(T)(size_t capacity) nothrow @nogc 17 { 18 return LockedQueue!T(capacity); 19 } 20 21 /** 22 Locked queue for inter-thread communication. 23 Support multiple writers, multiple readers. 24 Blocks threads either when empty or full. 25 @nogc once in use. 26 27 See_also: $(LINK2 #Queue, Queue) 28 */ 29 struct LockedQueue(T) 30 { 31 public 32 { 33 /// Creates a locked queue with an initial capacity. 34 this(size_t capacity) nothrow @nogc 35 { 36 _queue = ringBufferNoGC!T(capacity); 37 _rwMutex = makeMutex(); 38 _readerSemaphore = makeSemaphore(0); 39 _writerSemaphore = makeSemaphore(cast(uint)capacity); 40 _initialized = true; 41 } 42 43 ~this() nothrow @nogc 44 { 45 if (_initialized) 46 { 47 clear(); 48 _initialized = false; 49 } 50 } 51 52 @disable this(this); 53 54 /// Returns: Capacity of the locked queue. 55 size_t capacity() const nothrow @nogc 56 { 57 // no lock-required as capacity does not change 58 return _queue.capacity; 59 } 60 61 /// Push an item to the back, block if queue is full. 62 void pushBack(T x) nothrow @nogc 63 { 64 _writerSemaphore.wait(); 65 { 66 _rwMutex.lock(); 67 _queue.pushBack(x); 68 _rwMutex.unlock(); 69 } 70 _readerSemaphore.notify(); 71 } 72 73 /// Push an item to the front, block if queue is full. 74 void pushFront(T x) nothrow @nogc 75 { 76 _writerSemaphore.wait(); 77 { 78 _rwMutex.lock(); 79 _queue.pushFront(x); 80 _rwMutex.unlock(); 81 } 82 _readerSemaphore.notify(); 83 } 84 85 /// Pop an item from the front, block if queue is empty. 86 T popFront() nothrow @nogc 87 { 88 _readerSemaphore.wait(); 89 _rwMutex.lock(); 90 T res = _queue.popFront(); 91 _rwMutex.unlock(); 92 _writerSemaphore.notify(); 93 return res; 94 } 95 96 /// Pop an item from the back, block if queue is empty. 97 T popBack() nothrow @nogc 98 { 99 _readerSemaphore.wait(); 100 _rwMutex.lock(); 101 T res = _queue.popBack(); 102 _rwMutex.unlock(); 103 _writerSemaphore.notify(); 104 return res; 105 } 106 107 /// Tries to pop an item from the front immediately. 108 /// Returns: true if an item was returned, false if the queue is empty. 109 bool tryPopFront(out T result) nothrow @nogc 110 { 111 if (_readerSemaphore.tryWait()) 112 { 113 _rwMutex.lock(); 114 result = _queue.popFront(); 115 _rwMutex.unlock(); 116 _writerSemaphore.notify(); 117 return true; 118 } 119 else 120 return false; 121 } 122 123 /// Tries to pop an item from the back immediately. 124 /// Returns: true if an item was returned, false if the queue is empty. 125 bool tryPopBack(out T result) nothrow @nogc 126 { 127 if (_readerSemaphore.tryWait()) 128 { 129 _rwMutex.lock(); 130 result = _queue.popBack(); 131 _rwMutex.unlock(); 132 _writerSemaphore.notify(); 133 return true; 134 } 135 else 136 return false; 137 } 138 139 /// Removes all locked queue items. 140 void clear() nothrow @nogc 141 { 142 while (_readerSemaphore.tryWait()) 143 { 144 _rwMutex.lock(); 145 _queue.popBack(); 146 _rwMutex.unlock(); 147 _writerSemaphore.notify(); 148 } 149 } 150 } 151 152 private 153 { 154 RingBufferNoGC!T _queue; 155 UncheckedMutex _rwMutex; 156 UncheckedSemaphore _readerSemaphore, _writerSemaphore; 157 bool _initialized; 158 } 159 } 160 161 162 unittest 163 { 164 import dplug.core.nogc; 165 auto lq = mallocNew!(LockedQueue!int)(3); 166 scope(exit) lq.destroyFree(); 167 lq.clear(); 168 lq.pushFront(2); 169 lq.pushBack(3); 170 lq.pushFront(1); 171 172 // should contain [1 2 3] here 173 assert(lq.popBack() == 3); 174 assert(lq.popFront() == 1); 175 int res; 176 if (lq.tryPopFront(res)) 177 { 178 assert(res == 2); 179 } 180 } 181