1 /**
2  * Threads and thread-pool.
3  *
4  * Copyright: Copyright Sean Kelly 2005 - 2012.
5  * Copyright: Copyright (c) 2009-2011, David Simcha.
6  * Copyright: Copyright Guillaume Piolat 2016.
7  * License: Distributed under the
8  *      $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0).
9  *    (See accompanying file LICENSE)
10  * Authors:   Sean Kelly, Walter Bright, Alex Rønne Petersen, Martin Nowak, David Simcha, Guillaume Piolat
11  */
12 module dplug.core.thread;
13 
14 import core.stdc.stdlib;
15 import core.stdc.stdio;
16 
17 import dplug.core.nogc;
18 import dplug.core.lockedqueue;
19 import dplug.core.sync;
20 
21 version(Posix)
22     import core.sys.posix.pthread;
23 else version(Windows)
24 {
25     import core.stdc.stdint : uintptr_t;
26     import core.sys.windows.windef;
27     import core.sys.windows.winbase;
28     import core.thread;
29 
30     extern (Windows) alias btex_fptr = uint function(void*) ;
31     extern (C) uintptr_t _beginthreadex(void*, uint, btex_fptr, void*, uint, uint*) nothrow @nogc;
32 }
33 else
34     static assert(false, "Platform not supported");
35 
36 version (OSX)
37     version = Darwin;
38 else version (iOS)
39     version = Darwin;
40 else version (TVOS)
41     version = Darwin;
42 else version (WatchOS)
43     version = Darwin;
44 
45 version(Darwin)
46 {
47     extern(C) nothrow @nogc
48     int sysctlbyname(const(char)*, void *, size_t *, void *, size_t);
49 }
50 
51 //debug = threadPoolIsActuallySynchronous;
52 
53 
54 /// Legacy thread function
55 alias ThreadDelegate = void delegate() nothrow @nogc;
56 
57 /// Thread function with user data, used eg. in thread pool.
58 alias ThreadDelegateUser = void delegate(void* userData) nothrow @nogc;
59 
60 
61 Thread makeThread(ThreadDelegate callback, size_t stackSize = 0) nothrow @nogc
62 {
63     return Thread(callback, stackSize);
64 }
65 
66 Thread makeThread(ThreadDelegateUser callback, size_t stackSize = 0, void* userData = null) nothrow @nogc
67 {
68     return Thread(callback, stackSize, userData);
69 }
70 
71 /// Optimistic thread, failure not supported
72 struct Thread
73 {
74 nothrow:
75 @nogc:
76 public:
77 
78     /// Create a thread with user data. Thread is not created until `start` has been called.
79     ///
80     /// Params:
81     ///     callback  = The delegate that will be called by the thread.
82     ///     stackSize = The thread stack size in bytes. 0 for default size.
83     ///     userData  = a pointer to be passed to thread delegate
84     ///
85     /// Warning: It is STRONGLY ADVISED to pass a class member delegate (not a struct
86     ///          member delegate) to have additional context.
87     ///          Passing struct method delegates are currently UNSUPPORTED.
88     ///
89     this(ThreadDelegate callback, size_t stackSize = 0)
90     {
91         _stackSize = stackSize;
92         _context = cast(CreateContext*) malloc(CreateContext.sizeof);
93         _context.callback = callback;
94         _context.callbackUser = null;
95     }
96 
97     ///ditto
98     this(ThreadDelegateUser callback, size_t stackSize = 0, void* userData = null)
99     {
100         _stackSize = stackSize;
101         _context = cast(CreateContext*) malloc(CreateContext.sizeof);
102         _context.callback = null;
103         _context.callbackUser = callback;
104         _context.userData = userData;
105     }
106 
107     ~this()
108     {
109         if (_context !is null)
110         {
111             free(_context);
112             _context = null;
113         }
114     }
115 
116     @disable this(this);
117 
118     /// Starts the thread. Threads are created suspended. This function can
119     /// only be called once.
120     void start()
121     {
122         version(Posix)
123         {
124             pthread_attr_t attr;
125 
126             int err = assumeNothrowNoGC(
127                 (pthread_attr_t* pattr)
128                 {
129                     return pthread_attr_init(pattr);
130                 })(&attr);
131 
132             if (err != 0)
133                 assert(false);
134 
135             if(_stackSize != 0)
136             {
137                 int err2 = assumeNothrowNoGC(
138                     (pthread_attr_t* pattr, size_t stackSize)
139                     {
140                         return pthread_attr_setstacksize(pattr, stackSize);
141                     })(&attr, _stackSize);
142                 if (err2 != 0)
143                     assert(false);
144             }
145 
146             int err3 = pthread_create(&_id, &attr, &posixThreadEntryPoint, _context);
147             if (err3 != 0)
148                 assert(false);
149 
150             int err4 = assumeNothrowNoGC(
151                 (pthread_attr_t* pattr)
152                 {
153                     return pthread_attr_destroy(pattr);
154                 })(&attr);
155             if (err4 != 0)
156                 assert(false);
157         }
158         else version(Windows)
159         {
160 
161             uint dummy;
162 
163             _id = cast(HANDLE) _beginthreadex(null,
164                                               cast(uint)_stackSize,
165                                               &windowsThreadEntryPoint,
166                                               _context,
167                                               CREATE_SUSPENDED,
168                                               &dummy);
169             if (cast(size_t)_id == 0)
170                 assert(false);
171             if (ResumeThread(_id) == -1)
172                 assert(false);
173         }
174         else
175             static assert(false);
176     }
177 
178     /// Wait for that thread termination
179     /// Again, this function can be called only once.
180     /// This actually releases the thread resource.
181     void join()
182     {
183         version(Posix)
184         {
185             void* returnValue;
186             if (0 != pthread_join(_id, &returnValue))
187                 assert(false);
188         }
189         else version(Windows)
190         {
191             if(WaitForSingleObject(_id, INFINITE) != WAIT_OBJECT_0)
192                 assert(false);
193             CloseHandle(_id);
194         }
195     }
196 
197     void* getThreadID()
198     {
199         version(Posix) return cast(void*)_id;
200         else version(Windows) return cast(void*)_id;
201         else assert(false);
202     }
203 
204 private:
205     version(Posix) 
206     {
207         pthread_t _id;
208     }
209     else version(Windows) 
210     {
211         HANDLE _id;
212     }
213     else 
214         static assert(false);
215 
216     // Thread context given to OS thread creation function need to have a constant adress
217     // since there are no guarantees the `Thread` struct will be at the same adress.
218     static struct CreateContext
219     {
220     nothrow:
221     @nogc:
222         ThreadDelegate callback;
223         ThreadDelegateUser callbackUser;
224         void* userData;
225         void call()
226         {
227             if (callback !is null)
228                 callback();
229             else
230                 callbackUser(userData);
231         }
232     }
233     CreateContext* _context;
234 
235     size_t _stackSize;
236 }
237 
238 version(Posix)
239 {
240     extern(C) void* posixThreadEntryPoint(void* threadContext) nothrow @nogc
241     {
242         Thread.CreateContext* context = cast(Thread.CreateContext*)(threadContext);
243         context.call();
244         return null;
245     }
246 }
247 
248 version(Windows)
249 {
250     extern (Windows) uint windowsThreadEntryPoint(void* threadContext) nothrow @nogc
251     {
252         Thread.CreateContext* context = cast(Thread.CreateContext*)(threadContext);
253         context.call();
254         return 0;
255     }
256 }
257 
258 unittest
259 {
260     int outerInt = 0;
261 
262     class A
263     {
264     nothrow @nogc:
265         this()
266         {
267             t = makeThread(&f);
268             t.start();
269         }
270 
271         void join()
272         {
273             t.join();
274         }
275 
276         void f()
277         {
278             outerInt = 1;
279             innerInt = 2;
280 
281             // verify this
282             assert(checkValue0 == 0x11223344);
283             assert(checkValue1 == 0x55667788);
284         }
285 
286         int checkValue0 = 0x11223344;
287         int checkValue1 = 0x55667788;
288         int innerInt = 0;
289         Thread t;
290     }
291 
292     auto a = new A;
293     a.t.join();
294     assert(a.innerInt == 2);
295     a.destroy();
296     assert(outerInt == 1);
297 }
298 
299 /// Launch a function in a newly created thread, which is destroyed afterwards.
300 /// Return the thread so that you can call `.join()` on it.
301 Thread launchInAThread(ThreadDelegate dg, size_t stackSize = 0) nothrow @nogc
302 {
303     Thread t = makeThread(dg, stackSize);
304     t.start();
305     return t;
306 }
307 
308 //
309 // Thread-pool
310 //
311 
312 /// Returns: Number of CPUs.
313 int getTotalNumberOfCPUs() nothrow @nogc
314 {
315     version(Windows)
316     {
317       //  import core.sys.windows.windef;// : SYSTEM_INFO, GetSystemInfo;
318         SYSTEM_INFO si;
319         GetSystemInfo(&si);
320         int procs = cast(int) si.dwNumberOfProcessors;
321         if (procs < 1)
322             procs = 1;
323         return procs;
324     }
325     else version(Darwin)
326     {
327         auto nameStr = "machdep.cpu.core_count\0".ptr;
328         uint ans;
329         size_t len = uint.sizeof;
330         sysctlbyname(nameStr, &ans, &len, null, 0);
331         return cast(int)ans;
332     }
333     else version(Posix)
334     {
335         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
336         return cast(int) sysconf(_SC_NPROCESSORS_ONLN);
337     }
338     else
339         static assert(false, "OS unsupported");
340 }
341 
342 alias ThreadPoolDelegate = void delegate(int workItem, int threadIndex) nothrow @nogc;
343 
344 
345 debug(threadPoolIsActuallySynchronous)
346 {
347     /// Fake synchronous version of the thread pool
348     /// For measurement purpose, makes it easier to measure actual CPU time spent.
349     class ThreadPool
350     {
351     public:
352     nothrow:
353     @nogc:
354 
355         enum constantThreadId = 0;
356 
357         this(int numThreads = 0, int maxThreads = 0, size_t stackSize = 0)
358         {
359         }
360 
361         ~this()
362         {
363         }
364 
365         void parallelFor(int count, scope ThreadPoolDelegate dg)
366         {
367             foreach(i; 0..count)
368                 dg(cast(int)i, constantThreadId);
369         }
370 
371         void parallelForAsync(int count, scope ThreadPoolDelegate dg)
372         {
373             foreach(i; 0..count)
374                 dg(cast(int)i, constantThreadId);
375         }
376 
377         /// Wait for completion of the previous parallelFor, if any.
378         // It's always safe to call this function before doing another parallelFor.
379         void waitForCompletion()
380         {
381         }
382 
383         int numThreads() pure const
384         {
385             return 1;
386         }
387     }
388 }
389 else
390 {
391 
392     /// Rewrite of the ThreadPool using condition variables.
393     /// FUTURE: this could be speed-up by using futures. Description of the task
394     ///         and associated condition+mutex would go in an external struct.
395     /// Note: the interface of the thread-pool itself is not thread-safe, you cannot give orders from
396     ///       multiple threads at once.
397     class ThreadPool
398     {
399     public:
400     nothrow:
401     @nogc:
402 
403         /// Creates a thread-pool.
404         /// Params:
405         ///     numThreads = Number of threads to create (0 = auto).
406         ///     maxThreads = A maximum number of threads to create (0 = none).
407         ///     stackSize = Stack size to create threads with (0 = auto).
408         this(int numThreads = 0, int maxThreads = 0, size_t stackSize = 0)
409         {
410             // Create sync first
411             _workMutex = makeMutex();
412             _workCondition = makeConditionVariable();
413 
414             _finishMutex = makeMutex();
415             _finishCondition = makeConditionVariable();
416 
417             // Create threads
418             if (numThreads == 0)
419                 numThreads = getTotalNumberOfCPUs();
420 
421             // Limit number of threads eventually (this is done to give other software some leeway
422             // in a soft real-time OS)
423             if (maxThreads != 0)
424             {
425                 if (numThreads > maxThreads)
426                     numThreads = maxThreads;
427             }
428 
429             assert(numThreads >= 1);
430 
431             _threads = mallocSlice!Thread(numThreads);
432             foreach(size_t threadIndex, ref thread; _threads)
433             {
434                 // Pass the index of the thread through user data, so that it can be passed to the task in 
435                 // case these task need thread-local buffers.
436                 void* userData = cast(void*)(threadIndex);
437                 thread = makeThread(&workerThreadFunc, stackSize, userData);
438             }
439 
440             // because of calling currentThreadId, don't start threads until all are created
441             foreach(ref thread; _threads)
442             {
443                 thread.start();
444             }
445         }
446 
447         /// Destroys a thread-pool.
448         ~this()
449         {
450             if (_threads !is null)
451             {
452                 assert(_state == State.initial);
453 
454                 // Put the threadpool is stop state
455                 _workMutex.lock();
456                     _stopFlag = true;
457                 _workMutex.unlock();
458 
459                 // Notify all workers
460                 _workCondition.notifyAll();
461 
462                 // Wait for each thread termination
463                 foreach(ref thread; _threads)
464                     thread.join();
465 
466                 // Detroys each thread
467                 foreach(ref thread; _threads)
468                     thread.destroy();
469                 freeSlice(_threads);
470                 _threads = null;
471                 destroy(_workMutex);
472             }
473         }
474 
475         /// Calls the delegate in parallel, with 0..count as index.
476         /// Immediate waiting for completion.
477         /// If there is only one task, it is run directly on this thread.
478         /// IMPORTANT to be reentrant there! widget drawn alone can then launch same threadpool.
479         void parallelFor(int count, scope ThreadPoolDelegate dg)
480         {
481             assert(_state == State.initial);
482 
483             // Do not launch worker threads for one work-item, not worth it.
484             // (but it is worth it in async).
485             if (count == 1)
486             {
487                 int dummythreadID = 0; // it should not matter which is passed as long as it's a valid ID.
488                 dg(0, dummythreadID);
489                 return;
490             }
491 
492             // Unleash parallel threads.
493             parallelForAsync(count, dg);
494 
495             // Wait for completion immediately.
496             waitForCompletion(); 
497         }
498 
499         /// Same, but does not wait for completion. 
500         /// You cannot have 2 concurrent parallelFor for the same thread-pool.
501         void parallelForAsync(int count, scope ThreadPoolDelegate dg)
502         {
503             assert(_state == State.initial);
504 
505             if (count == 0) // no tasks, exit immediately
506                 return;
507 
508             // At this point we assume all worker threads are waiting for messages
509 
510             // Sets the current task
511             _workMutex.lock();
512 
513             _taskDelegate = dg;       // immutable during this parallelFor
514             _taskNumWorkItem = count; // immutable during this parallelFor
515             _taskCurrentWorkItem = 0;
516             _taskCompleted = 0;
517 
518             _workMutex.unlock();
519 
520             if (count >= _threads.length)
521             {
522                 // wake up all threads
523                 // FUTURE: if number of tasks < number of threads only wake up the necessary amount of threads
524                 _workCondition.notifyAll();
525             }
526             else
527             {
528                 // Less tasks than threads in the pool: only wake-up some threads.
529                 for (int t = 0; t < count; ++t)
530                     _workCondition.notifyOne();
531             }
532 
533             _state = State.parallelForInProgress;
534         }
535 
536         /// Wait for completion of the previous parallelFor, if any.
537         // It's always safe to call this function before doing another parallelFor.
538         void waitForCompletion()
539         {
540             if (_state == State.initial)
541                 return; // that means that parallel threads were not launched
542 
543             assert(_state == State.parallelForInProgress);
544 
545             _finishMutex.lock();
546             scope(exit) _finishMutex.unlock();
547 
548             // FUTURE: order thread will be waken up multiple times
549             //         (one for every completed task)
550             //         maybe that can be optimized
551             while (_taskCompleted < _taskNumWorkItem)
552             {
553                 _finishCondition.wait(&_finishMutex);
554             }
555 
556             _state = State.initial;
557         }
558 
559         int numThreads() pure const
560         {
561             return cast(int)_threads.length;
562         }
563 
564     private:
565         Thread[] _threads = null;
566 
567         // A map to find back thread index from thread system ID
568         void*[] _threadID = null;
569 
570         // Used to signal more work
571         UncheckedMutex _workMutex;
572         ConditionVariable _workCondition;
573 
574         // Used to signal completion
575         UncheckedMutex _finishMutex;
576         ConditionVariable _finishCondition;
577 
578         // These fields represent the current task group (ie. a parallelFor)
579         ThreadPoolDelegate _taskDelegate;
580         int _taskNumWorkItem;     // total number of tasks in this task group
581         int _taskCurrentWorkItem; // current task still left to do (protected by _workMutex)
582         int _taskCompleted;       // every task < taskCompleted has already been completed (protected by _finishMutex)
583 
584         bool _stopFlag;
585 
586         bool hasWork()
587         {
588             return _taskCurrentWorkItem < _taskNumWorkItem;
589         }
590 
591         // Represent the thread-pool state from the user POV
592         enum State
593         {
594             initial,               // tasks can be launched
595             parallelForInProgress, // task were launched, but not waited one
596         }
597         State _state = State.initial;
598 
599         // What worker threads do
600         // MAYDO: threads come here with bad context with struct delegates
601         void workerThreadFunc(void* userData)
602         {
603             while (true)
604             {
605                 int workItem = -1;
606                 {
607                     _workMutex.lock();
608                     scope(exit) _workMutex.unlock();
609 
610                     // Wait for notification
611                     while (!_stopFlag && !hasWork())
612                         _workCondition.wait(&_workMutex);
613 
614                     if (_stopFlag && !hasWork())
615                         return;
616 
617                     assert(hasWork());
618 
619                     // Pick a task and increment counter
620                     workItem = _taskCurrentWorkItem;
621                     _taskCurrentWorkItem++;
622                 }
623 
624                 // Find thread index from user data set by pool
625                 int threadIndex = cast(int)( cast(size_t)(userData) );
626 
627                 // Do the actual task
628                 _taskDelegate(workItem, threadIndex);
629 
630                 // signal completion of one more task
631                 {
632                     _finishMutex.lock();
633                     _taskCompleted++;
634                     _finishMutex.unlock();
635 
636                     _finishCondition.notifyOne(); // wake-up
637                 }
638             }
639         }
640     }
641 }
642 
643 /// Get the current thread OS handle.
644 /// The returned ID is just used for display. You can't get a `Thread` out of it.
645 public static size_t getCurrentThreadId() nothrow @nogc
646 {
647     version(Windows)
648     {
649         return cast(size_t) GetCurrentThreadId();
650     }
651     else version(Posix)
652     {
653         return cast(size_t)cast(void*)pthread_self();
654     }
655     else
656         static assert(false);
657 }
658 
659 unittest
660 {
661     import core.atomic;
662     import dplug.core.nogc;
663 
664     struct A
665     {
666         ThreadPool _pool;
667         int _numThreads;
668 
669         this(int numThreads, int maxThreads = 0, int stackSize = 0)
670         {
671             _pool = mallocNew!ThreadPool(numThreads, maxThreads, stackSize);
672             _numThreads = _pool.numThreads();
673         }
674 
675         ~this()
676         {
677             _pool.destroy();
678         }
679 
680         void launch(int count, bool async) nothrow @nogc
681         {
682             if (async)
683             {
684                 _pool.parallelForAsync(count, &loopBody);
685                 _pool.waitForCompletion();
686             }
687             else
688                 _pool.parallelFor(count, &loopBody);
689         }
690 
691         void loopBody(int workItem, int threadIndex) nothrow @nogc
692         {
693             bool goodIndex = (threadIndex >= 0) && (threadIndex < _numThreads);
694             assert(goodIndex);
695             atomicOp!"+="(counter, 1);
696         }
697 
698         shared(int) counter = 0;
699     }
700 
701     foreach (numThreads;  [0, 1, 2, 4, 8, 16, 32])
702     {
703         auto a = A(numThreads);
704         a.launch(10, false);
705         assert(a.counter == 10);
706 
707         a.launch(500, true);
708         assert(a.counter == 510);
709 
710         a.launch(1, false);
711         assert(a.counter == 511);
712 
713         a.launch(1, true);
714         assert(a.counter == 512);
715 
716         a.launch(0, true);
717         assert(a.counter == 512);
718         a.launch(0, false);
719         assert(a.counter == 512);
720     }
721 }
722 
723 // Bonus: Capacity to get the macOS version
724 
725 version(Darwin)
726 {
727 
728     // Note: .init value is a large future version (100.0.0), so that failure to detect version
729     // lead to newer behaviour.
730     struct MacOSVersion
731     {
732         int major = 100; // eg: major = 10   minor = 7 for 10.7
733         int minor = 0;
734         int patch = 0;
735     }
736 
737     /// Get the macOS version we are running on.
738     /// Note: it only makes sense for macOS, not iOS.
739     /// Note: patch always return zero for now.
740     MacOSVersion getMacOSVersion() nothrow @nogc
741     {
742         char[256] str;
743         size_t size = 256;
744         int ret = sysctlbyname("kern.osrelease", str.ptr, &size, null, 0);
745         MacOSVersion result;
746         if (ret != 0) 
747             return result;
748         int darwinMajor, darwinMinor, darwinPatch;
749         if (3 == sscanf(str.ptr, "%d.%d.%d", &darwinMajor, &darwinMinor, &darwinPatch))
750         {
751             result.patch = 0;
752 
753             switch(darwinMajor)
754             {
755                 case 0: .. case 11:
756                     result.major = 10; // 10.7
757                     result.minor = 7;
758                     break;
759 
760                 case 12: .. case 19:
761                     result.major = 10; // 10.7
762                     result.minor = darwinMajor - 4; // 10.8 to 10.15
763                     break;
764 
765                 case 20:
766                     result.major = 11; // Big Sur
767                     result.minor = 0;
768                     break;
769 
770                 case 21:
771                     result.major = 12; // Monterey
772                     result.minor = 0;
773                     break;
774 
775 
776                 default:
777                     result.major = 100;
778                     result.minor = 0;
779             }
780         }
781         return result;
782     }
783 
784   /*  unittest
785     {
786         MacOSVersion ver = getMacOSVersion();
787         printf("Detected macOS %d.%d.%d\n", ver.major, ver.minor, ver.patch);
788     } */
789 }