1 /** 2 * Copyright: Copyright Sean Kelly 2005 - 2009. 3 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) 4 * Authors: Sean Kelly 5 */ 6 /// This contains part of druntime's core.sys.mutex, core.sys.semaphore core.sys.condition and 7 /// Modified to make it @nogc nothrow 8 module dplug.core.sync; 9 10 import core.time; 11 12 import dplug.core.alignedbuffer; 13 import dplug.core.nogc; 14 15 import core.stdc.stdio; 16 17 version( Windows ) 18 { 19 import core.sys.windows.windows; 20 21 extern (Windows) export nothrow @nogc 22 { 23 void InitializeCriticalSectionAndSpinCount(CRITICAL_SECTION * lpCriticalSection, DWORD dwSpinCount); 24 } 25 } 26 else version( OSX ) 27 { 28 import core.sys.posix.pthread; 29 import core.sync.config; 30 import core.stdc.errno; 31 import core.sys.posix.time; 32 import core.sys.osx.mach.semaphore; 33 34 extern (C): 35 nothrow: 36 @nogc: 37 int pthread_mutexattr_setpolicy_np(pthread_mutexattr_t* attr, int); 38 } 39 else version( Posix ) 40 { 41 import core.sync.config; 42 import core.stdc.errno; 43 import core.sys.posix.pthread; 44 import core.sys.posix.semaphore; 45 import core.sys.posix.time; 46 } 47 else 48 { 49 static assert(false, "Platform not supported"); 50 } 51 52 53 // 54 // MUTEX 55 // 56 57 /// Returns: A newly created `UnchekedMutex`. 58 UncheckedMutex makeMutex() nothrow @nogc 59 { 60 return UncheckedMutex(42); 61 } 62 63 private enum PosixMutexAlignment = 64; // Wild guess, no measurements done 64 65 struct UncheckedMutex 66 { 67 private this(int dummyArg) nothrow @nogc 68 { 69 assert(!_created); 70 version( Windows ) 71 { 72 // Cargo-culting the spin-count in WTF::Lock 73 // See: https://webkit.org/blog/6161/locking-in-webkit/ 74 InitializeCriticalSectionAndSpinCount( &m_hndl, 40 ); 75 } 76 else version( Posix ) 77 { 78 _handle = cast(pthread_mutex_t*)( alignedMalloc(pthread_mutex_t.sizeof, PosixMutexAlignment) ); 79 80 assumeNothrowNoGC( 81 (pthread_mutex_t* handle) 82 { 83 pthread_mutexattr_t attr = void; 84 pthread_mutexattr_init( &attr ); 85 pthread_mutexattr_settype( &attr, PTHREAD_MUTEX_RECURSIVE ); 86 87 version (OSX) 88 { 89 // OSX mutexes are fair by default, but this has a cost for contended locks 90 // Disable fairness. 91 // https://blog.mozilla.org/nfroyd/2017/03/29/on-mutex-performance-part-1/ 92 enum _PTHREAD_MUTEX_POLICY_FIRSTFIT = 2; 93 pthread_mutexattr_setpolicy_np(& attr, _PTHREAD_MUTEX_POLICY_FIRSTFIT); 94 } 95 pthread_mutex_init( handle, &attr ); 96 97 })(handleAddr()); 98 } 99 _created = 1; 100 } 101 102 ~this() nothrow @nogc 103 { 104 if (_created) 105 { 106 version( Windows ) 107 { 108 DeleteCriticalSection( &m_hndl ); 109 } 110 else version( Posix ) 111 { 112 assumeNothrowNoGC( 113 (pthread_mutex_t* handle) 114 { 115 pthread_mutex_destroy(handle); 116 })(handleAddr); 117 alignedFree(_handle, PosixMutexAlignment); 118 } 119 _created = 0; 120 } 121 } 122 123 @disable this(this); 124 125 /// Lock mutex 126 final void lock() nothrow @nogc 127 { 128 version( Windows ) 129 { 130 EnterCriticalSection( &m_hndl ); 131 } 132 else version( Posix ) 133 { 134 assumeNothrowNoGC( 135 (pthread_mutex_t* handle) 136 { 137 int res = pthread_mutex_lock(handle); 138 if (res != 0) 139 assert(false); 140 })(handleAddr()); 141 } 142 } 143 144 // undocumented function for internal use 145 final void unlock() nothrow @nogc 146 { 147 version( Windows ) 148 { 149 LeaveCriticalSection( &m_hndl ); 150 } 151 else version( Posix ) 152 { 153 assumeNothrowNoGC( 154 (pthread_mutex_t* handle) 155 { 156 int res = pthread_mutex_unlock(handle); 157 if (res != 0) 158 assert(false); 159 })(handleAddr()); 160 } 161 } 162 163 bool tryLock() nothrow @nogc 164 { 165 version( Windows ) 166 { 167 return TryEnterCriticalSection( &m_hndl ) != 0; 168 } 169 else version( Posix ) 170 { 171 int result = assumeNothrowNoGC( 172 (pthread_mutex_t* handle) 173 { 174 return pthread_mutex_trylock(handle); 175 })(handleAddr()); 176 return result == 0; 177 } 178 } 179 180 // For debugging purpose 181 void dumpState() nothrow @nogc 182 { 183 version( Posix ) 184 { 185 ubyte* pstate = cast(ubyte*)(handleAddr()); 186 for (size_t i = 0; i < pthread_mutex_t.sizeof; ++i) 187 { 188 printf("%02x", pstate[i]); 189 } 190 printf("\n"); 191 } 192 } 193 194 195 196 private: 197 version( Windows ) 198 { 199 CRITICAL_SECTION m_hndl; 200 } 201 else version( Posix ) 202 { 203 pthread_mutex_t* _handle = null; 204 } 205 206 // Work-around for Issue 16636 207 // https://issues.dlang.org/show_bug.cgi?id=16636 208 // Still crash with LDC somehow 209 long _created; 210 211 package: 212 version( Posix ) 213 { 214 pthread_mutex_t* handleAddr() nothrow @nogc 215 { 216 return _handle; 217 } 218 } 219 } 220 221 unittest 222 { 223 UncheckedMutex mutex = makeMutex(); 224 foreach(i; 0..100) 225 { 226 mutex.lock(); 227 mutex.unlock(); 228 229 if (mutex.tryLock) 230 mutex.unlock(); 231 } 232 mutex.destroy(); 233 } 234 235 236 237 // 238 // SEMAPHORE 239 // 240 241 /// Returns: A newly created `UncheckedSemaphore` 242 UncheckedSemaphore makeSemaphore(uint count) nothrow @nogc 243 { 244 return UncheckedSemaphore(count); 245 } 246 247 struct UncheckedSemaphore 248 { 249 private this( uint count ) nothrow @nogc 250 { 251 version( Windows ) 252 { 253 m_hndl = CreateSemaphoreA( null, count, int.max, null ); 254 if( m_hndl == m_hndl.init ) 255 assert(false); 256 } 257 else version( OSX ) 258 { 259 mach_port_t task = assumeNothrowNoGC( 260 () 261 { 262 return mach_task_self(); 263 })(); 264 265 kern_return_t rc = assumeNothrowNoGC( 266 (mach_port_t t, semaphore_t* handle, uint count) 267 { 268 return semaphore_create(t, handle, SYNC_POLICY_FIFO, count ); 269 })(task, &m_hndl, count); 270 271 if( rc ) 272 assert(false); 273 } 274 else version( Posix ) 275 { 276 int rc = sem_init( &m_hndl, 0, count ); 277 if( rc ) 278 assert(false); 279 } 280 _created = 1; 281 } 282 283 ~this() nothrow @nogc 284 { 285 if (_created) 286 { 287 version( Windows ) 288 { 289 BOOL rc = CloseHandle( m_hndl ); 290 assert( rc, "Unable to destroy semaphore" ); 291 } 292 else version( OSX ) 293 { 294 mach_port_t task = assumeNothrowNoGC( 295 () 296 { 297 return mach_task_self(); 298 })(); 299 300 kern_return_t rc = assumeNothrowNoGC( 301 (mach_port_t t, semaphore_t handle) 302 { 303 return semaphore_destroy( t, handle ); 304 })(task, m_hndl); 305 306 assert( !rc, "Unable to destroy semaphore" ); 307 } 308 else version( Posix ) 309 { 310 int rc = sem_destroy( &m_hndl ); 311 assert( !rc, "Unable to destroy semaphore" ); 312 } 313 _created = 0; 314 } 315 } 316 317 @disable this(this); 318 319 void wait() nothrow @nogc 320 { 321 version( Windows ) 322 { 323 DWORD rc = WaitForSingleObject( m_hndl, INFINITE ); 324 assert( rc == WAIT_OBJECT_0 ); 325 } 326 else version( OSX ) 327 { 328 while( true ) 329 { 330 auto rc = assumeNothrowNoGC( 331 (semaphore_t handle) 332 { 333 return semaphore_wait(handle); 334 })(m_hndl); 335 if( !rc ) 336 return; 337 if( rc == KERN_ABORTED && errno == EINTR ) 338 continue; 339 assert(false); 340 } 341 } 342 else version( Posix ) 343 { 344 while( true ) 345 { 346 if (!assumeNothrowNoGC( 347 (sem_t* handle) 348 { 349 return sem_wait(handle); 350 })(&m_hndl)) 351 return; 352 if( errno != EINTR ) 353 assert(false); 354 } 355 } 356 } 357 358 bool wait( Duration period ) nothrow @nogc 359 in 360 { 361 assert( !period.isNegative ); 362 } 363 body 364 { 365 version( Windows ) 366 { 367 auto maxWaitMillis = dur!("msecs")( uint.max - 1 ); 368 369 while( period > maxWaitMillis ) 370 { 371 auto rc = WaitForSingleObject( m_hndl, cast(uint) 372 maxWaitMillis.total!"msecs" ); 373 switch( rc ) 374 { 375 case WAIT_OBJECT_0: 376 return true; 377 case WAIT_TIMEOUT: 378 period -= maxWaitMillis; 379 continue; 380 default: 381 assert(false); 382 } 383 } 384 switch( WaitForSingleObject( m_hndl, cast(uint) period.total!"msecs" ) ) 385 { 386 case WAIT_OBJECT_0: 387 return true; 388 case WAIT_TIMEOUT: 389 return false; 390 default: 391 assert(false); 392 } 393 } 394 else version( OSX ) 395 { 396 mach_timespec_t t = void; 397 (cast(byte*) &t)[0 .. t.sizeof] = 0; 398 399 if( period.total!"seconds" > t.tv_sec.max ) 400 { 401 t.tv_sec = t.tv_sec.max; 402 t.tv_nsec = cast(typeof(t.tv_nsec)) period.split!("seconds", "nsecs")().nsecs; 403 } 404 else 405 period.split!("seconds", "nsecs")(t.tv_sec, t.tv_nsec); 406 while( true ) 407 { 408 auto rc = assumeNothrowNoGC( 409 (semaphore_t handle, mach_timespec_t t) 410 { 411 return semaphore_timedwait(handle, t); 412 })(m_hndl, t); 413 if( !rc ) 414 return true; 415 if( rc == KERN_OPERATION_TIMED_OUT ) 416 return false; 417 if( rc != KERN_ABORTED || errno != EINTR ) 418 assert(false); 419 } 420 } 421 else version( Posix ) 422 { 423 timespec t = void; 424 425 assumeNothrowNoGC( 426 (timespec t, Duration period) 427 { 428 mktspec( t, period ); 429 })(t, period); 430 431 while( true ) 432 { 433 if (! ((sem_t* handle, timespec* t) 434 { 435 return sem_timedwait(handle, t); 436 })(&m_hndl, &t)) 437 return true; 438 if( errno == ETIMEDOUT ) 439 return false; 440 if( errno != EINTR ) 441 assert(false); 442 } 443 } 444 } 445 446 void notify() nothrow @nogc 447 { 448 version( Windows ) 449 { 450 if( !ReleaseSemaphore( m_hndl, 1, null ) ) 451 assert(false); 452 } 453 else version( OSX ) 454 { 455 auto rc = assumeNothrowNoGC( 456 (semaphore_t handle) 457 { 458 return semaphore_signal(handle); 459 })(m_hndl); 460 if( rc ) 461 assert(false); 462 } 463 else version( Posix ) 464 { 465 int rc = sem_post( &m_hndl ); 466 if( rc ) 467 assert(false); 468 } 469 } 470 471 bool tryWait() nothrow @nogc 472 { 473 version( Windows ) 474 { 475 switch( WaitForSingleObject( m_hndl, 0 ) ) 476 { 477 case WAIT_OBJECT_0: 478 return true; 479 case WAIT_TIMEOUT: 480 return false; 481 default: 482 assert(false); 483 } 484 } 485 else version( OSX ) 486 { 487 return wait( dur!"hnsecs"(0) ); 488 } 489 else version( Posix ) 490 { 491 while( true ) 492 { 493 if( !sem_trywait( &m_hndl ) ) 494 return true; 495 if( errno == EAGAIN ) 496 return false; 497 if( errno != EINTR ) 498 assert(false); 499 } 500 } 501 } 502 503 504 private: 505 version( Windows ) 506 { 507 HANDLE m_hndl; 508 } 509 else version( OSX ) 510 { 511 semaphore_t m_hndl; 512 } 513 else version( Posix ) 514 { 515 sem_t m_hndl; 516 } 517 ulong _created = 0; 518 } 519 520 521 unittest 522 { 523 foreach(j; 0..4) 524 { 525 UncheckedSemaphore semaphore = makeSemaphore(1); 526 foreach(i; 0..100) 527 { 528 semaphore.wait(); 529 semaphore.notify(); 530 if (semaphore.tryWait()) 531 semaphore.notify(); 532 } 533 } 534 } 535 536 537 538 // 539 // CONDITION VARIABLE 540 // 541 542 543 ConditionVariable makeConditionVariable() nothrow @nogc 544 { 545 return ConditionVariable(42); 546 } 547 548 /** 549 * This struct represents a condition variable as conceived by C.A.R. Hoare. As 550 * per Mesa type monitors however, "signal" has been replaced with "notify" to 551 * indicate that control is not transferred to the waiter when a notification 552 * is sent. 553 */ 554 struct ConditionVariable 555 { 556 public: 557 nothrow: 558 @nogc: 559 560 /// Initializes a condition variable. 561 this(int dummy) 562 { 563 version( Windows ) 564 { 565 m_blockLock = CreateSemaphoreA( null, 1, 1, null ); 566 if( m_blockLock == m_blockLock.init ) 567 assert(false); 568 m_blockQueue = CreateSemaphoreA( null, 0, int.max, null ); 569 if( m_blockQueue == m_blockQueue.init ) 570 assert(false); 571 InitializeCriticalSection( &m_unblockLock ); 572 } 573 else version( Posix ) 574 { 575 _handle = cast(pthread_cond_t*)( alignedMalloc(pthread_cond_t.sizeof, PosixMutexAlignment) ); 576 577 int rc = pthread_cond_init( handleAddr(), null ); 578 if( rc ) 579 assert(false); 580 } 581 } 582 583 584 ~this() 585 { 586 version( Windows ) 587 { 588 CloseHandle( m_blockLock ); 589 CloseHandle( m_blockQueue ); 590 DeleteCriticalSection( &m_unblockLock ); 591 } 592 else version( Posix ) 593 { 594 if (_handle !is null) 595 { 596 int rc = pthread_cond_destroy( handleAddr() ); 597 assert( !rc, "Unable to destroy condition" ); 598 alignedFree(_handle, PosixMutexAlignment); 599 _handle = null; 600 } 601 } 602 } 603 604 /// Wait until notified. 605 /// The associated mutex should always be the same for this condition variable. 606 void wait(UncheckedMutex* assocMutex) 607 { 608 version( Windows ) 609 { 610 timedWait( INFINITE, assocMutex ); 611 } 612 else version( Posix ) 613 { 614 int rc = pthread_cond_wait( handleAddr(), assocMutex.handleAddr() ); 615 if( rc ) 616 assert(false); 617 } 618 } 619 620 /// Notifies one waiter. 621 void notifyOne() 622 { 623 version( Windows ) 624 { 625 notifyImpl( false ); 626 } 627 else version( Posix ) 628 { 629 int rc = pthread_cond_signal( handleAddr() ); 630 if( rc ) 631 assert(false); 632 } 633 } 634 635 636 /// Notifies all waiters. 637 void notifyAll() 638 { 639 version( Windows ) 640 { 641 notifyImpl( true ); 642 } 643 else version( Posix ) 644 { 645 int rc = pthread_cond_broadcast( handleAddr() ); 646 if( rc ) 647 assert(false); 648 } 649 } 650 651 version(Posix) 652 { 653 pthread_cond_t* handleAddr() nothrow @nogc 654 { 655 return _handle; 656 } 657 } 658 659 660 private: 661 version( Windows ) 662 { 663 bool timedWait( DWORD timeout, UncheckedMutex* assocMutex ) 664 { 665 int numSignalsLeft; 666 int numWaitersGone; 667 DWORD rc; 668 669 rc = WaitForSingleObject( m_blockLock, INFINITE ); 670 assert( rc == WAIT_OBJECT_0 ); 671 672 m_numWaitersBlocked++; 673 674 rc = ReleaseSemaphore( m_blockLock, 1, null ); 675 assert( rc ); 676 677 assocMutex.unlock(); 678 679 rc = WaitForSingleObject( m_blockQueue, timeout ); 680 assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT ); 681 bool timedOut = (rc == WAIT_TIMEOUT); 682 683 EnterCriticalSection( &m_unblockLock ); 684 685 if( (numSignalsLeft = m_numWaitersToUnblock) != 0 ) 686 { 687 if ( timedOut ) 688 { 689 // timeout (or canceled) 690 if( m_numWaitersBlocked != 0 ) 691 { 692 m_numWaitersBlocked--; 693 // do not unblock next waiter below (already unblocked) 694 numSignalsLeft = 0; 695 } 696 else 697 { 698 // spurious wakeup pending!! 699 m_numWaitersGone = 1; 700 } 701 } 702 if( --m_numWaitersToUnblock == 0 ) 703 { 704 if( m_numWaitersBlocked != 0 ) 705 { 706 // open the gate 707 rc = ReleaseSemaphore( m_blockLock, 1, null ); 708 assert( rc ); 709 // do not open the gate below again 710 numSignalsLeft = 0; 711 } 712 else if( (numWaitersGone = m_numWaitersGone) != 0 ) 713 { 714 m_numWaitersGone = 0; 715 } 716 } 717 } 718 else if( ++m_numWaitersGone == int.max / 2 ) 719 { 720 // timeout/canceled or spurious event :-) 721 rc = WaitForSingleObject( m_blockLock, INFINITE ); 722 assert( rc == WAIT_OBJECT_0 ); 723 // something is going on here - test of timeouts? 724 m_numWaitersBlocked -= m_numWaitersGone; 725 rc = ReleaseSemaphore( m_blockLock, 1, null ); 726 assert( rc == WAIT_OBJECT_0 ); 727 m_numWaitersGone = 0; 728 } 729 730 LeaveCriticalSection( &m_unblockLock ); 731 732 if( numSignalsLeft == 1 ) 733 { 734 // better now than spurious later (same as ResetEvent) 735 for( ; numWaitersGone > 0; --numWaitersGone ) 736 { 737 rc = WaitForSingleObject( m_blockQueue, INFINITE ); 738 assert( rc == WAIT_OBJECT_0 ); 739 } 740 // open the gate 741 rc = ReleaseSemaphore( m_blockLock, 1, null ); 742 assert( rc ); 743 } 744 else if( numSignalsLeft != 0 ) 745 { 746 // unblock next waiter 747 rc = ReleaseSemaphore( m_blockQueue, 1, null ); 748 assert( rc ); 749 } 750 assocMutex.lock(); 751 return !timedOut; 752 } 753 754 755 void notifyImpl( bool all ) 756 { 757 DWORD rc; 758 759 EnterCriticalSection( &m_unblockLock ); 760 761 if( m_numWaitersToUnblock != 0 ) 762 { 763 if( m_numWaitersBlocked == 0 ) 764 { 765 LeaveCriticalSection( &m_unblockLock ); 766 return; 767 } 768 if( all ) 769 { 770 m_numWaitersToUnblock += m_numWaitersBlocked; 771 m_numWaitersBlocked = 0; 772 } 773 else 774 { 775 m_numWaitersToUnblock++; 776 m_numWaitersBlocked--; 777 } 778 LeaveCriticalSection( &m_unblockLock ); 779 } 780 else if( m_numWaitersBlocked > m_numWaitersGone ) 781 { 782 rc = WaitForSingleObject( m_blockLock, INFINITE ); 783 assert( rc == WAIT_OBJECT_0 ); 784 if( 0 != m_numWaitersGone ) 785 { 786 m_numWaitersBlocked -= m_numWaitersGone; 787 m_numWaitersGone = 0; 788 } 789 if( all ) 790 { 791 m_numWaitersToUnblock = m_numWaitersBlocked; 792 m_numWaitersBlocked = 0; 793 } 794 else 795 { 796 m_numWaitersToUnblock = 1; 797 m_numWaitersBlocked--; 798 } 799 LeaveCriticalSection( &m_unblockLock ); 800 rc = ReleaseSemaphore( m_blockQueue, 1, null ); 801 assert( rc ); 802 } 803 else 804 { 805 LeaveCriticalSection( &m_unblockLock ); 806 } 807 } 808 809 810 // NOTE: This implementation uses Algorithm 8c as described here: 811 // http://groups.google.com/group/comp.programming.threads/ 812 // browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a 813 HANDLE m_blockLock; // auto-reset event (now semaphore) 814 HANDLE m_blockQueue; // auto-reset event (now semaphore) 815 CRITICAL_SECTION m_unblockLock; // internal mutex/CS 816 int m_numWaitersGone = 0; 817 int m_numWaitersBlocked = 0; 818 int m_numWaitersToUnblock = 0; 819 } 820 else version( Posix ) 821 { 822 pthread_cond_t* _handle; 823 } 824 } 825 826 unittest 827 { 828 import dplug.core.thread; 829 830 auto mutex = makeMutex(); 831 auto condvar = makeConditionVariable(); 832 833 bool finished = false; 834 835 // Launch a thread that wait on this condition 836 Thread t = launchInAThread( 837 () { 838 mutex.lock(); 839 while(!finished) 840 condvar.wait(&mutex); 841 mutex.unlock(); 842 }); 843 844 // Notify termination 845 mutex.lock(); 846 finished = true; 847 mutex.unlock(); 848 condvar.notifyOne(); 849 850 t.join(); 851 }