1 /**
2  * Threads and thread-pool.
3  *
4  * Copyright: Copyright Sean Kelly 2005 - 2012.
5  * Copyright: Copyright (c) 2009-2011, David Simcha.
6  * Copyright: Copyright Auburn Sounds 2016.
7  * License: Distributed under the
8  *      $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0).
9  *    (See accompanying file LICENSE)
10  * Authors:   Sean Kelly, Walter Bright, Alex Rønne Petersen, Martin Nowak, David Simcha, Guillaume Piolat
11  */
12 module dplug.core.thread;
13 
14 import dplug.core.nogc;
15 import dplug.core.lockedqueue;
16 import dplug.core.sync;
17 
18 version(Posix)
19     import core.sys.posix.pthread;
20 else version(Windows)
21 {
22     import core.stdc.stdint : uintptr_t;
23     import core.sys.windows.windef;
24     import core.sys.windows.winbase;
25     import core.thread;
26 
27     extern (Windows) alias btex_fptr = uint function(void*) ;
28     extern (C) uintptr_t _beginthreadex(void*, uint, btex_fptr, void*, uint, uint*) nothrow @nogc;
29 }
30 else
31     static assert(false, "Platform not supported");
32 
33 version(OSX)
34 {
35     extern(C) nothrow @nogc
36     int sysctlbyname(const(char)*, void *, size_t *, void *, size_t);
37 }
38 
39 
40 alias ThreadDelegate = void delegate() nothrow @nogc;
41 
42 
43 Thread makeThread(ThreadDelegate callback, size_t stackSize = 0) nothrow @nogc
44 {
45     return Thread(callback, stackSize);
46 }
47 
48 /// Optimistic thread, failure not supported
49 struct Thread
50 {
51 nothrow:
52 @nogc:
53 public:
54 
55     /// Create a suspended thread.
56     ///
57     /// Params:
58     ///     callback = The delegate that will be called by the thread.
59     ///     stackSize = The thread stack size in bytes. 0 for default size.
60     ///
61     /// Warning: It is STRONGLY ADVISED to pass a class member delegate (not a struct
62     ///          member delegate) to have context.
63     ///          Passing struct method delegates are currently UNSUPPORTED.
64     ///
65     this(ThreadDelegate callback, size_t stackSize = 0)
66     {
67         _stackSize = stackSize;
68         _callback = callback;
69     }
70 
71     /// Destroys a thread. The thread is supposed to be finished at this point.
72     ~this()
73     {
74         if (!_started)
75             return;
76 
77         version(Posix)
78         {
79             pthread_detach(_id);
80         }
81         else version(Windows)
82         {
83             CloseHandle(_id);
84         }
85     }
86 
87     @disable this(this);
88 
89     /// Starts the thread. Threads are created suspended. This function can
90     /// only be called once.
91     void start()
92     {
93         assert(!_started);
94         version(Posix)
95         {
96             pthread_attr_t attr;
97 
98             int err = assumeNothrowNoGC(
99                 (pthread_attr_t* pattr)
100                 {
101                     return pthread_attr_init(pattr);
102                 })(&attr);
103 
104             if (err != 0)
105                 assert(false);
106 
107             if(_stackSize != 0)
108             {
109                 int err2 = assumeNothrowNoGC(
110                     (pthread_attr_t* pattr, size_t stackSize)
111                     {
112                         return pthread_attr_setstacksize(pattr, stackSize);
113                     })(&attr, _stackSize);
114                 if (err2 != 0)
115                     assert(false);
116             }
117 
118             int err3 = pthread_create(&_id, &attr, &posixThreadEntryPoint, &_callback);
119             if (err3 != 0)
120                 assert(false);
121 
122             int err4 = assumeNothrowNoGC(
123                 (pthread_attr_t* pattr)
124                 {
125                     return pthread_attr_destroy(pattr);
126                 })(&attr);
127             if (err4 != 0)
128                 assert(false);
129         }
130 
131         version(Windows)
132         {
133 
134             uint dummy;
135             _id = cast(HANDLE) _beginthreadex(null,
136                                               cast(uint)_stackSize,
137                                               &windowsThreadEntryPoint,
138                                               &_callback,
139                                               CREATE_SUSPENDED,
140                                               &dummy);
141             if (cast(size_t)_id == 0)
142                 assert(false);
143             if (ResumeThread(_id) == -1)
144                 assert(false);
145         }
146     }
147 
148     /// Wait for that thread termination
149     void join()
150     {
151         version(Posix)
152         {
153             void* returnValue;
154             if (0 != pthread_join(_id, &returnValue))
155                 assert(false);
156         }
157         else version(Windows)
158         {
159             if(WaitForSingleObject(_id, INFINITE) != WAIT_OBJECT_0)
160                 assert(false);
161             CloseHandle(_id);
162         }
163     }
164 
165 private:
166     version(Posix) pthread_t _id;
167     version(Windows) HANDLE _id;
168     ThreadDelegate _callback;
169     size_t _stackSize;
170     bool _started = false;
171 }
172 
173 version(Posix)
174 {
175     extern(C) void* posixThreadEntryPoint(void* threadContext) nothrow @nogc
176     {
177         ThreadDelegate dg = *cast(ThreadDelegate*)(threadContext);
178         dg(); // hopfully called with the right context pointer
179         return null;
180     }
181 }
182 
183 version(Windows)
184 {
185     extern (Windows) uint windowsThreadEntryPoint(void* threadContext) nothrow @nogc
186     {
187         ThreadDelegate dg = *cast(ThreadDelegate*)(threadContext);
188         dg();
189         return 0;
190     }
191 }
192 
193 unittest
194 {
195     int outerInt = 0;
196 
197     class A
198     {
199     nothrow @nogc:
200         this()
201         {
202             t = makeThread(&f);
203             t.start();
204         }
205 
206         void join()
207         {
208             t.join();
209         }
210 
211         void f()
212         {
213             outerInt = 1;
214             innerInt = 2;
215 
216             // verify this
217             assert(checkValue0 == 0x11223344);
218             assert(checkValue1 == 0x55667788);
219         }
220 
221         int checkValue0 = 0x11223344;
222         int checkValue1 = 0x55667788;
223         int innerInt = 0;
224         Thread t;
225     }
226 
227     auto a = new A;
228     a.t.join();
229     assert(a.innerInt == 2);
230     a.destroy();
231     assert(outerInt == 1);
232 }
233 
234 /// Launch a function in a newly created thread, which is destroyed afterwards.
235 /// Return the thread so that you can call `.join()` on it.
236 Thread launchInAThread(ThreadDelegate dg) nothrow @nogc
237 {
238     Thread t = makeThread(dg);
239     t.start();
240     return t;
241 }
242 
243 
244 version(Windows)
245 {
246     /// Returns: current thread identifier.
247     void* currentThreadId() nothrow @nogc
248     {
249         return cast(void*)GetCurrentThreadId();
250     }
251 }
252 else version(Posix)
253 {
254     /// Returns: current thread identifier.
255     void* currentThreadId() nothrow @nogc
256     {
257         return assumeNothrowNoGC(
258                 ()
259                 {
260                     return cast(void*)(pthread_self());
261                 })();
262     }
263 }
264 
265 
266 //
267 // Thread-pool
268 //
269 
270 /// Returns: Number of CPUs.
271 int getTotalNumberOfCPUs() nothrow @nogc
272 {
273     version(Windows)
274     {
275         import core.sys.windows.windows : SYSTEM_INFO, GetSystemInfo;
276         SYSTEM_INFO si;
277         GetSystemInfo(&si);
278         int procs = cast(int) si.dwNumberOfProcessors;
279         if (procs < 1)
280             procs = 1;
281         return procs;
282     }
283     else version(linux)
284     {
285         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
286         return cast(int) sysconf(_SC_NPROCESSORS_ONLN);
287     }
288     else version(OSX)
289     {
290         auto nameStr = "machdep.cpu.core_count\0".ptr;
291         uint ans;
292         size_t len = uint.sizeof;
293         sysctlbyname(nameStr, &ans, &len, null, 0);
294         return cast(int)ans;
295     }
296     else
297         static assert(false, "OS unsupported");
298 }
299 
300 alias ThreadPoolDelegate = void delegate(int workItem) nothrow @nogc;
301 
302 
303 /// Rewrite of the ThreadPool using condition variables.
304 /// FUTURE: this could be speed-up by using futures. Description of the task
305 ///         and associated condition+mutex would go in an external struct.
306 /// Note: the interface of the thread-pool itself is not thread-safe, you cannot give orders from
307 ///       multiple threads at once.
308 class ThreadPool
309 {
310 public:
311 nothrow:
312 @nogc:
313 
314     /// Creates a thread-pool.
315     this(int numThreads = 0, size_t stackSize = 0)
316     {
317         // Create sync first
318         _workMutex = makeMutex();
319         _workCondition = makeConditionVariable();
320 
321         _finishMutex = makeMutex();
322         _finishCondition = makeConditionVariable();
323 
324         // Create threads
325         if (numThreads == 0)
326             numThreads = getTotalNumberOfCPUs();
327         _threads = mallocSlice!Thread(numThreads);
328         foreach(ref thread; _threads)
329         {
330             thread = makeThread(&workerThreadFunc, stackSize);
331             thread.start();
332         }
333     }
334 
335     /// Destroys a thread-pool.
336     ~this()
337     {
338         if (_threads !is null)
339         {
340             assert(_state == State.initial);
341 
342             // Put the threadpool is stop state
343             _workMutex.lock();
344                 _stopFlag = true;
345             _workMutex.unlock();
346 
347             // Notify all workers
348             _workCondition.notifyAll();
349 
350             // Wait for each thread termination
351             foreach(ref thread; _threads)
352                 thread.join();
353 
354             // Detroys each thread
355             foreach(ref thread; _threads)
356                 thread.destroy();
357             freeSlice(_threads);
358             _threads = null;
359         }
360     }
361 
362     /// Calls the delegate in parallel, with 0..count as index.
363     /// Immediate waiting for completion.
364     void parallelFor(int count, scope ThreadPoolDelegate dg)
365     {
366         assert(_state == State.initial);
367 
368         // Do not launch worker threads for one work-item, not worth it.
369         // (but it is worth it in async).
370         if (count == 1)
371         {
372             dg(0);
373             return;
374         }
375 
376         // Unleash parallel threads.
377         parallelForAsync(count, dg);
378 
379         // Wait for completion immediately.
380         waitForCompletion(); 
381     }
382 
383     /// Same, but does not wait for completion. 
384     /// You cannot have 2 concurrent parallelFor for the same thread-pool.
385     void parallelForAsync(int count, scope ThreadPoolDelegate dg)
386     {
387         assert(_state == State.initial);
388 
389         if (count == 0) // no tasks, exit immediately
390             return;
391 
392         // At this point we assume all worker threads are waiting for messages
393 
394         // Sets the current task
395         _workMutex.lock();
396 
397         _taskDelegate = dg;       // immutable during this parallelFor
398         _taskNumWorkItem = count; // immutable during this parallelFor
399         _taskCurrentWorkItem = 0;
400         _taskCompleted = 0;
401 
402         _workMutex.unlock();
403 
404         // wake up all threads
405         // FUTURE: if number of tasks < number of threads only wake up the necessary amount of threads
406         _workCondition.notifyAll();
407 
408         _state = State.parallelForInProgress;
409     }
410 
411     /// Wait for completion of the previous parallelFor, if any.
412     // It's always safe to call this function before doing another parallelFor.
413     void waitForCompletion()
414     {
415         if (_state == State.initial)
416             return; // that means that parallel threads were not launched
417 
418         assert(_state == State.parallelForInProgress);
419 
420         _finishMutex.lock();
421         scope(exit) _finishMutex.unlock();
422 
423         // FUTURE: order thread will be waken up multiple times
424         //         (one for every completed task)
425         //         maybe that can be optimized
426         while (_taskCompleted < _taskNumWorkItem)
427         {
428             _finishCondition.wait(&_finishMutex);
429         }
430 
431         _state = State.initial;
432     }
433 
434 private:
435     Thread[] _threads = null;
436 
437     // Used to signal more work
438     UncheckedMutex _workMutex;
439     ConditionVariable _workCondition;
440 
441     // Used to signal completion
442     UncheckedMutex _finishMutex;
443     ConditionVariable _finishCondition;
444 
445     // These fields represent the current task group (ie. a parallelFor)
446     ThreadPoolDelegate _taskDelegate;
447     int _taskNumWorkItem;     // total number of tasks in this task group
448     int _taskCurrentWorkItem; // current task still left to do (protected by _workMutex)
449     int _taskCompleted;       // every task < taskCompleted has already been completed (protected by _finishMutex)
450 
451     bool _stopFlag;
452 
453     bool hasWork()
454     {
455         return _taskCurrentWorkItem < _taskNumWorkItem;
456     }
457 
458     // Represent the thread-pool state from the user POV
459     enum State
460     {
461         initial,               // tasks can be launched
462         parallelForInProgress, // task were launched, but not waited one
463     }
464     State _state = State.initial;
465 
466     // What worker threads do
467     // MAYDO: threads come here with bad context with struct delegates
468     void workerThreadFunc()
469     {
470         while (true)
471         {
472             int workItem = -1;
473             {
474                 _workMutex.lock();
475                 scope(exit) _workMutex.unlock();
476 
477                 // Wait for notification
478                 while (!_stopFlag && !hasWork())
479                     _workCondition.wait(&_workMutex);
480 
481                 if (_stopFlag && !hasWork())
482                     return;
483 
484                 assert(hasWork());
485 
486                 // Pick a task and increment counter
487                 workItem = _taskCurrentWorkItem;
488                 _taskCurrentWorkItem++;
489             }
490 
491             assert(workItem != -1);
492 
493             // Do the actual task
494             _taskDelegate(workItem);
495 
496             // signal completion of one more task
497             {
498                 _finishMutex.lock();
499                 _taskCompleted++;
500                 _finishMutex.unlock();
501 
502                 _finishCondition.notifyOne(); // wake-up
503             }
504         }
505     }
506 }
507 
508 
509 unittest
510 {
511     import core.atomic;
512     import dplug.core.nogc;
513 
514     struct A
515     {
516         ThreadPool _pool;
517 
518         this(int dummy)
519         {
520             _pool = mallocNew!ThreadPool();
521         }
522 
523         ~this()
524         {
525             _pool.destroy();
526         }
527 
528         void launch(int count, bool async) nothrow @nogc
529         {
530             if (async)
531             {
532                 _pool.parallelForAsync(count, &loopBody);
533                 _pool.waitForCompletion();
534             }
535             else
536                 _pool.parallelFor(count, &loopBody);
537         }
538 
539         void loopBody(int workItem) nothrow @nogc
540         {
541             atomicOp!"+="(counter, 1);
542         }
543 
544         shared(int) counter = 0;
545     }
546 
547     auto a = A(4);
548     a.launch(10, false);
549     assert(a.counter == 10);
550 
551     a.launch(500, true);
552     assert(a.counter == 510);
553 
554     a.launch(1, false);
555     assert(a.counter == 511);
556 
557     a.launch(1, true);
558     assert(a.counter == 512);
559 
560     a.launch(0, true);
561     assert(a.counter == 512);
562     a.launch(0, false);
563     assert(a.counter == 512);
564 }