1 /**
2  * The thread module provides support for thread creation and management.
3  *
4  * Copyright: Copyright Sean Kelly 2005 - 2012.
5  * Copyright:  Copyright (c) 2009-2011, David Simcha.
6  * Copyright: Copyright Auburn Sounds 2016
7  * License: Distributed under the
8  *      $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0).
9  *    (See accompanying file LICENSE)
10  * Authors:   Sean Kelly, Walter Bright, Alex Rønne Petersen, Martin Nowak, David Simcha, Guillaume Piolat
11  */
12 module dplug.core.thread;
13 
14 import dplug.core.nogc;
15 import dplug.core.lockedqueue;
16 import dplug.core.sync;
17 
18 version(Posix)
19     import core.sys.posix.pthread;
20 else version(Windows)
21 {
22     import core.stdc.stdint : uintptr_t;
23     import core.sys.windows.windef;
24     import core.sys.windows.winbase;
25     import core.thread;
26 
27     extern (Windows) alias btex_fptr = uint function(void*) ;
28     extern (C) uintptr_t _beginthreadex(void*, uint, btex_fptr, void*, uint, uint*) nothrow @nogc;
29 }
30 else
31     static assert(false, "Platform not supported");
32 
33 version(OSX)
34 {
35     extern(C) nothrow @nogc
36     int sysctlbyname(const(char)*, void *, size_t *, void *, size_t);
37 }
38 
39 
40 alias ThreadDelegate = void delegate() nothrow @nogc;
41 
42 
43 Thread makeThread(ThreadDelegate callback, size_t stackSize = 0) nothrow @nogc
44 {
45     return Thread(callback, stackSize);
46 }
47 
48 /// Optimistic thread, failure not supported
49 struct Thread
50 {
51 nothrow:
52 @nogc:
53 public:
54 
55     /// Create a suspended thread.
56     /// Params:
57     ///     callback The delegate that will be called by the thread
58     ///     stackSize The thread stack size in bytes. 0 for default size.
59     /// Warning: It is STRONGLY ADVISED to pass a class member delegate to have
60     ///          the right delegate context.
61     ///          Passing struct method delegates are currently UNSUPPORTED.
62     this(ThreadDelegate callback, size_t stackSize = 0)
63     {
64         _stackSize = stackSize;
65         _callback = callback;
66     }
67 
68     /// Destroys a thread. The thread is supposed to be finished at this point.
69     ~this()
70     {
71         if (!_started)
72             return;
73 
74         version(Posix)
75         {
76             pthread_detach(_id);
77         }
78         else version(Windows)
79         {
80             CloseHandle(_id);
81         }
82     }
83 
84     @disable this(this);
85 
86     /// Starts the thread. Threads are created suspended. This function can
87     /// only be called once.
88     void start()
89     {
90         assert(!_started);
91         version(Posix)
92         {
93             pthread_attr_t attr;
94 
95             int err = assumeNothrowNoGC(
96                 (pthread_attr_t* pattr)
97                 {
98                     return pthread_attr_init(pattr);
99                 })(&attr);
100 
101             if (err != 0)
102                 assert(false);
103 
104             if(_stackSize != 0)
105             {
106                 int err2 = assumeNothrowNoGC(
107                     (pthread_attr_t* pattr, size_t stackSize)
108                     {
109                         return pthread_attr_setstacksize(pattr, stackSize);
110                     })(&attr, _stackSize);
111                 if (err2 != 0)
112                     assert(false);
113             }
114 
115             int err3 = pthread_create(&_id, &attr, &posixThreadEntryPoint, &_callback);
116             if (err3 != 0)
117                 assert(false);
118 
119             int err4 = assumeNothrowNoGC(
120                 (pthread_attr_t* pattr)
121                 {
122                     return pthread_attr_destroy(pattr);
123                 })(&attr);
124             if (err4 != 0)
125                 assert(false);
126         }
127 
128         version(Windows)
129         {
130 
131             uint dummy;
132             _id = cast(HANDLE) _beginthreadex(null,
133                                               cast(uint)_stackSize,
134                                               &windowsThreadEntryPoint,
135                                               &_callback,
136                                               CREATE_SUSPENDED,
137                                               &dummy);
138             if (cast(size_t)_id == 0)
139                 assert(false);
140             if (ResumeThread(_id) == -1)
141                 assert(false);
142         }
143     }
144 
145     /// Wait for that thread termination
146     void join()
147     {
148         version(Posix)
149         {
150             void* returnValue;
151             if (0 != pthread_join(_id, &returnValue))
152                 assert(false);
153         }
154         else version(Windows)
155         {
156             if(WaitForSingleObject(_id, INFINITE) != WAIT_OBJECT_0)
157                 assert(false);
158             CloseHandle(_id);
159         }
160     }
161 
162 private:
163     version(Posix) pthread_t _id;
164     version(Windows) HANDLE _id;
165     ThreadDelegate _callback;
166     size_t _stackSize;
167     bool _started = false;
168 }
169 
170 version(Posix)
171 {
172     extern(C) void* posixThreadEntryPoint(void* threadContext) nothrow @nogc
173     {
174         ThreadDelegate dg = *cast(ThreadDelegate*)(threadContext);
175         dg(); // hopfully called with the right context pointer
176         return null;
177     }
178 }
179 
180 version(Windows)
181 {
182     extern (Windows) uint windowsThreadEntryPoint(void* threadContext) nothrow @nogc
183     {
184         ThreadDelegate dg = *cast(ThreadDelegate*)(threadContext);
185         dg();
186         return 0;
187     }
188 }
189 
190 unittest
191 {
192     int outerInt = 0;
193 
194     class A
195     {
196     nothrow @nogc:
197         this()
198         {
199             t = makeThread(&f);
200             t.start();
201         }
202 
203         void join()
204         {
205             t.join();
206         }
207 
208         void f()
209         {
210             outerInt = 1;
211             innerInt = 2;
212 
213             // verify this
214             assert(checkValue0 == 0x11223344);
215             assert(checkValue1 == 0x55667788);
216         }
217 
218         int checkValue0 = 0x11223344;
219         int checkValue1 = 0x55667788;
220         int innerInt = 0;
221         Thread t;
222     }
223 
224     auto a = new A;
225     a.t.join();
226     assert(a.innerInt == 2);
227     a.destroy();
228     assert(outerInt == 1);
229 }
230 
231 /// Launch a function in a newly created thread, which is destroyed afterwards.
232 /// Return the thread so that you can call `.join()` on it.
233 Thread launchInAThread(ThreadDelegate dg) nothrow @nogc
234 {
235     Thread t = makeThread(dg);
236     t.start();
237     return t;
238 }
239 
240 
241 version(Windows)
242 {
243     /// Returns: current thread identifier.
244     void* currentThreadId() nothrow @nogc
245     {
246         return cast(void*)GetCurrentThreadId();
247     }
248 }
249 else version(Posix)
250 {
251     /// Returns: current thread identifier.
252     void* currentThreadId() nothrow @nogc
253     {
254         return assumeNothrowNoGC(
255                 ()
256                 {
257                     return cast(void*)(pthread_self());
258                 })();
259     }
260 }
261 
262 
263 //
264 // Thread-pool
265 //
266 
267 /// Returns: Number of CPUs.
268 int getTotalNumberOfCPUs() nothrow @nogc
269 {
270     version(Windows)
271     {
272         import core.sys.windows.windows : SYSTEM_INFO, GetSystemInfo;
273         SYSTEM_INFO si;
274         GetSystemInfo(&si);
275         int procs = cast(int) si.dwNumberOfProcessors;
276         if (procs < 1)
277             procs = 1;
278         return procs;
279     }
280     else version(linux)
281     {
282         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
283         return cast(int) sysconf(_SC_NPROCESSORS_ONLN);
284     }
285     else version(OSX)
286     {
287         auto nameStr = "machdep.cpu.core_count\0".ptr;
288         uint ans;
289         size_t len = uint.sizeof;
290         sysctlbyname(nameStr, &ans, &len, null, 0);
291         return cast(int)ans;
292     }
293     else
294         static assert(false, "OS unsupported");
295 }
296 
297 alias ThreadPoolDelegate = void delegate(int workItem) nothrow @nogc;
298 
299 
300 /// Rewrite of the ThreadPool using condition variables.
301 /// FUTURE: this could be speed-up by using futures. Description of the task
302 ///         and associated condition+mutex would go in an external struct.
303 /// Note: the interface of the thread-pool itself is not thread-safe, you cannot give orders from
304 ///       multiple threads at once.
305 class ThreadPool
306 {
307 public:
308 nothrow:
309 @nogc:
310 
311     /// Creates a thread-pool.
312     this(int numThreads = 0, size_t stackSize = 0)
313     {
314         // Create sync first
315         _workMutex = makeMutex();
316         _workCondition = makeConditionVariable();
317 
318         _finishMutex = makeMutex();
319         _finishCondition = makeConditionVariable();
320 
321         // Create threads
322         if (numThreads == 0)
323             numThreads = getTotalNumberOfCPUs();
324         _threads = mallocSlice!Thread(numThreads);
325         foreach(ref thread; _threads)
326         {
327             thread = makeThread(&workerThreadFunc, stackSize);
328             thread.start();
329         }
330     }
331 
332     /// Destroys a thread-pool.
333     ~this()
334     {
335         if (_threads !is null)
336         {
337             assert(_state == State.initial);
338 
339             // Put the threadpool is stop state
340             _workMutex.lock();
341                 _stopFlag = true;
342             _workMutex.unlock();
343 
344             // Notify all workers
345             _workCondition.notifyAll();
346 
347             // Wait for each thread termination
348             foreach(ref thread; _threads)
349                 thread.join();
350 
351             // Detroys each thread
352             foreach(ref thread; _threads)
353                 thread.destroy();
354             freeSlice(_threads);
355             _threads = null;
356         }
357     }
358 
359     /// Calls the delegate in parallel, with 0..count as index.
360     /// Immediate waiting for completion.
361     void parallelFor(int count, scope ThreadPoolDelegate dg)
362     {
363         assert(_state == State.initial);
364 
365         // Do not launch worker threads for one work-item, not worth it.
366         // (but it is worth it in async).
367         if (count == 1)
368         {
369             dg(0);
370             return;
371         }
372 
373         // Unleash parallel threads.
374         parallelForAsync(count, dg);
375 
376         // Wait for completion immediately.
377         waitForCompletion(); 
378     }
379 
380     /// Same, but does not wait for completion. 
381     /// You cannot have 2 concurrent parallelFor for the same thread-pool.
382     void parallelForAsync(int count, scope ThreadPoolDelegate dg)
383     {
384         assert(_state == State.initial);
385 
386         if (count == 0) // no tasks, exit immediately
387             return;
388 
389         // At this point we assume all worker threads are waiting for messages
390 
391         // Sets the current task
392         _workMutex.lock();
393 
394         _taskDelegate = dg;       // immutable during this parallelFor
395         _taskNumWorkItem = count; // immutable during this parallelFor
396         _taskCurrentWorkItem = 0;
397         _taskCompleted = 0;
398 
399         _workMutex.unlock();
400 
401         // wake up all threads
402         // FUTURE: if number of tasks < number of threads only wake up the necessary amount of threads
403         _workCondition.notifyAll();
404 
405         _state = State.parallelForInProgress;
406     }
407 
408     /// Wait for completion of the previous parallelFor, if any.
409     // It's always safe to call this function before doing another parallelFor.
410     void waitForCompletion()
411     {
412         if (_state == State.initial)
413             return; // that means that parallel threads were not launched
414 
415         assert(_state == State.parallelForInProgress);
416 
417         _finishMutex.lock();
418         scope(exit) _finishMutex.unlock();
419 
420         // FUTURE: order thread will be waken up multiple times
421         //         (one for every completed task)
422         //         maybe that can be optimized
423         while (_taskCompleted < _taskNumWorkItem)
424         {
425             _finishCondition.wait(&_finishMutex);
426         }
427 
428         _state = State.initial;
429     }
430 
431 private:
432     Thread[] _threads = null;
433 
434     // Used to signal more work
435     UncheckedMutex _workMutex;
436     ConditionVariable _workCondition;
437 
438     // Used to signal completion
439     UncheckedMutex _finishMutex;
440     ConditionVariable _finishCondition;
441 
442     // These fields represent the current task group (ie. a parallelFor)
443     ThreadPoolDelegate _taskDelegate;
444     int _taskNumWorkItem;     // total number of tasks in this task group
445     int _taskCurrentWorkItem; // current task still left to do (protected by _workMutex)
446     int _taskCompleted;       // every task < taskCompleted has already been completed (protected by _finishMutex)
447 
448     bool _stopFlag;
449 
450     bool hasWork()
451     {
452         return _taskCurrentWorkItem < _taskNumWorkItem;
453     }
454 
455     // Represent the thread-pool state from the user POV
456     enum State
457     {
458         initial,               // tasks can be launched
459         parallelForInProgress, // task were launched, but not waited one
460     }
461     State _state = State.initial;
462 
463     // What worker threads do
464     // MAYDO: threads come here with bad context with struct delegates
465     void workerThreadFunc()
466     {
467         while (true)
468         {
469             int workItem = -1;
470             {
471                 _workMutex.lock();
472                 scope(exit) _workMutex.unlock();
473 
474                 // Wait for notification
475                 while (!_stopFlag && !hasWork())
476                     _workCondition.wait(&_workMutex);
477 
478                 if (_stopFlag && !hasWork())
479                     return;
480 
481                 assert(hasWork());
482 
483                 // Pick a task and increment counter
484                 workItem = _taskCurrentWorkItem;
485                 _taskCurrentWorkItem++;
486             }
487 
488             assert(workItem != -1);
489 
490             // Do the actual task
491             _taskDelegate(workItem);
492 
493             // signal completion of one more task
494             {
495                 _finishMutex.lock();
496                 _taskCompleted++;
497                 _finishMutex.unlock();
498 
499                 _finishCondition.notifyOne(); // wake-up
500             }
501         }
502     }
503 }
504 
505 
506 unittest
507 {
508     import core.atomic;
509     import dplug.core.nogc;
510 
511     struct A
512     {
513         ThreadPool _pool;
514 
515         this(int dummy)
516         {
517             _pool = mallocEmplace!ThreadPool();
518         }
519 
520         ~this()
521         {
522             _pool.destroy();
523         }
524 
525         void launch(int count, bool async) nothrow @nogc
526         {
527             if (async)
528             {
529                 _pool.parallelForAsync(count, &loopBody);
530                 _pool.waitForCompletion();
531             }
532             else
533                 _pool.parallelFor(count, &loopBody);
534         }
535 
536         void loopBody(int workItem) nothrow @nogc
537         {
538             atomicOp!"+="(counter, 1);
539         }
540 
541         shared(int) counter = 0;
542     }
543 
544     auto a = A(4);
545     a.launch(10, false);
546     assert(a.counter == 10);
547 
548     a.launch(500, true);
549     assert(a.counter == 510);
550 
551     a.launch(1, false);
552     assert(a.counter == 511);
553 
554     a.launch(1, true);
555     assert(a.counter == 512);
556 
557     a.launch(0, true);
558     assert(a.counter == 512);
559     a.launch(0, false);
560     assert(a.counter == 512);
561 }