LCOV - code coverage report
Current view: top level - lib/pthreadpool - pthreadpool.c (source / functions) Hit Total Coverage
Test: coverage report for master 2f515e9b Lines: 275 380 72.4 %
Date: 2024-04-21 15:09:00 Functions: 17 20 85.0 %

          Line data    Source code
       1             : /*
       2             :  * Unix SMB/CIFS implementation.
       3             :  * thread pool implementation
       4             :  * Copyright (C) Volker Lendecke 2009
       5             :  *
       6             :  * This program is free software; you can redistribute it and/or modify
       7             :  * it under the terms of the GNU General Public License as published by
       8             :  * the Free Software Foundation; either version 3 of the License, or
       9             :  * (at your option) any later version.
      10             :  *
      11             :  * This program is distributed in the hope that it will be useful,
      12             :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :  * GNU General Public License for more details.
      15             :  *
      16             :  * You should have received a copy of the GNU General Public License
      17             :  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
      18             :  */
      19             : 
      20             : #include "replace.h"
      21             : #include "system/time.h"
      22             : #include "system/wait.h"
      23             : #include "system/threads.h"
      24             : #include "system/filesys.h"
      25             : #include "pthreadpool.h"
      26             : #include "lib/util/dlinklist.h"
      27             : 
      28             : #ifdef NDEBUG
      29             : #undef NDEBUG
      30             : #endif
      31             : 
      32             : #include <assert.h>
      33             : 
      34             : struct pthreadpool_job {
      35             :         int id;
      36             :         void (*fn)(void *private_data);
      37             :         void *private_data;
      38             : };
      39             : 
      40             : struct pthreadpool {
      41             :         /*
      42             :          * List pthreadpools for fork safety
      43             :          */
      44             :         struct pthreadpool *prev, *next;
      45             : 
      46             :         /*
      47             :          * Control access to this struct
      48             :          */
      49             :         pthread_mutex_t mutex;
      50             : 
      51             :         /*
      52             :          * Threads waiting for work do so here
      53             :          */
      54             :         pthread_cond_t condvar;
      55             : 
      56             :         /*
      57             :          * Array of jobs
      58             :          */
      59             :         size_t jobs_array_len;
      60             :         struct pthreadpool_job *jobs;
      61             : 
      62             :         size_t head;
      63             :         size_t num_jobs;
      64             : 
      65             :         /*
      66             :          * Indicate job completion
      67             :          */
      68             :         int (*signal_fn)(int jobid,
      69             :                          void (*job_fn)(void *private_data),
      70             :                          void *job_fn_private_data,
      71             :                          void *private_data);
      72             :         void *signal_fn_private_data;
      73             : 
      74             :         /*
      75             :          * indicator to worker threads to stop processing further jobs
      76             :          * and exit.
      77             :          */
      78             :         bool stopped;
      79             : 
      80             :         /*
      81             :          * indicator to the last worker thread to free the pool
      82             :          * resources.
      83             :          */
      84             :         bool destroyed;
      85             : 
      86             :         /*
      87             :          * maximum number of threads
      88             :          * 0 means no real thread, only strict sync processing.
      89             :          */
      90             :         unsigned max_threads;
      91             : 
      92             :         /*
      93             :          * Number of threads
      94             :          */
      95             :         unsigned num_threads;
      96             : 
      97             :         /*
      98             :          * Number of idle threads
      99             :          */
     100             :         unsigned num_idle;
     101             : 
     102             :         /*
     103             :          * Condition variable indicating that helper threads should
     104             :          * quickly go away making way for fork() without anybody
     105             :          * waiting on pool->condvar.
     106             :          */
     107             :         pthread_cond_t *prefork_cond;
     108             : 
     109             :         /*
     110             :          * Waiting position for helper threads while fork is
     111             :          * running. The forking thread will have locked it, and all
     112             :          * idle helper threads will sit here until after the fork,
     113             :          * where the forking thread will unlock it again.
     114             :          */
     115             :         pthread_mutex_t fork_mutex;
     116             : };
     117             : 
     118             : static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
     119             : static struct pthreadpool *pthreadpools = NULL;
     120             : static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
     121             : 
     122             : static void pthreadpool_prep_atfork(void);
     123             : 
     124             : /*
     125             :  * Initialize a thread pool
     126             :  */
     127             : 
     128       91470 : int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
     129             :                      int (*signal_fn)(int jobid,
     130             :                                       void (*job_fn)(void *private_data),
     131             :                                       void *job_fn_private_data,
     132             :                                       void *private_data),
     133             :                      void *signal_fn_private_data)
     134             : {
     135        1986 :         struct pthreadpool *pool;
     136        1986 :         int ret;
     137             : 
     138       91470 :         pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
     139       91470 :         if (pool == NULL) {
     140           0 :                 return ENOMEM;
     141             :         }
     142       91470 :         pool->signal_fn = signal_fn;
     143       91470 :         pool->signal_fn_private_data = signal_fn_private_data;
     144             : 
     145       91470 :         pool->jobs_array_len = 4;
     146       91470 :         pool->jobs = calloc(
     147             :                 pool->jobs_array_len, sizeof(struct pthreadpool_job));
     148             : 
     149       91470 :         if (pool->jobs == NULL) {
     150           0 :                 free(pool);
     151           0 :                 return ENOMEM;
     152             :         }
     153             : 
     154       91470 :         pool->head = pool->num_jobs = 0;
     155             : 
     156       91470 :         ret = pthread_mutex_init(&pool->mutex, NULL);
     157       91470 :         if (ret != 0) {
     158           0 :                 free(pool->jobs);
     159           0 :                 free(pool);
     160           0 :                 return ret;
     161             :         }
     162             : 
     163       91470 :         ret = pthread_cond_init(&pool->condvar, NULL);
     164       91470 :         if (ret != 0) {
     165           0 :                 pthread_mutex_destroy(&pool->mutex);
     166           0 :                 free(pool->jobs);
     167           0 :                 free(pool);
     168           0 :                 return ret;
     169             :         }
     170             : 
     171       91470 :         ret = pthread_mutex_init(&pool->fork_mutex, NULL);
     172       91470 :         if (ret != 0) {
     173           0 :                 pthread_cond_destroy(&pool->condvar);
     174           0 :                 pthread_mutex_destroy(&pool->mutex);
     175           0 :                 free(pool->jobs);
     176           0 :                 free(pool);
     177           0 :                 return ret;
     178             :         }
     179             : 
     180       91470 :         pool->stopped = false;
     181       91470 :         pool->destroyed = false;
     182       91470 :         pool->num_threads = 0;
     183       91470 :         pool->max_threads = max_threads;
     184       91470 :         pool->num_idle = 0;
     185       91470 :         pool->prefork_cond = NULL;
     186             : 
     187       91470 :         ret = pthread_mutex_lock(&pthreadpools_mutex);
     188       91470 :         if (ret != 0) {
     189           0 :                 pthread_mutex_destroy(&pool->fork_mutex);
     190           0 :                 pthread_cond_destroy(&pool->condvar);
     191           0 :                 pthread_mutex_destroy(&pool->mutex);
     192           0 :                 free(pool->jobs);
     193           0 :                 free(pool);
     194           0 :                 return ret;
     195             :         }
     196       91470 :         DLIST_ADD(pthreadpools, pool);
     197             : 
     198       91470 :         ret = pthread_mutex_unlock(&pthreadpools_mutex);
     199       91470 :         assert(ret == 0);
     200             : 
     201       91470 :         pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
     202             : 
     203       91470 :         *presult = pool;
     204             : 
     205       91470 :         return 0;
     206             : }
     207             : 
     208       10114 : size_t pthreadpool_max_threads(struct pthreadpool *pool)
     209             : {
     210       10114 :         if (pool->stopped) {
     211           0 :                 return 0;
     212             :         }
     213             : 
     214       10114 :         return pool->max_threads;
     215             : }
     216             : 
     217       20143 : size_t pthreadpool_queued_jobs(struct pthreadpool *pool)
     218             : {
     219           0 :         int res;
     220           0 :         int unlock_res;
     221           0 :         size_t ret;
     222             : 
     223       20143 :         if (pool->stopped) {
     224           0 :                 return 0;
     225             :         }
     226             : 
     227       20143 :         res = pthread_mutex_lock(&pool->mutex);
     228       20143 :         if (res != 0) {
     229           0 :                 return res;
     230             :         }
     231             : 
     232       20143 :         if (pool->stopped) {
     233           0 :                 unlock_res = pthread_mutex_unlock(&pool->mutex);
     234           0 :                 assert(unlock_res == 0);
     235           0 :                 return 0;
     236             :         }
     237             : 
     238       20143 :         ret = pool->num_jobs;
     239             : 
     240       20143 :         unlock_res = pthread_mutex_unlock(&pool->mutex);
     241       20143 :         assert(unlock_res == 0);
     242       20143 :         return ret;
     243             : }
     244             : 
     245       64610 : static void pthreadpool_prepare_pool(struct pthreadpool *pool)
     246             : {
     247         607 :         int ret;
     248             : 
     249       64610 :         ret = pthread_mutex_lock(&pool->fork_mutex);
     250       64610 :         assert(ret == 0);
     251             : 
     252       64610 :         ret = pthread_mutex_lock(&pool->mutex);
     253       64610 :         assert(ret == 0);
     254             : 
     255       64667 :         while (pool->num_idle != 0) {
     256          57 :                 unsigned num_idle = pool->num_idle;
     257           4 :                 pthread_cond_t prefork_cond;
     258             : 
     259          57 :                 ret = pthread_cond_init(&prefork_cond, NULL);
     260          57 :                 assert(ret == 0);
     261             : 
     262             :                 /*
     263             :                  * Push all idle threads off pool->condvar. In the
     264             :                  * child we can destroy the pool, which would result
     265             :                  * in undefined behaviour in the
     266             :                  * pthread_cond_destroy(pool->condvar). glibc just
     267             :                  * blocks here.
     268             :                  */
     269          57 :                 pool->prefork_cond = &prefork_cond;
     270             : 
     271          57 :                 ret = pthread_cond_signal(&pool->condvar);
     272          57 :                 assert(ret == 0);
     273             : 
     274         114 :                 while (pool->num_idle == num_idle) {
     275          57 :                         ret = pthread_cond_wait(&prefork_cond, &pool->mutex);
     276          57 :                         assert(ret == 0);
     277             :                 }
     278             : 
     279          57 :                 pool->prefork_cond = NULL;
     280             : 
     281          57 :                 ret = pthread_cond_destroy(&prefork_cond);
     282          57 :                 assert(ret == 0);
     283             :         }
     284             : 
     285             :         /*
     286             :          * Probably it's well-defined somewhere: What happens to
     287             :          * condvars after a fork? The rationale of pthread_atfork only
     288             :          * writes about mutexes. So better be safe than sorry and
     289             :          * destroy/reinit pool->condvar across a fork.
     290             :          */
     291             : 
     292       64610 :         ret = pthread_cond_destroy(&pool->condvar);
     293       64610 :         assert(ret == 0);
     294       64610 : }
     295             : 
     296       66727 : static void pthreadpool_prepare(void)
     297             : {
     298         607 :         int ret;
     299         607 :         struct pthreadpool *pool;
     300             : 
     301       66727 :         ret = pthread_mutex_lock(&pthreadpools_mutex);
     302       66727 :         assert(ret == 0);
     303             : 
     304       66727 :         pool = pthreadpools;
     305             : 
     306      131337 :         while (pool != NULL) {
     307       64610 :                 pthreadpool_prepare_pool(pool);
     308       64610 :                 pool = pool->next;
     309             :         }
     310       66727 : }
     311             : 
     312       66727 : static void pthreadpool_parent(void)
     313             : {
     314         607 :         int ret;
     315         607 :         struct pthreadpool *pool;
     316             : 
     317       66727 :         for (pool = DLIST_TAIL(pthreadpools);
     318      130731 :              pool != NULL;
     319       64610 :              pool = DLIST_PREV(pool)) {
     320       64610 :                 ret = pthread_cond_init(&pool->condvar, NULL);
     321       64610 :                 assert(ret == 0);
     322       64610 :                 ret = pthread_mutex_unlock(&pool->mutex);
     323       64610 :                 assert(ret == 0);
     324       64610 :                 ret = pthread_mutex_unlock(&pool->fork_mutex);
     325       64610 :                 assert(ret == 0);
     326             :         }
     327             : 
     328       66727 :         ret = pthread_mutex_unlock(&pthreadpools_mutex);
     329       66727 :         assert(ret == 0);
     330       66727 : }
     331             : 
     332           0 : static void pthreadpool_child(void)
     333             : {
     334           0 :         int ret;
     335           0 :         struct pthreadpool *pool;
     336             : 
     337           0 :         for (pool = DLIST_TAIL(pthreadpools);
     338           0 :              pool != NULL;
     339           0 :              pool = DLIST_PREV(pool)) {
     340             : 
     341           0 :                 pool->num_threads = 0;
     342           0 :                 pool->num_idle = 0;
     343           0 :                 pool->head = 0;
     344           0 :                 pool->num_jobs = 0;
     345           0 :                 pool->stopped = true;
     346             : 
     347           0 :                 ret = pthread_cond_init(&pool->condvar, NULL);
     348           0 :                 assert(ret == 0);
     349             : 
     350           0 :                 ret = pthread_mutex_unlock(&pool->mutex);
     351           0 :                 assert(ret == 0);
     352             : 
     353           0 :                 ret = pthread_mutex_unlock(&pool->fork_mutex);
     354           0 :                 assert(ret == 0);
     355             :         }
     356             : 
     357           0 :         ret = pthread_mutex_unlock(&pthreadpools_mutex);
     358           0 :         assert(ret == 0);
     359           0 : }
     360             : 
     361        6748 : static void pthreadpool_prep_atfork(void)
     362             : {
     363        6748 :         pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
     364             :                        pthreadpool_child);
     365        6748 : }
     366             : 
     367      131559 : static int pthreadpool_free(struct pthreadpool *pool)
     368             : {
     369        2031 :         int ret, ret1, ret2;
     370             : 
     371      131559 :         ret = pthread_mutex_lock(&pthreadpools_mutex);
     372      131559 :         if (ret != 0) {
     373           0 :                 return ret;
     374             :         }
     375      131559 :         DLIST_REMOVE(pthreadpools, pool);
     376      131559 :         ret = pthread_mutex_unlock(&pthreadpools_mutex);
     377      131559 :         assert(ret == 0);
     378             : 
     379      131559 :         ret = pthread_mutex_lock(&pool->mutex);
     380      131559 :         assert(ret == 0);
     381      131559 :         ret = pthread_mutex_unlock(&pool->mutex);
     382      131559 :         assert(ret == 0);
     383             : 
     384      131559 :         ret = pthread_mutex_destroy(&pool->mutex);
     385      131559 :         ret1 = pthread_cond_destroy(&pool->condvar);
     386      131559 :         ret2 = pthread_mutex_destroy(&pool->fork_mutex);
     387             : 
     388      131559 :         if (ret != 0) {
     389           0 :                 return ret;
     390             :         }
     391      131559 :         if (ret1 != 0) {
     392           0 :                 return ret1;
     393             :         }
     394      131559 :         if (ret2 != 0) {
     395           0 :                 return ret2;
     396             :         }
     397             : 
     398      131559 :         free(pool->jobs);
     399      131559 :         free(pool);
     400             : 
     401      131559 :         return 0;
     402             : }
     403             : 
     404             : /*
     405             :  * Stop a thread pool. Wake up all idle threads for exit.
     406             :  */
     407             : 
     408       83186 : static int pthreadpool_stop_locked(struct pthreadpool *pool)
     409             : {
     410        1118 :         int ret;
     411             : 
     412       83186 :         pool->stopped = true;
     413             : 
     414       83186 :         if (pool->num_threads == 0) {
     415       79311 :                 return 0;
     416             :         }
     417             : 
     418             :         /*
     419             :          * We have active threads, tell them to finish.
     420             :          */
     421             : 
     422        2792 :         ret = pthread_cond_broadcast(&pool->condvar);
     423             : 
     424        2792 :         return ret;
     425             : }
     426             : 
     427             : /*
     428             :  * Stop a thread pool. Wake up all idle threads for exit.
     429             :  */
     430             : 
     431      131556 : int pthreadpool_stop(struct pthreadpool *pool)
     432             : {
     433        2028 :         int ret, ret1;
     434             : 
     435      131556 :         ret = pthread_mutex_lock(&pool->mutex);
     436      131556 :         if (ret != 0) {
     437           0 :                 return ret;
     438             :         }
     439             : 
     440      131556 :         if (!pool->stopped) {
     441       83182 :                 ret = pthreadpool_stop_locked(pool);
     442             :         }
     443             : 
     444      131556 :         ret1 = pthread_mutex_unlock(&pool->mutex);
     445      131556 :         assert(ret1 == 0);
     446             : 
     447      129528 :         return ret;
     448             : }
     449             : 
     450             : /*
     451             :  * Destroy a thread pool. Wake up all idle threads for exit. The last
     452             :  * one will free the pool.
     453             :  */
     454             : 
     455      131561 : int pthreadpool_destroy(struct pthreadpool *pool)
     456             : {
     457        2033 :         int ret, ret1;
     458        2033 :         bool free_it;
     459             : 
     460      131561 :         assert(!pool->destroyed);
     461             : 
     462      131561 :         ret = pthread_mutex_lock(&pool->mutex);
     463      131561 :         if (ret != 0) {
     464           0 :                 return ret;
     465             :         }
     466             : 
     467      131561 :         pool->destroyed = true;
     468             : 
     469      131561 :         if (!pool->stopped) {
     470           4 :                 ret = pthreadpool_stop_locked(pool);
     471             :         }
     472             : 
     473      131561 :         free_it = (pool->num_threads == 0);
     474             : 
     475      131561 :         ret1 = pthread_mutex_unlock(&pool->mutex);
     476      131561 :         assert(ret1 == 0);
     477             : 
     478      131561 :         if (free_it) {
     479      128875 :                 pthreadpool_free(pool);
     480             :         }
     481             : 
     482      129528 :         return ret;
     483             : }
     484             : /*
     485             :  * Prepare for pthread_exit(), pool->mutex must be locked and will be
     486             :  * unlocked here. This is a bit of a layering violation, but here we
     487             :  * also take care of removing the pool if we're the last thread.
     488             :  */
     489        3896 : static void pthreadpool_server_exit(struct pthreadpool *pool)
     490             : {
     491         239 :         int ret;
     492         239 :         bool free_it;
     493             : 
     494        3896 :         pool->num_threads -= 1;
     495             : 
     496        3896 :         free_it = (pool->destroyed && (pool->num_threads == 0));
     497             : 
     498        3896 :         ret = pthread_mutex_unlock(&pool->mutex);
     499        3896 :         assert(ret == 0);
     500             : 
     501        3896 :         if (free_it) {
     502        2684 :                 pthreadpool_free(pool);
     503             :         }
     504        3896 : }
     505             : 
     506      249129 : static bool pthreadpool_get_job(struct pthreadpool *p,
     507             :                                 struct pthreadpool_job *job)
     508             : {
     509      249129 :         if (p->stopped) {
     510        3101 :                 return false;
     511             :         }
     512             : 
     513      245790 :         if (p->num_jobs == 0) {
     514           0 :                 return false;
     515             :         }
     516      245790 :         *job = p->jobs[p->head];
     517      245790 :         p->head = (p->head+1) % p->jobs_array_len;
     518      245790 :         p->num_jobs -= 1;
     519      245790 :         return true;
     520             : }
     521             : 
     522      245792 : static bool pthreadpool_put_job(struct pthreadpool *p,
     523             :                                 int id,
     524             :                                 void (*fn)(void *private_data),
     525             :                                 void *private_data)
     526             : {
     527       78366 :         struct pthreadpool_job *job;
     528             : 
     529      245792 :         if (p->num_jobs == p->jobs_array_len) {
     530          12 :                 struct pthreadpool_job *tmp;
     531          45 :                 size_t new_len = p->jobs_array_len * 2;
     532             : 
     533          45 :                 tmp = realloc(
     534          45 :                         p->jobs, sizeof(struct pthreadpool_job) * new_len);
     535          45 :                 if (tmp == NULL) {
     536           0 :                         return false;
     537             :                 }
     538          45 :                 p->jobs = tmp;
     539             : 
     540             :                 /*
     541             :                  * We just doubled the jobs array. The array implements a FIFO
     542             :                  * queue with a modulo-based wraparound, so we have to memcpy
     543             :                  * the jobs that are logically at the queue end but physically
     544             :                  * before the queue head into the reallocated area. The new
     545             :                  * space starts at the current jobs_array_len, and we have to
     546             :                  * copy everything before the current head job into the new
     547             :                  * area.
     548             :                  */
     549          45 :                 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
     550          45 :                        sizeof(struct pthreadpool_job) * p->head);
     551             : 
     552          45 :                 p->jobs_array_len = new_len;
     553             :         }
     554             : 
     555      245792 :         job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
     556      245792 :         job->id = id;
     557      245792 :         job->fn = fn;
     558      245792 :         job->private_data = private_data;
     559             : 
     560      245792 :         p->num_jobs += 1;
     561             : 
     562      245792 :         return true;
     563             : }
     564             : 
     565           2 : static void pthreadpool_undo_put_job(struct pthreadpool *p)
     566             : {
     567           2 :         p->num_jobs -= 1;
     568           0 : }
     569             : 
     570        3942 : static void *pthreadpool_server(void *arg)
     571             : {
     572        3942 :         struct pthreadpool *pool = (struct pthreadpool *)arg;
     573         243 :         int res;
     574             : 
     575        3942 :         res = pthread_mutex_lock(&pool->mutex);
     576        3942 :         if (res != 0) {
     577           0 :                 return NULL;
     578             :         }
     579             : 
     580      245767 :         while (1) {
     581       78604 :                 struct timespec ts;
     582       78604 :                 struct pthreadpool_job job;
     583             : 
     584             :                 /*
     585             :                  * idle-wait at most 1 second. If nothing happens in that
     586             :                  * time, exit this thread.
     587             :                  */
     588             : 
     589      249709 :                 clock_gettime(CLOCK_REALTIME, &ts);
     590      249709 :                 ts.tv_sec += 1;
     591             : 
     592      484344 :                 while ((pool->num_jobs == 0) && !pool->stopped) {
     593             : 
     594      235215 :                         pool->num_idle += 1;
     595      235215 :                         res = pthread_cond_timedwait(
     596             :                                 &pool->condvar, &pool->mutex, &ts);
     597      235174 :                         pool->num_idle -= 1;
     598             : 
     599      235174 :                         if (pool->prefork_cond != NULL) {
     600             :                                 /*
     601             :                                  * Me must allow fork() to continue
     602             :                                  * without anybody waiting on
     603             :                                  * &pool->condvar. Tell
     604             :                                  * pthreadpool_prepare_pool that we
     605             :                                  * got that message.
     606             :                                  */
     607             : 
     608          57 :                                 res = pthread_cond_signal(pool->prefork_cond);
     609          57 :                                 assert(res == 0);
     610             : 
     611          57 :                                 res = pthread_mutex_unlock(&pool->mutex);
     612          57 :                                 assert(res == 0);
     613             : 
     614             :                                 /*
     615             :                                  * pthreadpool_prepare_pool has
     616             :                                  * already locked this mutex across
     617             :                                  * the fork. This makes us wait
     618             :                                  * without sitting in a condvar.
     619             :                                  */
     620          57 :                                 res = pthread_mutex_lock(&pool->fork_mutex);
     621          57 :                                 assert(res == 0);
     622          57 :                                 res = pthread_mutex_unlock(&pool->fork_mutex);
     623          57 :                                 assert(res == 0);
     624             : 
     625          57 :                                 res = pthread_mutex_lock(&pool->mutex);
     626          57 :                                 assert(res == 0);
     627             :                         }
     628             : 
     629      235174 :                         if (res == ETIMEDOUT) {
     630             : 
     631         539 :                                 if (pool->num_jobs == 0) {
     632             :                                         /*
     633             :                                          * we timed out and still no work for
     634             :                                          * us. Exit.
     635             :                                          */
     636         539 :                                         pthreadpool_server_exit(pool);
     637        3896 :                                         return NULL;
     638             :                                 }
     639             : 
     640           0 :                                 break;
     641             :                         }
     642      234635 :                         assert(res == 0);
     643             :                 }
     644             : 
     645      249129 :                 if (pthreadpool_get_job(pool, &job)) {
     646       78364 :                         int ret;
     647             : 
     648             :                         /*
     649             :                          * Do the work with the mutex unlocked
     650             :                          */
     651             : 
     652      245790 :                         res = pthread_mutex_unlock(&pool->mutex);
     653      245790 :                         assert(res == 0);
     654             : 
     655      245790 :                         job.fn(job.private_data);
     656             : 
     657      245732 :                         ret = pool->signal_fn(job.id,
     658             :                                               job.fn, job.private_data,
     659             :                                               pool->signal_fn_private_data);
     660             : 
     661      245658 :                         res = pthread_mutex_lock(&pool->mutex);
     662      245785 :                         assert(res == 0);
     663             : 
     664      245785 :                         if (ret != 0) {
     665           0 :                                 pthreadpool_server_exit(pool);
     666           0 :                                 return NULL;
     667             :                         }
     668             :                 }
     669             : 
     670      249124 :                 if (pool->stopped) {
     671             :                         /*
     672             :                          * we're asked to stop processing jobs, so exit
     673             :                          */
     674        3357 :                         pthreadpool_server_exit(pool);
     675        3357 :                         return NULL;
     676             :                 }
     677             :         }
     678             : }
     679             : 
     680        3944 : static int pthreadpool_create_thread(struct pthreadpool *pool)
     681             : {
     682         245 :         pthread_attr_t thread_attr;
     683         245 :         pthread_t thread_id;
     684         245 :         int res;
     685         245 :         sigset_t mask, omask;
     686             : 
     687             :         /*
     688             :          * Create a new worker thread. It should not receive any signals.
     689             :          */
     690             : 
     691        3944 :         sigfillset(&mask);
     692             : 
     693        3944 :         res = pthread_attr_init(&thread_attr);
     694        3944 :         if (res != 0) {
     695           0 :                 return res;
     696             :         }
     697             : 
     698        3944 :         res = pthread_attr_setdetachstate(
     699             :                 &thread_attr, PTHREAD_CREATE_DETACHED);
     700        3944 :         if (res != 0) {
     701           0 :                 pthread_attr_destroy(&thread_attr);
     702           0 :                 return res;
     703             :         }
     704             : 
     705        3944 :         res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
     706        3944 :         if (res != 0) {
     707           0 :                 pthread_attr_destroy(&thread_attr);
     708           0 :                 return res;
     709             :         }
     710             : 
     711        3944 :         res = pthread_create(&thread_id, &thread_attr, pthreadpool_server,
     712             :                              (void *)pool);
     713             : 
     714        3944 :         assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
     715             : 
     716        3944 :         pthread_attr_destroy(&thread_attr);
     717             : 
     718        3944 :         if (res == 0) {
     719        3942 :                 pool->num_threads += 1;
     720             :         }
     721             : 
     722        3699 :         return res;
     723             : }
     724             : 
     725      245793 : int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
     726             :                         void (*fn)(void *private_data), void *private_data)
     727             : {
     728       78367 :         int res;
     729       78367 :         int unlock_res;
     730             : 
     731      245793 :         assert(!pool->destroyed);
     732             : 
     733      245793 :         res = pthread_mutex_lock(&pool->mutex);
     734      245793 :         if (res != 0) {
     735           0 :                 return res;
     736             :         }
     737             : 
     738      245793 :         if (pool->stopped) {
     739             :                 /*
     740             :                  * Protect against the pool being shut down while
     741             :                  * trying to add a job
     742             :                  */
     743           0 :                 unlock_res = pthread_mutex_unlock(&pool->mutex);
     744           0 :                 assert(unlock_res == 0);
     745           0 :                 return EINVAL;
     746             :         }
     747             : 
     748      245793 :         if (pool->max_threads == 0) {
     749           1 :                 unlock_res = pthread_mutex_unlock(&pool->mutex);
     750           1 :                 assert(unlock_res == 0);
     751             : 
     752             :                 /*
     753             :                  * If no thread are allowed we do strict sync processing.
     754             :                  */
     755           1 :                 fn(private_data);
     756           1 :                 res = pool->signal_fn(job_id, fn, private_data,
     757             :                                       pool->signal_fn_private_data);
     758           1 :                 return res;
     759             :         }
     760             : 
     761             :         /*
     762             :          * Add job to the end of the queue
     763             :          */
     764      245792 :         if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
     765           0 :                 unlock_res = pthread_mutex_unlock(&pool->mutex);
     766           0 :                 assert(unlock_res == 0);
     767           0 :                 return ENOMEM;
     768             :         }
     769             : 
     770      245792 :         if (pool->num_idle > 0) {
     771             :                 /*
     772             :                  * We have idle threads, wake one.
     773             :                  */
     774      231858 :                 res = pthread_cond_signal(&pool->condvar);
     775      231858 :                 if (res != 0) {
     776           0 :                         pthreadpool_undo_put_job(pool);
     777             :                 }
     778      231858 :                 unlock_res = pthread_mutex_unlock(&pool->mutex);
     779      231858 :                 assert(unlock_res == 0);
     780      163727 :                 return res;
     781             :         }
     782             : 
     783       13934 :         if (pool->num_threads >= pool->max_threads) {
     784             :                 /*
     785             :                  * No more new threads, we just queue the request
     786             :                  */
     787        9990 :                 unlock_res = pthread_mutex_unlock(&pool->mutex);
     788        9990 :                 assert(unlock_res == 0);
     789           0 :                 return 0;
     790             :         }
     791             : 
     792        3944 :         res = pthreadpool_create_thread(pool);
     793        3944 :         if (res == 0) {
     794        3942 :                 unlock_res = pthread_mutex_unlock(&pool->mutex);
     795        3942 :                 assert(unlock_res == 0);
     796        3699 :                 return 0;
     797             :         }
     798             : 
     799           2 :         if (pool->num_threads != 0) {
     800             :                 /*
     801             :                  * At least one thread is still available, let
     802             :                  * that one run the queued job.
     803             :                  */
     804           0 :                 unlock_res = pthread_mutex_unlock(&pool->mutex);
     805           0 :                 assert(unlock_res == 0);
     806           0 :                 return 0;
     807             :         }
     808             : 
     809           2 :         pthreadpool_undo_put_job(pool);
     810             : 
     811           2 :         unlock_res = pthread_mutex_unlock(&pool->mutex);
     812           2 :         assert(unlock_res == 0);
     813             : 
     814           0 :         return res;
     815             : }
     816             : 
     817           0 : size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
     818             :                               void (*fn)(void *private_data), void *private_data)
     819             : {
     820           0 :         int res;
     821           0 :         size_t i, j;
     822           0 :         size_t num = 0;
     823             : 
     824           0 :         assert(!pool->destroyed);
     825             : 
     826           0 :         res = pthread_mutex_lock(&pool->mutex);
     827           0 :         if (res != 0) {
     828           0 :                 return res;
     829             :         }
     830             : 
     831           0 :         for (i = 0, j = 0; i < pool->num_jobs; i++) {
     832           0 :                 size_t idx = (pool->head + i) % pool->jobs_array_len;
     833           0 :                 size_t new_idx = (pool->head + j) % pool->jobs_array_len;
     834           0 :                 struct pthreadpool_job *job = &pool->jobs[idx];
     835             : 
     836           0 :                 if ((job->private_data == private_data) &&
     837           0 :                     (job->id == job_id) &&
     838           0 :                     (job->fn == fn))
     839             :                 {
     840             :                         /*
     841             :                          * Just skip the entry.
     842             :                          */
     843           0 :                         num++;
     844           0 :                         continue;
     845             :                 }
     846             : 
     847             :                 /*
     848             :                  * If we already removed one or more jobs (so j will be smaller
     849             :                  * then i), we need to fill possible gaps in the logical list.
     850             :                  */
     851           0 :                 if (j < i) {
     852           0 :                         pool->jobs[new_idx] = *job;
     853             :                 }
     854           0 :                 j++;
     855             :         }
     856             : 
     857           0 :         pool->num_jobs -= num;
     858             : 
     859           0 :         res = pthread_mutex_unlock(&pool->mutex);
     860           0 :         assert(res == 0);
     861             : 
     862           0 :         return num;
     863             : }

Generated by: LCOV version 1.14