1 /** 2 * Threads and thread-pool. 3 * 4 * Copyright: Copyright Sean Kelly 2005 - 2012. 5 * Copyright: Copyright (c) 2009-2011, David Simcha. 6 * Copyright: Copyright Guillaume Piolat 2016. 7 * License: Distributed under the 8 * $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0). 9 * (See accompanying file LICENSE) 10 * Authors: Sean Kelly, Walter Bright, Alex Rønne Petersen, Martin Nowak, David Simcha, Guillaume Piolat 11 */ 12 module dplug.core.thread; 13 14 import core.stdc.stdlib; 15 import core.stdc.stdio; 16 17 import dplug.core.nogc; 18 import dplug.core.lockedqueue; 19 import dplug.core.sync; 20 21 version(Posix) 22 import core.sys.posix.pthread; 23 else version(Windows) 24 { 25 import core.stdc.stdint : uintptr_t; 26 import core.sys.windows.windef; 27 import core.sys.windows.winbase; 28 import core.thread; 29 30 extern (Windows) alias btex_fptr = uint function(void*) ; 31 extern (C) uintptr_t _beginthreadex(void*, uint, btex_fptr, void*, uint, uint*) nothrow @nogc; 32 } 33 else 34 static assert(false, "Platform not supported"); 35 36 version (OSX) 37 version = Darwin; 38 else version (iOS) 39 version = Darwin; 40 else version (TVOS) 41 version = Darwin; 42 else version (WatchOS) 43 version = Darwin; 44 45 version(Darwin) 46 { 47 extern(C) nothrow @nogc 48 int sysctlbyname(const(char)*, void *, size_t *, void *, size_t); 49 } 50 51 //debug = threadPoolIsActuallySynchronous; 52 53 54 /// Legacy thread function 55 alias ThreadDelegate = void delegate() nothrow @nogc; 56 57 /// Thread function with user data, used eg. in thread pool. 58 alias ThreadDelegateUser = void delegate(void* userData) nothrow @nogc; 59 60 61 Thread makeThread(ThreadDelegate callback, size_t stackSize = 0) nothrow @nogc 62 { 63 return Thread(callback, stackSize); 64 } 65 66 Thread makeThread(ThreadDelegateUser callback, size_t stackSize = 0, void* userData = null) nothrow @nogc 67 { 68 return Thread(callback, stackSize, userData); 69 } 70 71 /// Optimistic thread, failure not supported 72 struct Thread 73 { 74 nothrow: 75 @nogc: 76 public: 77 78 /// Create a thread with user data. Thread is not created until `start` has been called. 79 /// 80 /// Params: 81 /// callback = The delegate that will be called by the thread. 82 /// stackSize = The thread stack size in bytes. 0 for default size. 83 /// userData = a pointer to be passed to thread delegate 84 /// 85 /// Warning: It is STRONGLY ADVISED to pass a class member delegate (not a struct 86 /// member delegate) to have additional context. 87 /// Passing struct method delegates are currently UNSUPPORTED. 88 /// 89 this(ThreadDelegate callback, size_t stackSize = 0) 90 { 91 _stackSize = stackSize; 92 _context = cast(CreateContext*) malloc(CreateContext.sizeof); 93 _context.callback = callback; 94 _context.callbackUser = null; 95 } 96 97 ///ditto 98 this(ThreadDelegateUser callback, size_t stackSize = 0, void* userData = null) 99 { 100 _stackSize = stackSize; 101 _context = cast(CreateContext*) malloc(CreateContext.sizeof); 102 _context.callback = null; 103 _context.callbackUser = callback; 104 _context.userData = userData; 105 } 106 107 ~this() 108 { 109 if (_context !is null) 110 { 111 free(_context); 112 _context = null; 113 } 114 } 115 116 @disable this(this); 117 118 /// Starts the thread. Threads are created suspended. This function can 119 /// only be called once. 120 void start() 121 { 122 version(Posix) 123 { 124 pthread_attr_t attr; 125 126 int err = assumeNothrowNoGC( 127 (pthread_attr_t* pattr) 128 { 129 return pthread_attr_init(pattr); 130 })(&attr); 131 132 if (err != 0) 133 assert(false); 134 135 if(_stackSize != 0) 136 { 137 int err2 = assumeNothrowNoGC( 138 (pthread_attr_t* pattr, size_t stackSize) 139 { 140 return pthread_attr_setstacksize(pattr, stackSize); 141 })(&attr, _stackSize); 142 if (err2 != 0) 143 assert(false); 144 } 145 146 int err3 = pthread_create(&_id, &attr, &posixThreadEntryPoint, _context); 147 if (err3 != 0) 148 assert(false); 149 150 int err4 = assumeNothrowNoGC( 151 (pthread_attr_t* pattr) 152 { 153 return pthread_attr_destroy(pattr); 154 })(&attr); 155 if (err4 != 0) 156 assert(false); 157 } 158 else version(Windows) 159 { 160 161 uint dummy; 162 163 _id = cast(HANDLE) _beginthreadex(null, 164 cast(uint)_stackSize, 165 &windowsThreadEntryPoint, 166 _context, 167 CREATE_SUSPENDED, 168 &dummy); 169 if (cast(size_t)_id == 0) 170 assert(false); 171 if (ResumeThread(_id) == -1) 172 assert(false); 173 } 174 else 175 static assert(false); 176 } 177 178 /// Wait for that thread termination 179 /// Again, this function can be called only once. 180 /// This actually releases the thread resource. 181 void join() 182 { 183 version(Posix) 184 { 185 void* returnValue; 186 if (0 != pthread_join(_id, &returnValue)) 187 assert(false); 188 } 189 else version(Windows) 190 { 191 if(WaitForSingleObject(_id, INFINITE) != WAIT_OBJECT_0) 192 assert(false); 193 CloseHandle(_id); 194 } 195 } 196 197 void* getThreadID() 198 { 199 version(Posix) return cast(void*)_id; 200 else version(Windows) return cast(void*)_id; 201 else assert(false); 202 } 203 204 private: 205 version(Posix) 206 { 207 pthread_t _id; 208 } 209 else version(Windows) 210 { 211 HANDLE _id; 212 } 213 else 214 static assert(false); 215 216 // Thread context given to OS thread creation function need to have a constant adress 217 // since there are no guarantees the `Thread` struct will be at the same adress. 218 static struct CreateContext 219 { 220 nothrow: 221 @nogc: 222 ThreadDelegate callback; 223 ThreadDelegateUser callbackUser; 224 void* userData; 225 void call() 226 { 227 if (callback !is null) 228 callback(); 229 else 230 callbackUser(userData); 231 } 232 } 233 CreateContext* _context; 234 235 size_t _stackSize; 236 } 237 238 version(Posix) 239 { 240 extern(C) void* posixThreadEntryPoint(void* threadContext) nothrow @nogc 241 { 242 Thread.CreateContext* context = cast(Thread.CreateContext*)(threadContext); 243 context.call(); 244 return null; 245 } 246 } 247 248 version(Windows) 249 { 250 extern (Windows) uint windowsThreadEntryPoint(void* threadContext) nothrow @nogc 251 { 252 Thread.CreateContext* context = cast(Thread.CreateContext*)(threadContext); 253 context.call(); 254 return 0; 255 } 256 } 257 258 unittest 259 { 260 int outerInt = 0; 261 262 class A 263 { 264 nothrow @nogc: 265 this() 266 { 267 t = makeThread(&f); 268 t.start(); 269 } 270 271 void join() 272 { 273 t.join(); 274 } 275 276 void f() 277 { 278 outerInt = 1; 279 innerInt = 2; 280 281 // verify this 282 assert(checkValue0 == 0x11223344); 283 assert(checkValue1 == 0x55667788); 284 } 285 286 int checkValue0 = 0x11223344; 287 int checkValue1 = 0x55667788; 288 int innerInt = 0; 289 Thread t; 290 } 291 292 auto a = new A; 293 a.t.join(); 294 assert(a.innerInt == 2); 295 a.destroy(); 296 assert(outerInt == 1); 297 } 298 299 /// Launch a function in a newly created thread, which is destroyed afterwards. 300 /// Return the thread so that you can call `.join()` on it. 301 Thread launchInAThread(ThreadDelegate dg, size_t stackSize = 0) nothrow @nogc 302 { 303 Thread t = makeThread(dg, stackSize); 304 t.start(); 305 return t; 306 } 307 308 // 309 // Thread-pool 310 // 311 312 /// Returns: Number of CPUs. 313 int getTotalNumberOfCPUs() nothrow @nogc 314 { 315 version(Windows) 316 { 317 // import core.sys.windows.windef;// : SYSTEM_INFO, GetSystemInfo; 318 SYSTEM_INFO si; 319 GetSystemInfo(&si); 320 int procs = cast(int) si.dwNumberOfProcessors; 321 if (procs < 1) 322 procs = 1; 323 return procs; 324 } 325 else version(Darwin) 326 { 327 auto nameStr = "machdep.cpu.core_count\0".ptr; 328 uint ans; 329 size_t len = uint.sizeof; 330 sysctlbyname(nameStr, &ans, &len, null, 0); 331 return cast(int)ans; 332 } 333 else version(Posix) 334 { 335 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 336 return cast(int) sysconf(_SC_NPROCESSORS_ONLN); 337 } 338 else 339 static assert(false, "OS unsupported"); 340 } 341 342 alias ThreadPoolDelegate = void delegate(int workItem, int threadIndex) nothrow @nogc; 343 344 345 debug(threadPoolIsActuallySynchronous) 346 { 347 /// Fake synchronous version of the thread pool 348 /// For measurement purpose, makes it easier to measure actual CPU time spent. 349 class ThreadPool 350 { 351 public: 352 nothrow: 353 @nogc: 354 355 enum constantThreadId = 0; 356 357 this(int numThreads = 0, int maxThreads = 0, size_t stackSize = 0) 358 { 359 } 360 361 ~this() 362 { 363 } 364 365 void parallelFor(int count, scope ThreadPoolDelegate dg) 366 { 367 foreach(i; 0..count) 368 dg(cast(int)i, constantThreadId); 369 } 370 371 void parallelForAsync(int count, scope ThreadPoolDelegate dg) 372 { 373 foreach(i; 0..count) 374 dg(cast(int)i, constantThreadId); 375 } 376 377 /// Wait for completion of the previous parallelFor, if any. 378 // It's always safe to call this function before doing another parallelFor. 379 void waitForCompletion() 380 { 381 } 382 383 int numThreads() pure const 384 { 385 return 1; 386 } 387 } 388 } 389 else 390 { 391 392 /// Rewrite of the ThreadPool using condition variables. 393 /// FUTURE: this could be speed-up by using futures. Description of the task 394 /// and associated condition+mutex would go in an external struct. 395 /// Note: the interface of the thread-pool itself is not thread-safe, you cannot give orders from 396 /// multiple threads at once. 397 class ThreadPool 398 { 399 public: 400 nothrow: 401 @nogc: 402 403 /// Creates a thread-pool. 404 /// Params: 405 /// numThreads = Number of threads to create (0 = auto). 406 /// maxThreads = A maximum number of threads to create (0 = none). 407 /// stackSize = Stack size to create threads with (0 = auto). 408 this(int numThreads = 0, int maxThreads = 0, size_t stackSize = 0) 409 { 410 // Create sync first 411 _workMutex = makeMutex(); 412 _workCondition = makeConditionVariable(); 413 414 _finishMutex = makeMutex(); 415 _finishCondition = makeConditionVariable(); 416 417 // Create threads 418 if (numThreads == 0) 419 numThreads = getTotalNumberOfCPUs(); 420 421 // Limit number of threads eventually (this is done to give other software some leeway 422 // in a soft real-time OS) 423 if (maxThreads != 0) 424 { 425 if (numThreads > maxThreads) 426 numThreads = maxThreads; 427 } 428 429 assert(numThreads >= 1); 430 431 _threads = mallocSlice!Thread(numThreads); 432 foreach(size_t threadIndex, ref thread; _threads) 433 { 434 // Pass the index of the thread through user data, so that it can be passed to the task in 435 // case these task need thread-local buffers. 436 void* userData = cast(void*)(threadIndex); 437 thread = makeThread(&workerThreadFunc, stackSize, userData); 438 } 439 440 // because of calling currentThreadId, don't start threads until all are created 441 foreach(ref thread; _threads) 442 { 443 thread.start(); 444 } 445 } 446 447 /// Destroys a thread-pool. 448 ~this() 449 { 450 if (_threads !is null) 451 { 452 assert(_state == State.initial); 453 454 // Put the threadpool is stop state 455 _workMutex.lock(); 456 _stopFlag = true; 457 _workMutex.unlock(); 458 459 // Notify all workers 460 _workCondition.notifyAll(); 461 462 // Wait for each thread termination 463 foreach(ref thread; _threads) 464 thread.join(); 465 466 // Detroys each thread 467 foreach(ref thread; _threads) 468 thread.destroy(); 469 freeSlice(_threads); 470 _threads = null; 471 destroy(_workMutex); 472 } 473 } 474 475 /// Calls the delegate in parallel, with 0..count as index. 476 /// Immediate waiting for completion. 477 /// If there is only one task, it is run directly on this thread. 478 /// IMPORTANT to be reentrant there! widget drawn alone can then launch same threadpool. 479 void parallelFor(int count, scope ThreadPoolDelegate dg) 480 { 481 assert(_state == State.initial); 482 483 // Do not launch worker threads for one work-item, not worth it. 484 // (but it is worth it in async). 485 if (count == 1) 486 { 487 int dummythreadID = 0; // it should not matter which is passed as long as it's a valid ID. 488 dg(0, dummythreadID); 489 return; 490 } 491 492 // Unleash parallel threads. 493 parallelForAsync(count, dg); 494 495 // Wait for completion immediately. 496 waitForCompletion(); 497 } 498 499 /// Same, but does not wait for completion. 500 /// You cannot have 2 concurrent parallelFor for the same thread-pool. 501 void parallelForAsync(int count, scope ThreadPoolDelegate dg) 502 { 503 assert(_state == State.initial); 504 505 if (count == 0) // no tasks, exit immediately 506 return; 507 508 // At this point we assume all worker threads are waiting for messages 509 510 // Sets the current task 511 _workMutex.lock(); 512 513 _taskDelegate = dg; // immutable during this parallelFor 514 _taskNumWorkItem = count; // immutable during this parallelFor 515 _taskCurrentWorkItem = 0; 516 _taskCompleted = 0; 517 518 _workMutex.unlock(); 519 520 if (count >= _threads.length) 521 { 522 // wake up all threads 523 // FUTURE: if number of tasks < number of threads only wake up the necessary amount of threads 524 _workCondition.notifyAll(); 525 } 526 else 527 { 528 // Less tasks than threads in the pool: only wake-up some threads. 529 for (int t = 0; t < count; ++t) 530 _workCondition.notifyOne(); 531 } 532 533 _state = State.parallelForInProgress; 534 } 535 536 /// Wait for completion of the previous parallelFor, if any. 537 // It's always safe to call this function before doing another parallelFor. 538 void waitForCompletion() 539 { 540 if (_state == State.initial) 541 return; // that means that parallel threads were not launched 542 543 assert(_state == State.parallelForInProgress); 544 545 _finishMutex.lock(); 546 scope(exit) _finishMutex.unlock(); 547 548 // FUTURE: order thread will be waken up multiple times 549 // (one for every completed task) 550 // maybe that can be optimized 551 while (_taskCompleted < _taskNumWorkItem) 552 { 553 _finishCondition.wait(&_finishMutex); 554 } 555 556 _state = State.initial; 557 } 558 559 int numThreads() pure const 560 { 561 return cast(int)_threads.length; 562 } 563 564 private: 565 Thread[] _threads = null; 566 567 // A map to find back thread index from thread system ID 568 void*[] _threadID = null; 569 570 // Used to signal more work 571 UncheckedMutex _workMutex; 572 ConditionVariable _workCondition; 573 574 // Used to signal completion 575 UncheckedMutex _finishMutex; 576 ConditionVariable _finishCondition; 577 578 // These fields represent the current task group (ie. a parallelFor) 579 ThreadPoolDelegate _taskDelegate; 580 int _taskNumWorkItem; // total number of tasks in this task group 581 int _taskCurrentWorkItem; // current task still left to do (protected by _workMutex) 582 int _taskCompleted; // every task < taskCompleted has already been completed (protected by _finishMutex) 583 584 bool _stopFlag; 585 586 bool hasWork() 587 { 588 return _taskCurrentWorkItem < _taskNumWorkItem; 589 } 590 591 // Represent the thread-pool state from the user POV 592 enum State 593 { 594 initial, // tasks can be launched 595 parallelForInProgress, // task were launched, but not waited one 596 } 597 State _state = State.initial; 598 599 // What worker threads do 600 // MAYDO: threads come here with bad context with struct delegates 601 void workerThreadFunc(void* userData) 602 { 603 while (true) 604 { 605 int workItem = -1; 606 { 607 _workMutex.lock(); 608 scope(exit) _workMutex.unlock(); 609 610 // Wait for notification 611 while (!_stopFlag && !hasWork()) 612 _workCondition.wait(&_workMutex); 613 614 if (_stopFlag && !hasWork()) 615 return; 616 617 assert(hasWork()); 618 619 // Pick a task and increment counter 620 workItem = _taskCurrentWorkItem; 621 _taskCurrentWorkItem++; 622 } 623 624 // Find thread index from user data set by pool 625 int threadIndex = cast(int)( cast(size_t)(userData) ); 626 627 // Do the actual task 628 _taskDelegate(workItem, threadIndex); 629 630 // signal completion of one more task 631 { 632 _finishMutex.lock(); 633 _taskCompleted++; 634 _finishMutex.unlock(); 635 636 _finishCondition.notifyOne(); // wake-up 637 } 638 } 639 } 640 } 641 } 642 643 /// Get the current thread OS handle. 644 /// The returned ID is just used for display. You can't get a `Thread` out of it. 645 public static size_t getCurrentThreadId() nothrow @nogc 646 { 647 version(Windows) 648 { 649 return cast(size_t) GetCurrentThreadId(); 650 } 651 else version(Posix) 652 { 653 return cast(size_t)cast(void*)pthread_self(); 654 } 655 else 656 static assert(false); 657 } 658 659 unittest 660 { 661 import core.atomic; 662 import dplug.core.nogc; 663 664 struct A 665 { 666 ThreadPool _pool; 667 int _numThreads; 668 669 this(int numThreads, int maxThreads = 0, int stackSize = 0) 670 { 671 _pool = mallocNew!ThreadPool(numThreads, maxThreads, stackSize); 672 _numThreads = _pool.numThreads(); 673 } 674 675 ~this() 676 { 677 _pool.destroy(); 678 } 679 680 void launch(int count, bool async) nothrow @nogc 681 { 682 if (async) 683 { 684 _pool.parallelForAsync(count, &loopBody); 685 _pool.waitForCompletion(); 686 } 687 else 688 _pool.parallelFor(count, &loopBody); 689 } 690 691 void loopBody(int workItem, int threadIndex) nothrow @nogc 692 { 693 bool goodIndex = (threadIndex >= 0) && (threadIndex < _numThreads); 694 assert(goodIndex); 695 atomicOp!"+="(counter, 1); 696 } 697 698 shared(int) counter = 0; 699 } 700 701 foreach (numThreads; [0, 1, 2, 4, 8, 16, 32]) 702 { 703 auto a = A(numThreads); 704 a.launch(10, false); 705 assert(a.counter == 10); 706 707 a.launch(500, true); 708 assert(a.counter == 510); 709 710 a.launch(1, false); 711 assert(a.counter == 511); 712 713 a.launch(1, true); 714 assert(a.counter == 512); 715 716 a.launch(0, true); 717 assert(a.counter == 512); 718 a.launch(0, false); 719 assert(a.counter == 512); 720 } 721 } 722 723 // Bonus: Capacity to get the macOS version 724 725 version(Darwin) 726 { 727 728 // Note: .init value is a large future version (100.0.0), so that failure to detect version 729 // lead to newer behaviour. 730 struct MacOSVersion 731 { 732 int major = 100; // eg: major = 10 minor = 7 for 10.7 733 int minor = 0; 734 int patch = 0; 735 } 736 737 /// Get the macOS version we are running on. 738 /// Note: it only makes sense for macOS, not iOS. 739 /// Note: patch always return zero for now. 740 MacOSVersion getMacOSVersion() nothrow @nogc 741 { 742 char[256] str; 743 size_t size = 256; 744 int ret = sysctlbyname("kern.osrelease", str.ptr, &size, null, 0); 745 MacOSVersion result; 746 if (ret != 0) 747 return result; 748 int darwinMajor, darwinMinor, darwinPatch; 749 if (3 == sscanf(str.ptr, "%d.%d.%d", &darwinMajor, &darwinMinor, &darwinPatch)) 750 { 751 result.patch = 0; 752 753 switch(darwinMajor) 754 { 755 case 0: .. case 11: 756 result.major = 10; // 10.7 757 result.minor = 7; 758 break; 759 760 case 12: .. case 19: 761 result.major = 10; // 10.7 762 result.minor = darwinMajor - 4; // 10.8 to 10.15 763 break; 764 765 case 20: 766 result.major = 11; // Big Sur 767 result.minor = 0; 768 break; 769 770 case 21: 771 result.major = 12; // Monterey 772 result.minor = 0; 773 break; 774 775 776 default: 777 result.major = 100; 778 result.minor = 0; 779 } 780 } 781 return result; 782 } 783 784 /* unittest 785 { 786 MacOSVersion ver = getMacOSVersion(); 787 printf("Detected macOS %d.%d.%d\n", ver.major, ver.minor, ver.patch); 788 } */ 789 }