1 /** 2 * Copyright: Copyright Sean Kelly 2005 - 2009. 3 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) 4 * Authors: Sean Kelly 5 */ 6 // This contains part of druntime's core.sys.mutex, core.sys.semaphore core.sys.condition and 7 // Modified to make it @nogc nothrow 8 /** 9 Mutexes, semaphores and condition variables. 10 */ 11 module dplug.core.sync; 12 13 import core.time; 14 15 import dplug.core.vec; 16 import dplug.core.nogc; 17 18 import core.stdc.stdio; 19 20 version( Windows ) 21 { 22 import core.sys.windows.windows; 23 24 extern (Windows) export nothrow @nogc 25 { 26 void InitializeCriticalSectionAndSpinCount(CRITICAL_SECTION * lpCriticalSection, DWORD dwSpinCount); 27 } 28 } 29 else version( OSX ) 30 { 31 import core.sys.posix.pthread; 32 import core.sync.config; 33 import core.stdc.errno; 34 import core.sys.posix.time; 35 import core.sys.osx.mach.semaphore; 36 37 /+ 38 extern (C): 39 nothrow: 40 @nogc: 41 int pthread_mutexattr_setpolicy_np(pthread_mutexattr_t* attr, int); 42 +/ 43 } 44 else version( Posix ) 45 { 46 import core.sync.config; 47 import core.stdc.errno; 48 import core.sys.posix.pthread; 49 import core.sys.posix.semaphore; 50 import core.sys.posix.time; 51 } 52 else 53 { 54 static assert(false, "Platform not supported"); 55 } 56 57 58 // 59 // MUTEX 60 // 61 62 /// Returns: A newly created `UnchekedMutex`. 63 UncheckedMutex makeMutex() nothrow @nogc 64 { 65 return UncheckedMutex(42); 66 } 67 68 private enum PosixMutexAlignment = 64; // Wild guess, no measurements done 69 70 struct UncheckedMutex 71 { 72 private this(int dummyArg) nothrow @nogc 73 { 74 assert(!_created); 75 version( Windows ) 76 { 77 // Cargo-culting the spin-count in WTF::Lock 78 // See: https://webkit.org/blog/6161/locking-in-webkit/ 79 InitializeCriticalSectionAndSpinCount( &m_hndl, 40 ); 80 } 81 else version( Posix ) 82 { 83 _handle = cast(pthread_mutex_t*)( alignedMalloc(pthread_mutex_t.sizeof, PosixMutexAlignment) ); 84 85 assumeNothrowNoGC( 86 (pthread_mutex_t* handle) 87 { 88 pthread_mutexattr_t attr = void; 89 pthread_mutexattr_init( &attr ); 90 pthread_mutexattr_settype( &attr, PTHREAD_MUTEX_RECURSIVE ); 91 92 version (OSX) 93 { 94 // Note: disabled since this breaks thread pool. 95 /+ 96 // OSX mutexes are fair by default, but this has a cost for contended locks 97 // Disable fairness. 98 // https://blog.mozilla.org/nfroyd/2017/03/29/on-mutex-performance-part-1/ 99 enum _PTHREAD_MUTEX_POLICY_FIRSTFIT = 2; 100 pthread_mutexattr_setpolicy_np(& attr, _PTHREAD_MUTEX_POLICY_FIRSTFIT); 101 +/ 102 } 103 104 pthread_mutex_init( handle, &attr ); 105 106 })(handleAddr()); 107 } 108 _created = 1; 109 } 110 111 ~this() nothrow @nogc 112 { 113 if (_created) 114 { 115 version( Windows ) 116 { 117 DeleteCriticalSection( &m_hndl ); 118 } 119 else version( Posix ) 120 { 121 assumeNothrowNoGC( 122 (pthread_mutex_t* handle) 123 { 124 pthread_mutex_destroy(handle); 125 })(handleAddr); 126 alignedFree(_handle, PosixMutexAlignment); 127 } 128 _created = 0; 129 } 130 } 131 132 @disable this(this); 133 134 /// Lock mutex 135 final void lock() nothrow @nogc 136 { 137 version( Windows ) 138 { 139 EnterCriticalSection( &m_hndl ); 140 } 141 else version( Posix ) 142 { 143 assumeNothrowNoGC( 144 (pthread_mutex_t* handle) 145 { 146 int res = pthread_mutex_lock(handle); 147 if (res != 0) 148 assert(false); 149 })(handleAddr()); 150 } 151 } 152 153 // undocumented function for internal use 154 final void unlock() nothrow @nogc 155 { 156 version( Windows ) 157 { 158 LeaveCriticalSection( &m_hndl ); 159 } 160 else version( Posix ) 161 { 162 assumeNothrowNoGC( 163 (pthread_mutex_t* handle) 164 { 165 int res = pthread_mutex_unlock(handle); 166 if (res != 0) 167 assert(false); 168 })(handleAddr()); 169 } 170 } 171 172 bool tryLock() nothrow @nogc 173 { 174 version( Windows ) 175 { 176 return TryEnterCriticalSection( &m_hndl ) != 0; 177 } 178 else version( Posix ) 179 { 180 int result = assumeNothrowNoGC( 181 (pthread_mutex_t* handle) 182 { 183 return pthread_mutex_trylock(handle); 184 })(handleAddr()); 185 return result == 0; 186 } 187 } 188 189 // For debugging purpose 190 void dumpState() nothrow @nogc 191 { 192 version( Posix ) 193 { 194 ubyte* pstate = cast(ubyte*)(handleAddr()); 195 for (size_t i = 0; i < pthread_mutex_t.sizeof; ++i) 196 { 197 printf("%02x", pstate[i]); 198 } 199 printf("\n"); 200 } 201 } 202 203 204 205 private: 206 version( Windows ) 207 { 208 CRITICAL_SECTION m_hndl; 209 } 210 else version( Posix ) 211 { 212 pthread_mutex_t* _handle = null; 213 } 214 215 // Work-around for Issue 16636 216 // https://issues.dlang.org/show_bug.cgi?id=16636 217 // Still crash with LDC somehow 218 long _created; 219 220 package: 221 version( Posix ) 222 { 223 pthread_mutex_t* handleAddr() nothrow @nogc 224 { 225 return _handle; 226 } 227 } 228 } 229 230 unittest 231 { 232 UncheckedMutex mutex = makeMutex(); 233 foreach(i; 0..100) 234 { 235 mutex.lock(); 236 mutex.unlock(); 237 238 if (mutex.tryLock) 239 mutex.unlock(); 240 } 241 mutex.destroy(); 242 } 243 244 245 246 // 247 // SEMAPHORE 248 // 249 250 /// Returns: A newly created `UncheckedSemaphore` 251 UncheckedSemaphore makeSemaphore(uint count) nothrow @nogc 252 { 253 return UncheckedSemaphore(count); 254 } 255 256 struct UncheckedSemaphore 257 { 258 private this( uint count ) nothrow @nogc 259 { 260 version( Windows ) 261 { 262 m_hndl = CreateSemaphoreA( null, count, int.max, null ); 263 if( m_hndl == m_hndl.init ) 264 assert(false); 265 } 266 else version( OSX ) 267 { 268 mach_port_t task = assumeNothrowNoGC( 269 () 270 { 271 return mach_task_self(); 272 })(); 273 274 kern_return_t rc = assumeNothrowNoGC( 275 (mach_port_t t, semaphore_t* handle, uint count) 276 { 277 return semaphore_create(t, handle, SYNC_POLICY_FIFO, count ); 278 })(task, &m_hndl, count); 279 280 if( rc ) 281 assert(false); 282 } 283 else version( Posix ) 284 { 285 int rc = sem_init( &m_hndl, 0, count ); 286 if( rc ) 287 assert(false); 288 } 289 _created = 1; 290 } 291 292 ~this() nothrow @nogc 293 { 294 if (_created) 295 { 296 version( Windows ) 297 { 298 BOOL rc = CloseHandle( m_hndl ); 299 assert( rc, "Unable to destroy semaphore" ); 300 } 301 else version( OSX ) 302 { 303 mach_port_t task = assumeNothrowNoGC( 304 () 305 { 306 return mach_task_self(); 307 })(); 308 309 kern_return_t rc = assumeNothrowNoGC( 310 (mach_port_t t, semaphore_t handle) 311 { 312 return semaphore_destroy( t, handle ); 313 })(task, m_hndl); 314 315 assert( !rc, "Unable to destroy semaphore" ); 316 } 317 else version( Posix ) 318 { 319 int rc = sem_destroy( &m_hndl ); 320 assert( !rc, "Unable to destroy semaphore" ); 321 } 322 _created = 0; 323 } 324 } 325 326 @disable this(this); 327 328 void wait() nothrow @nogc 329 { 330 version( Windows ) 331 { 332 DWORD rc = WaitForSingleObject( m_hndl, INFINITE ); 333 assert( rc == WAIT_OBJECT_0 ); 334 } 335 else version( OSX ) 336 { 337 while( true ) 338 { 339 auto rc = assumeNothrowNoGC( 340 (semaphore_t handle) 341 { 342 return semaphore_wait(handle); 343 })(m_hndl); 344 if( !rc ) 345 return; 346 if( rc == KERN_ABORTED && errno == EINTR ) 347 continue; 348 assert(false); 349 } 350 } 351 else version( Posix ) 352 { 353 while( true ) 354 { 355 if (!assumeNothrowNoGC( 356 (sem_t* handle) 357 { 358 return sem_wait(handle); 359 })(&m_hndl)) 360 return; 361 if( errno != EINTR ) 362 assert(false); 363 } 364 } 365 } 366 367 bool wait( Duration period ) nothrow @nogc 368 in 369 { 370 assert( !period.isNegative ); 371 } 372 body 373 { 374 version( Windows ) 375 { 376 auto maxWaitMillis = dur!("msecs")( uint.max - 1 ); 377 378 while( period > maxWaitMillis ) 379 { 380 auto rc = WaitForSingleObject( m_hndl, cast(uint) 381 maxWaitMillis.total!"msecs" ); 382 switch( rc ) 383 { 384 case WAIT_OBJECT_0: 385 return true; 386 case WAIT_TIMEOUT: 387 period -= maxWaitMillis; 388 continue; 389 default: 390 assert(false); 391 } 392 } 393 switch( WaitForSingleObject( m_hndl, cast(uint) period.total!"msecs" ) ) 394 { 395 case WAIT_OBJECT_0: 396 return true; 397 case WAIT_TIMEOUT: 398 return false; 399 default: 400 assert(false); 401 } 402 } 403 else version( OSX ) 404 { 405 mach_timespec_t t = void; 406 (cast(byte*) &t)[0 .. t.sizeof] = 0; 407 408 if( period.total!"seconds" > t.tv_sec.max ) 409 { 410 t.tv_sec = t.tv_sec.max; 411 t.tv_nsec = cast(typeof(t.tv_nsec)) period.split!("seconds", "nsecs")().nsecs; 412 } 413 else 414 period.split!("seconds", "nsecs")(t.tv_sec, t.tv_nsec); 415 while( true ) 416 { 417 auto rc = assumeNothrowNoGC( 418 (semaphore_t handle, mach_timespec_t t) 419 { 420 return semaphore_timedwait(handle, t); 421 })(m_hndl, t); 422 if( !rc ) 423 return true; 424 if( rc == KERN_OPERATION_TIMED_OUT ) 425 return false; 426 if( rc != KERN_ABORTED || errno != EINTR ) 427 assert(false); 428 } 429 } 430 else version( Posix ) 431 { 432 timespec t; 433 434 assumeNothrowNoGC( 435 (ref timespec t, Duration period) 436 { 437 mktspec( t, period ); 438 })(t, period); 439 440 while( true ) 441 { 442 if (! ((sem_t* handle, timespec* t) 443 { 444 return sem_timedwait(handle, t); 445 })(&m_hndl, &t)) 446 return true; 447 if( errno == ETIMEDOUT ) 448 return false; 449 if( errno != EINTR ) 450 assert(false); 451 } 452 } 453 } 454 455 void notify() nothrow @nogc 456 { 457 version( Windows ) 458 { 459 if( !ReleaseSemaphore( m_hndl, 1, null ) ) 460 assert(false); 461 } 462 else version( OSX ) 463 { 464 auto rc = assumeNothrowNoGC( 465 (semaphore_t handle) 466 { 467 return semaphore_signal(handle); 468 })(m_hndl); 469 if( rc ) 470 assert(false); 471 } 472 else version( Posix ) 473 { 474 int rc = sem_post( &m_hndl ); 475 if( rc ) 476 assert(false); 477 } 478 } 479 480 bool tryWait() nothrow @nogc 481 { 482 version( Windows ) 483 { 484 switch( WaitForSingleObject( m_hndl, 0 ) ) 485 { 486 case WAIT_OBJECT_0: 487 return true; 488 case WAIT_TIMEOUT: 489 return false; 490 default: 491 assert(false); 492 } 493 } 494 else version( OSX ) 495 { 496 return wait( dur!"hnsecs"(0) ); 497 } 498 else version( Posix ) 499 { 500 while( true ) 501 { 502 if( !sem_trywait( &m_hndl ) ) 503 return true; 504 if( errno == EAGAIN ) 505 return false; 506 if( errno != EINTR ) 507 assert(false); 508 } 509 } 510 } 511 512 513 private: 514 version( Windows ) 515 { 516 HANDLE m_hndl; 517 } 518 else version( OSX ) 519 { 520 semaphore_t m_hndl; 521 } 522 else version( Posix ) 523 { 524 sem_t m_hndl; 525 } 526 ulong _created = 0; 527 } 528 529 530 unittest 531 { 532 foreach(j; 0..4) 533 { 534 UncheckedSemaphore semaphore = makeSemaphore(1); 535 foreach(i; 0..100) 536 { 537 semaphore.wait(); 538 semaphore.notify(); 539 if (semaphore.tryWait()) 540 semaphore.notify(); 541 } 542 } 543 } 544 545 546 547 // 548 // CONDITION VARIABLE 549 // 550 551 552 ConditionVariable makeConditionVariable() nothrow @nogc 553 { 554 return ConditionVariable(42); 555 } 556 557 /** 558 * This struct represents a condition variable as conceived by C.A.R. Hoare. As 559 * per Mesa type monitors however, "signal" has been replaced with "notify" to 560 * indicate that control is not transferred to the waiter when a notification 561 * is sent. 562 */ 563 struct ConditionVariable 564 { 565 public: 566 nothrow: 567 @nogc: 568 569 /// Initializes a condition variable. 570 this(int dummy) 571 { 572 version( Windows ) 573 { 574 m_blockLock = CreateSemaphoreA( null, 1, 1, null ); 575 if( m_blockLock == m_blockLock.init ) 576 assert(false); 577 m_blockQueue = CreateSemaphoreA( null, 0, int.max, null ); 578 if( m_blockQueue == m_blockQueue.init ) 579 assert(false); 580 InitializeCriticalSection( &m_unblockLock ); 581 } 582 else version( Posix ) 583 { 584 _handle = cast(pthread_cond_t*)( alignedMalloc(pthread_cond_t.sizeof, PosixMutexAlignment) ); 585 586 int rc = pthread_cond_init( handleAddr(), null ); 587 if( rc ) 588 assert(false); 589 } 590 } 591 592 593 ~this() 594 { 595 version( Windows ) 596 { 597 CloseHandle( m_blockLock ); 598 CloseHandle( m_blockQueue ); 599 DeleteCriticalSection( &m_unblockLock ); 600 } 601 else version( Posix ) 602 { 603 if (_handle !is null) 604 { 605 int rc = pthread_cond_destroy( handleAddr() ); 606 assert( !rc, "Unable to destroy condition" ); 607 alignedFree(_handle, PosixMutexAlignment); 608 _handle = null; 609 } 610 } 611 } 612 613 /// Wait until notified. 614 /// The associated mutex should always be the same for this condition variable. 615 void wait(UncheckedMutex* assocMutex) 616 { 617 version( Windows ) 618 { 619 timedWait( INFINITE, assocMutex ); 620 } 621 else version( Posix ) 622 { 623 int rc = pthread_cond_wait( handleAddr(), assocMutex.handleAddr() ); 624 if( rc ) 625 assert(false); 626 } 627 } 628 629 /// Notifies one waiter. 630 void notifyOne() 631 { 632 version( Windows ) 633 { 634 notifyImpl( false ); 635 } 636 else version( Posix ) 637 { 638 int rc = pthread_cond_signal( handleAddr() ); 639 if( rc ) 640 assert(false); 641 } 642 } 643 644 645 /// Notifies all waiters. 646 void notifyAll() 647 { 648 version( Windows ) 649 { 650 notifyImpl( true ); 651 } 652 else version( Posix ) 653 { 654 int rc = pthread_cond_broadcast( handleAddr() ); 655 if( rc ) 656 assert(false); 657 } 658 } 659 660 version(Posix) 661 { 662 pthread_cond_t* handleAddr() nothrow @nogc 663 { 664 return _handle; 665 } 666 } 667 668 669 private: 670 version( Windows ) 671 { 672 bool timedWait( DWORD timeout, UncheckedMutex* assocMutex ) 673 { 674 int numSignalsLeft; 675 int numWaitersGone; 676 DWORD rc; 677 678 rc = WaitForSingleObject( m_blockLock, INFINITE ); 679 assert( rc == WAIT_OBJECT_0 ); 680 681 m_numWaitersBlocked++; 682 683 rc = ReleaseSemaphore( m_blockLock, 1, null ); 684 assert( rc ); 685 686 assocMutex.unlock(); 687 688 rc = WaitForSingleObject( m_blockQueue, timeout ); 689 assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT ); 690 bool timedOut = (rc == WAIT_TIMEOUT); 691 692 EnterCriticalSection( &m_unblockLock ); 693 694 if( (numSignalsLeft = m_numWaitersToUnblock) != 0 ) 695 { 696 if ( timedOut ) 697 { 698 // timeout (or canceled) 699 if( m_numWaitersBlocked != 0 ) 700 { 701 m_numWaitersBlocked--; 702 // do not unblock next waiter below (already unblocked) 703 numSignalsLeft = 0; 704 } 705 else 706 { 707 // spurious wakeup pending!! 708 m_numWaitersGone = 1; 709 } 710 } 711 if( --m_numWaitersToUnblock == 0 ) 712 { 713 if( m_numWaitersBlocked != 0 ) 714 { 715 // open the gate 716 rc = ReleaseSemaphore( m_blockLock, 1, null ); 717 assert( rc ); 718 // do not open the gate below again 719 numSignalsLeft = 0; 720 } 721 else if( (numWaitersGone = m_numWaitersGone) != 0 ) 722 { 723 m_numWaitersGone = 0; 724 } 725 } 726 } 727 else if( ++m_numWaitersGone == int.max / 2 ) 728 { 729 // timeout/canceled or spurious event :-) 730 rc = WaitForSingleObject( m_blockLock, INFINITE ); 731 assert( rc == WAIT_OBJECT_0 ); 732 // something is going on here - test of timeouts? 733 m_numWaitersBlocked -= m_numWaitersGone; 734 rc = ReleaseSemaphore( m_blockLock, 1, null ); 735 assert( rc == WAIT_OBJECT_0 ); 736 m_numWaitersGone = 0; 737 } 738 739 LeaveCriticalSection( &m_unblockLock ); 740 741 if( numSignalsLeft == 1 ) 742 { 743 // better now than spurious later (same as ResetEvent) 744 for( ; numWaitersGone > 0; --numWaitersGone ) 745 { 746 rc = WaitForSingleObject( m_blockQueue, INFINITE ); 747 assert( rc == WAIT_OBJECT_0 ); 748 } 749 // open the gate 750 rc = ReleaseSemaphore( m_blockLock, 1, null ); 751 assert( rc ); 752 } 753 else if( numSignalsLeft != 0 ) 754 { 755 // unblock next waiter 756 rc = ReleaseSemaphore( m_blockQueue, 1, null ); 757 assert( rc ); 758 } 759 assocMutex.lock(); 760 return !timedOut; 761 } 762 763 764 void notifyImpl( bool all ) 765 { 766 DWORD rc; 767 768 EnterCriticalSection( &m_unblockLock ); 769 770 if( m_numWaitersToUnblock != 0 ) 771 { 772 if( m_numWaitersBlocked == 0 ) 773 { 774 LeaveCriticalSection( &m_unblockLock ); 775 return; 776 } 777 if( all ) 778 { 779 m_numWaitersToUnblock += m_numWaitersBlocked; 780 m_numWaitersBlocked = 0; 781 } 782 else 783 { 784 m_numWaitersToUnblock++; 785 m_numWaitersBlocked--; 786 } 787 LeaveCriticalSection( &m_unblockLock ); 788 } 789 else if( m_numWaitersBlocked > m_numWaitersGone ) 790 { 791 rc = WaitForSingleObject( m_blockLock, INFINITE ); 792 assert( rc == WAIT_OBJECT_0 ); 793 if( 0 != m_numWaitersGone ) 794 { 795 m_numWaitersBlocked -= m_numWaitersGone; 796 m_numWaitersGone = 0; 797 } 798 if( all ) 799 { 800 m_numWaitersToUnblock = m_numWaitersBlocked; 801 m_numWaitersBlocked = 0; 802 } 803 else 804 { 805 m_numWaitersToUnblock = 1; 806 m_numWaitersBlocked--; 807 } 808 LeaveCriticalSection( &m_unblockLock ); 809 rc = ReleaseSemaphore( m_blockQueue, 1, null ); 810 assert( rc ); 811 } 812 else 813 { 814 LeaveCriticalSection( &m_unblockLock ); 815 } 816 } 817 818 819 // NOTE: This implementation uses Algorithm 8c as described here: 820 // http://groups.google.com/group/comp.programming.threads/ 821 // browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a 822 HANDLE m_blockLock; // auto-reset event (now semaphore) 823 HANDLE m_blockQueue; // auto-reset event (now semaphore) 824 CRITICAL_SECTION m_unblockLock; // internal mutex/CS 825 int m_numWaitersGone = 0; 826 int m_numWaitersBlocked = 0; 827 int m_numWaitersToUnblock = 0; 828 } 829 else version( Posix ) 830 { 831 pthread_cond_t* _handle; 832 } 833 } 834 835 unittest 836 { 837 import dplug.core.thread; 838 839 auto mutex = makeMutex(); 840 auto condvar = makeConditionVariable(); 841 842 bool finished = false; 843 844 // Launch a thread that wait on this condition 845 Thread t = launchInAThread( 846 () { 847 mutex.lock(); 848 while(!finished) 849 condvar.wait(&mutex); 850 mutex.unlock(); 851 }); 852 853 // Notify termination 854 mutex.lock(); 855 finished = true; 856 mutex.unlock(); 857 condvar.notifyOne(); 858 859 t.join(); 860 }