Line data Source code
1 : /*
2 : * Unix SMB/CIFS implementation.
3 : * thread pool implementation
4 : * Copyright (C) Volker Lendecke 2009
5 : *
6 : * This program is free software; you can redistribute it and/or modify
7 : * it under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation; either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * This program is distributed in the hope that it will be useful,
12 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : * GNU General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public License
17 : * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : */
19 :
20 : #include "replace.h"
21 : #include "system/time.h"
22 : #include "system/wait.h"
23 : #include "system/threads.h"
24 : #include "system/filesys.h"
25 : #include "pthreadpool.h"
26 : #include "lib/util/dlinklist.h"
27 :
28 : #ifdef NDEBUG
29 : #undef NDEBUG
30 : #endif
31 :
32 : #include <assert.h>
33 :
34 : struct pthreadpool_job {
35 : int id;
36 : void (*fn)(void *private_data);
37 : void *private_data;
38 : };
39 :
40 : struct pthreadpool {
41 : /*
42 : * List pthreadpools for fork safety
43 : */
44 : struct pthreadpool *prev, *next;
45 :
46 : /*
47 : * Control access to this struct
48 : */
49 : pthread_mutex_t mutex;
50 :
51 : /*
52 : * Threads waiting for work do so here
53 : */
54 : pthread_cond_t condvar;
55 :
56 : /*
57 : * Array of jobs
58 : */
59 : size_t jobs_array_len;
60 : struct pthreadpool_job *jobs;
61 :
62 : size_t head;
63 : size_t num_jobs;
64 :
65 : /*
66 : * Indicate job completion
67 : */
68 : int (*signal_fn)(int jobid,
69 : void (*job_fn)(void *private_data),
70 : void *job_fn_private_data,
71 : void *private_data);
72 : void *signal_fn_private_data;
73 :
74 : /*
75 : * indicator to worker threads to stop processing further jobs
76 : * and exit.
77 : */
78 : bool stopped;
79 :
80 : /*
81 : * indicator to the last worker thread to free the pool
82 : * resources.
83 : */
84 : bool destroyed;
85 :
86 : /*
87 : * maximum number of threads
88 : * 0 means no real thread, only strict sync processing.
89 : */
90 : unsigned max_threads;
91 :
92 : /*
93 : * Number of threads
94 : */
95 : unsigned num_threads;
96 :
97 : /*
98 : * Number of idle threads
99 : */
100 : unsigned num_idle;
101 :
102 : /*
103 : * Condition variable indicating that helper threads should
104 : * quickly go away making way for fork() without anybody
105 : * waiting on pool->condvar.
106 : */
107 : pthread_cond_t *prefork_cond;
108 :
109 : /*
110 : * Waiting position for helper threads while fork is
111 : * running. The forking thread will have locked it, and all
112 : * idle helper threads will sit here until after the fork,
113 : * where the forking thread will unlock it again.
114 : */
115 : pthread_mutex_t fork_mutex;
116 : };
117 :
118 : static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
119 : static struct pthreadpool *pthreadpools = NULL;
120 : static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
121 :
122 : static void pthreadpool_prep_atfork(void);
123 :
124 : /*
125 : * Initialize a thread pool
126 : */
127 :
128 91470 : int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
129 : int (*signal_fn)(int jobid,
130 : void (*job_fn)(void *private_data),
131 : void *job_fn_private_data,
132 : void *private_data),
133 : void *signal_fn_private_data)
134 : {
135 1986 : struct pthreadpool *pool;
136 1986 : int ret;
137 :
138 91470 : pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
139 91470 : if (pool == NULL) {
140 0 : return ENOMEM;
141 : }
142 91470 : pool->signal_fn = signal_fn;
143 91470 : pool->signal_fn_private_data = signal_fn_private_data;
144 :
145 91470 : pool->jobs_array_len = 4;
146 91470 : pool->jobs = calloc(
147 : pool->jobs_array_len, sizeof(struct pthreadpool_job));
148 :
149 91470 : if (pool->jobs == NULL) {
150 0 : free(pool);
151 0 : return ENOMEM;
152 : }
153 :
154 91470 : pool->head = pool->num_jobs = 0;
155 :
156 91470 : ret = pthread_mutex_init(&pool->mutex, NULL);
157 91470 : if (ret != 0) {
158 0 : free(pool->jobs);
159 0 : free(pool);
160 0 : return ret;
161 : }
162 :
163 91470 : ret = pthread_cond_init(&pool->condvar, NULL);
164 91470 : if (ret != 0) {
165 0 : pthread_mutex_destroy(&pool->mutex);
166 0 : free(pool->jobs);
167 0 : free(pool);
168 0 : return ret;
169 : }
170 :
171 91470 : ret = pthread_mutex_init(&pool->fork_mutex, NULL);
172 91470 : if (ret != 0) {
173 0 : pthread_cond_destroy(&pool->condvar);
174 0 : pthread_mutex_destroy(&pool->mutex);
175 0 : free(pool->jobs);
176 0 : free(pool);
177 0 : return ret;
178 : }
179 :
180 91470 : pool->stopped = false;
181 91470 : pool->destroyed = false;
182 91470 : pool->num_threads = 0;
183 91470 : pool->max_threads = max_threads;
184 91470 : pool->num_idle = 0;
185 91470 : pool->prefork_cond = NULL;
186 :
187 91470 : ret = pthread_mutex_lock(&pthreadpools_mutex);
188 91470 : if (ret != 0) {
189 0 : pthread_mutex_destroy(&pool->fork_mutex);
190 0 : pthread_cond_destroy(&pool->condvar);
191 0 : pthread_mutex_destroy(&pool->mutex);
192 0 : free(pool->jobs);
193 0 : free(pool);
194 0 : return ret;
195 : }
196 91470 : DLIST_ADD(pthreadpools, pool);
197 :
198 91470 : ret = pthread_mutex_unlock(&pthreadpools_mutex);
199 91470 : assert(ret == 0);
200 :
201 91470 : pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
202 :
203 91470 : *presult = pool;
204 :
205 91470 : return 0;
206 : }
207 :
208 10114 : size_t pthreadpool_max_threads(struct pthreadpool *pool)
209 : {
210 10114 : if (pool->stopped) {
211 0 : return 0;
212 : }
213 :
214 10114 : return pool->max_threads;
215 : }
216 :
217 20143 : size_t pthreadpool_queued_jobs(struct pthreadpool *pool)
218 : {
219 0 : int res;
220 0 : int unlock_res;
221 0 : size_t ret;
222 :
223 20143 : if (pool->stopped) {
224 0 : return 0;
225 : }
226 :
227 20143 : res = pthread_mutex_lock(&pool->mutex);
228 20143 : if (res != 0) {
229 0 : return res;
230 : }
231 :
232 20143 : if (pool->stopped) {
233 0 : unlock_res = pthread_mutex_unlock(&pool->mutex);
234 0 : assert(unlock_res == 0);
235 0 : return 0;
236 : }
237 :
238 20143 : ret = pool->num_jobs;
239 :
240 20143 : unlock_res = pthread_mutex_unlock(&pool->mutex);
241 20143 : assert(unlock_res == 0);
242 20143 : return ret;
243 : }
244 :
245 64610 : static void pthreadpool_prepare_pool(struct pthreadpool *pool)
246 : {
247 607 : int ret;
248 :
249 64610 : ret = pthread_mutex_lock(&pool->fork_mutex);
250 64610 : assert(ret == 0);
251 :
252 64610 : ret = pthread_mutex_lock(&pool->mutex);
253 64610 : assert(ret == 0);
254 :
255 64667 : while (pool->num_idle != 0) {
256 57 : unsigned num_idle = pool->num_idle;
257 4 : pthread_cond_t prefork_cond;
258 :
259 57 : ret = pthread_cond_init(&prefork_cond, NULL);
260 57 : assert(ret == 0);
261 :
262 : /*
263 : * Push all idle threads off pool->condvar. In the
264 : * child we can destroy the pool, which would result
265 : * in undefined behaviour in the
266 : * pthread_cond_destroy(pool->condvar). glibc just
267 : * blocks here.
268 : */
269 57 : pool->prefork_cond = &prefork_cond;
270 :
271 57 : ret = pthread_cond_signal(&pool->condvar);
272 57 : assert(ret == 0);
273 :
274 114 : while (pool->num_idle == num_idle) {
275 57 : ret = pthread_cond_wait(&prefork_cond, &pool->mutex);
276 57 : assert(ret == 0);
277 : }
278 :
279 57 : pool->prefork_cond = NULL;
280 :
281 57 : ret = pthread_cond_destroy(&prefork_cond);
282 57 : assert(ret == 0);
283 : }
284 :
285 : /*
286 : * Probably it's well-defined somewhere: What happens to
287 : * condvars after a fork? The rationale of pthread_atfork only
288 : * writes about mutexes. So better be safe than sorry and
289 : * destroy/reinit pool->condvar across a fork.
290 : */
291 :
292 64610 : ret = pthread_cond_destroy(&pool->condvar);
293 64610 : assert(ret == 0);
294 64610 : }
295 :
296 66727 : static void pthreadpool_prepare(void)
297 : {
298 607 : int ret;
299 607 : struct pthreadpool *pool;
300 :
301 66727 : ret = pthread_mutex_lock(&pthreadpools_mutex);
302 66727 : assert(ret == 0);
303 :
304 66727 : pool = pthreadpools;
305 :
306 131337 : while (pool != NULL) {
307 64610 : pthreadpool_prepare_pool(pool);
308 64610 : pool = pool->next;
309 : }
310 66727 : }
311 :
312 66727 : static void pthreadpool_parent(void)
313 : {
314 607 : int ret;
315 607 : struct pthreadpool *pool;
316 :
317 66727 : for (pool = DLIST_TAIL(pthreadpools);
318 130731 : pool != NULL;
319 64610 : pool = DLIST_PREV(pool)) {
320 64610 : ret = pthread_cond_init(&pool->condvar, NULL);
321 64610 : assert(ret == 0);
322 64610 : ret = pthread_mutex_unlock(&pool->mutex);
323 64610 : assert(ret == 0);
324 64610 : ret = pthread_mutex_unlock(&pool->fork_mutex);
325 64610 : assert(ret == 0);
326 : }
327 :
328 66727 : ret = pthread_mutex_unlock(&pthreadpools_mutex);
329 66727 : assert(ret == 0);
330 66727 : }
331 :
332 0 : static void pthreadpool_child(void)
333 : {
334 0 : int ret;
335 0 : struct pthreadpool *pool;
336 :
337 0 : for (pool = DLIST_TAIL(pthreadpools);
338 0 : pool != NULL;
339 0 : pool = DLIST_PREV(pool)) {
340 :
341 0 : pool->num_threads = 0;
342 0 : pool->num_idle = 0;
343 0 : pool->head = 0;
344 0 : pool->num_jobs = 0;
345 0 : pool->stopped = true;
346 :
347 0 : ret = pthread_cond_init(&pool->condvar, NULL);
348 0 : assert(ret == 0);
349 :
350 0 : ret = pthread_mutex_unlock(&pool->mutex);
351 0 : assert(ret == 0);
352 :
353 0 : ret = pthread_mutex_unlock(&pool->fork_mutex);
354 0 : assert(ret == 0);
355 : }
356 :
357 0 : ret = pthread_mutex_unlock(&pthreadpools_mutex);
358 0 : assert(ret == 0);
359 0 : }
360 :
361 6748 : static void pthreadpool_prep_atfork(void)
362 : {
363 6748 : pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
364 : pthreadpool_child);
365 6748 : }
366 :
367 131559 : static int pthreadpool_free(struct pthreadpool *pool)
368 : {
369 2031 : int ret, ret1, ret2;
370 :
371 131559 : ret = pthread_mutex_lock(&pthreadpools_mutex);
372 131559 : if (ret != 0) {
373 0 : return ret;
374 : }
375 131559 : DLIST_REMOVE(pthreadpools, pool);
376 131559 : ret = pthread_mutex_unlock(&pthreadpools_mutex);
377 131559 : assert(ret == 0);
378 :
379 131559 : ret = pthread_mutex_lock(&pool->mutex);
380 131559 : assert(ret == 0);
381 131559 : ret = pthread_mutex_unlock(&pool->mutex);
382 131559 : assert(ret == 0);
383 :
384 131559 : ret = pthread_mutex_destroy(&pool->mutex);
385 131559 : ret1 = pthread_cond_destroy(&pool->condvar);
386 131559 : ret2 = pthread_mutex_destroy(&pool->fork_mutex);
387 :
388 131559 : if (ret != 0) {
389 0 : return ret;
390 : }
391 131559 : if (ret1 != 0) {
392 0 : return ret1;
393 : }
394 131559 : if (ret2 != 0) {
395 0 : return ret2;
396 : }
397 :
398 131559 : free(pool->jobs);
399 131559 : free(pool);
400 :
401 131559 : return 0;
402 : }
403 :
404 : /*
405 : * Stop a thread pool. Wake up all idle threads for exit.
406 : */
407 :
408 83186 : static int pthreadpool_stop_locked(struct pthreadpool *pool)
409 : {
410 1118 : int ret;
411 :
412 83186 : pool->stopped = true;
413 :
414 83186 : if (pool->num_threads == 0) {
415 79311 : return 0;
416 : }
417 :
418 : /*
419 : * We have active threads, tell them to finish.
420 : */
421 :
422 2792 : ret = pthread_cond_broadcast(&pool->condvar);
423 :
424 2792 : return ret;
425 : }
426 :
427 : /*
428 : * Stop a thread pool. Wake up all idle threads for exit.
429 : */
430 :
431 131556 : int pthreadpool_stop(struct pthreadpool *pool)
432 : {
433 2028 : int ret, ret1;
434 :
435 131556 : ret = pthread_mutex_lock(&pool->mutex);
436 131556 : if (ret != 0) {
437 0 : return ret;
438 : }
439 :
440 131556 : if (!pool->stopped) {
441 83182 : ret = pthreadpool_stop_locked(pool);
442 : }
443 :
444 131556 : ret1 = pthread_mutex_unlock(&pool->mutex);
445 131556 : assert(ret1 == 0);
446 :
447 129528 : return ret;
448 : }
449 :
450 : /*
451 : * Destroy a thread pool. Wake up all idle threads for exit. The last
452 : * one will free the pool.
453 : */
454 :
455 131561 : int pthreadpool_destroy(struct pthreadpool *pool)
456 : {
457 2033 : int ret, ret1;
458 2033 : bool free_it;
459 :
460 131561 : assert(!pool->destroyed);
461 :
462 131561 : ret = pthread_mutex_lock(&pool->mutex);
463 131561 : if (ret != 0) {
464 0 : return ret;
465 : }
466 :
467 131561 : pool->destroyed = true;
468 :
469 131561 : if (!pool->stopped) {
470 4 : ret = pthreadpool_stop_locked(pool);
471 : }
472 :
473 131561 : free_it = (pool->num_threads == 0);
474 :
475 131561 : ret1 = pthread_mutex_unlock(&pool->mutex);
476 131561 : assert(ret1 == 0);
477 :
478 131561 : if (free_it) {
479 128875 : pthreadpool_free(pool);
480 : }
481 :
482 129528 : return ret;
483 : }
484 : /*
485 : * Prepare for pthread_exit(), pool->mutex must be locked and will be
486 : * unlocked here. This is a bit of a layering violation, but here we
487 : * also take care of removing the pool if we're the last thread.
488 : */
489 3896 : static void pthreadpool_server_exit(struct pthreadpool *pool)
490 : {
491 239 : int ret;
492 239 : bool free_it;
493 :
494 3896 : pool->num_threads -= 1;
495 :
496 3896 : free_it = (pool->destroyed && (pool->num_threads == 0));
497 :
498 3896 : ret = pthread_mutex_unlock(&pool->mutex);
499 3896 : assert(ret == 0);
500 :
501 3896 : if (free_it) {
502 2684 : pthreadpool_free(pool);
503 : }
504 3896 : }
505 :
506 249129 : static bool pthreadpool_get_job(struct pthreadpool *p,
507 : struct pthreadpool_job *job)
508 : {
509 249129 : if (p->stopped) {
510 3101 : return false;
511 : }
512 :
513 245790 : if (p->num_jobs == 0) {
514 0 : return false;
515 : }
516 245790 : *job = p->jobs[p->head];
517 245790 : p->head = (p->head+1) % p->jobs_array_len;
518 245790 : p->num_jobs -= 1;
519 245790 : return true;
520 : }
521 :
522 245792 : static bool pthreadpool_put_job(struct pthreadpool *p,
523 : int id,
524 : void (*fn)(void *private_data),
525 : void *private_data)
526 : {
527 78366 : struct pthreadpool_job *job;
528 :
529 245792 : if (p->num_jobs == p->jobs_array_len) {
530 12 : struct pthreadpool_job *tmp;
531 45 : size_t new_len = p->jobs_array_len * 2;
532 :
533 45 : tmp = realloc(
534 45 : p->jobs, sizeof(struct pthreadpool_job) * new_len);
535 45 : if (tmp == NULL) {
536 0 : return false;
537 : }
538 45 : p->jobs = tmp;
539 :
540 : /*
541 : * We just doubled the jobs array. The array implements a FIFO
542 : * queue with a modulo-based wraparound, so we have to memcpy
543 : * the jobs that are logically at the queue end but physically
544 : * before the queue head into the reallocated area. The new
545 : * space starts at the current jobs_array_len, and we have to
546 : * copy everything before the current head job into the new
547 : * area.
548 : */
549 45 : memcpy(&p->jobs[p->jobs_array_len], p->jobs,
550 45 : sizeof(struct pthreadpool_job) * p->head);
551 :
552 45 : p->jobs_array_len = new_len;
553 : }
554 :
555 245792 : job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
556 245792 : job->id = id;
557 245792 : job->fn = fn;
558 245792 : job->private_data = private_data;
559 :
560 245792 : p->num_jobs += 1;
561 :
562 245792 : return true;
563 : }
564 :
565 2 : static void pthreadpool_undo_put_job(struct pthreadpool *p)
566 : {
567 2 : p->num_jobs -= 1;
568 0 : }
569 :
570 3942 : static void *pthreadpool_server(void *arg)
571 : {
572 3942 : struct pthreadpool *pool = (struct pthreadpool *)arg;
573 243 : int res;
574 :
575 3942 : res = pthread_mutex_lock(&pool->mutex);
576 3942 : if (res != 0) {
577 0 : return NULL;
578 : }
579 :
580 245767 : while (1) {
581 78604 : struct timespec ts;
582 78604 : struct pthreadpool_job job;
583 :
584 : /*
585 : * idle-wait at most 1 second. If nothing happens in that
586 : * time, exit this thread.
587 : */
588 :
589 249709 : clock_gettime(CLOCK_REALTIME, &ts);
590 249709 : ts.tv_sec += 1;
591 :
592 484344 : while ((pool->num_jobs == 0) && !pool->stopped) {
593 :
594 235215 : pool->num_idle += 1;
595 235215 : res = pthread_cond_timedwait(
596 : &pool->condvar, &pool->mutex, &ts);
597 235174 : pool->num_idle -= 1;
598 :
599 235174 : if (pool->prefork_cond != NULL) {
600 : /*
601 : * Me must allow fork() to continue
602 : * without anybody waiting on
603 : * &pool->condvar. Tell
604 : * pthreadpool_prepare_pool that we
605 : * got that message.
606 : */
607 :
608 57 : res = pthread_cond_signal(pool->prefork_cond);
609 57 : assert(res == 0);
610 :
611 57 : res = pthread_mutex_unlock(&pool->mutex);
612 57 : assert(res == 0);
613 :
614 : /*
615 : * pthreadpool_prepare_pool has
616 : * already locked this mutex across
617 : * the fork. This makes us wait
618 : * without sitting in a condvar.
619 : */
620 57 : res = pthread_mutex_lock(&pool->fork_mutex);
621 57 : assert(res == 0);
622 57 : res = pthread_mutex_unlock(&pool->fork_mutex);
623 57 : assert(res == 0);
624 :
625 57 : res = pthread_mutex_lock(&pool->mutex);
626 57 : assert(res == 0);
627 : }
628 :
629 235174 : if (res == ETIMEDOUT) {
630 :
631 539 : if (pool->num_jobs == 0) {
632 : /*
633 : * we timed out and still no work for
634 : * us. Exit.
635 : */
636 539 : pthreadpool_server_exit(pool);
637 3896 : return NULL;
638 : }
639 :
640 0 : break;
641 : }
642 234635 : assert(res == 0);
643 : }
644 :
645 249129 : if (pthreadpool_get_job(pool, &job)) {
646 78364 : int ret;
647 :
648 : /*
649 : * Do the work with the mutex unlocked
650 : */
651 :
652 245790 : res = pthread_mutex_unlock(&pool->mutex);
653 245790 : assert(res == 0);
654 :
655 245790 : job.fn(job.private_data);
656 :
657 245732 : ret = pool->signal_fn(job.id,
658 : job.fn, job.private_data,
659 : pool->signal_fn_private_data);
660 :
661 245658 : res = pthread_mutex_lock(&pool->mutex);
662 245785 : assert(res == 0);
663 :
664 245785 : if (ret != 0) {
665 0 : pthreadpool_server_exit(pool);
666 0 : return NULL;
667 : }
668 : }
669 :
670 249124 : if (pool->stopped) {
671 : /*
672 : * we're asked to stop processing jobs, so exit
673 : */
674 3357 : pthreadpool_server_exit(pool);
675 3357 : return NULL;
676 : }
677 : }
678 : }
679 :
680 3944 : static int pthreadpool_create_thread(struct pthreadpool *pool)
681 : {
682 245 : pthread_attr_t thread_attr;
683 245 : pthread_t thread_id;
684 245 : int res;
685 245 : sigset_t mask, omask;
686 :
687 : /*
688 : * Create a new worker thread. It should not receive any signals.
689 : */
690 :
691 3944 : sigfillset(&mask);
692 :
693 3944 : res = pthread_attr_init(&thread_attr);
694 3944 : if (res != 0) {
695 0 : return res;
696 : }
697 :
698 3944 : res = pthread_attr_setdetachstate(
699 : &thread_attr, PTHREAD_CREATE_DETACHED);
700 3944 : if (res != 0) {
701 0 : pthread_attr_destroy(&thread_attr);
702 0 : return res;
703 : }
704 :
705 3944 : res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
706 3944 : if (res != 0) {
707 0 : pthread_attr_destroy(&thread_attr);
708 0 : return res;
709 : }
710 :
711 3944 : res = pthread_create(&thread_id, &thread_attr, pthreadpool_server,
712 : (void *)pool);
713 :
714 3944 : assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
715 :
716 3944 : pthread_attr_destroy(&thread_attr);
717 :
718 3944 : if (res == 0) {
719 3942 : pool->num_threads += 1;
720 : }
721 :
722 3699 : return res;
723 : }
724 :
725 245793 : int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
726 : void (*fn)(void *private_data), void *private_data)
727 : {
728 78367 : int res;
729 78367 : int unlock_res;
730 :
731 245793 : assert(!pool->destroyed);
732 :
733 245793 : res = pthread_mutex_lock(&pool->mutex);
734 245793 : if (res != 0) {
735 0 : return res;
736 : }
737 :
738 245793 : if (pool->stopped) {
739 : /*
740 : * Protect against the pool being shut down while
741 : * trying to add a job
742 : */
743 0 : unlock_res = pthread_mutex_unlock(&pool->mutex);
744 0 : assert(unlock_res == 0);
745 0 : return EINVAL;
746 : }
747 :
748 245793 : if (pool->max_threads == 0) {
749 1 : unlock_res = pthread_mutex_unlock(&pool->mutex);
750 1 : assert(unlock_res == 0);
751 :
752 : /*
753 : * If no thread are allowed we do strict sync processing.
754 : */
755 1 : fn(private_data);
756 1 : res = pool->signal_fn(job_id, fn, private_data,
757 : pool->signal_fn_private_data);
758 1 : return res;
759 : }
760 :
761 : /*
762 : * Add job to the end of the queue
763 : */
764 245792 : if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
765 0 : unlock_res = pthread_mutex_unlock(&pool->mutex);
766 0 : assert(unlock_res == 0);
767 0 : return ENOMEM;
768 : }
769 :
770 245792 : if (pool->num_idle > 0) {
771 : /*
772 : * We have idle threads, wake one.
773 : */
774 231858 : res = pthread_cond_signal(&pool->condvar);
775 231858 : if (res != 0) {
776 0 : pthreadpool_undo_put_job(pool);
777 : }
778 231858 : unlock_res = pthread_mutex_unlock(&pool->mutex);
779 231858 : assert(unlock_res == 0);
780 163727 : return res;
781 : }
782 :
783 13934 : if (pool->num_threads >= pool->max_threads) {
784 : /*
785 : * No more new threads, we just queue the request
786 : */
787 9990 : unlock_res = pthread_mutex_unlock(&pool->mutex);
788 9990 : assert(unlock_res == 0);
789 0 : return 0;
790 : }
791 :
792 3944 : res = pthreadpool_create_thread(pool);
793 3944 : if (res == 0) {
794 3942 : unlock_res = pthread_mutex_unlock(&pool->mutex);
795 3942 : assert(unlock_res == 0);
796 3699 : return 0;
797 : }
798 :
799 2 : if (pool->num_threads != 0) {
800 : /*
801 : * At least one thread is still available, let
802 : * that one run the queued job.
803 : */
804 0 : unlock_res = pthread_mutex_unlock(&pool->mutex);
805 0 : assert(unlock_res == 0);
806 0 : return 0;
807 : }
808 :
809 2 : pthreadpool_undo_put_job(pool);
810 :
811 2 : unlock_res = pthread_mutex_unlock(&pool->mutex);
812 2 : assert(unlock_res == 0);
813 :
814 0 : return res;
815 : }
816 :
817 0 : size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
818 : void (*fn)(void *private_data), void *private_data)
819 : {
820 0 : int res;
821 0 : size_t i, j;
822 0 : size_t num = 0;
823 :
824 0 : assert(!pool->destroyed);
825 :
826 0 : res = pthread_mutex_lock(&pool->mutex);
827 0 : if (res != 0) {
828 0 : return res;
829 : }
830 :
831 0 : for (i = 0, j = 0; i < pool->num_jobs; i++) {
832 0 : size_t idx = (pool->head + i) % pool->jobs_array_len;
833 0 : size_t new_idx = (pool->head + j) % pool->jobs_array_len;
834 0 : struct pthreadpool_job *job = &pool->jobs[idx];
835 :
836 0 : if ((job->private_data == private_data) &&
837 0 : (job->id == job_id) &&
838 0 : (job->fn == fn))
839 : {
840 : /*
841 : * Just skip the entry.
842 : */
843 0 : num++;
844 0 : continue;
845 : }
846 :
847 : /*
848 : * If we already removed one or more jobs (so j will be smaller
849 : * then i), we need to fill possible gaps in the logical list.
850 : */
851 0 : if (j < i) {
852 0 : pool->jobs[new_idx] = *job;
853 : }
854 0 : j++;
855 : }
856 :
857 0 : pool->num_jobs -= num;
858 :
859 0 : res = pthread_mutex_unlock(&pool->mutex);
860 0 : assert(res == 0);
861 :
862 0 : return num;
863 : }
|