LCOV - code coverage report
Current view: top level - lib/pthreadpool - pthreadpool_tevent.c (source / functions) Hit Total Coverage
Test: coverage report for master 2f515e9b Lines: 120 146 82.2 %
Date: 2024-04-21 15:09:00 Functions: 13 13 100.0 %

          Line data    Source code
       1             : /*
       2             :  * Unix SMB/CIFS implementation.
       3             :  * threadpool implementation based on pthreads
       4             :  * Copyright (C) Volker Lendecke 2009,2011
       5             :  *
       6             :  * This program is free software; you can redistribute it and/or modify
       7             :  * it under the terms of the GNU General Public License as published by
       8             :  * the Free Software Foundation; either version 3 of the License, or
       9             :  * (at your option) any later version.
      10             :  *
      11             :  * This program is distributed in the hope that it will be useful,
      12             :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :  * GNU General Public License for more details.
      15             :  *
      16             :  * You should have received a copy of the GNU General Public License
      17             :  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
      18             :  */
      19             : 
      20             : #include "replace.h"
      21             : #include "system/filesys.h"
      22             : #include "pthreadpool_tevent.h"
      23             : #include "pthreadpool.h"
      24             : #include "lib/util/tevent_unix.h"
      25             : #include "lib/util/dlinklist.h"
      26             : 
      27             : struct pthreadpool_tevent_job_state;
      28             : 
      29             : /*
      30             :  * We need one pthreadpool_tevent_glue object per unique combination of tevent
      31             :  * contexts and pthreadpool_tevent objects. Maintain a list of used tevent
      32             :  * contexts in a pthreadpool_tevent.
      33             :  */
      34             : struct pthreadpool_tevent_glue {
      35             :         struct pthreadpool_tevent_glue *prev, *next;
      36             :         struct pthreadpool_tevent *pool; /* back-pointer to owning object. */
      37             :         /* Tuple we are keeping track of in this list. */
      38             :         struct tevent_context *ev;
      39             :         struct tevent_threaded_context *tctx;
      40             :         /* Pointer to link object owned by *ev. */
      41             :         struct pthreadpool_tevent_glue_ev_link *ev_link;
      42             : };
      43             : 
      44             : /*
      45             :  * The pthreadpool_tevent_glue_ev_link and its destructor ensure we remove the
      46             :  * tevent context from our list of active event contexts if the event context
      47             :  * is destroyed.
      48             :  * This structure is talloc()'ed from the struct tevent_context *, and is a
      49             :  * back-pointer allowing the related struct pthreadpool_tevent_glue object
      50             :  * to be removed from the struct pthreadpool_tevent glue list if the owning
      51             :  * tevent_context is talloc_free()'ed.
      52             :  */
      53             : struct pthreadpool_tevent_glue_ev_link {
      54             :         struct pthreadpool_tevent_glue *glue;
      55             : };
      56             : 
      57             : struct pthreadpool_tevent {
      58             :         struct pthreadpool *pool;
      59             :         struct pthreadpool_tevent_glue *glue_list;
      60             : 
      61             :         struct pthreadpool_tevent_job_state *jobs;
      62             : };
      63             : 
      64             : struct pthreadpool_tevent_job_state {
      65             :         struct pthreadpool_tevent_job_state *prev, *next;
      66             :         struct pthreadpool_tevent *pool;
      67             :         struct tevent_context *ev;
      68             :         struct tevent_immediate *im;
      69             :         struct tevent_req *req;
      70             : 
      71             :         void (*fn)(void *private_data);
      72             :         void *private_data;
      73             : };
      74             : 
      75             : static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
      76             : 
      77             : static int pthreadpool_tevent_job_signal(int jobid,
      78             :                                          void (*job_fn)(void *private_data),
      79             :                                          void *job_private_data,
      80             :                                          void *private_data);
      81             : 
      82       91464 : int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
      83             :                             struct pthreadpool_tevent **presult)
      84             : {
      85        1980 :         struct pthreadpool_tevent *pool;
      86        1980 :         int ret;
      87             : 
      88       91464 :         pool = talloc_zero(mem_ctx, struct pthreadpool_tevent);
      89       91464 :         if (pool == NULL) {
      90           0 :                 return ENOMEM;
      91             :         }
      92             : 
      93       91464 :         ret = pthreadpool_init(max_threads, &pool->pool,
      94             :                                pthreadpool_tevent_job_signal, pool);
      95       91464 :         if (ret != 0) {
      96           0 :                 TALLOC_FREE(pool);
      97           0 :                 return ret;
      98             :         }
      99             : 
     100       91464 :         talloc_set_destructor(pool, pthreadpool_tevent_destructor);
     101             : 
     102       91464 :         *presult = pool;
     103       91464 :         return 0;
     104             : }
     105             : 
     106       10114 : size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
     107             : {
     108       10114 :         if (pool->pool == NULL) {
     109           0 :                 return 0;
     110             :         }
     111             : 
     112       10114 :         return pthreadpool_max_threads(pool->pool);
     113             : }
     114             : 
     115       20143 : size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
     116             : {
     117       20143 :         if (pool->pool == NULL) {
     118           0 :                 return 0;
     119             :         }
     120             : 
     121       20143 :         return pthreadpool_queued_jobs(pool->pool);
     122             : }
     123             : 
     124      131556 : static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
     125             : {
     126        2028 :         struct pthreadpool_tevent_job_state *state, *next;
     127      131556 :         struct pthreadpool_tevent_glue *glue = NULL;
     128        2028 :         int ret;
     129             : 
     130      131556 :         ret = pthreadpool_stop(pool->pool);
     131      131556 :         if (ret != 0) {
     132           0 :                 return ret;
     133             :         }
     134             : 
     135      131559 :         for (state = pool->jobs; state != NULL; state = next) {
     136           3 :                 next = state->next;
     137           3 :                 DLIST_REMOVE(pool->jobs, state);
     138           3 :                 state->pool = NULL;
     139             :         }
     140             : 
     141             :         /*
     142             :          * Delete all the registered
     143             :          * tevent_context/tevent_threaded_context
     144             :          * pairs.
     145             :          */
     146      134678 :         for (glue = pool->glue_list; glue != NULL; glue = pool->glue_list) {
     147             :                 /* The glue destructor removes it from the list */
     148        3122 :                 TALLOC_FREE(glue);
     149             :         }
     150      131556 :         pool->glue_list = NULL;
     151             : 
     152      131556 :         ret = pthreadpool_destroy(pool->pool);
     153      131556 :         if (ret != 0) {
     154           0 :                 return ret;
     155             :         }
     156      131556 :         pool->pool = NULL;
     157             : 
     158      131556 :         return 0;
     159             : }
     160             : 
     161        3197 : static int pthreadpool_tevent_glue_destructor(
     162             :         struct pthreadpool_tevent_glue *glue)
     163             : {
     164        3197 :         if (glue->pool->glue_list != NULL) {
     165        3197 :                 DLIST_REMOVE(glue->pool->glue_list, glue);
     166             :         }
     167             : 
     168             :         /* Ensure the ev_link destructor knows we're gone */
     169        3197 :         glue->ev_link->glue = NULL;
     170             : 
     171        3197 :         TALLOC_FREE(glue->ev_link);
     172        3197 :         TALLOC_FREE(glue->tctx);
     173             : 
     174        3197 :         return 0;
     175             : }
     176             : 
     177             : /*
     178             :  * Destructor called either explicitly from
     179             :  * pthreadpool_tevent_glue_destructor(), or indirectly
     180             :  * when owning tevent_context is destroyed.
     181             :  *
     182             :  * When called from pthreadpool_tevent_glue_destructor()
     183             :  * ev_link->glue is already NULL, so this does nothing.
     184             :  *
     185             :  * When called from talloc_free() of the owning
     186             :  * tevent_context we must ensure we also remove the
     187             :  * linked glue object from the list inside
     188             :  * struct pthreadpool_tevent.
     189             :  */
     190        3197 : static int pthreadpool_tevent_glue_link_destructor(
     191             :         struct pthreadpool_tevent_glue_ev_link *ev_link)
     192             : {
     193        3197 :         TALLOC_FREE(ev_link->glue);
     194        3197 :         return 0;
     195             : }
     196             : 
     197      235789 : static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
     198             :                                           struct tevent_context *ev)
     199             : {
     200      235789 :         struct pthreadpool_tevent_glue *glue = NULL;
     201      235789 :         struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
     202             : 
     203             :         /*
     204             :          * See if this tevent_context was already registered by
     205             :          * searching the glue object list. If so we have nothing
     206             :          * to do here - we already have a tevent_context/tevent_threaded_context
     207             :          * pair.
     208             :          */
     209      235796 :         for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
     210      232642 :                 if (glue->ev == ev) {
     211      164307 :                         return 0;
     212             :                 }
     213             :         }
     214             : 
     215             :         /*
     216             :          * Event context not yet registered - create a new glue
     217             :          * object containing a tevent_context/tevent_threaded_context
     218             :          * pair and put it on the list to remember this registration.
     219             :          * We also need a link object to ensure the event context
     220             :          * can't go away without us knowing about it.
     221             :          */
     222        3154 :         glue = talloc_zero(pool, struct pthreadpool_tevent_glue);
     223        3154 :         if (glue == NULL) {
     224           0 :                 return ENOMEM;
     225             :         }
     226        3154 :         *glue = (struct pthreadpool_tevent_glue) {
     227             :                 .pool = pool,
     228             :                 .ev = ev,
     229             :         };
     230        3154 :         talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
     231             : 
     232             :         /*
     233             :          * Now allocate the link object to the event context. Note this
     234             :          * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event
     235             :          * context is freed we are able to cleanup the glue object
     236             :          * in the link object destructor.
     237             :          */
     238             : 
     239        3154 :         ev_link = talloc_zero(ev, struct pthreadpool_tevent_glue_ev_link);
     240        3154 :         if (ev_link == NULL) {
     241           0 :                 TALLOC_FREE(glue);
     242           0 :                 return ENOMEM;
     243             :         }
     244        3154 :         ev_link->glue = glue;
     245        3154 :         talloc_set_destructor(ev_link, pthreadpool_tevent_glue_link_destructor);
     246             : 
     247        3154 :         glue->ev_link = ev_link;
     248             : 
     249             : #ifdef HAVE_PTHREAD
     250        3154 :         glue->tctx = tevent_threaded_context_create(glue, ev);
     251        3154 :         if (glue->tctx == NULL) {
     252           0 :                 TALLOC_FREE(ev_link);
     253           0 :                 TALLOC_FREE(glue);
     254           0 :                 return ENOMEM;
     255             :         }
     256             : #endif
     257             : 
     258        3154 :         DLIST_ADD(pool->glue_list, glue);
     259        3119 :         return 0;
     260             : }
     261             : 
     262             : static void pthreadpool_tevent_job_fn(void *private_data);
     263             : static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
     264             :                                         struct tevent_immediate *im,
     265             :                                         void *private_data);
     266             : 
     267      235784 : static int pthreadpool_tevent_job_state_destructor(
     268             :         struct pthreadpool_tevent_job_state *state)
     269             : {
     270      235784 :         if (state->pool == NULL) {
     271      167423 :                 return 0;
     272             :         }
     273             : 
     274             :         /*
     275             :          * We should never be called with state->req == NULL,
     276             :          * state->pool must be cleared before the 2nd talloc_free().
     277             :          */
     278           3 :         if (state->req == NULL) {
     279           0 :                 abort();
     280             :         }
     281             : 
     282             :         /*
     283             :          * We need to reparent to a long term context.
     284             :          */
     285           3 :         (void)talloc_reparent(state->req, NULL, state);
     286           3 :         state->req = NULL;
     287           3 :         return -1;
     288             : }
     289             : 
     290      235789 : struct tevent_req *pthreadpool_tevent_job_send(
     291             :         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
     292             :         struct pthreadpool_tevent *pool,
     293             :         void (*fn)(void *private_data), void *private_data)
     294             : {
     295       68363 :         struct tevent_req *req;
     296       68363 :         struct pthreadpool_tevent_job_state *state;
     297       68363 :         int ret;
     298             : 
     299      235789 :         req = tevent_req_create(mem_ctx, &state,
     300             :                                 struct pthreadpool_tevent_job_state);
     301      235789 :         if (req == NULL) {
     302           0 :                 return NULL;
     303             :         }
     304      235789 :         state->pool = pool;
     305      235789 :         state->ev = ev;
     306      235789 :         state->req = req;
     307      235789 :         state->fn = fn;
     308      235789 :         state->private_data = private_data;
     309             : 
     310      235789 :         if (pool == NULL) {
     311           0 :                 tevent_req_error(req, EINVAL);
     312           0 :                 return tevent_req_post(req, ev);
     313             :         }
     314      235789 :         if (pool->pool == NULL) {
     315           0 :                 tevent_req_error(req, EINVAL);
     316           0 :                 return tevent_req_post(req, ev);
     317             :         }
     318             : 
     319      235789 :         state->im = tevent_create_immediate(state);
     320      235789 :         if (tevent_req_nomem(state->im, req)) {
     321           0 :                 return tevent_req_post(req, ev);
     322             :         }
     323             : 
     324      235789 :         ret = pthreadpool_tevent_register_ev(pool, ev);
     325      235789 :         if (tevent_req_error(req, ret)) {
     326           0 :                 return tevent_req_post(req, ev);
     327             :         }
     328             : 
     329      235789 :         ret = pthreadpool_add_job(pool->pool, 0,
     330             :                                   pthreadpool_tevent_job_fn,
     331             :                                   state);
     332      235789 :         if (tevent_req_error(req, ret)) {
     333           2 :                 return tevent_req_post(req, ev);
     334             :         }
     335             : 
     336             :         /*
     337             :          * Once the job is scheduled, we need to protect
     338             :          * our memory.
     339             :          */
     340      235787 :         talloc_set_destructor(state, pthreadpool_tevent_job_state_destructor);
     341             : 
     342      235787 :         DLIST_ADD_END(pool->jobs, state);
     343             : 
     344      167426 :         return req;
     345             : }
     346             : 
     347      235787 : static void pthreadpool_tevent_job_fn(void *private_data)
     348             : {
     349      235787 :         struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
     350             :                 private_data, struct pthreadpool_tevent_job_state);
     351      235787 :         state->fn(state->private_data);
     352      235784 : }
     353             : 
     354      235784 : static int pthreadpool_tevent_job_signal(int jobid,
     355             :                                          void (*job_fn)(void *private_data),
     356             :                                          void *job_private_data,
     357             :                                          void *private_data)
     358             : {
     359      235784 :         struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
     360             :                 job_private_data, struct pthreadpool_tevent_job_state);
     361      235784 :         struct tevent_threaded_context *tctx = NULL;
     362      235784 :         struct pthreadpool_tevent_glue *g = NULL;
     363             : 
     364      235784 :         if (state->pool == NULL) {
     365             :                 /* The pthreadpool_tevent is already gone */
     366           0 :                 return 0;
     367             :         }
     368             : 
     369             : #ifdef HAVE_PTHREAD
     370      235783 :         for (g = state->pool->glue_list; g != NULL; g = g->next) {
     371      235783 :                 if (g->ev == state->ev) {
     372      235783 :                         tctx = g->tctx;
     373      235783 :                         break;
     374             :                 }
     375             :         }
     376             : 
     377      235783 :         if (tctx == NULL) {
     378           0 :                 abort();
     379             :         }
     380             : #endif
     381             : 
     382      235783 :         if (tctx != NULL) {
     383             :                 /* with HAVE_PTHREAD */
     384      235783 :                 tevent_threaded_schedule_immediate(tctx, state->im,
     385             :                                                    pthreadpool_tevent_job_done,
     386       68360 :                                                    state);
     387             :         } else {
     388             :                 /* without HAVE_PTHREAD */
     389           0 :                 tevent_schedule_immediate(state->im, state->ev,
     390             :                                           pthreadpool_tevent_job_done,
     391       68360 :                                           state);
     392             :         }
     393             : 
     394      235782 :         return 0;
     395             : }
     396             : 
     397      235781 : static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
     398             :                                         struct tevent_immediate *im,
     399             :                                         void *private_data)
     400             : {
     401      235781 :         struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
     402             :                 private_data, struct pthreadpool_tevent_job_state);
     403             : 
     404      235781 :         if (state->pool != NULL) {
     405      235781 :                 DLIST_REMOVE(state->pool->jobs, state);
     406      235781 :                 state->pool = NULL;
     407             :         }
     408             : 
     409      235781 :         if (state->req == NULL) {
     410             :                 /*
     411             :                  * There was a talloc_free() state->req
     412             :                  * while the job was pending,
     413             :                  * which mean we're reparented on a longterm
     414             :                  * talloc context.
     415             :                  *
     416             :                  * We just cleanup here...
     417             :                  */
     418           0 :                 talloc_free(state);
     419           0 :                 return;
     420             :         }
     421             : 
     422      235781 :         tevent_req_done(state->req);
     423             : }
     424             : 
     425      235782 : int pthreadpool_tevent_job_recv(struct tevent_req *req)
     426             : {
     427      235782 :         return tevent_req_simple_recv_unix(req);
     428             : }

Generated by: LCOV version 1.14