1 /**
2 Multiple writers, multiple readers interlocked queue.
3 
4 Copyright: Auburn Sounds 2015-2016.
5 License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
6 */
7 module dplug.core.lockedqueue;
8 
9 import dplug.core.ringbuf;
10 import dplug.core.sync;
11 import dplug.core.nogc;
12 
13 
14 auto makeLockedQueue(T)(size_t capacity) nothrow @nogc
15 {
16     return LockedQueue!T(capacity);
17 }
18 
19 /**
20 Locked queue for inter-thread communication.
21 Support multiple writers, multiple readers.
22 Blocks threads either when empty or full.
23 @nogc once in use.
24 
25 See_also: $(LINK2 #Queue, Queue)
26 */
27 struct LockedQueue(T)
28 {
29     public
30     {
31         /// Creates a locked queue with an initial capacity.
32         this(size_t capacity) nothrow @nogc
33         {
34             _queue = makeRingBufferNoGC!T(capacity);
35             _rwMutex = makeMutex();
36             _readerSemaphore = makeSemaphore(0);
37             _writerSemaphore = makeSemaphore(cast(uint)capacity);
38             _initialized = true;
39         }
40 
41         ~this() nothrow @nogc
42         {
43             if (_initialized)
44             {
45                 clear();
46                 _initialized = false;
47             }
48         }
49 
50         @disable this(this);
51 
52         /// Returns: Capacity of the locked queue.
53         size_t capacity() const nothrow @nogc
54         {
55             // no lock-required as capacity does not change
56             return _queue.capacity;
57         }
58 
59         /// Push an item to the back, block if queue is full.
60         void pushBack(T x) nothrow @nogc
61         {
62             _writerSemaphore.wait();
63             {
64                 _rwMutex.lock();
65                 _queue.pushBack(x);
66                 _rwMutex.unlock();
67             }
68             _readerSemaphore.notify();
69         }
70 
71         /// Push an item to the front, block if queue is full.
72         void pushFront(T x) nothrow @nogc
73         {
74             _writerSemaphore.wait();
75             {
76                 _rwMutex.lock();
77                 _queue.pushFront(x);
78                 _rwMutex.unlock();
79             }
80             _readerSemaphore.notify();
81         }
82 
83         /// Pop an item from the front, block if queue is empty.
84         T popFront() nothrow @nogc
85         {
86             _readerSemaphore.wait();
87             _rwMutex.lock();
88             T res = _queue.popFront();
89             _rwMutex.unlock();
90             _writerSemaphore.notify();
91             return res;
92         }
93 
94         /// Pop an item from the back, block if queue is empty.
95         T popBack() nothrow @nogc
96         {
97             _readerSemaphore.wait();
98             _rwMutex.lock();
99             T res = _queue.popBack();
100             _rwMutex.unlock();
101             _writerSemaphore.notify();
102             return res;
103         }
104 
105         /// Tries to pop an item from the front immediately.
106         /// Returns: true if an item was returned, false if the queue is empty.
107         bool tryPopFront(out T result) nothrow @nogc
108         {
109             if (_readerSemaphore.tryWait())
110             {
111                 _rwMutex.lock();
112                 result = _queue.popFront();
113                 _rwMutex.unlock();
114                 _writerSemaphore.notify();
115                 return true;
116             }
117             else
118                 return false;
119         }
120 
121         /// Tries to pop an item from the back immediately.
122         /// Returns: true if an item was returned, false if the queue is empty.
123         bool tryPopBack(out T result) nothrow @nogc
124         {
125             if (_readerSemaphore.tryWait())
126             {
127                 _rwMutex.lock();
128                 result = _queue.popBack();
129                 _rwMutex.unlock();
130                 _writerSemaphore.notify();
131                 return true;
132             }
133             else
134                 return false;
135         }
136 
137         /// Removes all locked queue items.
138         void clear() nothrow @nogc
139         {
140             while (_readerSemaphore.tryWait())
141             {
142                 _rwMutex.lock();
143                 _queue.popBack();
144                 _rwMutex.unlock();
145                 _writerSemaphore.notify();
146             }
147         }
148     }
149 
150     private
151     {
152         RingBufferNoGC!T _queue;
153         UncheckedMutex _rwMutex;
154         UncheckedSemaphore _readerSemaphore, _writerSemaphore;
155         bool _initialized;
156     }
157 }
158 
159 
160 unittest
161 {
162     import dplug.core.nogc;
163     auto lq = mallocNew!(LockedQueue!int)(3);
164     scope(exit) lq.destroyFree();
165     lq.clear();
166     lq.pushFront(2);
167     lq.pushBack(3);
168     lq.pushFront(1);
169 
170     // should contain [1 2 3] here
171     assert(lq.popBack() == 3);
172     assert(lq.popFront() == 1);
173     int res;
174     if (lq.tryPopFront(res))
175     {
176         assert(res == 2);
177     }
178 }
179