LCOV - code coverage report
Current view: top level - lib/tevent - tevent_threads.c (source / functions) Hit Total Coverage
Test: coverage report for master 2f515e9b Lines: 178 230 77.4 %
Date: 2024-04-21 15:09:00 Functions: 11 12 91.7 %

          Line data    Source code
       1             : /*
       2             :    tevent event library.
       3             : 
       4             :    Copyright (C) Jeremy Allison 2015
       5             : 
       6             :      ** NOTE! The following LGPL license applies to the tevent
       7             :      ** library. This does NOT imply that all of Samba is released
       8             :      ** under the LGPL
       9             : 
      10             :    This library is free software; you can redistribute it and/or
      11             :    modify it under the terms of the GNU Lesser General Public
      12             :    License as published by the Free Software Foundation; either
      13             :    version 3 of the License, or (at your option) any later version.
      14             : 
      15             :    This library is distributed in the hope that it will be useful,
      16             :    but WITHOUT ANY WARRANTY; without even the implied warranty of
      17             :    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      18             :    Lesser General Public License for more details.
      19             : 
      20             :    You should have received a copy of the GNU Lesser General Public
      21             :    License along with this library; if not, see <http://www.gnu.org/licenses/>.
      22             : */
      23             : 
      24             : #include "replace.h"
      25             : #include "system/filesys.h"
      26             : #include "talloc.h"
      27             : #include "tevent.h"
      28             : #include "tevent_internal.h"
      29             : #include "tevent_util.h"
      30             : 
      31             : #ifdef HAVE_PTHREAD
      32             : #include "system/threads.h"
      33             : 
      34             : struct tevent_immediate_list {
      35             :         struct tevent_immediate_list *next, *prev;
      36             :         tevent_immediate_handler_t handler;
      37             :         struct tevent_immediate *im;
      38             :         void *private_ptr;
      39             : };
      40             : 
      41             : struct tevent_thread_proxy {
      42             :         pthread_mutex_t mutex;
      43             :         struct tevent_context *dest_ev_ctx;
      44             :         int read_fd;
      45             :         int write_fd;
      46             :         struct tevent_fd *pipe_read_fde;
      47             :         /* Pending events list. */
      48             :         struct tevent_immediate_list *im_list;
      49             :         /* Completed events list. */
      50             :         struct tevent_immediate_list *tofree_im_list;
      51             :         struct tevent_immediate *free_im;
      52             : };
      53             : 
      54         590 : static void free_im_list(struct tevent_immediate_list **pp_list_head)
      55             : {
      56         590 :         struct tevent_immediate_list *im_entry = NULL;
      57         590 :         struct tevent_immediate_list *im_next = NULL;
      58             : 
      59        1190 :         for (im_entry = *pp_list_head; im_entry; im_entry = im_next) {
      60         600 :                 im_next = im_entry->next;
      61         600 :                 DLIST_REMOVE(*pp_list_head, im_entry);
      62         600 :                 TALLOC_FREE(im_entry);
      63             :         }
      64         590 : }
      65             : 
      66         182 : static void free_list_handler(struct tevent_context *ev,
      67             :                                 struct tevent_immediate *im,
      68             :                                 void *private_ptr)
      69             : {
      70         182 :         struct tevent_thread_proxy *tp =
      71         182 :                 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
      72         182 :         int ret;
      73             : 
      74         182 :         ret = pthread_mutex_lock(&tp->mutex);
      75         182 :         if (ret != 0) {
      76           0 :                 abort();
      77             :                 /* Notreached. */
      78             :                 return;
      79             :         }
      80             : 
      81         182 :         free_im_list(&tp->tofree_im_list);
      82             : 
      83         182 :         ret = pthread_mutex_unlock(&tp->mutex);
      84         182 :         if (ret != 0) {
      85           0 :                 abort();
      86             :                 /* Notreached. */
      87             :                 return;
      88             :         }
      89             : }
      90             : 
      91         386 : static void schedule_immediate_functions(struct tevent_thread_proxy *tp)
      92             : {
      93         386 :         struct tevent_immediate_list *im_entry = NULL;
      94         386 :         struct tevent_immediate_list *im_next = NULL;
      95             : 
      96         986 :         for (im_entry = tp->im_list; im_entry; im_entry = im_next) {
      97         600 :                 im_next = im_entry->next;
      98         600 :                 DLIST_REMOVE(tp->im_list, im_entry);
      99             : 
     100         600 :                 tevent_schedule_immediate(im_entry->im,
     101             :                                         tp->dest_ev_ctx,
     102             :                                         im_entry->handler,
     103         600 :                                         im_entry->private_ptr);
     104             : 
     105             :                 /* Move from pending list to free list. */
     106         600 :                 DLIST_ADD(tp->tofree_im_list, im_entry);
     107             :         }
     108         386 :         if (tp->tofree_im_list != NULL) {
     109             :                 /*
     110             :                  * Once the current immediate events
     111             :                  * are processed, we need to reschedule
     112             :                  * ourselves to free them. This works
     113             :                  * as tevent_schedule_immediate()
     114             :                  * always adds events to the *END* of
     115             :                  * the immediate events list.
     116             :                  */
     117         386 :                 tevent_schedule_immediate(tp->free_im,
     118             :                                         tp->dest_ev_ctx,
     119             :                                         free_list_handler,
     120         386 :                                         tp);
     121             :         }
     122         386 : }
     123             : 
     124         386 : static void pipe_read_handler(struct tevent_context *ev,
     125             :                                 struct tevent_fd *fde,
     126             :                                 uint16_t flags,
     127             :                                 void *private_ptr)
     128             : {
     129         386 :         struct tevent_thread_proxy *tp =
     130         386 :                 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
     131         386 :         ssize_t len = 64;
     132         386 :         int ret;
     133             : 
     134         386 :         ret = pthread_mutex_lock(&tp->mutex);
     135         386 :         if (ret != 0) {
     136           0 :                 abort();
     137             :                 /* Notreached. */
     138             :                 return;
     139             :         }
     140             : 
     141             :         /*
     142             :          * Clear out all data in the pipe. We
     143             :          * don't really care if this returns -1.
     144             :          */
     145         774 :         while (len == 64) {
     146         388 :                 char buf[64];
     147         388 :                 len = read(tp->read_fd, buf, 64);
     148         386 :         };
     149             : 
     150         386 :         schedule_immediate_functions(tp);
     151             : 
     152         386 :         ret = pthread_mutex_unlock(&tp->mutex);
     153         386 :         if (ret != 0) {
     154           0 :                 abort();
     155             :                 /* Notreached. */
     156             :                 return;
     157             :         }
     158             : }
     159             : 
     160         204 : static int tevent_thread_proxy_destructor(struct tevent_thread_proxy *tp)
     161             : {
     162         204 :         int ret;
     163             : 
     164         204 :         ret = pthread_mutex_lock(&tp->mutex);
     165         204 :         if (ret != 0) {
     166           0 :                 abort();
     167             :                 /* Notreached. */
     168             :                 return 0;
     169             :         }
     170             : 
     171         204 :         TALLOC_FREE(tp->pipe_read_fde);
     172             : 
     173         204 :         if (tp->read_fd != -1) {
     174         204 :                 (void)close(tp->read_fd);
     175         204 :                 tp->read_fd = -1;
     176             :         }
     177         204 :         if (tp->write_fd != -1) {
     178         204 :                 (void)close(tp->write_fd);
     179         204 :                 tp->write_fd = -1;
     180             :         }
     181             : 
     182             :         /* Hmmm. It's probably an error if we get here with
     183             :            any non-NULL immediate entries.. */
     184             : 
     185         204 :         free_im_list(&tp->im_list);
     186         204 :         free_im_list(&tp->tofree_im_list);
     187             : 
     188         204 :         TALLOC_FREE(tp->free_im);
     189             : 
     190         204 :         ret = pthread_mutex_unlock(&tp->mutex);
     191         204 :         if (ret != 0) {
     192           0 :                 abort();
     193             :                 /* Notreached. */
     194             :                 return 0;
     195             :         }
     196             : 
     197         204 :         ret = pthread_mutex_destroy(&tp->mutex);
     198         204 :         if (ret != 0) {
     199           0 :                 abort();
     200             :                 /* Notreached. */
     201             :                 return 0;
     202             :         }
     203             : 
     204         204 :         return 0;
     205             : }
     206             : 
     207             : /*
     208             :  * Create a struct that can be passed to other threads
     209             :  * to allow them to signal the struct tevent_context *
     210             :  * passed in.
     211             :  */
     212             : 
     213         202 : struct tevent_thread_proxy *tevent_thread_proxy_create(
     214             :                 struct tevent_context *dest_ev_ctx)
     215             : {
     216         202 :         int ret;
     217         202 :         int pipefds[2];
     218         202 :         struct tevent_thread_proxy *tp;
     219             : 
     220         202 :         if (dest_ev_ctx->wrapper.glue != NULL) {
     221             :                 /*
     222             :                  * stacking of wrappers is not supported
     223             :                  */
     224           0 :                 tevent_debug(dest_ev_ctx->wrapper.glue->main_ev,
     225             :                              TEVENT_DEBUG_FATAL,
     226             :                              "%s() not allowed on a wrapper context\n",
     227             :                              __func__);
     228           0 :                 errno = EINVAL;
     229           0 :                 return NULL;
     230             :         }
     231             : 
     232         202 :         tp = talloc_zero(dest_ev_ctx, struct tevent_thread_proxy);
     233         204 :         if (tp == NULL) {
     234           0 :                 return NULL;
     235             :         }
     236             : 
     237         204 :         ret = pthread_mutex_init(&tp->mutex, NULL);
     238         204 :         if (ret != 0) {
     239           0 :                 goto fail;
     240             :         }
     241             : 
     242         204 :         tp->dest_ev_ctx = dest_ev_ctx;
     243         204 :         tp->read_fd = -1;
     244         204 :         tp->write_fd = -1;
     245             : 
     246         204 :         talloc_set_destructor(tp, tevent_thread_proxy_destructor);
     247             : 
     248         204 :         ret = pipe(pipefds);
     249         204 :         if (ret == -1) {
     250           0 :                 goto fail;
     251             :         }
     252             : 
     253         204 :         tp->read_fd = pipefds[0];
     254         204 :         tp->write_fd = pipefds[1];
     255             : 
     256         204 :         ret = ev_set_blocking(pipefds[0], false);
     257         204 :         if (ret != 0) {
     258           0 :                 goto fail;
     259             :         }
     260         204 :         ret = ev_set_blocking(pipefds[1], false);
     261         202 :         if (ret != 0) {
     262           0 :                 goto fail;
     263             :         }
     264         202 :         if (!ev_set_close_on_exec(pipefds[0])) {
     265           0 :                 goto fail;
     266             :         }
     267         204 :         if (!ev_set_close_on_exec(pipefds[1])) {
     268           0 :                 goto fail;
     269             :         }
     270             : 
     271         204 :         tp->pipe_read_fde = tevent_add_fd(dest_ev_ctx,
     272             :                                 tp,
     273             :                                 tp->read_fd,
     274             :                                 TEVENT_FD_READ,
     275             :                                 pipe_read_handler,
     276             :                                 tp);
     277         204 :         if (tp->pipe_read_fde == NULL) {
     278           0 :                 goto fail;
     279             :         }
     280             : 
     281             :         /*
     282             :          * Create an immediate event to free
     283             :          * completed lists.
     284             :          */
     285         204 :         tp->free_im = tevent_create_immediate(tp);
     286         204 :         if (tp->free_im == NULL) {
     287           0 :                 goto fail;
     288             :         }
     289             : 
     290           0 :         return tp;
     291             : 
     292           0 :   fail:
     293             : 
     294           0 :         TALLOC_FREE(tp);
     295           0 :         return NULL;
     296             : }
     297             : 
     298             : /*
     299             :  * This function schedules an immediate event to be called with argument
     300             :  * *pp_private in the thread context of dest_ev_ctx. Caller doesn't
     301             :  * wait for activation to take place, this is simply fire-and-forget.
     302             :  *
     303             :  * pp_im must be a pointer to an immediate event talloced on
     304             :  * a context owned by the calling thread, or the NULL context.
     305             :  * Ownership of *pp_im will be transferred to the tevent library.
     306             :  *
     307             :  * pp_private can be null, or contents of *pp_private must be
     308             :  * talloc'ed memory on a context owned by the calling thread
     309             :  * or the NULL context. If non-null, ownership of *pp_private will
     310             :  * be transferred to the tevent library.
     311             :  *
     312             :  * If you want to return a message, have the destination use the
     313             :  * same function call to send back to the caller.
     314             :  */
     315             : 
     316             : 
     317         600 : void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
     318             :                                   struct tevent_immediate **pp_im,
     319             :                                   tevent_immediate_handler_t handler,
     320             :                                   void *pp_private_data)
     321             : {
     322         600 :         struct tevent_immediate_list *im_entry;
     323         600 :         int ret;
     324         600 :         char c;
     325         600 :         ssize_t written;
     326             : 
     327         600 :         ret = pthread_mutex_lock(&tp->mutex);
     328         600 :         if (ret != 0) {
     329           0 :                 abort();
     330             :                 /* Notreached. */
     331             :                 return;
     332             :         }
     333             : 
     334         600 :         if (tp->write_fd == -1) {
     335             :                 /* In the process of being destroyed. Ignore. */
     336           0 :                 goto end;
     337             :         }
     338             : 
     339             :         /* Create a new immediate_list entry. MUST BE ON THE NULL CONTEXT */
     340         600 :         im_entry = talloc_zero(NULL, struct tevent_immediate_list);
     341         600 :         if (im_entry == NULL) {
     342           0 :                 goto end;
     343             :         }
     344             : 
     345         600 :         im_entry->handler = handler;
     346         600 :         im_entry->im = talloc_move(im_entry, pp_im);
     347             : 
     348         600 :         if (pp_private_data != NULL) {
     349         600 :                 void **pptr = (void **)pp_private_data;
     350         600 :                 im_entry->private_ptr = talloc_move(im_entry, pptr);
     351             :         }
     352             : 
     353         600 :         DLIST_ADD(tp->im_list, im_entry);
     354             : 
     355             :         /* And notify the dest_ev_ctx to wake up. */
     356         600 :         c = '\0';
     357         600 :         do {
     358         600 :                 written = write(tp->write_fd, &c, 1);
     359         600 :         } while (written == -1 && errno == EINTR);
     360             : 
     361         600 :   end:
     362             : 
     363         600 :         ret = pthread_mutex_unlock(&tp->mutex);
     364         600 :         if (ret != 0) {
     365           0 :                 abort();
     366             :                 /* Notreached. */
     367             :         }
     368             : }
     369             : #else
     370             : /* !HAVE_PTHREAD */
     371             : struct tevent_thread_proxy *tevent_thread_proxy_create(
     372             :                 struct tevent_context *dest_ev_ctx)
     373             : {
     374             :         errno = ENOSYS;
     375             :         return NULL;
     376             : }
     377             : 
     378             : void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
     379             :                                   struct tevent_immediate **pp_im,
     380             :                                   tevent_immediate_handler_t handler,
     381             :                                   void *pp_private_data)
     382             : {
     383             :         ;
     384             : }
     385             : #endif
     386             : 
     387        3203 : static int tevent_threaded_context_destructor(
     388             :         struct tevent_threaded_context *tctx)
     389             : {
     390        3203 :         struct tevent_context *main_ev = tevent_wrapper_main_ev(tctx->event_ctx);
     391          41 :         int ret;
     392             : 
     393        3203 :         if (main_ev != NULL) {
     394        3080 :                 DLIST_REMOVE(main_ev->threaded_contexts, tctx);
     395             :         }
     396             : 
     397             :         /*
     398             :          * We have to coordinate with _tevent_threaded_schedule_immediate's
     399             :          * unlock of the event_ctx_mutex. We're in the main thread here,
     400             :          * and we can be scheduled before the helper thread finalizes its
     401             :          * call _tevent_threaded_schedule_immediate. This means we would
     402             :          * pthreadpool_destroy a locked mutex, which is illegal.
     403             :          */
     404        3203 :         ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
     405        3203 :         if (ret != 0) {
     406           0 :                 abort();
     407             :         }
     408             : 
     409        3203 :         ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
     410        3203 :         if (ret != 0) {
     411           0 :                 abort();
     412             :         }
     413             : 
     414        3203 :         ret = pthread_mutex_destroy(&tctx->event_ctx_mutex);
     415        3203 :         if (ret != 0) {
     416           0 :                 abort();
     417             :         }
     418             : 
     419        3203 :         return 0;
     420             : }
     421             : 
     422        3208 : struct tevent_threaded_context *tevent_threaded_context_create(
     423             :         TALLOC_CTX *mem_ctx, struct tevent_context *ev)
     424             : {
     425             : #ifdef HAVE_PTHREAD
     426        3208 :         struct tevent_context *main_ev = tevent_wrapper_main_ev(ev);
     427          41 :         struct tevent_threaded_context *tctx;
     428          41 :         int ret;
     429             : 
     430        3208 :         ret = tevent_common_wakeup_init(main_ev);
     431        3208 :         if (ret != 0) {
     432           0 :                 errno = ret;
     433           0 :                 return NULL;
     434             :         }
     435             : 
     436        3208 :         tctx = talloc(mem_ctx, struct tevent_threaded_context);
     437        3208 :         if (tctx == NULL) {
     438           0 :                 return NULL;
     439             :         }
     440        3208 :         tctx->event_ctx = ev;
     441             : 
     442        3208 :         ret = pthread_mutex_init(&tctx->event_ctx_mutex, NULL);
     443        3208 :         if (ret != 0) {
     444           0 :                 TALLOC_FREE(tctx);
     445           0 :                 return NULL;
     446             :         }
     447             : 
     448        3208 :         DLIST_ADD(main_ev->threaded_contexts, tctx);
     449        3208 :         talloc_set_destructor(tctx, tevent_threaded_context_destructor);
     450             : 
     451        3208 :         return tctx;
     452             : #else
     453             :         errno = ENOSYS;
     454             :         return NULL;
     455             : #endif
     456             : }
     457             : 
     458           0 : static int tevent_threaded_schedule_immediate_destructor(struct tevent_immediate *im)
     459             : {
     460           0 :         if (im->event_ctx != NULL) {
     461           0 :                 abort();
     462             :         }
     463           0 :         return 0;
     464             : }
     465             : 
     466      237381 : void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
     467             :                                          struct tevent_immediate *im,
     468             :                                          tevent_immediate_handler_t handler,
     469             :                                          void *private_data,
     470             :                                          const char *handler_name,
     471             :                                          const char *location)
     472             : {
     473             : #ifdef HAVE_PTHREAD
     474      237381 :         const char *create_location = im->create_location;
     475      237381 :         struct tevent_context *main_ev = NULL;
     476      237381 :         struct tevent_wrapper_glue *glue = NULL;
     477       68566 :         int ret, wakeup_fd;
     478             : 
     479      237381 :         ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
     480      237381 :         if (ret != 0) {
     481           0 :                 abort();
     482             :         }
     483             : 
     484      237381 :         if (tctx->event_ctx == NULL) {
     485             :                 /*
     486             :                  * Our event context is already gone.
     487             :                  */
     488           0 :                 ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
     489           0 :                 if (ret != 0) {
     490           0 :                         abort();
     491             :                 }
     492           0 :                 return;
     493             :         }
     494             : 
     495      237381 :         glue = tctx->event_ctx->wrapper.glue;
     496             : 
     497      237381 :         if ((im->event_ctx != NULL) || (handler == NULL)) {
     498           0 :                 abort();
     499             :         }
     500      237381 :         if (im->destroyed) {
     501           0 :                 abort();
     502             :         }
     503      237381 :         if (im->busy) {
     504           0 :                 abort();
     505             :         }
     506             : 
     507      237381 :         main_ev = tevent_wrapper_main_ev(tctx->event_ctx);
     508             : 
     509      237381 :         *im = (struct tevent_immediate) {
     510      237381 :                 .event_ctx              = tctx->event_ctx,
     511             :                 .wrapper                = glue,
     512             :                 .handler                = handler,
     513             :                 .private_data           = private_data,
     514             :                 .handler_name           = handler_name,
     515             :                 .create_location        = create_location,
     516             :                 .schedule_location      = location,
     517             :         };
     518             : 
     519             :         /*
     520             :          * Make sure the event won't be destroyed while
     521             :          * it's part of the ev->scheduled_immediates list.
     522             :          * _tevent_schedule_immediate() will reset the destructor
     523             :          * in tevent_common_threaded_activate_immediate().
     524             :          */
     525      237381 :         talloc_set_destructor(im, tevent_threaded_schedule_immediate_destructor);
     526             : 
     527      237381 :         ret = pthread_mutex_lock(&main_ev->scheduled_mutex);
     528      237381 :         if (ret != 0) {
     529           0 :                 abort();
     530             :         }
     531             : 
     532      237381 :         DLIST_ADD_END(main_ev->scheduled_immediates, im);
     533      237381 :         wakeup_fd = main_ev->wakeup_fd;
     534             : 
     535      237381 :         ret = pthread_mutex_unlock(&main_ev->scheduled_mutex);
     536      237381 :         if (ret != 0) {
     537           0 :                 abort();
     538             :         }
     539             : 
     540      237381 :         ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
     541      237381 :         if (ret != 0) {
     542           0 :                 abort();
     543             :         }
     544             : 
     545             :         /*
     546             :          * We might want to wake up the main thread under the lock. We
     547             :          * had a slightly similar situation in pthreadpool, changed
     548             :          * with 1c4284c7395f23. This is not exactly the same, as the
     549             :          * wakeup is only a last-resort thing in case the main thread
     550             :          * is sleeping. Doing the wakeup under the lock can easily
     551             :          * lead to a contended mutex, which is much more expensive
     552             :          * than a noncontended one. So I'd opt for the lower footprint
     553             :          * initially. Maybe we have to change that later.
     554             :          */
     555      237381 :         tevent_common_wakeup_fd(wakeup_fd);
     556             : #else
     557             :         /*
     558             :          * tevent_threaded_context_create() returned NULL with ENOSYS...
     559             :          */
     560             :         abort();
     561             : #endif
     562             : }
     563             : 
     564     1560356 : void tevent_common_threaded_activate_immediate(struct tevent_context *ev)
     565             : {
     566             : #ifdef HAVE_PTHREAD
     567      275430 :         int ret;
     568     1560356 :         ret = pthread_mutex_lock(&ev->scheduled_mutex);
     569     1560356 :         if (ret != 0) {
     570           0 :                 abort();
     571             :         }
     572             : 
     573     1797735 :         while (ev->scheduled_immediates != NULL) {
     574      237379 :                 struct tevent_immediate *im = ev->scheduled_immediates;
     575      237379 :                 struct tevent_immediate copy = *im;
     576             : 
     577      237379 :                 DLIST_REMOVE(ev->scheduled_immediates, im);
     578             : 
     579      237379 :                 TEVENT_DEBUG(ev, TEVENT_DEBUG_TRACE,
     580             :                              "Schedule immediate event \"%s\": %p from thread into main\n",
     581             :                              im->handler_name, im);
     582      237379 :                 im->handler_name = NULL;
     583      237379 :                 _tevent_schedule_immediate(im,
     584             :                                            ev,
     585             :                                            copy.handler,
     586             :                                            copy.private_data,
     587             :                                            copy.handler_name,
     588             :                                            copy.schedule_location);
     589             :         }
     590             : 
     591     1560356 :         ret = pthread_mutex_unlock(&ev->scheduled_mutex);
     592     1560356 :         if (ret != 0) {
     593           0 :                 abort();
     594             :         }
     595             : #else
     596             :         /*
     597             :          * tevent_threaded_context_create() returned NULL with ENOSYS...
     598             :          */
     599             :         abort();
     600             : #endif
     601     1560356 : }

Generated by: LCOV version 1.14