1 /** 2 * The thread module provides support for thread creation and management. 3 * 4 * Copyright: Copyright Sean Kelly 2005 - 2012. 5 * Copyright: Copyright (c) 2009-2011, David Simcha. 6 * Copyright: Copyright Auburn Sounds 2016 7 * License: Distributed under the 8 * $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0). 9 * (See accompanying file LICENSE) 10 * Authors: Sean Kelly, Walter Bright, Alex Rønne Petersen, Martin Nowak, David Simcha, Guillaume Piolat 11 */ 12 module dplug.core.thread; 13 14 import dplug.core.nogc; 15 import dplug.core.lockedqueue; 16 import dplug.core.sync; 17 18 version(Posix) 19 import core.sys.posix.pthread; 20 else version(Windows) 21 { 22 import core.stdc.stdint : uintptr_t; 23 import core.sys.windows.windef; 24 import core.sys.windows.winbase; 25 import core.thread; 26 27 extern (Windows) alias btex_fptr = uint function(void*) ; 28 extern (C) uintptr_t _beginthreadex(void*, uint, btex_fptr, void*, uint, uint*) nothrow @nogc; 29 } 30 else 31 static assert(false, "Platform not supported"); 32 33 version(OSX) 34 { 35 extern(C) nothrow @nogc 36 int sysctlbyname(const(char)*, void *, size_t *, void *, size_t); 37 } 38 39 40 alias ThreadDelegate = void delegate() nothrow @nogc; 41 42 43 Thread makeThread(ThreadDelegate callback, size_t stackSize = 0) nothrow @nogc 44 { 45 return Thread(callback, stackSize); 46 } 47 48 /// Optimistic thread, failure not supported 49 struct Thread 50 { 51 nothrow: 52 @nogc: 53 public: 54 55 /// Create a suspended thread. 56 /// Params: 57 /// callback The delegate that will be called by the thread 58 /// stackSize The thread stack size in bytes. 0 for default size. 59 /// Warning: It is STRONGLY ADVISED to pass a class member delegate to have 60 /// the right delegate context. 61 /// Passing struct method delegates are currently UNSUPPORTED. 62 this(ThreadDelegate callback, size_t stackSize = 0) 63 { 64 _stackSize = stackSize; 65 _callback = callback; 66 } 67 68 /// Destroys a thread. The thread is supposed to be finished at this point. 69 ~this() 70 { 71 if (!_started) 72 return; 73 74 version(Posix) 75 { 76 pthread_detach(_id); 77 } 78 else version(Windows) 79 { 80 CloseHandle(_id); 81 } 82 } 83 84 @disable this(this); 85 86 /// Starts the thread. Threads are created suspended. This function can 87 /// only be called once. 88 void start() 89 { 90 assert(!_started); 91 version(Posix) 92 { 93 pthread_attr_t attr; 94 95 int err = assumeNothrowNoGC( 96 (pthread_attr_t* pattr) 97 { 98 return pthread_attr_init(pattr); 99 })(&attr); 100 101 if (err != 0) 102 assert(false); 103 104 if(_stackSize != 0) 105 { 106 int err2 = assumeNothrowNoGC( 107 (pthread_attr_t* pattr, size_t stackSize) 108 { 109 return pthread_attr_setstacksize(pattr, stackSize); 110 })(&attr, _stackSize); 111 if (err2 != 0) 112 assert(false); 113 } 114 115 int err3 = pthread_create(&_id, &attr, &posixThreadEntryPoint, &_callback); 116 if (err3 != 0) 117 assert(false); 118 119 int err4 = assumeNothrowNoGC( 120 (pthread_attr_t* pattr) 121 { 122 return pthread_attr_destroy(pattr); 123 })(&attr); 124 if (err4 != 0) 125 assert(false); 126 } 127 128 version(Windows) 129 { 130 131 uint dummy; 132 _id = cast(HANDLE) _beginthreadex(null, 133 cast(uint)_stackSize, 134 &windowsThreadEntryPoint, 135 &_callback, 136 CREATE_SUSPENDED, 137 &dummy); 138 if (cast(size_t)_id == 0) 139 assert(false); 140 if (ResumeThread(_id) == -1) 141 assert(false); 142 } 143 } 144 145 /// Wait for that thread termination 146 void join() 147 { 148 version(Posix) 149 { 150 void* returnValue; 151 if (0 != pthread_join(_id, &returnValue)) 152 assert(false); 153 } 154 else version(Windows) 155 { 156 if(WaitForSingleObject(_id, INFINITE) != WAIT_OBJECT_0) 157 assert(false); 158 CloseHandle(_id); 159 } 160 } 161 162 private: 163 version(Posix) pthread_t _id; 164 version(Windows) HANDLE _id; 165 ThreadDelegate _callback; 166 size_t _stackSize; 167 bool _started = false; 168 } 169 170 version(Posix) 171 { 172 extern(C) void* posixThreadEntryPoint(void* threadContext) nothrow @nogc 173 { 174 ThreadDelegate dg = *cast(ThreadDelegate*)(threadContext); 175 dg(); // hopfully called with the right context pointer 176 return null; 177 } 178 } 179 180 version(Windows) 181 { 182 extern (Windows) uint windowsThreadEntryPoint(void* threadContext) nothrow @nogc 183 { 184 ThreadDelegate dg = *cast(ThreadDelegate*)(threadContext); 185 dg(); 186 return 0; 187 } 188 } 189 190 unittest 191 { 192 int outerInt = 0; 193 194 class A 195 { 196 nothrow @nogc: 197 this() 198 { 199 t = makeThread(&f); 200 t.start(); 201 } 202 203 void join() 204 { 205 t.join(); 206 } 207 208 void f() 209 { 210 outerInt = 1; 211 innerInt = 2; 212 213 // verify this 214 assert(checkValue0 == 0x11223344); 215 assert(checkValue1 == 0x55667788); 216 } 217 218 int checkValue0 = 0x11223344; 219 int checkValue1 = 0x55667788; 220 int innerInt = 0; 221 Thread t; 222 } 223 224 auto a = new A; 225 a.t.join(); 226 assert(a.innerInt == 2); 227 a.destroy(); 228 assert(outerInt == 1); 229 } 230 231 /// Launch a function in a newly created thread, which is destroyed afterwards. 232 /// Return the thread so that you can call `.join()` on it. 233 Thread launchInAThread(ThreadDelegate dg) nothrow @nogc 234 { 235 Thread t = makeThread(dg); 236 t.start(); 237 return t; 238 } 239 240 241 version(Windows) 242 { 243 /// Returns: current thread identifier. 244 void* currentThreadId() nothrow @nogc 245 { 246 return cast(void*)GetCurrentThreadId(); 247 } 248 } 249 else version(Posix) 250 { 251 /// Returns: current thread identifier. 252 void* currentThreadId() nothrow @nogc 253 { 254 return assumeNothrowNoGC( 255 () 256 { 257 return cast(void*)(pthread_self()); 258 })(); 259 } 260 } 261 262 263 // 264 // Thread-pool 265 // 266 267 /// Returns: Number of CPUs. 268 int getTotalNumberOfCPUs() nothrow @nogc 269 { 270 version(Windows) 271 { 272 import core.sys.windows.windows : SYSTEM_INFO, GetSystemInfo; 273 SYSTEM_INFO si; 274 GetSystemInfo(&si); 275 int procs = cast(int) si.dwNumberOfProcessors; 276 if (procs < 1) 277 procs = 1; 278 return procs; 279 } 280 else version(linux) 281 { 282 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 283 return cast(int) sysconf(_SC_NPROCESSORS_ONLN); 284 } 285 else version(OSX) 286 { 287 auto nameStr = "machdep.cpu.core_count\0".ptr; 288 uint ans; 289 size_t len = uint.sizeof; 290 sysctlbyname(nameStr, &ans, &len, null, 0); 291 return cast(int)ans; 292 } 293 else 294 static assert(false, "OS unsupported"); 295 } 296 297 alias ThreadPoolDelegate = void delegate(int workItem) nothrow @nogc; 298 299 300 /// Rewrite of the ThreadPool using condition variables. 301 /// FUTURE: this could be speed-up by using futures. Description of the task 302 /// and associated condition+mutex would go in an external struct. 303 /// Note: the interface of the thread-pool itself is not thread-safe, you cannot give orders from 304 /// multiple threads at once. 305 class ThreadPool 306 { 307 public: 308 nothrow: 309 @nogc: 310 311 /// Creates a thread-pool. 312 this(int numThreads = 0, size_t stackSize = 0) 313 { 314 // Create sync first 315 _workMutex = makeMutex(); 316 _workCondition = makeConditionVariable(); 317 318 _finishMutex = makeMutex(); 319 _finishCondition = makeConditionVariable(); 320 321 // Create threads 322 if (numThreads == 0) 323 numThreads = getTotalNumberOfCPUs(); 324 _threads = mallocSlice!Thread(numThreads); 325 foreach(ref thread; _threads) 326 { 327 thread = makeThread(&workerThreadFunc, stackSize); 328 thread.start(); 329 } 330 } 331 332 /// Destroys a thread-pool. 333 ~this() 334 { 335 if (_threads !is null) 336 { 337 assert(_state == State.initial); 338 339 // Put the threadpool is stop state 340 _workMutex.lock(); 341 _stopFlag = true; 342 _workMutex.unlock(); 343 344 // Notify all workers 345 _workCondition.notifyAll(); 346 347 // Wait for each thread termination 348 foreach(ref thread; _threads) 349 thread.join(); 350 351 // Detroys each thread 352 foreach(ref thread; _threads) 353 thread.destroy(); 354 freeSlice(_threads); 355 _threads = null; 356 } 357 } 358 359 /// Calls the delegate in parallel, with 0..count as index. 360 /// Immediate waiting for completion. 361 void parallelFor(int count, scope ThreadPoolDelegate dg) 362 { 363 assert(_state == State.initial); 364 365 // Do not launch worker threads for one work-item, not worth it. 366 // (but it is worth it in async). 367 if (count == 1) 368 { 369 dg(0); 370 return; 371 } 372 373 // Unleash parallel threads. 374 parallelForAsync(count, dg); 375 376 // Wait for completion immediately. 377 waitForCompletion(); 378 } 379 380 /// Same, but does not wait for completion. 381 /// You cannot have 2 concurrent parallelFor for the same thread-pool. 382 void parallelForAsync(int count, scope ThreadPoolDelegate dg) 383 { 384 assert(_state == State.initial); 385 386 if (count == 0) // no tasks, exit immediately 387 return; 388 389 // At this point we assume all worker threads are waiting for messages 390 391 // Sets the current task 392 _workMutex.lock(); 393 394 _taskDelegate = dg; // immutable during this parallelFor 395 _taskNumWorkItem = count; // immutable during this parallelFor 396 _taskCurrentWorkItem = 0; 397 _taskCompleted = 0; 398 399 _workMutex.unlock(); 400 401 // wake up all threads 402 // FUTURE: if number of tasks < number of threads only wake up the necessary amount of threads 403 _workCondition.notifyAll(); 404 405 _state = State.parallelForInProgress; 406 } 407 408 /// Wait for completion of the previous parallelFor, if any. 409 // It's always safe to call this function before doing another parallelFor. 410 void waitForCompletion() 411 { 412 if (_state == State.initial) 413 return; // that means that parallel threads were not launched 414 415 assert(_state == State.parallelForInProgress); 416 417 _finishMutex.lock(); 418 scope(exit) _finishMutex.unlock(); 419 420 // FUTURE: order thread will be waken up multiple times 421 // (one for every completed task) 422 // maybe that can be optimized 423 while (_taskCompleted < _taskNumWorkItem) 424 { 425 _finishCondition.wait(&_finishMutex); 426 } 427 428 _state = State.initial; 429 } 430 431 private: 432 Thread[] _threads = null; 433 434 // Used to signal more work 435 UncheckedMutex _workMutex; 436 ConditionVariable _workCondition; 437 438 // Used to signal completion 439 UncheckedMutex _finishMutex; 440 ConditionVariable _finishCondition; 441 442 // These fields represent the current task group (ie. a parallelFor) 443 ThreadPoolDelegate _taskDelegate; 444 int _taskNumWorkItem; // total number of tasks in this task group 445 int _taskCurrentWorkItem; // current task still left to do (protected by _workMutex) 446 int _taskCompleted; // every task < taskCompleted has already been completed (protected by _finishMutex) 447 448 bool _stopFlag; 449 450 bool hasWork() 451 { 452 return _taskCurrentWorkItem < _taskNumWorkItem; 453 } 454 455 // Represent the thread-pool state from the user POV 456 enum State 457 { 458 initial, // tasks can be launched 459 parallelForInProgress, // task were launched, but not waited one 460 } 461 State _state = State.initial; 462 463 // What worker threads do 464 // MAYDO: threads come here with bad context with struct delegates 465 void workerThreadFunc() 466 { 467 while (true) 468 { 469 int workItem = -1; 470 { 471 _workMutex.lock(); 472 scope(exit) _workMutex.unlock(); 473 474 // Wait for notification 475 while (!_stopFlag && !hasWork()) 476 _workCondition.wait(&_workMutex); 477 478 if (_stopFlag && !hasWork()) 479 return; 480 481 assert(hasWork()); 482 483 // Pick a task and increment counter 484 workItem = _taskCurrentWorkItem; 485 _taskCurrentWorkItem++; 486 } 487 488 assert(workItem != -1); 489 490 // Do the actual task 491 _taskDelegate(workItem); 492 493 // signal completion of one more task 494 { 495 _finishMutex.lock(); 496 _taskCompleted++; 497 _finishMutex.unlock(); 498 499 _finishCondition.notifyOne(); // wake-up 500 } 501 } 502 } 503 } 504 505 506 unittest 507 { 508 import core.atomic; 509 import dplug.core.nogc; 510 511 struct A 512 { 513 ThreadPool _pool; 514 515 this(int dummy) 516 { 517 _pool = mallocEmplace!ThreadPool(); 518 } 519 520 ~this() 521 { 522 _pool.destroy(); 523 } 524 525 void launch(int count, bool async) nothrow @nogc 526 { 527 if (async) 528 { 529 _pool.parallelForAsync(count, &loopBody); 530 _pool.waitForCompletion(); 531 } 532 else 533 _pool.parallelFor(count, &loopBody); 534 } 535 536 void loopBody(int workItem) nothrow @nogc 537 { 538 atomicOp!"+="(counter, 1); 539 } 540 541 shared(int) counter = 0; 542 } 543 544 auto a = A(4); 545 a.launch(10, false); 546 assert(a.counter == 10); 547 548 a.launch(500, true); 549 assert(a.counter == 510); 550 551 a.launch(1, false); 552 assert(a.counter == 511); 553 554 a.launch(1, true); 555 assert(a.counter == 512); 556 557 a.launch(0, true); 558 assert(a.counter == 512); 559 a.launch(0, false); 560 assert(a.counter == 512); 561 }