1 /**
2  * Copyright: Copyright Sean Kelly 2005 - 2009.
3  * License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
4  * Authors:   Sean Kelly
5  */
6 /// This contains part of druntime's core.sys.mutex, core.sys.semaphore core.sys.condition and
7 /// Modified to make it @nogc nothrow
8 module dplug.core.sync;
9 
10 import core.time;
11 
12 import dplug.core.alignedbuffer;
13 import dplug.core.nogc;
14 
15 import core.stdc.stdio;
16 
17 version( Windows )
18 {
19     import core.sys.windows.windows;
20 
21     extern (Windows) export nothrow @nogc
22     {
23         void InitializeCriticalSectionAndSpinCount(CRITICAL_SECTION * lpCriticalSection, DWORD dwSpinCount);
24     }
25 }
26 else version( OSX )
27 {
28     import core.sys.posix.pthread;
29     import core.sync.config;
30     import core.stdc.errno;
31     import core.sys.posix.time;
32     import core.sys.osx.mach.semaphore;
33 
34     extern (C):
35     nothrow:
36     @nogc:
37     int pthread_mutexattr_setpolicy_np(pthread_mutexattr_t* attr, int);
38 }
39 else version( Posix )
40 {
41     import core.sync.config;
42     import core.stdc.errno;
43     import core.sys.posix.pthread;
44     import core.sys.posix.semaphore;
45     import core.sys.posix.time;
46 }
47 else
48 {
49     static assert(false, "Platform not supported");
50 }
51 
52 
53 //
54 // MUTEX
55 //
56 
57 /// Returns: A newly created `UnchekedMutex`.
58 UncheckedMutex makeMutex() nothrow @nogc
59 {
60     return UncheckedMutex(42);
61 }
62 
63 private enum PosixMutexAlignment = 64; // Wild guess, no measurements done
64 
65 struct UncheckedMutex
66 {
67     private this(int dummyArg) nothrow @nogc
68     {
69         assert(!_created);
70         version( Windows )
71         {
72             // Cargo-culting the spin-count in WTF::Lock
73             // See: https://webkit.org/blog/6161/locking-in-webkit/
74             InitializeCriticalSectionAndSpinCount( &m_hndl, 40 );
75         }
76         else version( Posix )
77         {
78             _handle = cast(pthread_mutex_t*)( alignedMalloc(pthread_mutex_t.sizeof, PosixMutexAlignment) );
79 
80             assumeNothrowNoGC(
81                 (pthread_mutex_t* handle)
82                 {
83                     pthread_mutexattr_t attr = void;
84                     pthread_mutexattr_init( &attr );
85                     pthread_mutexattr_settype( &attr, PTHREAD_MUTEX_RECURSIVE );
86 
87                     version (OSX)
88                     {
89                         // OSX mutexes are fair by default, but this has a cost for contended locks
90                         // Disable fairness.
91                         // https://blog.mozilla.org/nfroyd/2017/03/29/on-mutex-performance-part-1/
92                         enum _PTHREAD_MUTEX_POLICY_FIRSTFIT = 2;
93                         pthread_mutexattr_setpolicy_np(& attr, _PTHREAD_MUTEX_POLICY_FIRSTFIT);
94                     }
95                     pthread_mutex_init( handle, &attr );
96 
97                 })(handleAddr());
98         }
99         _created = 1;
100     }
101 
102     ~this() nothrow @nogc
103     {
104         if (_created)
105         {
106             version( Windows )
107             {
108                 DeleteCriticalSection( &m_hndl );
109             }
110             else version( Posix )
111             {
112                 assumeNothrowNoGC(
113                     (pthread_mutex_t* handle)
114                     {
115                         pthread_mutex_destroy(handle);
116                     })(handleAddr);
117                 alignedFree(_handle, PosixMutexAlignment);
118             }
119             _created = 0;
120         }
121     }
122 
123     @disable this(this);
124 
125     /// Lock mutex
126     final void lock() nothrow @nogc
127     {
128         version( Windows )
129         {
130             EnterCriticalSection( &m_hndl );
131         }
132         else version( Posix )
133         {
134             assumeNothrowNoGC(
135                 (pthread_mutex_t* handle)
136                 {
137                     int res = pthread_mutex_lock(handle);
138                     if (res != 0)
139                         assert(false);
140                 })(handleAddr());
141         }
142     }
143 
144     // undocumented function for internal use
145     final void unlock() nothrow @nogc
146     {
147         version( Windows )
148         {
149             LeaveCriticalSection( &m_hndl );
150         }
151         else version( Posix )
152         {
153             assumeNothrowNoGC(
154                 (pthread_mutex_t* handle)
155                 {
156                     int res = pthread_mutex_unlock(handle);
157                     if (res != 0)
158                         assert(false);
159                 })(handleAddr());
160         }
161     }
162 
163     bool tryLock() nothrow @nogc
164     {
165         version( Windows )
166         {
167             return TryEnterCriticalSection( &m_hndl ) != 0;
168         }
169         else version( Posix )
170         {
171             int result = assumeNothrowNoGC(
172                 (pthread_mutex_t* handle)
173                 {
174                     return pthread_mutex_trylock(handle);
175                 })(handleAddr());
176             return result == 0;
177         }
178     }
179 
180     // For debugging purpose
181     void dumpState() nothrow @nogc
182     {
183         version( Posix )
184         {
185             ubyte* pstate = cast(ubyte*)(handleAddr());
186             for (size_t i = 0; i < pthread_mutex_t.sizeof; ++i)
187             {
188                 printf("%02x", pstate[i]);
189             }
190             printf("\n");
191         }
192     }
193 
194 
195 
196 private:
197     version( Windows )
198     {
199         CRITICAL_SECTION    m_hndl;
200     }
201     else version( Posix )
202     {
203         pthread_mutex_t* _handle = null;
204     }
205 
206     // Work-around for Issue 16636
207     // https://issues.dlang.org/show_bug.cgi?id=16636
208     // Still crash with LDC somehow
209     long _created;
210 
211 package:
212     version( Posix )
213     {
214         pthread_mutex_t* handleAddr() nothrow @nogc
215         {
216             return _handle;
217         }
218     }
219 }
220 
221 unittest
222 {
223     UncheckedMutex mutex = makeMutex();
224     foreach(i; 0..100)
225     {
226         mutex.lock();
227         mutex.unlock();
228 
229         if (mutex.tryLock)
230             mutex.unlock();
231     }
232     mutex.destroy();
233 }
234 
235 
236 
237 //
238 // SEMAPHORE
239 //
240 
241 /// Returns: A newly created `UncheckedSemaphore`
242 UncheckedSemaphore makeSemaphore(uint count) nothrow @nogc
243 {
244     return UncheckedSemaphore(count);
245 }
246 
247 struct UncheckedSemaphore
248 {
249     private this( uint count ) nothrow @nogc
250     {
251         version( Windows )
252         {
253             m_hndl = CreateSemaphoreA( null, count, int.max, null );
254             if( m_hndl == m_hndl.init )
255                 assert(false);
256         }
257         else version( OSX )
258         {
259             mach_port_t task = assumeNothrowNoGC(
260                 ()
261                 {
262                     return mach_task_self();
263                 })();
264 
265             kern_return_t rc = assumeNothrowNoGC(
266                 (mach_port_t t, semaphore_t* handle, uint count)
267                 {
268                     return semaphore_create(t, handle, SYNC_POLICY_FIFO, count );
269                 })(task, &m_hndl, count);
270 
271             if( rc )
272                  assert(false);
273         }
274         else version( Posix )
275         {
276             int rc = sem_init( &m_hndl, 0, count );
277             if( rc )
278                 assert(false);
279         }
280         _created = 1;
281     }
282 
283     ~this() nothrow @nogc
284     {
285         if (_created)
286         {
287             version( Windows )
288             {
289                 BOOL rc = CloseHandle( m_hndl );
290                 assert( rc, "Unable to destroy semaphore" );
291             }
292             else version( OSX )
293             {
294                 mach_port_t task = assumeNothrowNoGC(
295                     ()
296                     {
297                         return mach_task_self();
298                     })();
299 
300                 kern_return_t rc = assumeNothrowNoGC(
301                     (mach_port_t t, semaphore_t handle)
302                     {
303                         return semaphore_destroy( t, handle );
304                     })(task, m_hndl);
305 
306                 assert( !rc, "Unable to destroy semaphore" );
307             }
308             else version( Posix )
309             {
310                 int rc = sem_destroy( &m_hndl );
311                 assert( !rc, "Unable to destroy semaphore" );
312             }
313             _created = 0;
314         }
315     }
316 
317     @disable this(this);
318 
319     void wait() nothrow @nogc
320     {
321         version( Windows )
322         {
323             DWORD rc = WaitForSingleObject( m_hndl, INFINITE );
324             assert( rc == WAIT_OBJECT_0 );
325         }
326         else version( OSX )
327         {
328             while( true )
329             {
330                 auto rc = assumeNothrowNoGC(
331                     (semaphore_t handle)
332                     {
333                         return semaphore_wait(handle);
334                     })(m_hndl);
335                 if( !rc )
336                     return;
337                 if( rc == KERN_ABORTED && errno == EINTR )
338                     continue;
339                 assert(false);
340             }
341         }
342         else version( Posix )
343         {
344             while( true )
345             {
346                 if (!assumeNothrowNoGC(
347                     (sem_t* handle)
348                     {
349                         return sem_wait(handle);
350                     })(&m_hndl))
351                     return;
352                 if( errno != EINTR )
353                     assert(false);
354             }
355         }
356     }
357 
358     bool wait( Duration period ) nothrow @nogc
359     in
360     {
361         assert( !period.isNegative );
362     }
363     body
364     {
365         version( Windows )
366         {
367             auto maxWaitMillis = dur!("msecs")( uint.max - 1 );
368 
369             while( period > maxWaitMillis )
370             {
371                 auto rc = WaitForSingleObject( m_hndl, cast(uint)
372                                                maxWaitMillis.total!"msecs" );
373                 switch( rc )
374                 {
375                     case WAIT_OBJECT_0:
376                         return true;
377                     case WAIT_TIMEOUT:
378                         period -= maxWaitMillis;
379                         continue;
380                     default:
381                          assert(false);
382                 }
383             }
384             switch( WaitForSingleObject( m_hndl, cast(uint) period.total!"msecs" ) )
385             {
386                 case WAIT_OBJECT_0:
387                     return true;
388                 case WAIT_TIMEOUT:
389                     return false;
390                 default:
391                     assert(false);
392             }
393         }
394         else version( OSX )
395         {
396             mach_timespec_t t = void;
397             (cast(byte*) &t)[0 .. t.sizeof] = 0;
398 
399             if( period.total!"seconds" > t.tv_sec.max )
400             {
401                 t.tv_sec  = t.tv_sec.max;
402                 t.tv_nsec = cast(typeof(t.tv_nsec)) period.split!("seconds", "nsecs")().nsecs;
403             }
404             else
405                 period.split!("seconds", "nsecs")(t.tv_sec, t.tv_nsec);
406             while( true )
407             {
408                 auto rc = assumeNothrowNoGC(
409                             (semaphore_t handle, mach_timespec_t t)
410                             {
411                                 return semaphore_timedwait(handle, t);
412                             })(m_hndl, t);
413                 if( !rc )
414                     return true;
415                 if( rc == KERN_OPERATION_TIMED_OUT )
416                     return false;
417                 if( rc != KERN_ABORTED || errno != EINTR )
418                      assert(false);
419             }
420         }
421         else version( Posix )
422         {
423             timespec t = void;
424 
425             assumeNothrowNoGC(
426                 (timespec t, Duration period)
427                 {
428                     mktspec( t, period );
429                 })(t, period);
430 
431             while( true )
432             {
433                 if (! ((sem_t* handle, timespec* t)
434                        {
435                             return sem_timedwait(handle, t);
436                        })(&m_hndl, &t))
437                     return true;
438                 if( errno == ETIMEDOUT )
439                     return false;
440                 if( errno != EINTR )
441                     assert(false);
442             }
443         }
444     }
445 
446     void notify()  nothrow @nogc
447     {
448         version( Windows )
449         {
450             if( !ReleaseSemaphore( m_hndl, 1, null ) )
451                 assert(false);
452         }
453         else version( OSX )
454         {
455            auto rc = assumeNothrowNoGC(
456                         (semaphore_t handle)
457                         {
458                             return semaphore_signal(handle);
459                         })(m_hndl);
460             if( rc )
461                 assert(false);
462         }
463         else version( Posix )
464         {
465             int rc = sem_post( &m_hndl );
466             if( rc )
467                 assert(false);
468         }
469     }
470 
471     bool tryWait() nothrow @nogc
472     {
473         version( Windows )
474         {
475             switch( WaitForSingleObject( m_hndl, 0 ) )
476             {
477                 case WAIT_OBJECT_0:
478                     return true;
479                 case WAIT_TIMEOUT:
480                     return false;
481                 default:
482                     assert(false);
483             }
484         }
485         else version( OSX )
486         {
487             return wait( dur!"hnsecs"(0) );
488         }
489         else version( Posix )
490         {
491             while( true )
492             {
493                 if( !sem_trywait( &m_hndl ) )
494                     return true;
495                 if( errno == EAGAIN )
496                     return false;
497                 if( errno != EINTR )
498                     assert(false);
499             }
500         }
501     }
502 
503 
504 private:
505     version( Windows )
506     {
507         HANDLE  m_hndl;
508     }
509     else version( OSX )
510     {
511         semaphore_t m_hndl;
512     }
513     else version( Posix )
514     {
515         sem_t   m_hndl;
516     }
517     ulong _created = 0;
518 }
519 
520 
521 unittest
522 {
523     foreach(j; 0..4)
524     {
525         UncheckedSemaphore semaphore = makeSemaphore(1);
526         foreach(i; 0..100)
527         {
528             semaphore.wait();
529             semaphore.notify();
530             if (semaphore.tryWait())
531                 semaphore.notify();
532         }
533     }
534 }
535 
536 
537 
538 //
539 // CONDITION VARIABLE
540 //
541 
542 
543 ConditionVariable makeConditionVariable() nothrow @nogc
544 {
545     return ConditionVariable(42);
546 }
547 
548 /**
549 * This struct represents a condition variable as conceived by C.A.R. Hoare.  As
550 * per Mesa type monitors however, "signal" has been replaced with "notify" to
551 * indicate that control is not transferred to the waiter when a notification
552 * is sent.
553 */
554 struct ConditionVariable
555 {
556 public:
557 nothrow:
558 @nogc:
559 
560     /// Initializes a condition variable.
561     this(int dummy)
562     {
563         version( Windows )
564         {
565             m_blockLock = CreateSemaphoreA( null, 1, 1, null );
566             if( m_blockLock == m_blockLock.init )
567                 assert(false);
568             m_blockQueue = CreateSemaphoreA( null, 0, int.max, null );
569             if( m_blockQueue == m_blockQueue.init )
570                 assert(false);
571             InitializeCriticalSection( &m_unblockLock );
572         }
573         else version( Posix )
574         {
575             _handle = cast(pthread_cond_t*)( alignedMalloc(pthread_cond_t.sizeof, PosixMutexAlignment) );
576 
577             int rc = pthread_cond_init( handleAddr(), null );
578             if( rc )
579                 assert(false);
580         }
581     }
582 
583 
584     ~this()
585     {
586         version( Windows )
587         {
588             CloseHandle( m_blockLock );
589             CloseHandle( m_blockQueue );
590             DeleteCriticalSection( &m_unblockLock );
591         }
592         else version( Posix )
593         {
594             if (_handle !is null)
595             {
596                 int rc = pthread_cond_destroy( handleAddr() );
597                 assert( !rc, "Unable to destroy condition" );
598                 alignedFree(_handle, PosixMutexAlignment);
599                 _handle = null;
600             }
601         }
602     }
603 
604     /// Wait until notified.
605     /// The associated mutex should always be the same for this condition variable.
606     void wait(UncheckedMutex* assocMutex)
607     {
608         version( Windows )
609         {
610             timedWait( INFINITE, assocMutex );
611         }
612         else version( Posix )
613         {
614             int rc = pthread_cond_wait( handleAddr(), assocMutex.handleAddr() );
615             if( rc )
616                 assert(false);
617         }
618     }
619 
620     /// Notifies one waiter.
621     void notifyOne()
622     {
623         version( Windows )
624         {
625             notifyImpl( false );
626         }
627         else version( Posix )
628         {
629             int rc = pthread_cond_signal( handleAddr() );
630             if( rc )
631                 assert(false);
632         }
633     }
634 
635 
636     /// Notifies all waiters.
637     void notifyAll()
638     {
639         version( Windows )
640         {
641             notifyImpl( true );
642         }
643         else version( Posix )
644         {
645             int rc = pthread_cond_broadcast( handleAddr() );
646             if( rc )
647                 assert(false);
648         }
649     }
650 
651     version(Posix)
652     {
653         pthread_cond_t* handleAddr() nothrow @nogc
654         {
655             return _handle;
656         }
657     }
658 
659 
660 private:
661     version( Windows )
662     {
663         bool timedWait( DWORD timeout, UncheckedMutex* assocMutex )
664         {
665             int   numSignalsLeft;
666             int   numWaitersGone;
667             DWORD rc;
668 
669             rc = WaitForSingleObject( m_blockLock, INFINITE );
670             assert( rc == WAIT_OBJECT_0 );
671 
672             m_numWaitersBlocked++;
673 
674             rc = ReleaseSemaphore( m_blockLock, 1, null );
675             assert( rc );
676 
677             assocMutex.unlock();
678 
679             rc = WaitForSingleObject( m_blockQueue, timeout );
680             assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT );
681             bool timedOut = (rc == WAIT_TIMEOUT);
682 
683             EnterCriticalSection( &m_unblockLock );
684 
685             if( (numSignalsLeft = m_numWaitersToUnblock) != 0 )
686             {
687                 if ( timedOut )
688                 {
689                     // timeout (or canceled)
690                     if( m_numWaitersBlocked != 0 )
691                     {
692                         m_numWaitersBlocked--;
693                         // do not unblock next waiter below (already unblocked)
694                         numSignalsLeft = 0;
695                     }
696                     else
697                     {
698                         // spurious wakeup pending!!
699                         m_numWaitersGone = 1;
700                     }
701                 }
702                 if( --m_numWaitersToUnblock == 0 )
703                 {
704                     if( m_numWaitersBlocked != 0 )
705                     {
706                         // open the gate
707                         rc = ReleaseSemaphore( m_blockLock, 1, null );
708                         assert( rc );
709                         // do not open the gate below again
710                         numSignalsLeft = 0;
711                     }
712                     else if( (numWaitersGone = m_numWaitersGone) != 0 )
713                     {
714                         m_numWaitersGone = 0;
715                     }
716                 }
717             }
718             else if( ++m_numWaitersGone == int.max / 2 )
719             {
720                 // timeout/canceled or spurious event :-)
721                 rc = WaitForSingleObject( m_blockLock, INFINITE );
722                 assert( rc == WAIT_OBJECT_0 );
723                 // something is going on here - test of timeouts?
724                 m_numWaitersBlocked -= m_numWaitersGone;
725                 rc = ReleaseSemaphore( m_blockLock, 1, null );
726                 assert( rc == WAIT_OBJECT_0 );
727                 m_numWaitersGone = 0;
728             }
729 
730             LeaveCriticalSection( &m_unblockLock );
731 
732             if( numSignalsLeft == 1 )
733             {
734                 // better now than spurious later (same as ResetEvent)
735                 for( ; numWaitersGone > 0; --numWaitersGone )
736                 {
737                     rc = WaitForSingleObject( m_blockQueue, INFINITE );
738                     assert( rc == WAIT_OBJECT_0 );
739                 }
740                 // open the gate
741                 rc = ReleaseSemaphore( m_blockLock, 1, null );
742                 assert( rc );
743             }
744             else if( numSignalsLeft != 0 )
745             {
746                 // unblock next waiter
747                 rc = ReleaseSemaphore( m_blockQueue, 1, null );
748                 assert( rc );
749             }
750             assocMutex.lock();
751             return !timedOut;
752         }
753 
754 
755         void notifyImpl( bool all )
756         {
757             DWORD rc;
758 
759             EnterCriticalSection( &m_unblockLock );
760 
761             if( m_numWaitersToUnblock != 0 )
762             {
763                 if( m_numWaitersBlocked == 0 )
764                 {
765                     LeaveCriticalSection( &m_unblockLock );
766                     return;
767                 }
768                 if( all )
769                 {
770                     m_numWaitersToUnblock += m_numWaitersBlocked;
771                     m_numWaitersBlocked = 0;
772                 }
773                 else
774                 {
775                     m_numWaitersToUnblock++;
776                     m_numWaitersBlocked--;
777                 }
778                 LeaveCriticalSection( &m_unblockLock );
779             }
780             else if( m_numWaitersBlocked > m_numWaitersGone )
781             {
782                 rc = WaitForSingleObject( m_blockLock, INFINITE );
783                 assert( rc == WAIT_OBJECT_0 );
784                 if( 0 != m_numWaitersGone )
785                 {
786                     m_numWaitersBlocked -= m_numWaitersGone;
787                     m_numWaitersGone = 0;
788                 }
789                 if( all )
790                 {
791                     m_numWaitersToUnblock = m_numWaitersBlocked;
792                     m_numWaitersBlocked = 0;
793                 }
794                 else
795                 {
796                     m_numWaitersToUnblock = 1;
797                     m_numWaitersBlocked--;
798                 }
799                 LeaveCriticalSection( &m_unblockLock );
800                 rc = ReleaseSemaphore( m_blockQueue, 1, null );
801                 assert( rc );
802             }
803             else
804             {
805                 LeaveCriticalSection( &m_unblockLock );
806             }
807         }
808 
809 
810         // NOTE: This implementation uses Algorithm 8c as described here:
811         //       http://groups.google.com/group/comp.programming.threads/
812         //              browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a
813         HANDLE              m_blockLock;    // auto-reset event (now semaphore)
814         HANDLE              m_blockQueue;   // auto-reset event (now semaphore)
815         CRITICAL_SECTION    m_unblockLock;  // internal mutex/CS
816         int                 m_numWaitersGone        = 0;
817         int                 m_numWaitersBlocked     = 0;
818         int                 m_numWaitersToUnblock   = 0;
819     }
820     else version( Posix )
821     {
822         pthread_cond_t*     _handle;
823     }
824 }
825 
826 unittest
827 {
828     import dplug.core.thread;
829 
830     auto mutex = makeMutex();
831     auto condvar = makeConditionVariable();
832 
833     bool finished = false;
834 
835     // Launch a thread that wait on this condition
836     Thread t = launchInAThread(
837         () {
838             mutex.lock();
839             while(!finished)
840                 condvar.wait(&mutex);
841             mutex.unlock();
842         });
843 
844     // Notify termination
845     mutex.lock();
846         finished = true;
847     mutex.unlock();
848     condvar.notifyOne();
849 
850     t.join();
851 }