Line data Source code
1 : /*
2 : tevent event library.
3 :
4 : Copyright (C) Jeremy Allison 2015
5 :
6 : ** NOTE! The following LGPL license applies to the tevent
7 : ** library. This does NOT imply that all of Samba is released
8 : ** under the LGPL
9 :
10 : This library is free software; you can redistribute it and/or
11 : modify it under the terms of the GNU Lesser General Public
12 : License as published by the Free Software Foundation; either
13 : version 3 of the License, or (at your option) any later version.
14 :
15 : This library is distributed in the hope that it will be useful,
16 : but WITHOUT ANY WARRANTY; without even the implied warranty of
17 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 : Lesser General Public License for more details.
19 :
20 : You should have received a copy of the GNU Lesser General Public
21 : License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 : */
23 :
24 : #include "replace.h"
25 : #include "system/filesys.h"
26 : #include "talloc.h"
27 : #include "tevent.h"
28 : #include "tevent_internal.h"
29 : #include "tevent_util.h"
30 :
31 : #ifdef HAVE_PTHREAD
32 : #include "system/threads.h"
33 :
34 : struct tevent_immediate_list {
35 : struct tevent_immediate_list *next, *prev;
36 : tevent_immediate_handler_t handler;
37 : struct tevent_immediate *im;
38 : void *private_ptr;
39 : };
40 :
41 : struct tevent_thread_proxy {
42 : pthread_mutex_t mutex;
43 : struct tevent_context *dest_ev_ctx;
44 : int read_fd;
45 : int write_fd;
46 : struct tevent_fd *pipe_read_fde;
47 : /* Pending events list. */
48 : struct tevent_immediate_list *im_list;
49 : /* Completed events list. */
50 : struct tevent_immediate_list *tofree_im_list;
51 : struct tevent_immediate *free_im;
52 : };
53 :
54 590 : static void free_im_list(struct tevent_immediate_list **pp_list_head)
55 : {
56 590 : struct tevent_immediate_list *im_entry = NULL;
57 590 : struct tevent_immediate_list *im_next = NULL;
58 :
59 1190 : for (im_entry = *pp_list_head; im_entry; im_entry = im_next) {
60 600 : im_next = im_entry->next;
61 600 : DLIST_REMOVE(*pp_list_head, im_entry);
62 600 : TALLOC_FREE(im_entry);
63 : }
64 590 : }
65 :
66 182 : static void free_list_handler(struct tevent_context *ev,
67 : struct tevent_immediate *im,
68 : void *private_ptr)
69 : {
70 182 : struct tevent_thread_proxy *tp =
71 182 : talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
72 182 : int ret;
73 :
74 182 : ret = pthread_mutex_lock(&tp->mutex);
75 182 : if (ret != 0) {
76 0 : abort();
77 : /* Notreached. */
78 : return;
79 : }
80 :
81 182 : free_im_list(&tp->tofree_im_list);
82 :
83 182 : ret = pthread_mutex_unlock(&tp->mutex);
84 182 : if (ret != 0) {
85 0 : abort();
86 : /* Notreached. */
87 : return;
88 : }
89 : }
90 :
91 386 : static void schedule_immediate_functions(struct tevent_thread_proxy *tp)
92 : {
93 386 : struct tevent_immediate_list *im_entry = NULL;
94 386 : struct tevent_immediate_list *im_next = NULL;
95 :
96 986 : for (im_entry = tp->im_list; im_entry; im_entry = im_next) {
97 600 : im_next = im_entry->next;
98 600 : DLIST_REMOVE(tp->im_list, im_entry);
99 :
100 600 : tevent_schedule_immediate(im_entry->im,
101 : tp->dest_ev_ctx,
102 : im_entry->handler,
103 600 : im_entry->private_ptr);
104 :
105 : /* Move from pending list to free list. */
106 600 : DLIST_ADD(tp->tofree_im_list, im_entry);
107 : }
108 386 : if (tp->tofree_im_list != NULL) {
109 : /*
110 : * Once the current immediate events
111 : * are processed, we need to reschedule
112 : * ourselves to free them. This works
113 : * as tevent_schedule_immediate()
114 : * always adds events to the *END* of
115 : * the immediate events list.
116 : */
117 386 : tevent_schedule_immediate(tp->free_im,
118 : tp->dest_ev_ctx,
119 : free_list_handler,
120 386 : tp);
121 : }
122 386 : }
123 :
124 386 : static void pipe_read_handler(struct tevent_context *ev,
125 : struct tevent_fd *fde,
126 : uint16_t flags,
127 : void *private_ptr)
128 : {
129 386 : struct tevent_thread_proxy *tp =
130 386 : talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
131 386 : ssize_t len = 64;
132 386 : int ret;
133 :
134 386 : ret = pthread_mutex_lock(&tp->mutex);
135 386 : if (ret != 0) {
136 0 : abort();
137 : /* Notreached. */
138 : return;
139 : }
140 :
141 : /*
142 : * Clear out all data in the pipe. We
143 : * don't really care if this returns -1.
144 : */
145 774 : while (len == 64) {
146 388 : char buf[64];
147 388 : len = read(tp->read_fd, buf, 64);
148 386 : };
149 :
150 386 : schedule_immediate_functions(tp);
151 :
152 386 : ret = pthread_mutex_unlock(&tp->mutex);
153 386 : if (ret != 0) {
154 0 : abort();
155 : /* Notreached. */
156 : return;
157 : }
158 : }
159 :
160 204 : static int tevent_thread_proxy_destructor(struct tevent_thread_proxy *tp)
161 : {
162 204 : int ret;
163 :
164 204 : ret = pthread_mutex_lock(&tp->mutex);
165 204 : if (ret != 0) {
166 0 : abort();
167 : /* Notreached. */
168 : return 0;
169 : }
170 :
171 204 : TALLOC_FREE(tp->pipe_read_fde);
172 :
173 204 : if (tp->read_fd != -1) {
174 204 : (void)close(tp->read_fd);
175 204 : tp->read_fd = -1;
176 : }
177 204 : if (tp->write_fd != -1) {
178 204 : (void)close(tp->write_fd);
179 204 : tp->write_fd = -1;
180 : }
181 :
182 : /* Hmmm. It's probably an error if we get here with
183 : any non-NULL immediate entries.. */
184 :
185 204 : free_im_list(&tp->im_list);
186 204 : free_im_list(&tp->tofree_im_list);
187 :
188 204 : TALLOC_FREE(tp->free_im);
189 :
190 204 : ret = pthread_mutex_unlock(&tp->mutex);
191 204 : if (ret != 0) {
192 0 : abort();
193 : /* Notreached. */
194 : return 0;
195 : }
196 :
197 204 : ret = pthread_mutex_destroy(&tp->mutex);
198 204 : if (ret != 0) {
199 0 : abort();
200 : /* Notreached. */
201 : return 0;
202 : }
203 :
204 204 : return 0;
205 : }
206 :
207 : /*
208 : * Create a struct that can be passed to other threads
209 : * to allow them to signal the struct tevent_context *
210 : * passed in.
211 : */
212 :
213 202 : struct tevent_thread_proxy *tevent_thread_proxy_create(
214 : struct tevent_context *dest_ev_ctx)
215 : {
216 202 : int ret;
217 202 : int pipefds[2];
218 202 : struct tevent_thread_proxy *tp;
219 :
220 202 : if (dest_ev_ctx->wrapper.glue != NULL) {
221 : /*
222 : * stacking of wrappers is not supported
223 : */
224 0 : tevent_debug(dest_ev_ctx->wrapper.glue->main_ev,
225 : TEVENT_DEBUG_FATAL,
226 : "%s() not allowed on a wrapper context\n",
227 : __func__);
228 0 : errno = EINVAL;
229 0 : return NULL;
230 : }
231 :
232 202 : tp = talloc_zero(dest_ev_ctx, struct tevent_thread_proxy);
233 204 : if (tp == NULL) {
234 0 : return NULL;
235 : }
236 :
237 204 : ret = pthread_mutex_init(&tp->mutex, NULL);
238 204 : if (ret != 0) {
239 0 : goto fail;
240 : }
241 :
242 204 : tp->dest_ev_ctx = dest_ev_ctx;
243 204 : tp->read_fd = -1;
244 204 : tp->write_fd = -1;
245 :
246 204 : talloc_set_destructor(tp, tevent_thread_proxy_destructor);
247 :
248 204 : ret = pipe(pipefds);
249 204 : if (ret == -1) {
250 0 : goto fail;
251 : }
252 :
253 204 : tp->read_fd = pipefds[0];
254 204 : tp->write_fd = pipefds[1];
255 :
256 204 : ret = ev_set_blocking(pipefds[0], false);
257 204 : if (ret != 0) {
258 0 : goto fail;
259 : }
260 204 : ret = ev_set_blocking(pipefds[1], false);
261 202 : if (ret != 0) {
262 0 : goto fail;
263 : }
264 202 : if (!ev_set_close_on_exec(pipefds[0])) {
265 0 : goto fail;
266 : }
267 204 : if (!ev_set_close_on_exec(pipefds[1])) {
268 0 : goto fail;
269 : }
270 :
271 204 : tp->pipe_read_fde = tevent_add_fd(dest_ev_ctx,
272 : tp,
273 : tp->read_fd,
274 : TEVENT_FD_READ,
275 : pipe_read_handler,
276 : tp);
277 204 : if (tp->pipe_read_fde == NULL) {
278 0 : goto fail;
279 : }
280 :
281 : /*
282 : * Create an immediate event to free
283 : * completed lists.
284 : */
285 204 : tp->free_im = tevent_create_immediate(tp);
286 204 : if (tp->free_im == NULL) {
287 0 : goto fail;
288 : }
289 :
290 0 : return tp;
291 :
292 0 : fail:
293 :
294 0 : TALLOC_FREE(tp);
295 0 : return NULL;
296 : }
297 :
298 : /*
299 : * This function schedules an immediate event to be called with argument
300 : * *pp_private in the thread context of dest_ev_ctx. Caller doesn't
301 : * wait for activation to take place, this is simply fire-and-forget.
302 : *
303 : * pp_im must be a pointer to an immediate event talloced on
304 : * a context owned by the calling thread, or the NULL context.
305 : * Ownership of *pp_im will be transferred to the tevent library.
306 : *
307 : * pp_private can be null, or contents of *pp_private must be
308 : * talloc'ed memory on a context owned by the calling thread
309 : * or the NULL context. If non-null, ownership of *pp_private will
310 : * be transferred to the tevent library.
311 : *
312 : * If you want to return a message, have the destination use the
313 : * same function call to send back to the caller.
314 : */
315 :
316 :
317 600 : void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
318 : struct tevent_immediate **pp_im,
319 : tevent_immediate_handler_t handler,
320 : void *pp_private_data)
321 : {
322 600 : struct tevent_immediate_list *im_entry;
323 600 : int ret;
324 600 : char c;
325 600 : ssize_t written;
326 :
327 600 : ret = pthread_mutex_lock(&tp->mutex);
328 600 : if (ret != 0) {
329 0 : abort();
330 : /* Notreached. */
331 : return;
332 : }
333 :
334 600 : if (tp->write_fd == -1) {
335 : /* In the process of being destroyed. Ignore. */
336 0 : goto end;
337 : }
338 :
339 : /* Create a new immediate_list entry. MUST BE ON THE NULL CONTEXT */
340 600 : im_entry = talloc_zero(NULL, struct tevent_immediate_list);
341 600 : if (im_entry == NULL) {
342 0 : goto end;
343 : }
344 :
345 600 : im_entry->handler = handler;
346 600 : im_entry->im = talloc_move(im_entry, pp_im);
347 :
348 600 : if (pp_private_data != NULL) {
349 600 : void **pptr = (void **)pp_private_data;
350 600 : im_entry->private_ptr = talloc_move(im_entry, pptr);
351 : }
352 :
353 600 : DLIST_ADD(tp->im_list, im_entry);
354 :
355 : /* And notify the dest_ev_ctx to wake up. */
356 600 : c = '\0';
357 600 : do {
358 600 : written = write(tp->write_fd, &c, 1);
359 600 : } while (written == -1 && errno == EINTR);
360 :
361 600 : end:
362 :
363 600 : ret = pthread_mutex_unlock(&tp->mutex);
364 600 : if (ret != 0) {
365 0 : abort();
366 : /* Notreached. */
367 : }
368 : }
369 : #else
370 : /* !HAVE_PTHREAD */
371 : struct tevent_thread_proxy *tevent_thread_proxy_create(
372 : struct tevent_context *dest_ev_ctx)
373 : {
374 : errno = ENOSYS;
375 : return NULL;
376 : }
377 :
378 : void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
379 : struct tevent_immediate **pp_im,
380 : tevent_immediate_handler_t handler,
381 : void *pp_private_data)
382 : {
383 : ;
384 : }
385 : #endif
386 :
387 3203 : static int tevent_threaded_context_destructor(
388 : struct tevent_threaded_context *tctx)
389 : {
390 3203 : struct tevent_context *main_ev = tevent_wrapper_main_ev(tctx->event_ctx);
391 41 : int ret;
392 :
393 3203 : if (main_ev != NULL) {
394 3080 : DLIST_REMOVE(main_ev->threaded_contexts, tctx);
395 : }
396 :
397 : /*
398 : * We have to coordinate with _tevent_threaded_schedule_immediate's
399 : * unlock of the event_ctx_mutex. We're in the main thread here,
400 : * and we can be scheduled before the helper thread finalizes its
401 : * call _tevent_threaded_schedule_immediate. This means we would
402 : * pthreadpool_destroy a locked mutex, which is illegal.
403 : */
404 3203 : ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
405 3203 : if (ret != 0) {
406 0 : abort();
407 : }
408 :
409 3203 : ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
410 3203 : if (ret != 0) {
411 0 : abort();
412 : }
413 :
414 3203 : ret = pthread_mutex_destroy(&tctx->event_ctx_mutex);
415 3203 : if (ret != 0) {
416 0 : abort();
417 : }
418 :
419 3203 : return 0;
420 : }
421 :
422 3208 : struct tevent_threaded_context *tevent_threaded_context_create(
423 : TALLOC_CTX *mem_ctx, struct tevent_context *ev)
424 : {
425 : #ifdef HAVE_PTHREAD
426 3208 : struct tevent_context *main_ev = tevent_wrapper_main_ev(ev);
427 41 : struct tevent_threaded_context *tctx;
428 41 : int ret;
429 :
430 3208 : ret = tevent_common_wakeup_init(main_ev);
431 3208 : if (ret != 0) {
432 0 : errno = ret;
433 0 : return NULL;
434 : }
435 :
436 3208 : tctx = talloc(mem_ctx, struct tevent_threaded_context);
437 3208 : if (tctx == NULL) {
438 0 : return NULL;
439 : }
440 3208 : tctx->event_ctx = ev;
441 :
442 3208 : ret = pthread_mutex_init(&tctx->event_ctx_mutex, NULL);
443 3208 : if (ret != 0) {
444 0 : TALLOC_FREE(tctx);
445 0 : return NULL;
446 : }
447 :
448 3208 : DLIST_ADD(main_ev->threaded_contexts, tctx);
449 3208 : talloc_set_destructor(tctx, tevent_threaded_context_destructor);
450 :
451 3208 : return tctx;
452 : #else
453 : errno = ENOSYS;
454 : return NULL;
455 : #endif
456 : }
457 :
458 0 : static int tevent_threaded_schedule_immediate_destructor(struct tevent_immediate *im)
459 : {
460 0 : if (im->event_ctx != NULL) {
461 0 : abort();
462 : }
463 0 : return 0;
464 : }
465 :
466 237381 : void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
467 : struct tevent_immediate *im,
468 : tevent_immediate_handler_t handler,
469 : void *private_data,
470 : const char *handler_name,
471 : const char *location)
472 : {
473 : #ifdef HAVE_PTHREAD
474 237381 : const char *create_location = im->create_location;
475 237381 : struct tevent_context *main_ev = NULL;
476 237381 : struct tevent_wrapper_glue *glue = NULL;
477 68566 : int ret, wakeup_fd;
478 :
479 237381 : ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
480 237381 : if (ret != 0) {
481 0 : abort();
482 : }
483 :
484 237381 : if (tctx->event_ctx == NULL) {
485 : /*
486 : * Our event context is already gone.
487 : */
488 0 : ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
489 0 : if (ret != 0) {
490 0 : abort();
491 : }
492 0 : return;
493 : }
494 :
495 237381 : glue = tctx->event_ctx->wrapper.glue;
496 :
497 237381 : if ((im->event_ctx != NULL) || (handler == NULL)) {
498 0 : abort();
499 : }
500 237381 : if (im->destroyed) {
501 0 : abort();
502 : }
503 237381 : if (im->busy) {
504 0 : abort();
505 : }
506 :
507 237381 : main_ev = tevent_wrapper_main_ev(tctx->event_ctx);
508 :
509 237381 : *im = (struct tevent_immediate) {
510 237381 : .event_ctx = tctx->event_ctx,
511 : .wrapper = glue,
512 : .handler = handler,
513 : .private_data = private_data,
514 : .handler_name = handler_name,
515 : .create_location = create_location,
516 : .schedule_location = location,
517 : };
518 :
519 : /*
520 : * Make sure the event won't be destroyed while
521 : * it's part of the ev->scheduled_immediates list.
522 : * _tevent_schedule_immediate() will reset the destructor
523 : * in tevent_common_threaded_activate_immediate().
524 : */
525 237381 : talloc_set_destructor(im, tevent_threaded_schedule_immediate_destructor);
526 :
527 237381 : ret = pthread_mutex_lock(&main_ev->scheduled_mutex);
528 237381 : if (ret != 0) {
529 0 : abort();
530 : }
531 :
532 237381 : DLIST_ADD_END(main_ev->scheduled_immediates, im);
533 237381 : wakeup_fd = main_ev->wakeup_fd;
534 :
535 237381 : ret = pthread_mutex_unlock(&main_ev->scheduled_mutex);
536 237381 : if (ret != 0) {
537 0 : abort();
538 : }
539 :
540 237381 : ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
541 237381 : if (ret != 0) {
542 0 : abort();
543 : }
544 :
545 : /*
546 : * We might want to wake up the main thread under the lock. We
547 : * had a slightly similar situation in pthreadpool, changed
548 : * with 1c4284c7395f23. This is not exactly the same, as the
549 : * wakeup is only a last-resort thing in case the main thread
550 : * is sleeping. Doing the wakeup under the lock can easily
551 : * lead to a contended mutex, which is much more expensive
552 : * than a noncontended one. So I'd opt for the lower footprint
553 : * initially. Maybe we have to change that later.
554 : */
555 237381 : tevent_common_wakeup_fd(wakeup_fd);
556 : #else
557 : /*
558 : * tevent_threaded_context_create() returned NULL with ENOSYS...
559 : */
560 : abort();
561 : #endif
562 : }
563 :
564 1560356 : void tevent_common_threaded_activate_immediate(struct tevent_context *ev)
565 : {
566 : #ifdef HAVE_PTHREAD
567 275430 : int ret;
568 1560356 : ret = pthread_mutex_lock(&ev->scheduled_mutex);
569 1560356 : if (ret != 0) {
570 0 : abort();
571 : }
572 :
573 1797735 : while (ev->scheduled_immediates != NULL) {
574 237379 : struct tevent_immediate *im = ev->scheduled_immediates;
575 237379 : struct tevent_immediate copy = *im;
576 :
577 237379 : DLIST_REMOVE(ev->scheduled_immediates, im);
578 :
579 237379 : TEVENT_DEBUG(ev, TEVENT_DEBUG_TRACE,
580 : "Schedule immediate event \"%s\": %p from thread into main\n",
581 : im->handler_name, im);
582 237379 : im->handler_name = NULL;
583 237379 : _tevent_schedule_immediate(im,
584 : ev,
585 : copy.handler,
586 : copy.private_data,
587 : copy.handler_name,
588 : copy.schedule_location);
589 : }
590 :
591 1560356 : ret = pthread_mutex_unlock(&ev->scheduled_mutex);
592 1560356 : if (ret != 0) {
593 0 : abort();
594 : }
595 : #else
596 : /*
597 : * tevent_threaded_context_create() returned NULL with ENOSYS...
598 : */
599 : abort();
600 : #endif
601 1560356 : }
|