1 /** 2 Mutexes, semaphores and condition variables. 3 4 Copyright: Sean Kelly 2005 - 2009. 5 Copyright: Guillaume Piolat 2016 - 2018. 6 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) 7 */ 8 // This contains part of druntime's core.sys.mutex, core.sys.semaphore core.sys.condition and 9 // Modified to make it @nogc nothrow 10 module dplug.core.sync; 11 12 import core.time; 13 import core.atomic; 14 15 import dplug.core.vec; 16 import dplug.core.nogc; 17 18 import core.stdc.stdio; 19 import core.stdc.stdlib: malloc, free; 20 21 // emulate conditions variable on Windows 22 // (this was originally in Phobos for XP compatibility) 23 version = emulateCondVarWin32; 24 25 version (OSX) 26 version = Darwin; 27 else version (iOS) 28 version = Darwin; 29 else version (TVOS) 30 version = Darwin; 31 else version (WatchOS) 32 version = Darwin; 33 34 version( Windows ) 35 { 36 import core.sys.windows.windef; 37 import core.sys.windows.winbase; 38 39 extern (Windows) nothrow @nogc 40 { 41 void InitializeCriticalSectionAndSpinCount(CRITICAL_SECTION * lpCriticalSection, DWORD dwSpinCount); 42 } 43 } 44 else version( Darwin ) 45 { 46 import core.sys.posix.pthread; 47 import core.sync.config; 48 import core.stdc.errno; 49 import core.sys.posix.time; 50 static if (__VERSION__ < 2084) 51 import core.sys.osx.mach.semaphore; // was removed with DMDFE 2.084 52 else 53 import core.sys.darwin.mach.semaphore; 54 } 55 else version( Posix ) 56 { 57 import core.sync.config; 58 import core.stdc.errno; 59 import core.sys.posix.pthread; 60 import core.sys.posix.semaphore; 61 import core.sys.posix.time; 62 } 63 else 64 { 65 static assert(false, "Platform not supported"); 66 } 67 68 69 // 70 // MUTEX 71 // 72 73 /// Returns: A newly created `UnchekedMutex`. 74 UncheckedMutex makeMutex() nothrow @nogc 75 { 76 return UncheckedMutex(42); 77 } 78 79 private enum PosixMutexAlignment = 64; // Wild guess, no measurements done 80 81 // Cargo-culting the spin-count in WTF::Lock 82 // See: https://webkit.org/blog/6161/locking-in-webkit/ 83 private enum CriticalSectionSpinCount = 40; 84 85 struct UncheckedMutex 86 { 87 nothrow @nogc: 88 89 private this(int dummyArg) 90 { 91 assert(!_created); 92 lazyThreadSafeInitialization(); 93 } 94 95 ~this() 96 { 97 // RACE CONDITION HERE 98 // TODO: not easy to make the mutex destruction thread-safe, because one of the 99 // thread must wait. Ignore that for now. 100 101 void* mutexHandle = atomicLoad(_mutex); // Can be the mutex handle, or null. 102 103 if (mutexHandle !is null) 104 { 105 // Now, the mutex could have been destroyed by another thread already. 106 // Make a cas to ensure we are first to attempt it. 107 108 if (cas(&_mutex, &mutexHandle, null)) 109 { 110 destroyMutex(mutexHandle); 111 } 112 } 113 114 _created = 0; 115 } 116 117 @disable this(this); 118 119 /// Lock mutex, and create it in a thread-safe way if it doens't exist yet. 120 /// This is useful when globals access must be protected by a mutex. 121 void lockLazy() @trusted 122 { 123 lazyThreadSafeInitialization(); 124 lock(); 125 } 126 127 /// Lock mutex 128 void lock() 129 { 130 // No sync needed, else would have used lockLazy() 131 version( Windows ) 132 { 133 EnterCriticalSection( cast(CRITICAL_SECTION*) _mutex ); 134 } 135 else version( Posix ) 136 { 137 assumeNothrowNoGC( 138 (pthread_mutex_t* handle) 139 { 140 int res = pthread_mutex_lock(handle); 141 if (res != 0) 142 assert(false); 143 })(handleAddr()); 144 } 145 } 146 147 // undocumented function for internal use 148 void unlock() 149 { 150 version( Windows ) 151 { 152 LeaveCriticalSection( cast(CRITICAL_SECTION*) _mutex ); 153 } 154 else version( Posix ) 155 { 156 assumeNothrowNoGC( 157 (pthread_mutex_t* handle) 158 { 159 int res = pthread_mutex_unlock(handle); 160 if (res != 0) 161 assert(false); 162 })(handleAddr()); 163 } 164 } 165 166 bool tryLock() 167 { 168 version( Windows ) 169 { 170 return TryEnterCriticalSection( cast(CRITICAL_SECTION*) _mutex ) != 0; 171 } 172 else version( Posix ) 173 { 174 int result = assumeNothrowNoGC( 175 (pthread_mutex_t* handle) 176 { 177 return pthread_mutex_trylock(handle); 178 })(handleAddr()); 179 return result == 0; 180 } 181 } 182 183 private: 184 // on Windows, this is a CRITICAL_SECTION*. 185 // else, this is a pthread_mutex_t* 186 void* _mutex; 187 188 // Work-around for Issue 16636 189 // https://issues.dlang.org/show_bug.cgi?id=16636 190 // Still crash with LDC somehow 191 long _created = 0; 192 193 static void* createMutex() 194 { 195 version( Windows ) 196 { 197 CRITICAL_SECTION* critSec = cast(CRITICAL_SECTION*) malloc(CRITICAL_SECTION.sizeof); 198 InitializeCriticalSectionAndSpinCount( critSec, CriticalSectionSpinCount ); 199 return critSec; 200 } 201 else version( Posix ) 202 { 203 pthread_mutex_t* pmtx = cast(pthread_mutex_t*)( alignedMalloc(pthread_mutex_t.sizeof, PosixMutexAlignment) ); 204 205 assumeNothrowNoGC( 206 (pthread_mutex_t* handle) 207 { 208 pthread_mutexattr_t attr = void; 209 pthread_mutexattr_init( &attr ); 210 pthread_mutexattr_settype( &attr, PTHREAD_MUTEX_RECURSIVE ); 211 212 version (Darwin) 213 { 214 // Note: disabled since this breaks thread pool. 215 /+ 216 // OSX mutexes are fair by default, but this has a cost for contended locks 217 // Disable fairness. 218 // https://blog.mozilla.org/nfroyd/2017/03/29/on-mutex-performance-part-1/ 219 enum _PTHREAD_MUTEX_POLICY_FIRSTFIT = 2; 220 pthread_mutexattr_setpolicy_np(& attr, _PTHREAD_MUTEX_POLICY_FIRSTFIT); 221 +/ 222 } 223 224 pthread_mutex_init( handle, &attr ); 225 226 })(pmtx); 227 return pmtx; 228 } 229 else 230 static assert(false); 231 } 232 233 static void destroyMutex(void* mutex) 234 { 235 if (mutex is null) 236 return; 237 238 // TODO: this should be thread-safe, for example a cas with local variable name or _created 239 240 version( Windows ) 241 { 242 DeleteCriticalSection( cast(CRITICAL_SECTION*) mutex ); 243 free(mutex); 244 } 245 else version( Posix ) 246 { 247 assumeNothrowNoGC( 248 (pthread_mutex_t* handle) 249 { 250 pthread_mutex_destroy(handle); 251 })( cast(pthread_mutex_t*) mutex ); 252 alignedFree(mutex, PosixMutexAlignment); 253 } 254 else 255 static assert(false); 256 } 257 258 void lazyThreadSafeInitialization() @trusted 259 { 260 // Is there an existing mutex already? 261 if (atomicLoad(_mutex) !is null) 262 return; 263 264 // Create one mutex. 265 void* mtx = createMutex(); 266 void* p = cast(void*)mtx; 267 void** here = &_mutex; 268 void* ifThis = null; 269 270 // Try to set _mutex. 271 if (!cas(here, &ifThis, p)) 272 { 273 // Another thread created _mutex first. Destroy our useless instance. 274 destroyMutex(mtx); 275 } 276 } 277 278 package: 279 version( Posix ) 280 { 281 pthread_mutex_t* handleAddr() nothrow @nogc 282 { 283 return cast(pthread_mutex_t*) _mutex; 284 } 285 } 286 } 287 288 unittest 289 { 290 UncheckedMutex mutex = makeMutex(); 291 foreach(i; 0..100) 292 { 293 mutex.lock(); 294 mutex.unlock(); 295 296 if (mutex.tryLock) 297 mutex.unlock(); 298 } 299 mutex.destroy(); 300 } 301 302 unittest 303 { 304 __gshared UncheckedMutex mutex2; 305 mutex2.lockLazy(); 306 mutex2.unlock(); 307 } 308 309 310 311 // 312 // SEMAPHORE 313 // 314 315 /// Returns: A newly created `UncheckedSemaphore` 316 UncheckedSemaphore makeSemaphore(uint count) nothrow @nogc 317 { 318 return UncheckedSemaphore(count); 319 } 320 321 struct UncheckedSemaphore 322 { 323 private this( uint count ) nothrow @nogc 324 { 325 version( Windows ) 326 { 327 m_hndl = CreateSemaphoreA( null, count, int.max, null ); 328 if( m_hndl == m_hndl.init ) 329 assert(false); 330 } 331 else version( Darwin ) 332 { 333 mach_port_t task = assumeNothrowNoGC( 334 () 335 { 336 return mach_task_self(); 337 })(); 338 339 kern_return_t rc = assumeNothrowNoGC( 340 (mach_port_t t, semaphore_t* handle, uint count) 341 { 342 return semaphore_create(t, handle, SYNC_POLICY_FIFO, count ); 343 })(task, &m_hndl, count); 344 345 if( rc ) 346 assert(false); 347 } 348 else version( Posix ) 349 { 350 int rc = sem_init( &m_hndl, 0, count ); 351 if( rc ) 352 assert(false); 353 } 354 _created = 1; 355 } 356 357 ~this() nothrow @nogc 358 { 359 if (_created) 360 { 361 version( Windows ) 362 { 363 BOOL rc = CloseHandle( m_hndl ); 364 assert( rc, "Unable to destroy semaphore" ); 365 } 366 else version( Darwin ) 367 { 368 mach_port_t task = assumeNothrowNoGC( 369 () 370 { 371 return mach_task_self(); 372 })(); 373 374 kern_return_t rc = assumeNothrowNoGC( 375 (mach_port_t t, semaphore_t handle) 376 { 377 return semaphore_destroy( t, handle ); 378 })(task, m_hndl); 379 380 assert( !rc, "Unable to destroy semaphore" ); 381 } 382 else version( Posix ) 383 { 384 int rc = sem_destroy( &m_hndl ); 385 assert( !rc, "Unable to destroy semaphore" ); 386 } 387 _created = 0; 388 } 389 } 390 391 @disable this(this); 392 393 void wait() nothrow @nogc 394 { 395 version( Windows ) 396 { 397 DWORD rc = WaitForSingleObject( m_hndl, INFINITE ); 398 assert( rc == WAIT_OBJECT_0 ); 399 } 400 else version( Darwin ) 401 { 402 while( true ) 403 { 404 auto rc = assumeNothrowNoGC( 405 (semaphore_t handle) 406 { 407 return semaphore_wait(handle); 408 })(m_hndl); 409 if( !rc ) 410 return; 411 if( rc == KERN_ABORTED && errno == EINTR ) 412 continue; 413 assert(false); 414 } 415 } 416 else version( Posix ) 417 { 418 while( true ) 419 { 420 if (!assumeNothrowNoGC( 421 (sem_t* handle) 422 { 423 return sem_wait(handle); 424 })(&m_hndl)) 425 return; 426 if( errno != EINTR ) 427 assert(false); 428 } 429 } 430 } 431 432 bool wait( Duration period ) nothrow @nogc 433 { 434 assert( !period.isNegative ); 435 436 version( Windows ) 437 { 438 auto maxWaitMillis = dur!("msecs")( uint.max - 1 ); 439 440 while( period > maxWaitMillis ) 441 { 442 auto rc = WaitForSingleObject( m_hndl, cast(uint) 443 maxWaitMillis.total!"msecs" ); 444 switch( rc ) 445 { 446 case WAIT_OBJECT_0: 447 return true; 448 case WAIT_TIMEOUT: 449 period -= maxWaitMillis; 450 continue; 451 default: 452 assert(false); 453 } 454 } 455 switch( WaitForSingleObject( m_hndl, cast(uint) period.total!"msecs" ) ) 456 { 457 case WAIT_OBJECT_0: 458 return true; 459 case WAIT_TIMEOUT: 460 return false; 461 default: 462 assert(false); 463 } 464 } 465 else version( Darwin ) 466 { 467 mach_timespec_t t = void; 468 (cast(byte*) &t)[0 .. t.sizeof] = 0; 469 470 if( period.total!"seconds" > t.tv_sec.max ) 471 { 472 t.tv_sec = t.tv_sec.max; 473 t.tv_nsec = cast(typeof(t.tv_nsec)) period.split!("seconds", "nsecs")().nsecs; 474 } 475 else 476 period.split!("seconds", "nsecs")(t.tv_sec, t.tv_nsec); 477 while( true ) 478 { 479 auto rc = assumeNothrowNoGC( 480 (semaphore_t handle, mach_timespec_t t) 481 { 482 return semaphore_timedwait(handle, t); 483 })(m_hndl, t); 484 if( !rc ) 485 return true; 486 if( rc == KERN_OPERATION_TIMED_OUT ) 487 return false; 488 if( rc != KERN_ABORTED || errno != EINTR ) 489 assert(false); 490 } 491 } 492 else version( Posix ) 493 { 494 timespec t; 495 496 assumeNothrowNoGC( 497 (ref timespec t, Duration period) 498 { 499 mktspec( t, period ); 500 })(t, period); 501 502 while( true ) 503 { 504 if (! ((sem_t* handle, timespec* t) 505 { 506 return sem_timedwait(handle, t); 507 })(&m_hndl, &t)) 508 return true; 509 if( errno == ETIMEDOUT ) 510 return false; 511 if( errno != EINTR ) 512 assert(false); 513 } 514 } 515 } 516 517 void notify() nothrow @nogc 518 { 519 version( Windows ) 520 { 521 if( !ReleaseSemaphore( m_hndl, 1, null ) ) 522 assert(false); 523 } 524 else version( Darwin ) 525 { 526 auto rc = assumeNothrowNoGC( 527 (semaphore_t handle) 528 { 529 return semaphore_signal(handle); 530 })(m_hndl); 531 if( rc ) 532 assert(false); 533 } 534 else version( Posix ) 535 { 536 int rc = sem_post( &m_hndl ); 537 if( rc ) 538 assert(false); 539 } 540 } 541 542 bool tryWait() nothrow @nogc 543 { 544 version( Windows ) 545 { 546 switch( WaitForSingleObject( m_hndl, 0 ) ) 547 { 548 case WAIT_OBJECT_0: 549 return true; 550 case WAIT_TIMEOUT: 551 return false; 552 default: 553 assert(false); 554 } 555 } 556 else version( Darwin ) 557 { 558 return wait( dur!"hnsecs"(0) ); 559 } 560 else version( Posix ) 561 { 562 while( true ) 563 { 564 if( !sem_trywait( &m_hndl ) ) 565 return true; 566 if( errno == EAGAIN ) 567 return false; 568 if( errno != EINTR ) 569 assert(false); 570 } 571 } 572 } 573 574 575 private: 576 version( Windows ) 577 { 578 HANDLE m_hndl; 579 } 580 else version( Darwin ) 581 { 582 semaphore_t m_hndl; 583 } 584 else version( Posix ) 585 { 586 sem_t m_hndl; 587 } 588 ulong _created = 0; 589 } 590 591 592 unittest 593 { 594 foreach(j; 0..4) 595 { 596 UncheckedSemaphore semaphore = makeSemaphore(1); 597 foreach(i; 0..100) 598 { 599 semaphore.wait(); 600 semaphore.notify(); 601 if (semaphore.tryWait()) 602 semaphore.notify(); 603 } 604 } 605 } 606 607 608 609 // 610 // CONDITION VARIABLE 611 // 612 613 614 ConditionVariable makeConditionVariable() nothrow @nogc 615 { 616 return ConditionVariable(42); 617 } 618 619 /** 620 * This struct represents a condition variable as conceived by C.A.R. Hoare. As 621 * per Mesa type monitors however, "signal" has been replaced with "notify" to 622 * indicate that control is not transferred to the waiter when a notification 623 * is sent. 624 */ 625 struct ConditionVariable 626 { 627 public: 628 nothrow: 629 @nogc: 630 631 /// Initializes a condition variable. 632 this(int dummy) 633 { 634 version( Windows ) 635 { 636 m_blockLock = CreateSemaphoreA( null, 1, 1, null ); 637 if( m_blockLock == m_blockLock.init ) 638 assert(false); 639 m_blockQueue = CreateSemaphoreA( null, 0, int.max, null ); 640 if( m_blockQueue == m_blockQueue.init ) 641 assert(false); 642 643 m_unblockLock = cast(CRITICAL_SECTION*) malloc(CRITICAL_SECTION.sizeof); 644 InitializeCriticalSectionAndSpinCount( m_unblockLock, CriticalSectionSpinCount ); 645 } 646 else version( Posix ) 647 { 648 _handle = cast(pthread_cond_t*)( alignedMalloc(pthread_cond_t.sizeof, PosixMutexAlignment) ); 649 650 int rc = pthread_cond_init( handleAddr(), null ); 651 if( rc ) 652 assert(false); 653 } 654 } 655 656 657 ~this() 658 { 659 version( Windows ) 660 { 661 CloseHandle( m_blockLock ); 662 CloseHandle( m_blockQueue ); 663 if (m_unblockLock !is null) 664 { 665 DeleteCriticalSection( m_unblockLock ); 666 free(m_unblockLock); 667 m_unblockLock = null; 668 } 669 } 670 else version( Posix ) 671 { 672 if (_handle !is null) 673 { 674 int rc = pthread_cond_destroy( handleAddr() ); 675 assert( !rc, "Unable to destroy condition" ); 676 alignedFree(_handle, PosixMutexAlignment); 677 _handle = null; 678 } 679 } 680 } 681 682 /// Wait until notified. 683 /// The associated mutex should always be the same for this condition variable. 684 void wait(UncheckedMutex* assocMutex) 685 { 686 version( Windows ) 687 { 688 timedWait( INFINITE, assocMutex ); 689 } 690 else version( Posix ) 691 { 692 int rc = pthread_cond_wait( handleAddr(), assocMutex.handleAddr() ); 693 if( rc ) 694 assert(false); 695 } 696 } 697 698 /// Notifies one waiter. 699 void notifyOne() 700 { 701 version( Windows ) 702 { 703 notifyImpl( false ); 704 } 705 else version( Posix ) 706 { 707 int rc = pthread_cond_signal( handleAddr() ); 708 if( rc ) 709 assert(false); 710 } 711 } 712 713 714 /// Notifies all waiters. 715 void notifyAll() 716 { 717 version( Windows ) 718 { 719 notifyImpl( true ); 720 } 721 else version( Posix ) 722 { 723 int rc = pthread_cond_broadcast( handleAddr() ); 724 if( rc ) 725 assert(false); 726 } 727 } 728 729 version(Posix) 730 { 731 pthread_cond_t* handleAddr() nothrow @nogc 732 { 733 return _handle; 734 } 735 } 736 737 738 private: 739 version( Windows ) 740 { 741 bool timedWait( DWORD timeout, UncheckedMutex* assocMutex ) 742 { 743 int numSignalsLeft; 744 int numWaitersGone; 745 DWORD rc; 746 747 rc = WaitForSingleObject( m_blockLock, INFINITE ); 748 assert( rc == WAIT_OBJECT_0 ); 749 750 m_numWaitersBlocked++; 751 752 rc = ReleaseSemaphore( m_blockLock, 1, null ); 753 assert( rc ); 754 755 assocMutex.unlock(); 756 757 rc = WaitForSingleObject( m_blockQueue, timeout ); 758 assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT ); 759 bool timedOut = (rc == WAIT_TIMEOUT); 760 761 EnterCriticalSection( m_unblockLock ); 762 763 if( (numSignalsLeft = m_numWaitersToUnblock) != 0 ) 764 { 765 if ( timedOut ) 766 { 767 // timeout (or canceled) 768 if( m_numWaitersBlocked != 0 ) 769 { 770 m_numWaitersBlocked--; 771 // do not unblock next waiter below (already unblocked) 772 numSignalsLeft = 0; 773 } 774 else 775 { 776 // spurious wakeup pending!! 777 m_numWaitersGone = 1; 778 } 779 } 780 if( --m_numWaitersToUnblock == 0 ) 781 { 782 if( m_numWaitersBlocked != 0 ) 783 { 784 // open the gate 785 rc = ReleaseSemaphore( m_blockLock, 1, null ); 786 assert( rc ); 787 // do not open the gate below again 788 numSignalsLeft = 0; 789 } 790 else if( (numWaitersGone = m_numWaitersGone) != 0 ) 791 { 792 m_numWaitersGone = 0; 793 } 794 } 795 } 796 else if( ++m_numWaitersGone == int.max / 2 ) 797 { 798 // timeout/canceled or spurious event :-) 799 rc = WaitForSingleObject( m_blockLock, INFINITE ); 800 assert( rc == WAIT_OBJECT_0 ); 801 // something is going on here - test of timeouts? 802 m_numWaitersBlocked -= m_numWaitersGone; 803 rc = ReleaseSemaphore( m_blockLock, 1, null ); 804 assert( rc == WAIT_OBJECT_0 ); 805 m_numWaitersGone = 0; 806 } 807 808 LeaveCriticalSection( m_unblockLock ); 809 810 if( numSignalsLeft == 1 ) 811 { 812 // better now than spurious later (same as ResetEvent) 813 for( ; numWaitersGone > 0; --numWaitersGone ) 814 { 815 rc = WaitForSingleObject( m_blockQueue, INFINITE ); 816 assert( rc == WAIT_OBJECT_0 ); 817 } 818 // open the gate 819 rc = ReleaseSemaphore( m_blockLock, 1, null ); 820 assert( rc ); 821 } 822 else if( numSignalsLeft != 0 ) 823 { 824 // unblock next waiter 825 rc = ReleaseSemaphore( m_blockQueue, 1, null ); 826 assert( rc ); 827 } 828 assocMutex.lock(); 829 return !timedOut; 830 } 831 832 833 void notifyImpl( bool all ) 834 { 835 DWORD rc; 836 837 EnterCriticalSection( m_unblockLock ); 838 839 if( m_numWaitersToUnblock != 0 ) 840 { 841 if( m_numWaitersBlocked == 0 ) 842 { 843 LeaveCriticalSection( m_unblockLock ); 844 return; 845 } 846 if( all ) 847 { 848 m_numWaitersToUnblock += m_numWaitersBlocked; 849 m_numWaitersBlocked = 0; 850 } 851 else 852 { 853 m_numWaitersToUnblock++; 854 m_numWaitersBlocked--; 855 } 856 LeaveCriticalSection( m_unblockLock ); 857 } 858 else if( m_numWaitersBlocked > m_numWaitersGone ) 859 { 860 rc = WaitForSingleObject( m_blockLock, INFINITE ); 861 assert( rc == WAIT_OBJECT_0 ); 862 if( 0 != m_numWaitersGone ) 863 { 864 m_numWaitersBlocked -= m_numWaitersGone; 865 m_numWaitersGone = 0; 866 } 867 if( all ) 868 { 869 m_numWaitersToUnblock = m_numWaitersBlocked; 870 m_numWaitersBlocked = 0; 871 } 872 else 873 { 874 m_numWaitersToUnblock = 1; 875 m_numWaitersBlocked--; 876 } 877 LeaveCriticalSection( m_unblockLock ); 878 rc = ReleaseSemaphore( m_blockQueue, 1, null ); 879 assert( rc ); 880 } 881 else 882 { 883 LeaveCriticalSection( m_unblockLock ); 884 } 885 } 886 887 888 // NOTE: This implementation uses Algorithm 8c as described here: 889 // http://groups.google.com/group/comp.programming.threads/ 890 // browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a 891 HANDLE m_blockLock; // auto-reset event (now semaphore) 892 HANDLE m_blockQueue; // auto-reset event (now semaphore) 893 CRITICAL_SECTION* m_unblockLock = null; // internal mutex/CS 894 int m_numWaitersGone = 0; 895 int m_numWaitersBlocked = 0; 896 int m_numWaitersToUnblock = 0; 897 } 898 else version( Posix ) 899 { 900 pthread_cond_t* _handle; 901 } 902 } 903 904 unittest 905 { 906 import dplug.core.thread; 907 908 auto mutex = makeMutex(); 909 auto condvar = makeConditionVariable(); 910 911 bool finished = false; 912 913 // Launch a thread that wait on this condition 914 Thread t = launchInAThread( 915 () { 916 mutex.lock(); 917 while(!finished) 918 condvar.wait(&mutex); 919 mutex.unlock(); 920 }); 921 922 // Notify termination 923 mutex.lock(); 924 finished = true; 925 mutex.unlock(); 926 condvar.notifyOne(); 927 928 t.join(); 929 }