1 /**
2  * Copyright: Copyright Sean Kelly 2005 - 2009.
3  * License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
4  * Authors:   Sean Kelly
5  */
6 // This contains part of druntime's core.sys.mutex, core.sys.semaphore core.sys.condition and
7 // Modified to make it @nogc nothrow
8 /**
9     Mutexes, semaphores and condition variables.
10 */
11 module dplug.core.sync;
12 
13 import core.time;
14 
15 import dplug.core.vec;
16 import dplug.core.nogc;
17 
18 import core.stdc.stdio;
19 
20 version( Windows )
21 {
22     import core.sys.windows.windows;
23 
24     extern (Windows) export nothrow @nogc
25     {
26         void InitializeCriticalSectionAndSpinCount(CRITICAL_SECTION * lpCriticalSection, DWORD dwSpinCount);
27     }
28 }
29 else version( OSX )
30 {
31     import core.sys.posix.pthread;
32     import core.sync.config;
33     import core.stdc.errno;
34     import core.sys.posix.time;
35     import core.sys.osx.mach.semaphore;
36 
37     /+
38     extern (C):
39     nothrow:
40     @nogc:
41     int pthread_mutexattr_setpolicy_np(pthread_mutexattr_t* attr, int);
42     +/
43 }
44 else version( Posix )
45 {
46     import core.sync.config;
47     import core.stdc.errno;
48     import core.sys.posix.pthread;
49     import core.sys.posix.semaphore;
50     import core.sys.posix.time;
51 }
52 else
53 {
54     static assert(false, "Platform not supported");
55 }
56 
57 
58 //
59 // MUTEX
60 //
61 
62 /// Returns: A newly created `UnchekedMutex`.
63 UncheckedMutex makeMutex() nothrow @nogc
64 {
65     return UncheckedMutex(42);
66 }
67 
68 private enum PosixMutexAlignment = 64; // Wild guess, no measurements done
69 
70 struct UncheckedMutex
71 {
72     private this(int dummyArg) nothrow @nogc
73     {
74         assert(!_created);
75         version( Windows )
76         {
77             // Cargo-culting the spin-count in WTF::Lock
78             // See: https://webkit.org/blog/6161/locking-in-webkit/
79             InitializeCriticalSectionAndSpinCount( &m_hndl, 40 );
80         }
81         else version( Posix )
82         {
83             _handle = cast(pthread_mutex_t*)( alignedMalloc(pthread_mutex_t.sizeof, PosixMutexAlignment) );
84 
85             assumeNothrowNoGC(
86                 (pthread_mutex_t* handle)
87                 {
88                     pthread_mutexattr_t attr = void;
89                     pthread_mutexattr_init( &attr );
90                     pthread_mutexattr_settype( &attr, PTHREAD_MUTEX_RECURSIVE );
91 
92                     version (OSX)
93                     {
94                         // Note: disabled since this breaks thread pool.
95                         /+
96                             // OSX mutexes are fair by default, but this has a cost for contended locks
97                             // Disable fairness.
98                             // https://blog.mozilla.org/nfroyd/2017/03/29/on-mutex-performance-part-1/
99                             enum _PTHREAD_MUTEX_POLICY_FIRSTFIT = 2;
100                             pthread_mutexattr_setpolicy_np(& attr, _PTHREAD_MUTEX_POLICY_FIRSTFIT);
101                         +/
102                     }
103 
104                     pthread_mutex_init( handle, &attr );
105 
106                 })(handleAddr());
107         }
108         _created = 1;
109     }
110 
111     ~this() nothrow @nogc
112     {
113         if (_created)
114         {
115             version( Windows )
116             {
117                 DeleteCriticalSection( &m_hndl );
118             }
119             else version( Posix )
120             {
121                 assumeNothrowNoGC(
122                     (pthread_mutex_t* handle)
123                     {
124                         pthread_mutex_destroy(handle);
125                     })(handleAddr);
126                 alignedFree(_handle, PosixMutexAlignment);
127             }
128             _created = 0;
129         }
130     }
131 
132     @disable this(this);
133 
134     /// Lock mutex
135     final void lock() nothrow @nogc
136     {
137         version( Windows )
138         {
139             EnterCriticalSection( &m_hndl );
140         }
141         else version( Posix )
142         {
143             assumeNothrowNoGC(
144                 (pthread_mutex_t* handle)
145                 {
146                     int res = pthread_mutex_lock(handle);
147                     if (res != 0)
148                         assert(false);
149                 })(handleAddr());
150         }
151     }
152 
153     // undocumented function for internal use
154     final void unlock() nothrow @nogc
155     {
156         version( Windows )
157         {
158             LeaveCriticalSection( &m_hndl );
159         }
160         else version( Posix )
161         {
162             assumeNothrowNoGC(
163                 (pthread_mutex_t* handle)
164                 {
165                     int res = pthread_mutex_unlock(handle);
166                     if (res != 0)
167                         assert(false);
168                 })(handleAddr());
169         }
170     }
171 
172     bool tryLock() nothrow @nogc
173     {
174         version( Windows )
175         {
176             return TryEnterCriticalSection( &m_hndl ) != 0;
177         }
178         else version( Posix )
179         {
180             int result = assumeNothrowNoGC(
181                 (pthread_mutex_t* handle)
182                 {
183                     return pthread_mutex_trylock(handle);
184                 })(handleAddr());
185             return result == 0;
186         }
187     }
188 
189     // For debugging purpose
190     void dumpState() nothrow @nogc
191     {
192         version( Posix )
193         {
194             ubyte* pstate = cast(ubyte*)(handleAddr());
195             for (size_t i = 0; i < pthread_mutex_t.sizeof; ++i)
196             {
197                 printf("%02x", pstate[i]);
198             }
199             printf("\n");
200         }
201     }
202 
203 
204 
205 private:
206     version( Windows )
207     {
208         CRITICAL_SECTION    m_hndl;
209     }
210     else version( Posix )
211     {
212         pthread_mutex_t* _handle = null;
213     }
214 
215     // Work-around for Issue 16636
216     // https://issues.dlang.org/show_bug.cgi?id=16636
217     // Still crash with LDC somehow
218     long _created;
219 
220 package:
221     version( Posix )
222     {
223         pthread_mutex_t* handleAddr() nothrow @nogc
224         {
225             return _handle;
226         }
227     }
228 }
229 
230 unittest
231 {
232     UncheckedMutex mutex = makeMutex();
233     foreach(i; 0..100)
234     {
235         mutex.lock();
236         mutex.unlock();
237 
238         if (mutex.tryLock)
239             mutex.unlock();
240     }
241     mutex.destroy();
242 }
243 
244 
245 
246 //
247 // SEMAPHORE
248 //
249 
250 /// Returns: A newly created `UncheckedSemaphore`
251 UncheckedSemaphore makeSemaphore(uint count) nothrow @nogc
252 {
253     return UncheckedSemaphore(count);
254 }
255 
256 struct UncheckedSemaphore
257 {
258     private this( uint count ) nothrow @nogc
259     {
260         version( Windows )
261         {
262             m_hndl = CreateSemaphoreA( null, count, int.max, null );
263             if( m_hndl == m_hndl.init )
264                 assert(false);
265         }
266         else version( OSX )
267         {
268             mach_port_t task = assumeNothrowNoGC(
269                 ()
270                 {
271                     return mach_task_self();
272                 })();
273 
274             kern_return_t rc = assumeNothrowNoGC(
275                 (mach_port_t t, semaphore_t* handle, uint count)
276                 {
277                     return semaphore_create(t, handle, SYNC_POLICY_FIFO, count );
278                 })(task, &m_hndl, count);
279 
280             if( rc )
281                  assert(false);
282         }
283         else version( Posix )
284         {
285             int rc = sem_init( &m_hndl, 0, count );
286             if( rc )
287                 assert(false);
288         }
289         _created = 1;
290     }
291 
292     ~this() nothrow @nogc
293     {
294         if (_created)
295         {
296             version( Windows )
297             {
298                 BOOL rc = CloseHandle( m_hndl );
299                 assert( rc, "Unable to destroy semaphore" );
300             }
301             else version( OSX )
302             {
303                 mach_port_t task = assumeNothrowNoGC(
304                     ()
305                     {
306                         return mach_task_self();
307                     })();
308 
309                 kern_return_t rc = assumeNothrowNoGC(
310                     (mach_port_t t, semaphore_t handle)
311                     {
312                         return semaphore_destroy( t, handle );
313                     })(task, m_hndl);
314 
315                 assert( !rc, "Unable to destroy semaphore" );
316             }
317             else version( Posix )
318             {
319                 int rc = sem_destroy( &m_hndl );
320                 assert( !rc, "Unable to destroy semaphore" );
321             }
322             _created = 0;
323         }
324     }
325 
326     @disable this(this);
327 
328     void wait() nothrow @nogc
329     {
330         version( Windows )
331         {
332             DWORD rc = WaitForSingleObject( m_hndl, INFINITE );
333             assert( rc == WAIT_OBJECT_0 );
334         }
335         else version( OSX )
336         {
337             while( true )
338             {
339                 auto rc = assumeNothrowNoGC(
340                     (semaphore_t handle)
341                     {
342                         return semaphore_wait(handle);
343                     })(m_hndl);
344                 if( !rc )
345                     return;
346                 if( rc == KERN_ABORTED && errno == EINTR )
347                     continue;
348                 assert(false);
349             }
350         }
351         else version( Posix )
352         {
353             while( true )
354             {
355                 if (!assumeNothrowNoGC(
356                     (sem_t* handle)
357                     {
358                         return sem_wait(handle);
359                     })(&m_hndl))
360                     return;
361                 if( errno != EINTR )
362                     assert(false);
363             }
364         }
365     }
366 
367     bool wait( Duration period ) nothrow @nogc
368     in
369     {
370         assert( !period.isNegative );
371     }
372     body
373     {
374         version( Windows )
375         {
376             auto maxWaitMillis = dur!("msecs")( uint.max - 1 );
377 
378             while( period > maxWaitMillis )
379             {
380                 auto rc = WaitForSingleObject( m_hndl, cast(uint)
381                                                maxWaitMillis.total!"msecs" );
382                 switch( rc )
383                 {
384                     case WAIT_OBJECT_0:
385                         return true;
386                     case WAIT_TIMEOUT:
387                         period -= maxWaitMillis;
388                         continue;
389                     default:
390                          assert(false);
391                 }
392             }
393             switch( WaitForSingleObject( m_hndl, cast(uint) period.total!"msecs" ) )
394             {
395                 case WAIT_OBJECT_0:
396                     return true;
397                 case WAIT_TIMEOUT:
398                     return false;
399                 default:
400                     assert(false);
401             }
402         }
403         else version( OSX )
404         {
405             mach_timespec_t t = void;
406             (cast(byte*) &t)[0 .. t.sizeof] = 0;
407 
408             if( period.total!"seconds" > t.tv_sec.max )
409             {
410                 t.tv_sec  = t.tv_sec.max;
411                 t.tv_nsec = cast(typeof(t.tv_nsec)) period.split!("seconds", "nsecs")().nsecs;
412             }
413             else
414                 period.split!("seconds", "nsecs")(t.tv_sec, t.tv_nsec);
415             while( true )
416             {
417                 auto rc = assumeNothrowNoGC(
418                             (semaphore_t handle, mach_timespec_t t)
419                             {
420                                 return semaphore_timedwait(handle, t);
421                             })(m_hndl, t);
422                 if( !rc )
423                     return true;
424                 if( rc == KERN_OPERATION_TIMED_OUT )
425                     return false;
426                 if( rc != KERN_ABORTED || errno != EINTR )
427                      assert(false);
428             }
429         }
430         else version( Posix )
431         {
432             timespec t;
433 
434             assumeNothrowNoGC(
435                 (ref timespec t, Duration period)
436                 {
437                     mktspec( t, period );
438                 })(t, period);
439 
440             while( true )
441             {
442                 if (! ((sem_t* handle, timespec* t)
443                        {
444                             return sem_timedwait(handle, t);
445                        })(&m_hndl, &t))
446                     return true;
447                 if( errno == ETIMEDOUT )
448                     return false;
449                 if( errno != EINTR )
450                     assert(false);
451             }
452         }
453     }
454 
455     void notify()  nothrow @nogc
456     {
457         version( Windows )
458         {
459             if( !ReleaseSemaphore( m_hndl, 1, null ) )
460                 assert(false);
461         }
462         else version( OSX )
463         {
464            auto rc = assumeNothrowNoGC(
465                         (semaphore_t handle)
466                         {
467                             return semaphore_signal(handle);
468                         })(m_hndl);
469             if( rc )
470                 assert(false);
471         }
472         else version( Posix )
473         {
474             int rc = sem_post( &m_hndl );
475             if( rc )
476                 assert(false);
477         }
478     }
479 
480     bool tryWait() nothrow @nogc
481     {
482         version( Windows )
483         {
484             switch( WaitForSingleObject( m_hndl, 0 ) )
485             {
486                 case WAIT_OBJECT_0:
487                     return true;
488                 case WAIT_TIMEOUT:
489                     return false;
490                 default:
491                     assert(false);
492             }
493         }
494         else version( OSX )
495         {
496             return wait( dur!"hnsecs"(0) );
497         }
498         else version( Posix )
499         {
500             while( true )
501             {
502                 if( !sem_trywait( &m_hndl ) )
503                     return true;
504                 if( errno == EAGAIN )
505                     return false;
506                 if( errno != EINTR )
507                     assert(false);
508             }
509         }
510     }
511 
512 
513 private:
514     version( Windows )
515     {
516         HANDLE  m_hndl;
517     }
518     else version( OSX )
519     {
520         semaphore_t m_hndl;
521     }
522     else version( Posix )
523     {
524         sem_t   m_hndl;
525     }
526     ulong _created = 0;
527 }
528 
529 
530 unittest
531 {
532     foreach(j; 0..4)
533     {
534         UncheckedSemaphore semaphore = makeSemaphore(1);
535         foreach(i; 0..100)
536         {
537             semaphore.wait();
538             semaphore.notify();
539             if (semaphore.tryWait())
540                 semaphore.notify();
541         }
542     }
543 }
544 
545 
546 
547 //
548 // CONDITION VARIABLE
549 //
550 
551 
552 ConditionVariable makeConditionVariable() nothrow @nogc
553 {
554     return ConditionVariable(42);
555 }
556 
557 /**
558 * This struct represents a condition variable as conceived by C.A.R. Hoare.  As
559 * per Mesa type monitors however, "signal" has been replaced with "notify" to
560 * indicate that control is not transferred to the waiter when a notification
561 * is sent.
562 */
563 struct ConditionVariable
564 {
565 public:
566 nothrow:
567 @nogc:
568 
569     /// Initializes a condition variable.
570     this(int dummy)
571     {
572         version( Windows )
573         {
574             m_blockLock = CreateSemaphoreA( null, 1, 1, null );
575             if( m_blockLock == m_blockLock.init )
576                 assert(false);
577             m_blockQueue = CreateSemaphoreA( null, 0, int.max, null );
578             if( m_blockQueue == m_blockQueue.init )
579                 assert(false);
580             InitializeCriticalSection( &m_unblockLock );
581         }
582         else version( Posix )
583         {
584             _handle = cast(pthread_cond_t*)( alignedMalloc(pthread_cond_t.sizeof, PosixMutexAlignment) );
585 
586             int rc = pthread_cond_init( handleAddr(), null );
587             if( rc )
588                 assert(false);
589         }
590     }
591 
592 
593     ~this()
594     {
595         version( Windows )
596         {
597             CloseHandle( m_blockLock );
598             CloseHandle( m_blockQueue );
599             DeleteCriticalSection( &m_unblockLock );
600         }
601         else version( Posix )
602         {
603             if (_handle !is null)
604             {
605                 int rc = pthread_cond_destroy( handleAddr() );
606                 assert( !rc, "Unable to destroy condition" );
607                 alignedFree(_handle, PosixMutexAlignment);
608                 _handle = null;
609             }
610         }
611     }
612 
613     /// Wait until notified.
614     /// The associated mutex should always be the same for this condition variable.
615     void wait(UncheckedMutex* assocMutex)
616     {
617         version( Windows )
618         {
619             timedWait( INFINITE, assocMutex );
620         }
621         else version( Posix )
622         {
623             int rc = pthread_cond_wait( handleAddr(), assocMutex.handleAddr() );
624             if( rc )
625                 assert(false);
626         }
627     }
628 
629     /// Notifies one waiter.
630     void notifyOne()
631     {
632         version( Windows )
633         {
634             notifyImpl( false );
635         }
636         else version( Posix )
637         {
638             int rc = pthread_cond_signal( handleAddr() );
639             if( rc )
640                 assert(false);
641         }
642     }
643 
644 
645     /// Notifies all waiters.
646     void notifyAll()
647     {
648         version( Windows )
649         {
650             notifyImpl( true );
651         }
652         else version( Posix )
653         {
654             int rc = pthread_cond_broadcast( handleAddr() );
655             if( rc )
656                 assert(false);
657         }
658     }
659 
660     version(Posix)
661     {
662         pthread_cond_t* handleAddr() nothrow @nogc
663         {
664             return _handle;
665         }
666     }
667 
668 
669 private:
670     version( Windows )
671     {
672         bool timedWait( DWORD timeout, UncheckedMutex* assocMutex )
673         {
674             int   numSignalsLeft;
675             int   numWaitersGone;
676             DWORD rc;
677 
678             rc = WaitForSingleObject( m_blockLock, INFINITE );
679             assert( rc == WAIT_OBJECT_0 );
680 
681             m_numWaitersBlocked++;
682 
683             rc = ReleaseSemaphore( m_blockLock, 1, null );
684             assert( rc );
685 
686             assocMutex.unlock();
687 
688             rc = WaitForSingleObject( m_blockQueue, timeout );
689             assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT );
690             bool timedOut = (rc == WAIT_TIMEOUT);
691 
692             EnterCriticalSection( &m_unblockLock );
693 
694             if( (numSignalsLeft = m_numWaitersToUnblock) != 0 )
695             {
696                 if ( timedOut )
697                 {
698                     // timeout (or canceled)
699                     if( m_numWaitersBlocked != 0 )
700                     {
701                         m_numWaitersBlocked--;
702                         // do not unblock next waiter below (already unblocked)
703                         numSignalsLeft = 0;
704                     }
705                     else
706                     {
707                         // spurious wakeup pending!!
708                         m_numWaitersGone = 1;
709                     }
710                 }
711                 if( --m_numWaitersToUnblock == 0 )
712                 {
713                     if( m_numWaitersBlocked != 0 )
714                     {
715                         // open the gate
716                         rc = ReleaseSemaphore( m_blockLock, 1, null );
717                         assert( rc );
718                         // do not open the gate below again
719                         numSignalsLeft = 0;
720                     }
721                     else if( (numWaitersGone = m_numWaitersGone) != 0 )
722                     {
723                         m_numWaitersGone = 0;
724                     }
725                 }
726             }
727             else if( ++m_numWaitersGone == int.max / 2 )
728             {
729                 // timeout/canceled or spurious event :-)
730                 rc = WaitForSingleObject( m_blockLock, INFINITE );
731                 assert( rc == WAIT_OBJECT_0 );
732                 // something is going on here - test of timeouts?
733                 m_numWaitersBlocked -= m_numWaitersGone;
734                 rc = ReleaseSemaphore( m_blockLock, 1, null );
735                 assert( rc == WAIT_OBJECT_0 );
736                 m_numWaitersGone = 0;
737             }
738 
739             LeaveCriticalSection( &m_unblockLock );
740 
741             if( numSignalsLeft == 1 )
742             {
743                 // better now than spurious later (same as ResetEvent)
744                 for( ; numWaitersGone > 0; --numWaitersGone )
745                 {
746                     rc = WaitForSingleObject( m_blockQueue, INFINITE );
747                     assert( rc == WAIT_OBJECT_0 );
748                 }
749                 // open the gate
750                 rc = ReleaseSemaphore( m_blockLock, 1, null );
751                 assert( rc );
752             }
753             else if( numSignalsLeft != 0 )
754             {
755                 // unblock next waiter
756                 rc = ReleaseSemaphore( m_blockQueue, 1, null );
757                 assert( rc );
758             }
759             assocMutex.lock();
760             return !timedOut;
761         }
762 
763 
764         void notifyImpl( bool all )
765         {
766             DWORD rc;
767 
768             EnterCriticalSection( &m_unblockLock );
769 
770             if( m_numWaitersToUnblock != 0 )
771             {
772                 if( m_numWaitersBlocked == 0 )
773                 {
774                     LeaveCriticalSection( &m_unblockLock );
775                     return;
776                 }
777                 if( all )
778                 {
779                     m_numWaitersToUnblock += m_numWaitersBlocked;
780                     m_numWaitersBlocked = 0;
781                 }
782                 else
783                 {
784                     m_numWaitersToUnblock++;
785                     m_numWaitersBlocked--;
786                 }
787                 LeaveCriticalSection( &m_unblockLock );
788             }
789             else if( m_numWaitersBlocked > m_numWaitersGone )
790             {
791                 rc = WaitForSingleObject( m_blockLock, INFINITE );
792                 assert( rc == WAIT_OBJECT_0 );
793                 if( 0 != m_numWaitersGone )
794                 {
795                     m_numWaitersBlocked -= m_numWaitersGone;
796                     m_numWaitersGone = 0;
797                 }
798                 if( all )
799                 {
800                     m_numWaitersToUnblock = m_numWaitersBlocked;
801                     m_numWaitersBlocked = 0;
802                 }
803                 else
804                 {
805                     m_numWaitersToUnblock = 1;
806                     m_numWaitersBlocked--;
807                 }
808                 LeaveCriticalSection( &m_unblockLock );
809                 rc = ReleaseSemaphore( m_blockQueue, 1, null );
810                 assert( rc );
811             }
812             else
813             {
814                 LeaveCriticalSection( &m_unblockLock );
815             }
816         }
817 
818 
819         // NOTE: This implementation uses Algorithm 8c as described here:
820         //       http://groups.google.com/group/comp.programming.threads/
821         //              browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a
822         HANDLE              m_blockLock;    // auto-reset event (now semaphore)
823         HANDLE              m_blockQueue;   // auto-reset event (now semaphore)
824         CRITICAL_SECTION    m_unblockLock;  // internal mutex/CS
825         int                 m_numWaitersGone        = 0;
826         int                 m_numWaitersBlocked     = 0;
827         int                 m_numWaitersToUnblock   = 0;
828     }
829     else version( Posix )
830     {
831         pthread_cond_t*     _handle;
832     }
833 }
834 
835 unittest
836 {
837     import dplug.core.thread;
838 
839     auto mutex = makeMutex();
840     auto condvar = makeConditionVariable();
841 
842     bool finished = false;
843 
844     // Launch a thread that wait on this condition
845     Thread t = launchInAThread(
846         () {
847             mutex.lock();
848             while(!finished)
849                 condvar.wait(&mutex);
850             mutex.unlock();
851         });
852 
853     // Notify termination
854     mutex.lock();
855         finished = true;
856     mutex.unlock();
857     condvar.notifyOne();
858 
859     t.join();
860 }