Line data Source code
1 : #include <stdio.h>
2 : #include <string.h>
3 : #include <poll.h>
4 : #include <errno.h>
5 : #include <stdlib.h>
6 : #include <limits.h>
7 : #include <pthread.h>
8 : #include <unistd.h>
9 : #include <sys/types.h>
10 : #include <sys/wait.h>
11 : #include <signal.h>
12 : #include "pthreadpool_pipe.h"
13 : #include "pthreadpool_tevent.h"
14 :
15 1 : static int test_init(void)
16 : {
17 1 : struct pthreadpool_pipe *p;
18 1 : int ret;
19 :
20 1 : ret = pthreadpool_pipe_init(1, &p);
21 1 : if (ret != 0) {
22 0 : fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
23 : strerror(ret));
24 0 : return -1;
25 : }
26 1 : ret = pthreadpool_pipe_destroy(p);
27 1 : if (ret != 0) {
28 0 : fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
29 : strerror(ret));
30 0 : return -1;
31 : }
32 : return 0;
33 : }
34 :
35 10001 : static void test_sleep(void *ptr)
36 : {
37 10001 : int *ptimeout = (int *)ptr;
38 10001 : int ret;
39 10001 : ret = poll(NULL, 0, *ptimeout);
40 9956 : if (ret != 0) {
41 0 : fprintf(stderr, "poll returned %d (%s)\n",
42 0 : ret, strerror(errno));
43 : }
44 9956 : }
45 :
46 1 : static int test_jobs(int num_threads, int num_jobs)
47 : {
48 1 : char *finished;
49 1 : struct pthreadpool_pipe *p;
50 1 : int timeout = 1;
51 1 : int i, ret;
52 :
53 1 : finished = (char *)calloc(1, num_jobs);
54 1 : if (finished == NULL) {
55 0 : fprintf(stderr, "calloc failed\n");
56 0 : return -1;
57 : }
58 :
59 1 : ret = pthreadpool_pipe_init(num_threads, &p);
60 1 : if (ret != 0) {
61 0 : fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
62 : strerror(ret));
63 0 : free(finished);
64 0 : return -1;
65 : }
66 :
67 10001 : for (i=0; i<num_jobs; i++) {
68 10000 : ret = pthreadpool_pipe_add_job(p, i, test_sleep, &timeout);
69 10000 : if (ret != 0) {
70 0 : fprintf(stderr, "pthreadpool_pipe_add_job failed: "
71 : "%s\n", strerror(ret));
72 0 : free(finished);
73 0 : return -1;
74 : }
75 : }
76 :
77 10001 : for (i=0; i<num_jobs; i++) {
78 10000 : int jobid = -1;
79 10000 : ret = pthreadpool_pipe_finished_jobs(p, &jobid, 1);
80 10000 : if (ret < 0) {
81 0 : fprintf(stderr, "pthreadpool_pipe_finished_jobs "
82 : "failed: %s\n", strerror(-ret));
83 0 : free(finished);
84 0 : return -1;
85 : }
86 10000 : if ((ret != 1) || (jobid >= num_jobs)) {
87 0 : fprintf(stderr, "invalid job number %d\n", jobid);
88 0 : free(finished);
89 0 : return -1;
90 : }
91 10000 : finished[jobid] += 1;
92 : }
93 :
94 10001 : for (i=0; i<num_jobs; i++) {
95 10000 : if (finished[i] != 1) {
96 0 : fprintf(stderr, "finished[%d] = %d\n",
97 : i, finished[i]);
98 0 : free(finished);
99 0 : return -1;
100 : }
101 : }
102 :
103 1 : ret = pthreadpool_pipe_destroy(p);
104 1 : if (ret != 0) {
105 0 : fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
106 : strerror(ret));
107 0 : free(finished);
108 0 : return -1;
109 : }
110 :
111 1 : free(finished);
112 1 : return 0;
113 : }
114 :
115 1 : static int test_busydestroy(void)
116 : {
117 1 : struct pthreadpool_pipe *p;
118 1 : int timeout = 50;
119 1 : struct pollfd pfd;
120 1 : int ret, jobid;
121 :
122 1 : ret = pthreadpool_pipe_init(1, &p);
123 1 : if (ret != 0) {
124 0 : fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
125 : strerror(ret));
126 0 : return -1;
127 : }
128 1 : ret = pthreadpool_pipe_add_job(p, 1, test_sleep, &timeout);
129 1 : if (ret != 0) {
130 0 : fprintf(stderr, "pthreadpool_pipe_add_job failed: %s\n",
131 : strerror(ret));
132 0 : return -1;
133 : }
134 1 : ret = pthreadpool_pipe_destroy(p);
135 1 : if (ret != EBUSY) {
136 0 : fprintf(stderr, "Could destroy a busy pool\n");
137 0 : return -1;
138 : }
139 :
140 1 : pfd.fd = pthreadpool_pipe_signal_fd(p);
141 1 : pfd.events = POLLIN|POLLERR;
142 :
143 1 : do {
144 1 : ret = poll(&pfd, 1, -1);
145 1 : } while ((ret == -1) && (errno == EINTR));
146 :
147 1 : ret = pthreadpool_pipe_finished_jobs(p, &jobid, 1);
148 1 : if (ret < 0) {
149 0 : fprintf(stderr, "pthreadpool_pipe_finished_jobs failed: %s\n",
150 : strerror(-ret));
151 0 : return -1;
152 : }
153 :
154 1 : ret = pthreadpool_pipe_destroy(p);
155 1 : if (ret != 0) {
156 0 : fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
157 : strerror(ret));
158 0 : return -1;
159 : }
160 : return 0;
161 : }
162 :
163 1 : static int test_fork(void)
164 : {
165 1 : struct pthreadpool_pipe *p;
166 1 : pid_t child, waited;
167 1 : int status, ret;
168 :
169 1 : ret = pthreadpool_pipe_init(1, &p);
170 1 : if (ret != 0) {
171 0 : fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
172 : strerror(ret));
173 0 : return -1;
174 : }
175 1 : ret = pthreadpool_pipe_destroy(p);
176 1 : if (ret != 0) {
177 0 : fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
178 : strerror(ret));
179 0 : return -1;
180 : }
181 :
182 1 : child = fork();
183 2 : if (child < 0) {
184 0 : perror("fork failed");
185 0 : return -1;
186 : }
187 2 : if (child == 0) {
188 1 : exit(0);
189 : }
190 1 : waited = wait(&status);
191 1 : if (waited == -1) {
192 0 : perror("wait failed");
193 0 : return -1;
194 : }
195 1 : if (waited != child) {
196 0 : fprintf(stderr, "expected child %d, got %d\n",
197 : (int)child, (int)waited);
198 0 : return -1;
199 : }
200 : return 0;
201 : }
202 :
203 3 : static void busyfork_job(void *private_data)
204 : {
205 3 : return;
206 : }
207 :
208 1 : static int test_busyfork(void)
209 : {
210 1 : struct pthreadpool_pipe *p;
211 1 : int fds[2];
212 1 : struct pollfd pfd;
213 1 : pid_t child, waitret;
214 1 : int ret, jobnum, wstatus;
215 :
216 1 : ret = pipe(fds);
217 1 : if (ret == -1) {
218 0 : perror("pipe failed");
219 0 : return -1;
220 : }
221 :
222 1 : ret = pthreadpool_pipe_init(1, &p);
223 1 : if (ret != 0) {
224 0 : fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
225 : strerror(ret));
226 0 : return -1;
227 : }
228 :
229 1 : ret = pthreadpool_pipe_add_job(p, 1, busyfork_job, NULL);
230 1 : if (ret != 0) {
231 0 : fprintf(stderr, "pthreadpool_add_job failed: %s\n",
232 : strerror(ret));
233 0 : return -1;
234 : }
235 :
236 1 : ret = pthreadpool_pipe_finished_jobs(p, &jobnum, 1);
237 1 : if (ret != 1) {
238 0 : fprintf(stderr, "pthreadpool_pipe_finished_jobs failed\n");
239 0 : return -1;
240 : }
241 :
242 1 : ret = poll(NULL, 0, 200);
243 1 : if (ret == -1) {
244 0 : perror("poll failed");
245 0 : return -1;
246 : }
247 :
248 1 : child = fork();
249 2 : if (child < 0) {
250 0 : perror("fork failed");
251 0 : return -1;
252 : }
253 :
254 2 : if (child == 0) {
255 1 : ret = pthreadpool_pipe_destroy(p);
256 1 : if (ret != 0) {
257 0 : fprintf(stderr, "pthreadpool_pipe_destroy failed: "
258 : "%s\n", strerror(ret));
259 0 : exit(1);
260 : }
261 1 : exit(0);
262 : }
263 :
264 1 : ret = close(fds[1]);
265 1 : if (ret == -1) {
266 0 : perror("close failed");
267 0 : return -1;
268 : }
269 :
270 1 : pfd = (struct pollfd) { .fd = fds[0], .events = POLLIN };
271 :
272 1 : ret = poll(&pfd, 1, 5000);
273 1 : if (ret == -1) {
274 0 : perror("poll failed");
275 0 : return -1;
276 : }
277 1 : if (ret == 0) {
278 0 : fprintf(stderr, "Child did not exit for 5 seconds\n");
279 : /*
280 : * The child might hang forever in
281 : * pthread_cond_destroy for example. Be kind to the
282 : * system and kill it.
283 : */
284 0 : kill(child, SIGTERM);
285 0 : return -1;
286 : }
287 1 : if (ret != 1) {
288 0 : fprintf(stderr, "poll returned %d -- huh??\n", ret);
289 0 : return -1;
290 : }
291 :
292 1 : ret = poll(NULL, 0, 200);
293 1 : if (ret == -1) {
294 0 : perror("poll failed");
295 0 : return -1;
296 : }
297 :
298 1 : waitret = waitpid(child, &wstatus, WNOHANG);
299 1 : if (waitret != child) {
300 0 : fprintf(stderr, "waitpid returned %d\n", (int)waitret);
301 0 : return -1;
302 : }
303 :
304 1 : if (!WIFEXITED(wstatus)) {
305 0 : fprintf(stderr, "child did not properly exit\n");
306 0 : return -1;
307 : }
308 :
309 1 : ret = WEXITSTATUS(wstatus);
310 1 : if (ret != 0) {
311 0 : fprintf(stderr, "child returned %d\n", ret);
312 0 : return -1;
313 : }
314 :
315 : return 0;
316 : }
317 :
318 1 : static int test_busyfork2(void)
319 : {
320 1 : struct pthreadpool_pipe *p;
321 1 : pid_t child;
322 1 : int ret, jobnum;
323 1 : struct pollfd pfd;
324 :
325 1 : ret = pthreadpool_pipe_init(1, &p);
326 1 : if (ret != 0) {
327 0 : fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
328 : strerror(ret));
329 0 : return -1;
330 : }
331 :
332 1 : ret = pthreadpool_pipe_add_job(p, 1, busyfork_job, NULL);
333 1 : if (ret != 0) {
334 0 : fprintf(stderr, "pthreadpool_add_job failed: %s\n",
335 : strerror(ret));
336 0 : return -1;
337 : }
338 :
339 1 : ret = pthreadpool_pipe_finished_jobs(p, &jobnum, 1);
340 1 : if (ret != 1) {
341 0 : fprintf(stderr, "pthreadpool_pipe_finished_jobs failed\n");
342 0 : return -1;
343 : }
344 :
345 1 : ret = poll(NULL, 0, 10);
346 1 : if (ret == -1) {
347 0 : perror("poll failed");
348 0 : return -1;
349 : }
350 :
351 1 : ret = pthreadpool_pipe_add_job(p, 1, busyfork_job, NULL);
352 1 : if (ret != 0) {
353 0 : fprintf(stderr, "pthreadpool_add_job failed: %s\n",
354 : strerror(ret));
355 0 : return -1;
356 : }
357 :
358 : /*
359 : * Do the fork right after the add_job. This tests a race
360 : * where the atfork prepare handler gets all idle threads off
361 : * the condvar. If we are faster doing the fork than the
362 : * existing idle thread could get out of idle and take the
363 : * job, after the fork we end up with no threads to take care
364 : * of the job.
365 : */
366 :
367 1 : child = fork();
368 2 : if (child < 0) {
369 0 : perror("fork failed");
370 0 : return -1;
371 : }
372 :
373 2 : if (child == 0) {
374 1 : exit(0);
375 : }
376 :
377 1 : pfd = (struct pollfd) {
378 1 : .fd = pthreadpool_pipe_signal_fd(p),
379 : .events = POLLIN|POLLERR
380 : };
381 :
382 1 : do {
383 1 : ret = poll(&pfd, 1, 5000);
384 1 : } while ((ret == -1) && (errno == EINTR));
385 :
386 1 : if (ret == 0) {
387 0 : fprintf(stderr, "job unfinished after 5 seconds\n");
388 0 : return -1;
389 : }
390 :
391 : return 0;
392 : }
393 :
394 2 : static void test_tevent_wait(void *private_data)
395 : {
396 2 : int *timeout = private_data;
397 2 : poll(NULL, 0, *timeout);
398 2 : }
399 :
400 1 : static int test_tevent_1(void)
401 : {
402 1 : struct tevent_context *ev;
403 1 : struct pthreadpool_tevent *pool;
404 1 : struct tevent_req *req1, *req2;
405 1 : int timeout10 = 10;
406 1 : int timeout100 = 100;
407 1 : int ret;
408 1 : bool ok;
409 :
410 1 : ev = tevent_context_init(NULL);
411 1 : if (ev == NULL) {
412 0 : ret = errno;
413 0 : fprintf(stderr, "tevent_context_init failed: %s\n",
414 : strerror(ret));
415 0 : return ret;
416 : }
417 1 : ret = pthreadpool_tevent_init(ev, UINT_MAX, &pool);
418 1 : if (ret != 0) {
419 0 : fprintf(stderr, "pthreadpool_tevent_init failed: %s\n",
420 : strerror(ret));
421 0 : TALLOC_FREE(ev);
422 0 : return ret;
423 : }
424 1 : req1 = pthreadpool_tevent_job_send(
425 : ev, ev, pool, test_tevent_wait, &timeout10);
426 1 : if (req1 == NULL) {
427 0 : fprintf(stderr, "pthreadpool_tevent_job_send failed\n");
428 0 : TALLOC_FREE(ev);
429 0 : return ENOMEM;
430 : }
431 1 : req2 = pthreadpool_tevent_job_send(
432 : ev, ev, pool, test_tevent_wait, &timeout100);
433 1 : if (req2 == NULL) {
434 0 : fprintf(stderr, "pthreadpool_tevent_job_send failed\n");
435 0 : TALLOC_FREE(ev);
436 0 : return ENOMEM;
437 : }
438 1 : ok = tevent_req_poll(req2, ev);
439 1 : if (!ok) {
440 0 : ret = errno;
441 0 : fprintf(stderr, "tevent_req_poll failed: %s\n",
442 : strerror(ret));
443 0 : TALLOC_FREE(ev);
444 0 : return ret;
445 : }
446 1 : ret = pthreadpool_tevent_job_recv(req1);
447 1 : TALLOC_FREE(req1);
448 1 : if (ret != 0) {
449 0 : fprintf(stderr, "tevent_req_poll failed: %s\n",
450 : strerror(ret));
451 0 : TALLOC_FREE(ev);
452 0 : return ret;
453 : }
454 :
455 1 : TALLOC_FREE(req2);
456 :
457 1 : ret = tevent_loop_wait(ev);
458 1 : if (ret != 0) {
459 0 : fprintf(stderr, "tevent_loop_wait failed\n");
460 0 : return ret;
461 : }
462 :
463 1 : TALLOC_FREE(pool);
464 1 : TALLOC_FREE(ev);
465 1 : return 0;
466 : }
467 :
468 1 : int main(void)
469 : {
470 1 : int ret;
471 :
472 1 : ret = test_tevent_1();
473 1 : if (ret != 0) {
474 0 : fprintf(stderr, "test_event_1 failed: %s\n",
475 : strerror(ret));
476 0 : return 1;
477 : }
478 :
479 1 : ret = test_init();
480 1 : if (ret != 0) {
481 0 : fprintf(stderr, "test_init failed\n");
482 0 : return 1;
483 : }
484 :
485 1 : ret = test_fork();
486 1 : if (ret != 0) {
487 0 : fprintf(stderr, "test_fork failed\n");
488 0 : return 1;
489 : }
490 :
491 1 : ret = test_jobs(10, 10000);
492 1 : if (ret != 0) {
493 0 : fprintf(stderr, "test_jobs failed\n");
494 0 : return 1;
495 : }
496 :
497 1 : ret = test_busydestroy();
498 1 : if (ret != 0) {
499 0 : fprintf(stderr, "test_busydestroy failed\n");
500 0 : return 1;
501 : }
502 :
503 1 : ret = test_busyfork();
504 1 : if (ret != 0) {
505 0 : fprintf(stderr, "test_busyfork failed\n");
506 0 : return 1;
507 : }
508 :
509 1 : ret = test_busyfork2();
510 1 : if (ret != 0) {
511 0 : fprintf(stderr, "test_busyfork2 failed\n");
512 0 : return 1;
513 : }
514 :
515 1 : printf("success\n");
516 1 : return 0;
517 : }
|