1 /** 2 * Threads and thread-pool. 3 * 4 * Copyright: Copyright Sean Kelly 2005 - 2012. 5 * Copyright: Copyright (c) 2009-2011, David Simcha. 6 * Copyright: Copyright Auburn Sounds 2016. 7 * License: Distributed under the 8 * $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0). 9 * (See accompanying file LICENSE) 10 * Authors: Sean Kelly, Walter Bright, Alex Rønne Petersen, Martin Nowak, David Simcha, Guillaume Piolat 11 */ 12 module dplug.core.thread; 13 14 import dplug.core.nogc; 15 import dplug.core.lockedqueue; 16 import dplug.core.sync; 17 18 version(Posix) 19 import core.sys.posix.pthread; 20 else version(Windows) 21 { 22 import core.stdc.stdint : uintptr_t; 23 import core.sys.windows.windef; 24 import core.sys.windows.winbase; 25 import core.thread; 26 27 extern (Windows) alias btex_fptr = uint function(void*) ; 28 extern (C) uintptr_t _beginthreadex(void*, uint, btex_fptr, void*, uint, uint*) nothrow @nogc; 29 } 30 else 31 static assert(false, "Platform not supported"); 32 33 version(OSX) 34 { 35 extern(C) nothrow @nogc 36 int sysctlbyname(const(char)*, void *, size_t *, void *, size_t); 37 } 38 39 40 alias ThreadDelegate = void delegate() nothrow @nogc; 41 42 43 Thread makeThread(ThreadDelegate callback, size_t stackSize = 0) nothrow @nogc 44 { 45 return Thread(callback, stackSize); 46 } 47 48 /// Optimistic thread, failure not supported 49 struct Thread 50 { 51 nothrow: 52 @nogc: 53 public: 54 55 /// Create a suspended thread. 56 /// 57 /// Params: 58 /// callback = The delegate that will be called by the thread. 59 /// stackSize = The thread stack size in bytes. 0 for default size. 60 /// 61 /// Warning: It is STRONGLY ADVISED to pass a class member delegate (not a struct 62 /// member delegate) to have context. 63 /// Passing struct method delegates are currently UNSUPPORTED. 64 /// 65 this(ThreadDelegate callback, size_t stackSize = 0) 66 { 67 _stackSize = stackSize; 68 _callback = callback; 69 } 70 71 /// Destroys a thread. The thread is supposed to be finished at this point. 72 ~this() 73 { 74 if (!_started) 75 return; 76 77 version(Posix) 78 { 79 pthread_detach(_id); 80 } 81 else version(Windows) 82 { 83 CloseHandle(_id); 84 } 85 } 86 87 @disable this(this); 88 89 /// Starts the thread. Threads are created suspended. This function can 90 /// only be called once. 91 void start() 92 { 93 assert(!_started); 94 version(Posix) 95 { 96 pthread_attr_t attr; 97 98 int err = assumeNothrowNoGC( 99 (pthread_attr_t* pattr) 100 { 101 return pthread_attr_init(pattr); 102 })(&attr); 103 104 if (err != 0) 105 assert(false); 106 107 if(_stackSize != 0) 108 { 109 int err2 = assumeNothrowNoGC( 110 (pthread_attr_t* pattr, size_t stackSize) 111 { 112 return pthread_attr_setstacksize(pattr, stackSize); 113 })(&attr, _stackSize); 114 if (err2 != 0) 115 assert(false); 116 } 117 118 int err3 = pthread_create(&_id, &attr, &posixThreadEntryPoint, &_callback); 119 if (err3 != 0) 120 assert(false); 121 122 int err4 = assumeNothrowNoGC( 123 (pthread_attr_t* pattr) 124 { 125 return pthread_attr_destroy(pattr); 126 })(&attr); 127 if (err4 != 0) 128 assert(false); 129 } 130 131 version(Windows) 132 { 133 134 uint dummy; 135 _id = cast(HANDLE) _beginthreadex(null, 136 cast(uint)_stackSize, 137 &windowsThreadEntryPoint, 138 &_callback, 139 CREATE_SUSPENDED, 140 &dummy); 141 if (cast(size_t)_id == 0) 142 assert(false); 143 if (ResumeThread(_id) == -1) 144 assert(false); 145 } 146 } 147 148 /// Wait for that thread termination 149 void join() 150 { 151 version(Posix) 152 { 153 void* returnValue; 154 if (0 != pthread_join(_id, &returnValue)) 155 assert(false); 156 } 157 else version(Windows) 158 { 159 if(WaitForSingleObject(_id, INFINITE) != WAIT_OBJECT_0) 160 assert(false); 161 CloseHandle(_id); 162 } 163 } 164 165 private: 166 version(Posix) pthread_t _id; 167 version(Windows) HANDLE _id; 168 ThreadDelegate _callback; 169 size_t _stackSize; 170 bool _started = false; 171 } 172 173 version(Posix) 174 { 175 extern(C) void* posixThreadEntryPoint(void* threadContext) nothrow @nogc 176 { 177 ThreadDelegate dg = *cast(ThreadDelegate*)(threadContext); 178 dg(); // hopfully called with the right context pointer 179 return null; 180 } 181 } 182 183 version(Windows) 184 { 185 extern (Windows) uint windowsThreadEntryPoint(void* threadContext) nothrow @nogc 186 { 187 ThreadDelegate dg = *cast(ThreadDelegate*)(threadContext); 188 dg(); 189 return 0; 190 } 191 } 192 193 unittest 194 { 195 int outerInt = 0; 196 197 class A 198 { 199 nothrow @nogc: 200 this() 201 { 202 t = makeThread(&f); 203 t.start(); 204 } 205 206 void join() 207 { 208 t.join(); 209 } 210 211 void f() 212 { 213 outerInt = 1; 214 innerInt = 2; 215 216 // verify this 217 assert(checkValue0 == 0x11223344); 218 assert(checkValue1 == 0x55667788); 219 } 220 221 int checkValue0 = 0x11223344; 222 int checkValue1 = 0x55667788; 223 int innerInt = 0; 224 Thread t; 225 } 226 227 auto a = new A; 228 a.t.join(); 229 assert(a.innerInt == 2); 230 a.destroy(); 231 assert(outerInt == 1); 232 } 233 234 /// Launch a function in a newly created thread, which is destroyed afterwards. 235 /// Return the thread so that you can call `.join()` on it. 236 Thread launchInAThread(ThreadDelegate dg) nothrow @nogc 237 { 238 Thread t = makeThread(dg); 239 t.start(); 240 return t; 241 } 242 243 244 version(Windows) 245 { 246 /// Returns: current thread identifier. 247 void* currentThreadId() nothrow @nogc 248 { 249 return cast(void*)GetCurrentThreadId(); 250 } 251 } 252 else version(Posix) 253 { 254 /// Returns: current thread identifier. 255 void* currentThreadId() nothrow @nogc 256 { 257 return assumeNothrowNoGC( 258 () 259 { 260 return cast(void*)(pthread_self()); 261 })(); 262 } 263 } 264 265 266 // 267 // Thread-pool 268 // 269 270 /// Returns: Number of CPUs. 271 int getTotalNumberOfCPUs() nothrow @nogc 272 { 273 version(Windows) 274 { 275 import core.sys.windows.windows : SYSTEM_INFO, GetSystemInfo; 276 SYSTEM_INFO si; 277 GetSystemInfo(&si); 278 int procs = cast(int) si.dwNumberOfProcessors; 279 if (procs < 1) 280 procs = 1; 281 return procs; 282 } 283 else version(linux) 284 { 285 import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; 286 return cast(int) sysconf(_SC_NPROCESSORS_ONLN); 287 } 288 else version(OSX) 289 { 290 auto nameStr = "machdep.cpu.core_count\0".ptr; 291 uint ans; 292 size_t len = uint.sizeof; 293 sysctlbyname(nameStr, &ans, &len, null, 0); 294 return cast(int)ans; 295 } 296 else 297 static assert(false, "OS unsupported"); 298 } 299 300 alias ThreadPoolDelegate = void delegate(int workItem) nothrow @nogc; 301 302 303 /// Rewrite of the ThreadPool using condition variables. 304 /// FUTURE: this could be speed-up by using futures. Description of the task 305 /// and associated condition+mutex would go in an external struct. 306 /// Note: the interface of the thread-pool itself is not thread-safe, you cannot give orders from 307 /// multiple threads at once. 308 class ThreadPool 309 { 310 public: 311 nothrow: 312 @nogc: 313 314 /// Creates a thread-pool. 315 this(int numThreads = 0, size_t stackSize = 0) 316 { 317 // Create sync first 318 _workMutex = makeMutex(); 319 _workCondition = makeConditionVariable(); 320 321 _finishMutex = makeMutex(); 322 _finishCondition = makeConditionVariable(); 323 324 // Create threads 325 if (numThreads == 0) 326 numThreads = getTotalNumberOfCPUs(); 327 _threads = mallocSlice!Thread(numThreads); 328 foreach(ref thread; _threads) 329 { 330 thread = makeThread(&workerThreadFunc, stackSize); 331 thread.start(); 332 } 333 } 334 335 /// Destroys a thread-pool. 336 ~this() 337 { 338 if (_threads !is null) 339 { 340 assert(_state == State.initial); 341 342 // Put the threadpool is stop state 343 _workMutex.lock(); 344 _stopFlag = true; 345 _workMutex.unlock(); 346 347 // Notify all workers 348 _workCondition.notifyAll(); 349 350 // Wait for each thread termination 351 foreach(ref thread; _threads) 352 thread.join(); 353 354 // Detroys each thread 355 foreach(ref thread; _threads) 356 thread.destroy(); 357 freeSlice(_threads); 358 _threads = null; 359 } 360 } 361 362 /// Calls the delegate in parallel, with 0..count as index. 363 /// Immediate waiting for completion. 364 void parallelFor(int count, scope ThreadPoolDelegate dg) 365 { 366 assert(_state == State.initial); 367 368 // Do not launch worker threads for one work-item, not worth it. 369 // (but it is worth it in async). 370 if (count == 1) 371 { 372 dg(0); 373 return; 374 } 375 376 // Unleash parallel threads. 377 parallelForAsync(count, dg); 378 379 // Wait for completion immediately. 380 waitForCompletion(); 381 } 382 383 /// Same, but does not wait for completion. 384 /// You cannot have 2 concurrent parallelFor for the same thread-pool. 385 void parallelForAsync(int count, scope ThreadPoolDelegate dg) 386 { 387 assert(_state == State.initial); 388 389 if (count == 0) // no tasks, exit immediately 390 return; 391 392 // At this point we assume all worker threads are waiting for messages 393 394 // Sets the current task 395 _workMutex.lock(); 396 397 _taskDelegate = dg; // immutable during this parallelFor 398 _taskNumWorkItem = count; // immutable during this parallelFor 399 _taskCurrentWorkItem = 0; 400 _taskCompleted = 0; 401 402 _workMutex.unlock(); 403 404 // wake up all threads 405 // FUTURE: if number of tasks < number of threads only wake up the necessary amount of threads 406 _workCondition.notifyAll(); 407 408 _state = State.parallelForInProgress; 409 } 410 411 /// Wait for completion of the previous parallelFor, if any. 412 // It's always safe to call this function before doing another parallelFor. 413 void waitForCompletion() 414 { 415 if (_state == State.initial) 416 return; // that means that parallel threads were not launched 417 418 assert(_state == State.parallelForInProgress); 419 420 _finishMutex.lock(); 421 scope(exit) _finishMutex.unlock(); 422 423 // FUTURE: order thread will be waken up multiple times 424 // (one for every completed task) 425 // maybe that can be optimized 426 while (_taskCompleted < _taskNumWorkItem) 427 { 428 _finishCondition.wait(&_finishMutex); 429 } 430 431 _state = State.initial; 432 } 433 434 private: 435 Thread[] _threads = null; 436 437 // Used to signal more work 438 UncheckedMutex _workMutex; 439 ConditionVariable _workCondition; 440 441 // Used to signal completion 442 UncheckedMutex _finishMutex; 443 ConditionVariable _finishCondition; 444 445 // These fields represent the current task group (ie. a parallelFor) 446 ThreadPoolDelegate _taskDelegate; 447 int _taskNumWorkItem; // total number of tasks in this task group 448 int _taskCurrentWorkItem; // current task still left to do (protected by _workMutex) 449 int _taskCompleted; // every task < taskCompleted has already been completed (protected by _finishMutex) 450 451 bool _stopFlag; 452 453 bool hasWork() 454 { 455 return _taskCurrentWorkItem < _taskNumWorkItem; 456 } 457 458 // Represent the thread-pool state from the user POV 459 enum State 460 { 461 initial, // tasks can be launched 462 parallelForInProgress, // task were launched, but not waited one 463 } 464 State _state = State.initial; 465 466 // What worker threads do 467 // MAYDO: threads come here with bad context with struct delegates 468 void workerThreadFunc() 469 { 470 while (true) 471 { 472 int workItem = -1; 473 { 474 _workMutex.lock(); 475 scope(exit) _workMutex.unlock(); 476 477 // Wait for notification 478 while (!_stopFlag && !hasWork()) 479 _workCondition.wait(&_workMutex); 480 481 if (_stopFlag && !hasWork()) 482 return; 483 484 assert(hasWork()); 485 486 // Pick a task and increment counter 487 workItem = _taskCurrentWorkItem; 488 _taskCurrentWorkItem++; 489 } 490 491 assert(workItem != -1); 492 493 // Do the actual task 494 _taskDelegate(workItem); 495 496 // signal completion of one more task 497 { 498 _finishMutex.lock(); 499 _taskCompleted++; 500 _finishMutex.unlock(); 501 502 _finishCondition.notifyOne(); // wake-up 503 } 504 } 505 } 506 } 507 508 509 unittest 510 { 511 import core.atomic; 512 import dplug.core.nogc; 513 514 struct A 515 { 516 ThreadPool _pool; 517 518 this(int dummy) 519 { 520 _pool = mallocNew!ThreadPool(); 521 } 522 523 ~this() 524 { 525 _pool.destroy(); 526 } 527 528 void launch(int count, bool async) nothrow @nogc 529 { 530 if (async) 531 { 532 _pool.parallelForAsync(count, &loopBody); 533 _pool.waitForCompletion(); 534 } 535 else 536 _pool.parallelFor(count, &loopBody); 537 } 538 539 void loopBody(int workItem) nothrow @nogc 540 { 541 atomicOp!"+="(counter, 1); 542 } 543 544 shared(int) counter = 0; 545 } 546 547 auto a = A(4); 548 a.launch(10, false); 549 assert(a.counter == 10); 550 551 a.launch(500, true); 552 assert(a.counter == 510); 553 554 a.launch(1, false); 555 assert(a.counter == 511); 556 557 a.launch(1, true); 558 assert(a.counter == 512); 559 560 a.launch(0, true); 561 assert(a.counter == 512); 562 a.launch(0, false); 563 assert(a.counter == 512); 564 }