1 /**
2 * Multiple writers, multiple readers interlocked queue.
3 *
4 * Copyright: Copyright Auburn Sounds 2015-2016
5 * License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
6 * Authors:   Guillaume Piolat
7 */
8 module dplug.core.lockedqueue;
9 
10 import dplug.core.ringbuf;
11 import dplug.core.sync;
12 import dplug.core.nogc;
13 
14 
15 deprecated("Use makeLockedQueue instead") alias lockedQueue = makeLockedQueue;
16 auto makeLockedQueue(T)(size_t capacity) nothrow @nogc
17 {
18     return LockedQueue!T(capacity);
19 }
20 
21 /**
22 Locked queue for inter-thread communication.
23 Support multiple writers, multiple readers.
24 Blocks threads either when empty or full.
25 @nogc once in use.
26 
27 See_also: $(LINK2 #Queue, Queue)
28 */
29 struct LockedQueue(T)
30 {
31     public
32     {
33         /// Creates a locked queue with an initial capacity.
34         this(size_t capacity) nothrow @nogc
35         {
36             _queue = ringBufferNoGC!T(capacity);
37             _rwMutex = makeMutex();
38             _readerSemaphore = makeSemaphore(0);
39             _writerSemaphore = makeSemaphore(cast(uint)capacity);
40             _initialized = true;
41         }
42 
43         ~this() nothrow @nogc
44         {
45             if (_initialized)
46             {
47                 clear();
48                 _initialized = false;
49             }
50         }
51 
52         @disable this(this);
53 
54         /// Returns: Capacity of the locked queue.
55         size_t capacity() const nothrow @nogc
56         {
57             // no lock-required as capacity does not change
58             return _queue.capacity;
59         }
60 
61         /// Push an item to the back, block if queue is full.
62         void pushBack(T x) nothrow @nogc
63         {
64             _writerSemaphore.wait();
65             {
66                 _rwMutex.lock();
67                 _queue.pushBack(x);
68                 _rwMutex.unlock();
69             }
70             _readerSemaphore.notify();
71         }
72 
73         /// Push an item to the front, block if queue is full.
74         void pushFront(T x) nothrow @nogc
75         {
76             _writerSemaphore.wait();
77             {
78                 _rwMutex.lock();
79                 _queue.pushFront(x);
80                 _rwMutex.unlock();
81             }
82             _readerSemaphore.notify();
83         }
84 
85         /// Pop an item from the front, block if queue is empty.
86         T popFront() nothrow @nogc
87         {
88             _readerSemaphore.wait();
89             _rwMutex.lock();
90             T res = _queue.popFront();
91             _rwMutex.unlock();
92             _writerSemaphore.notify();
93             return res;
94         }
95 
96         /// Pop an item from the back, block if queue is empty.
97         T popBack() nothrow @nogc
98         {
99             _readerSemaphore.wait();
100             _rwMutex.lock();
101             T res = _queue.popBack();
102             _rwMutex.unlock();
103             _writerSemaphore.notify();
104             return res;
105         }
106 
107         /// Tries to pop an item from the front immediately.
108         /// Returns: true if an item was returned, false if the queue is empty.
109         bool tryPopFront(out T result) nothrow @nogc
110         {
111             if (_readerSemaphore.tryWait())
112             {
113                 _rwMutex.lock();
114                 result = _queue.popFront();
115                 _rwMutex.unlock();
116                 _writerSemaphore.notify();
117                 return true;
118             }
119             else
120                 return false;
121         }
122 
123         /// Tries to pop an item from the back immediately.
124         /// Returns: true if an item was returned, false if the queue is empty.
125         bool tryPopBack(out T result) nothrow @nogc
126         {
127             if (_readerSemaphore.tryWait())
128             {
129                 _rwMutex.lock();
130                 result = _queue.popBack();
131                 _rwMutex.unlock();
132                 _writerSemaphore.notify();
133                 return true;
134             }
135             else
136                 return false;
137         }
138 
139         /// Removes all locked queue items.
140         void clear() nothrow @nogc
141         {
142             while (_readerSemaphore.tryWait())
143             {
144                 _rwMutex.lock();
145                 _queue.popBack();
146                 _rwMutex.unlock();
147                 _writerSemaphore.notify();
148             }
149         }
150     }
151 
152     private
153     {
154         RingBufferNoGC!T _queue;
155         UncheckedMutex _rwMutex;
156         UncheckedSemaphore _readerSemaphore, _writerSemaphore;
157         bool _initialized;
158     }
159 }
160 
161 
162 unittest
163 {
164     import dplug.core.nogc;
165     auto lq = mallocNew!(LockedQueue!int)(3);
166     scope(exit) lq.destroyFree();
167     lq.clear();
168     lq.pushFront(2);
169     lq.pushBack(3);
170     lq.pushFront(1);
171 
172     // should contain [1 2 3] here
173     assert(lq.popBack() == 3);
174     assert(lq.popFront() == 1);
175     int res;
176     if (lq.tryPopFront(res))
177     {
178         assert(res == 2);
179     }
180 }
181