Line data Source code
1 : /*
2 : Unix SMB/CIFS implementation.
3 : Infrastructure for async requests
4 : Copyright (C) Volker Lendecke 2008
5 : Copyright (C) Stefan Metzmacher 2009
6 :
7 : ** NOTE! The following LGPL license applies to the tevent
8 : ** library. This does NOT imply that all of Samba is released
9 : ** under the LGPL
10 :
11 : This library is free software; you can redistribute it and/or
12 : modify it under the terms of the GNU Lesser General Public
13 : License as published by the Free Software Foundation; either
14 : version 3 of the License, or (at your option) any later version.
15 :
16 : This library is distributed in the hope that it will be useful,
17 : but WITHOUT ANY WARRANTY; without even the implied warranty of
18 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 : Lesser General Public License for more details.
20 :
21 : You should have received a copy of the GNU Lesser General Public
22 : License along with this library; if not, see <http://www.gnu.org/licenses/>.
23 : */
24 :
25 : #include "replace.h"
26 : #include "tevent.h"
27 : #include "tevent_internal.h"
28 : #include "tevent_util.h"
29 :
30 : #undef tevent_queue_add
31 : #undef tevent_queue_add_entry
32 : #undef tevent_queue_add_optimize_empty
33 :
34 : struct tevent_queue_entry {
35 : struct tevent_queue_entry *prev, *next;
36 : struct tevent_queue *queue;
37 :
38 : bool triggered;
39 :
40 : struct tevent_req *req;
41 : struct tevent_context *ev;
42 :
43 : tevent_queue_trigger_fn_t trigger;
44 : const char *trigger_name;
45 : void *private_data;
46 : uint64_t tag;
47 : };
48 :
49 : struct tevent_queue {
50 : const char *name;
51 : const char *location;
52 :
53 : bool running;
54 : struct tevent_immediate *immediate;
55 :
56 : size_t length;
57 : struct tevent_queue_entry *list;
58 : };
59 :
60 : static void tevent_queue_immediate_trigger(struct tevent_context *ev,
61 : struct tevent_immediate *im,
62 : void *private_data);
63 :
64 13111846 : static int tevent_queue_entry_destructor(struct tevent_queue_entry *e)
65 : {
66 13111846 : struct tevent_queue *q = e->queue;
67 :
68 13111846 : if (!q) {
69 0 : return 0;
70 : }
71 :
72 13111846 : tevent_trace_queue_callback(q->list->ev, e, TEVENT_EVENT_TRACE_DETACH);
73 13244862 : tevent_thread_call_depth_notify(TEVENT_CALL_FLOW_REQ_QUEUE_LEAVE,
74 12978830 : q->list->req,
75 13111846 : q->list->req->internal.call_depth,
76 : e->trigger_name);
77 13111846 : DLIST_REMOVE(q->list, e);
78 13111846 : q->length--;
79 :
80 13111846 : if (!q->running) {
81 2622 : return 0;
82 : }
83 :
84 13109197 : if (!q->list) {
85 11451297 : return 0;
86 : }
87 :
88 1602428 : if (q->list->triggered) {
89 454 : return 0;
90 : }
91 :
92 1599922 : tevent_schedule_immediate(q->immediate,
93 : q->list->ev,
94 : tevent_queue_immediate_trigger,
95 75465 : q);
96 :
97 1599922 : return 0;
98 : }
99 :
100 2334771 : static int tevent_queue_destructor(struct tevent_queue *q)
101 : {
102 2334771 : q->running = false;
103 :
104 2334773 : while (q->list) {
105 2 : struct tevent_queue_entry *e = q->list;
106 2 : talloc_free(e);
107 : }
108 :
109 2334771 : return 0;
110 : }
111 :
112 2342451 : struct tevent_queue *_tevent_queue_create(TALLOC_CTX *mem_ctx,
113 : const char *name,
114 : const char *location)
115 : {
116 15477 : struct tevent_queue *queue;
117 :
118 2342451 : queue = talloc_zero(mem_ctx, struct tevent_queue);
119 2342451 : if (!queue) {
120 0 : return NULL;
121 : }
122 :
123 2342451 : queue->name = talloc_strdup(queue, name);
124 2342451 : if (!queue->name) {
125 0 : talloc_free(queue);
126 0 : return NULL;
127 : }
128 2342451 : queue->immediate = tevent_create_immediate(queue);
129 2342451 : if (!queue->immediate) {
130 0 : talloc_free(queue);
131 0 : return NULL;
132 : }
133 :
134 2342451 : queue->location = location;
135 :
136 : /* queue is running by default */
137 2342451 : queue->running = true;
138 :
139 2342451 : talloc_set_destructor(queue, tevent_queue_destructor);
140 2342451 : return queue;
141 : }
142 :
143 2585947 : static void tevent_queue_immediate_trigger(struct tevent_context *ev,
144 : struct tevent_immediate *im,
145 : void *private_data)
146 : {
147 83346 : struct tevent_queue *q =
148 2585947 : talloc_get_type_abort(private_data,
149 : struct tevent_queue);
150 :
151 2585947 : if (!q->running) {
152 0 : return;
153 : }
154 :
155 2585947 : if (!q->list) {
156 0 : return;
157 : }
158 :
159 2585947 : tevent_trace_queue_callback(ev, q->list,
160 : TEVENT_EVENT_TRACE_BEFORE_HANDLER);
161 : /* Set the call depth of the request coming from the queue. */
162 2669293 : tevent_thread_call_depth_notify(TEVENT_CALL_FLOW_REQ_QUEUE_TRIGGER,
163 2502601 : q->list->req,
164 2585947 : q->list->req->internal.call_depth,
165 2585947 : q->list->trigger_name);
166 2585947 : q->list->triggered = true;
167 2585947 : q->list->trigger(q->list->req, q->list->private_data);
168 : }
169 :
170 1 : static void tevent_queue_noop_trigger(struct tevent_req *req,
171 : void *_private_data)
172 : {
173 : /* this is doing nothing but blocking the queue */
174 1 : }
175 :
176 13111849 : static struct tevent_queue_entry *tevent_queue_add_internal(
177 : struct tevent_queue *queue,
178 : struct tevent_context *ev,
179 : struct tevent_req *req,
180 : tevent_queue_trigger_fn_t trigger,
181 : const char *trigger_name,
182 : void *private_data,
183 : bool allow_direct)
184 : {
185 133016 : struct tevent_queue_entry *e;
186 :
187 13111849 : e = talloc_zero(req, struct tevent_queue_entry);
188 13111849 : if (e == NULL) {
189 0 : return NULL;
190 : }
191 :
192 : /*
193 : * if there is no trigger, it is just a blocker
194 : */
195 13111849 : if (trigger == NULL) {
196 1 : trigger = tevent_queue_noop_trigger;
197 : }
198 :
199 13111849 : e->queue = queue;
200 13111849 : e->req = req;
201 13111849 : e->ev = ev;
202 13111849 : e->trigger = trigger;
203 13111849 : e->trigger_name = trigger_name;
204 13111849 : e->private_data = private_data;
205 :
206 13111849 : if (queue->length > 0) {
207 : /*
208 : * if there are already entries in the
209 : * queue do not optimize.
210 : */
211 1602444 : allow_direct = false;
212 : }
213 :
214 13111849 : if (req->async.fn != NULL) {
215 : /*
216 : * If the caller wants to optimize for the
217 : * empty queue case, call the trigger only
218 : * if there is no callback defined for the
219 : * request yet.
220 : */
221 0 : allow_direct = false;
222 : }
223 :
224 13111849 : DLIST_ADD_END(queue->list, e);
225 13111849 : queue->length++;
226 13111849 : talloc_set_destructor(e, tevent_queue_entry_destructor);
227 13111849 : tevent_trace_queue_callback(ev, e, TEVENT_EVENT_TRACE_ATTACH);
228 13111849 : tevent_thread_call_depth_notify(TEVENT_CALL_FLOW_REQ_QUEUE_ENTER,
229 : req,
230 : req->internal.call_depth,
231 : e->trigger_name);
232 :
233 13111849 : if (!queue->running) {
234 24 : return e;
235 : }
236 :
237 13111819 : if (queue->list->triggered) {
238 1279916 : return e;
239 : }
240 :
241 : /*
242 : * If allowed we directly call the trigger
243 : * avoiding possible delays caused by
244 : * an immediate event.
245 : */
246 11766372 : if (allow_direct) {
247 10523279 : tevent_trace_queue_callback(ev,
248 : queue->list,
249 : TEVENT_EVENT_TRACE_BEFORE_HANDLER);
250 10523279 : queue->list->triggered = true;
251 10523279 : queue->list->trigger(queue->list->req,
252 10475671 : queue->list->private_data);
253 10523279 : return e;
254 : }
255 :
256 1243093 : tevent_schedule_immediate(queue->immediate,
257 : queue->list->ev,
258 : tevent_queue_immediate_trigger,
259 19871 : queue);
260 :
261 1243093 : return e;
262 : }
263 :
264 0 : bool tevent_queue_add(struct tevent_queue *queue,
265 : struct tevent_context *ev,
266 : struct tevent_req *req,
267 : tevent_queue_trigger_fn_t trigger,
268 : void *private_data)
269 : {
270 0 : return _tevent_queue_add(queue, ev, req, trigger, NULL, private_data);
271 : }
272 :
273 1060046 : bool _tevent_queue_add(struct tevent_queue *queue,
274 : struct tevent_context *ev,
275 : struct tevent_req *req,
276 : tevent_queue_trigger_fn_t trigger,
277 : const char* trigger_name,
278 : void *private_data)
279 : {
280 7908 : struct tevent_queue_entry *e;
281 :
282 1060046 : e = tevent_queue_add_internal(queue, ev, req,
283 : trigger, trigger_name,
284 : private_data, false);
285 1060046 : if (e == NULL) {
286 0 : return false;
287 : }
288 :
289 1052138 : return true;
290 : }
291 :
292 0 : struct tevent_queue_entry *tevent_queue_add_entry(
293 : struct tevent_queue *queue,
294 : struct tevent_context *ev,
295 : struct tevent_req *req,
296 : tevent_queue_trigger_fn_t trigger,
297 : void *private_data)
298 : {
299 0 : return _tevent_queue_add_entry(queue, ev, req,
300 : trigger, NULL,
301 : private_data);
302 : }
303 :
304 95049 : struct tevent_queue_entry *_tevent_queue_add_entry(
305 : struct tevent_queue *queue,
306 : struct tevent_context *ev,
307 : struct tevent_req *req,
308 : tevent_queue_trigger_fn_t trigger,
309 : const char* trigger_name,
310 : void *private_data)
311 : {
312 95049 : return tevent_queue_add_internal(queue, ev, req,
313 : trigger, trigger_name,
314 : private_data, false);
315 : }
316 :
317 0 : struct tevent_queue_entry *tevent_queue_add_optimize_empty(
318 : struct tevent_queue *queue,
319 : struct tevent_context *ev,
320 : struct tevent_req *req,
321 : tevent_queue_trigger_fn_t trigger,
322 : void *private_data)
323 : {
324 0 : return _tevent_queue_add_optimize_empty(queue, ev, req,
325 : trigger, NULL,
326 : private_data);
327 : }
328 :
329 11956754 : struct tevent_queue_entry *_tevent_queue_add_optimize_empty(
330 : struct tevent_queue *queue,
331 : struct tevent_context *ev,
332 : struct tevent_req *req,
333 : tevent_queue_trigger_fn_t trigger,
334 : const char* trigger_name,
335 : void *private_data)
336 : {
337 11956754 : return tevent_queue_add_internal(queue, ev, req,
338 : trigger, trigger_name,
339 : private_data, true);
340 : }
341 :
342 11 : void tevent_queue_entry_untrigger(struct tevent_queue_entry *entry)
343 : {
344 11 : if (entry->queue->running) {
345 0 : abort();
346 : }
347 :
348 11 : if (entry->queue->list != entry) {
349 0 : abort();
350 : }
351 :
352 11 : entry->triggered = false;
353 11 : }
354 :
355 68247 : void tevent_queue_start(struct tevent_queue *queue)
356 : {
357 68247 : if (queue->running) {
358 : /* already started */
359 68236 : return;
360 : }
361 :
362 11 : queue->running = true;
363 :
364 11 : if (!queue->list) {
365 0 : return;
366 : }
367 :
368 11 : if (queue->list->triggered) {
369 0 : return;
370 : }
371 :
372 11 : tevent_schedule_immediate(queue->immediate,
373 : queue->list->ev,
374 : tevent_queue_immediate_trigger,
375 0 : queue);
376 : }
377 :
378 119976 : void tevent_queue_stop(struct tevent_queue *queue)
379 : {
380 119976 : queue->running = false;
381 119976 : }
382 :
383 75611814 : size_t tevent_queue_length(struct tevent_queue *queue)
384 : {
385 75611814 : return queue->length;
386 : }
387 :
388 0 : bool tevent_queue_running(struct tevent_queue *queue)
389 : {
390 0 : return queue->running;
391 : }
392 :
393 : struct tevent_queue_wait_state {
394 : uint8_t dummy;
395 : };
396 :
397 : static void tevent_queue_wait_trigger(struct tevent_req *req,
398 : void *private_data);
399 :
400 421452 : struct tevent_req *tevent_queue_wait_send(TALLOC_CTX *mem_ctx,
401 : struct tevent_context *ev,
402 : struct tevent_queue *queue)
403 : {
404 7199 : struct tevent_req *req;
405 7199 : struct tevent_queue_wait_state *state;
406 7199 : bool ok;
407 :
408 421452 : req = tevent_req_create(mem_ctx, &state,
409 : struct tevent_queue_wait_state);
410 421452 : if (req == NULL) {
411 0 : return NULL;
412 : }
413 :
414 421452 : ok = _tevent_queue_add(queue, ev, req,
415 : tevent_queue_wait_trigger,
416 : "tevent_queue_wait_trigger",
417 : NULL);
418 421452 : if (!ok) {
419 0 : tevent_req_oom(req);
420 0 : return tevent_req_post(req, ev);
421 : }
422 :
423 414253 : return req;
424 : }
425 :
426 421341 : static void tevent_queue_wait_trigger(struct tevent_req *req,
427 : void *private_data)
428 : {
429 421341 : tevent_req_done(req);
430 421341 : }
431 :
432 421239 : bool tevent_queue_wait_recv(struct tevent_req *req)
433 : {
434 7192 : enum tevent_req_state state;
435 7192 : uint64_t err;
436 :
437 421239 : if (tevent_req_is_error(req, &state, &err)) {
438 0 : tevent_req_received(req);
439 0 : return false;
440 : }
441 :
442 421239 : tevent_req_received(req);
443 421239 : return true;
444 : }
445 :
446 410158 : void tevent_queue_entry_set_tag(struct tevent_queue_entry *qe, uint64_t tag)
447 : {
448 410158 : if (qe == NULL) {
449 0 : return;
450 : }
451 :
452 410158 : qe->tag = tag;
453 : }
454 :
455 410170 : uint64_t tevent_queue_entry_get_tag(const struct tevent_queue_entry *qe)
456 : {
457 410170 : if (qe == NULL) {
458 1 : return 0;
459 : }
460 :
461 410169 : return qe->tag;
462 : }
|