Line data Source code
1 : /*
2 : * Unix SMB/CIFS implementation.
3 : * Samba internal messaging functions
4 : * Copyright (C) 2013 by Volker Lendecke
5 : *
6 : * This program is free software; you can redistribute it and/or modify
7 : * it under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation; either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * This program is distributed in the hope that it will be useful,
12 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : * GNU General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public License
17 : * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : */
19 :
20 : #include "replace.h"
21 : #include "util/util.h"
22 : #include "system/network.h"
23 : #include "system/filesys.h"
24 : #include "system/dir.h"
25 : #include "system/select.h"
26 : #include "lib/util/debug.h"
27 : #include "messages_dgm.h"
28 : #include "lib/util/genrand.h"
29 : #include "lib/util/dlinklist.h"
30 : #include "lib/pthreadpool/pthreadpool_tevent.h"
31 : #include "lib/util/msghdr.h"
32 : #include "lib/util/iov_buf.h"
33 : #include "lib/util/blocking.h"
34 : #include "lib/util/tevent_unix.h"
35 : #include "lib/util/smb_strtox.h"
36 :
37 : #define MESSAGING_DGM_FRAGMENT_LENGTH 1024
38 :
39 : struct sun_path_buf {
40 : /*
41 : * This will carry enough for a socket path
42 : */
43 : char buf[sizeof(struct sockaddr_un)];
44 : };
45 :
46 : /*
47 : * We can only have one tevent_fd per dgm_context and per
48 : * tevent_context. Maintain a list of registered tevent_contexts per
49 : * dgm_context.
50 : */
51 : struct messaging_dgm_fde_ev {
52 : struct messaging_dgm_fde_ev *prev, *next;
53 :
54 : /*
55 : * Backreference to enable DLIST_REMOVE from our
56 : * destructor. Also, set to NULL when the dgm_context dies
57 : * before the messaging_dgm_fde_ev.
58 : */
59 : struct messaging_dgm_context *ctx;
60 :
61 : struct tevent_context *ev;
62 : struct tevent_fd *fde;
63 : };
64 :
65 : struct messaging_dgm_out {
66 : struct messaging_dgm_out *prev, *next;
67 : struct messaging_dgm_context *ctx;
68 :
69 : pid_t pid;
70 : int sock;
71 : bool is_blocking;
72 : uint64_t cookie;
73 :
74 : struct tevent_queue *queue;
75 : struct tevent_timer *idle_timer;
76 : };
77 :
78 : struct messaging_dgm_in_msg {
79 : struct messaging_dgm_in_msg *prev, *next;
80 : struct messaging_dgm_context *ctx;
81 : size_t msglen;
82 : size_t received;
83 : pid_t sender_pid;
84 : int sender_sock;
85 : uint64_t cookie;
86 : uint8_t buf[];
87 : };
88 :
89 : struct messaging_dgm_context {
90 : struct tevent_context *ev;
91 : pid_t pid;
92 : struct sun_path_buf socket_dir;
93 : struct sun_path_buf lockfile_dir;
94 : int lockfile_fd;
95 :
96 : int sock;
97 : struct messaging_dgm_in_msg *in_msgs;
98 :
99 : struct messaging_dgm_fde_ev *fde_evs;
100 : void (*recv_cb)(struct tevent_context *ev,
101 : const uint8_t *msg,
102 : size_t msg_len,
103 : int *fds,
104 : size_t num_fds,
105 : void *private_data);
106 : void *recv_cb_private_data;
107 :
108 : bool *have_dgm_context;
109 :
110 : struct pthreadpool_tevent *pool;
111 : struct messaging_dgm_out *outsocks;
112 : };
113 :
114 : /* Set socket close on exec. */
115 102567 : static int prepare_socket_cloexec(int sock)
116 : {
117 : #ifdef FD_CLOEXEC
118 1191 : int flags;
119 :
120 102567 : flags = fcntl(sock, F_GETFD, 0);
121 102567 : if (flags == -1) {
122 0 : return errno;
123 : }
124 102567 : flags |= FD_CLOEXEC;
125 102567 : if (fcntl(sock, F_SETFD, flags) == -1) {
126 0 : return errno;
127 : }
128 : #endif
129 101376 : return 0;
130 : }
131 :
132 83416 : static void close_fd_array(int *fds, size_t num_fds)
133 : {
134 71243 : size_t i;
135 :
136 83454 : for (i = 0; i < num_fds; i++) {
137 38 : if (fds[i] == -1) {
138 0 : continue;
139 : }
140 :
141 38 : close(fds[i]);
142 38 : fds[i] = -1;
143 : }
144 83416 : }
145 :
146 : /*
147 : * The idle handler can free the struct messaging_dgm_out *,
148 : * if it's unused (qlen of zero) which closes the socket.
149 : */
150 :
151 12431 : static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
152 : struct tevent_timer *te,
153 : struct timeval current_time,
154 : void *private_data)
155 : {
156 12431 : struct messaging_dgm_out *out = talloc_get_type_abort(
157 : private_data, struct messaging_dgm_out);
158 28 : size_t qlen;
159 :
160 12431 : out->idle_timer = NULL;
161 :
162 12431 : qlen = tevent_queue_length(out->queue);
163 12431 : if (qlen == 0) {
164 12431 : TALLOC_FREE(out);
165 : }
166 12431 : }
167 :
168 : /*
169 : * Setup the idle handler to fire after 1 second if the
170 : * queue is zero.
171 : */
172 :
173 581367 : static void messaging_dgm_out_rearm_idle_timer(struct messaging_dgm_out *out)
174 : {
175 137016 : size_t qlen;
176 :
177 581367 : qlen = tevent_queue_length(out->queue);
178 581367 : if (qlen != 0) {
179 136370 : TALLOC_FREE(out->idle_timer);
180 136370 : return;
181 : }
182 :
183 444997 : if (out->idle_timer != NULL) {
184 420944 : tevent_update_timer(out->idle_timer,
185 : tevent_timeval_current_ofs(1, 0));
186 420944 : return;
187 : }
188 :
189 24053 : out->idle_timer = tevent_add_timer(
190 : out->ctx->ev, out, tevent_timeval_current_ofs(1, 0),
191 : messaging_dgm_out_idle_handler, out);
192 : /*
193 : * No NULL check, we'll come back here. Worst case we're
194 : * leaking a bit.
195 : */
196 : }
197 :
198 : static int messaging_dgm_out_destructor(struct messaging_dgm_out *dst);
199 : static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
200 : struct tevent_timer *te,
201 : struct timeval current_time,
202 : void *private_data);
203 :
204 : /*
205 : * Connect to an existing rendezvous point for another
206 : * pid - wrapped inside a struct messaging_dgm_out *.
207 : */
208 :
209 30054 : static int messaging_dgm_out_create(TALLOC_CTX *mem_ctx,
210 : struct messaging_dgm_context *ctx,
211 : pid_t pid, struct messaging_dgm_out **pout)
212 : {
213 552 : struct messaging_dgm_out *out;
214 30054 : struct sockaddr_un addr = { .sun_family = AF_UNIX };
215 30054 : int ret = ENOMEM;
216 552 : int out_pathlen;
217 552 : char addr_buf[sizeof(addr.sun_path) + (3 * sizeof(unsigned) + 2)];
218 :
219 30054 : out = talloc(mem_ctx, struct messaging_dgm_out);
220 30054 : if (out == NULL) {
221 0 : goto fail;
222 : }
223 :
224 30054 : *out = (struct messaging_dgm_out) {
225 : .pid = pid,
226 : .ctx = ctx,
227 : .cookie = 1
228 : };
229 :
230 30054 : out_pathlen = snprintf(addr_buf, sizeof(addr_buf),
231 30054 : "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
232 30054 : if (out_pathlen < 0) {
233 0 : goto errno_fail;
234 : }
235 30054 : if ((size_t)out_pathlen >= sizeof(addr.sun_path)) {
236 0 : ret = ENAMETOOLONG;
237 0 : goto fail;
238 : }
239 :
240 30054 : memcpy(addr.sun_path, addr_buf, out_pathlen + 1);
241 :
242 30054 : out->queue = tevent_queue_create(out, addr.sun_path);
243 30054 : if (out->queue == NULL) {
244 0 : ret = ENOMEM;
245 0 : goto fail;
246 : }
247 :
248 30054 : out->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
249 30054 : if (out->sock == -1) {
250 0 : goto errno_fail;
251 : }
252 :
253 30054 : DLIST_ADD(ctx->outsocks, out);
254 30054 : talloc_set_destructor(out, messaging_dgm_out_destructor);
255 :
256 552 : do {
257 30054 : ret = connect(out->sock,
258 : (const struct sockaddr *)(const void *)&addr,
259 : sizeof(addr));
260 30054 : } while ((ret == -1) && (errno == EINTR));
261 :
262 30054 : if (ret == -1) {
263 6188 : goto errno_fail;
264 : }
265 :
266 23866 : ret = set_blocking(out->sock, false);
267 23866 : if (ret == -1) {
268 0 : goto errno_fail;
269 : }
270 23866 : out->is_blocking = false;
271 :
272 23866 : *pout = out;
273 23866 : return 0;
274 6188 : errno_fail:
275 6188 : ret = errno;
276 6188 : fail:
277 6188 : TALLOC_FREE(out);
278 6188 : return ret;
279 : }
280 :
281 60555 : static int messaging_dgm_out_destructor(struct messaging_dgm_out *out)
282 : {
283 60555 : DLIST_REMOVE(out->ctx->outsocks, out);
284 :
285 60555 : if ((tevent_queue_length(out->queue) != 0) &&
286 3 : (tevent_cached_getpid() == out->ctx->pid)) {
287 : /*
288 : * We have pending jobs. We can't close the socket,
289 : * this has been handed over to messaging_dgm_out_queue_state.
290 : */
291 0 : return 0;
292 : }
293 :
294 60552 : if (out->sock != -1) {
295 60552 : close(out->sock);
296 60552 : out->sock = -1;
297 : }
298 59526 : return 0;
299 : }
300 :
301 : /*
302 : * Find the struct messaging_dgm_out * to talk to pid.
303 : * If we don't have one, create it. Set the timer to
304 : * delete after 1 sec.
305 : */
306 :
307 518603 : static int messaging_dgm_out_get(struct messaging_dgm_context *ctx, pid_t pid,
308 : struct messaging_dgm_out **pout)
309 : {
310 68762 : struct messaging_dgm_out *out;
311 68762 : int ret;
312 :
313 945232 : for (out = ctx->outsocks; out != NULL; out = out->next) {
314 915178 : if (out->pid == pid) {
315 420339 : break;
316 : }
317 : }
318 :
319 518603 : if (out == NULL) {
320 30054 : ret = messaging_dgm_out_create(ctx, ctx, pid, &out);
321 30054 : if (ret != 0) {
322 6188 : return ret;
323 : }
324 : }
325 :
326 : /*
327 : * shouldn't be possible, should be set if messaging_dgm_out_create
328 : * succeeded. This check is to satisfy static checker
329 : */
330 512415 : if (out == NULL) {
331 0 : return EINVAL;
332 : }
333 512415 : messaging_dgm_out_rearm_idle_timer(out);
334 :
335 512415 : *pout = out;
336 512415 : return 0;
337 : }
338 :
339 : /*
340 : * This function is called directly to send a message fragment
341 : * when the outgoing queue is zero, and from a pthreadpool
342 : * job thread when messages are being queued (qlen != 0).
343 : * Make sure *ONLY* thread-safe functions are called within.
344 : */
345 :
346 526312 : static ssize_t messaging_dgm_sendmsg(int sock,
347 : const struct iovec *iov, int iovlen,
348 : const int *fds, size_t num_fds,
349 : int *perrno)
350 : {
351 69779 : struct msghdr msg;
352 69779 : ssize_t fdlen, ret;
353 :
354 : /*
355 : * Do the actual sendmsg syscall. This will be called from a
356 : * pthreadpool helper thread, so be careful what you do here.
357 : */
358 :
359 526312 : msg = (struct msghdr) {
360 : .msg_iov = discard_const_p(struct iovec, iov),
361 : .msg_iovlen = iovlen
362 : };
363 :
364 526312 : fdlen = msghdr_prep_fds(&msg, NULL, 0, fds, num_fds);
365 526312 : if (fdlen == -1) {
366 0 : *perrno = EINVAL;
367 0 : return -1;
368 : }
369 :
370 526312 : {
371 526312 : uint8_t buf[fdlen];
372 :
373 526312 : msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
374 :
375 69779 : do {
376 526312 : ret = sendmsg(sock, &msg, 0);
377 526309 : } while ((ret == -1) && (errno == EINTR));
378 : }
379 :
380 526309 : if (ret == -1) {
381 346 : *perrno = errno;
382 : }
383 456530 : return ret;
384 : }
385 :
386 : struct messaging_dgm_out_queue_state {
387 : struct tevent_context *ev;
388 : struct pthreadpool_tevent *pool;
389 :
390 : struct tevent_req *req;
391 : struct tevent_req *subreq;
392 :
393 : int sock;
394 :
395 : int *fds;
396 : uint8_t *buf;
397 :
398 : ssize_t sent;
399 : int err;
400 : };
401 :
402 : static int messaging_dgm_out_queue_state_destructor(
403 : struct messaging_dgm_out_queue_state *state);
404 : static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
405 : void *private_data);
406 : static void messaging_dgm_out_threaded_job(void *private_data);
407 : static void messaging_dgm_out_queue_done(struct tevent_req *subreq);
408 :
409 : /*
410 : * Push a message fragment onto a queue to be sent by a
411 : * threadpool job. Makes copies of data/fd's to be sent.
412 : * The running tevent_queue internally creates an immediate
413 : * event to schedule the write.
414 : */
415 :
416 69947 : static struct tevent_req *messaging_dgm_out_queue_send(
417 : TALLOC_CTX *mem_ctx, struct tevent_context *ev,
418 : struct messaging_dgm_out *out,
419 : const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
420 : {
421 69246 : struct tevent_req *req;
422 69246 : struct messaging_dgm_out_queue_state *state;
423 69246 : struct tevent_queue_entry *e;
424 69246 : size_t i;
425 69246 : ssize_t buflen;
426 :
427 69947 : req = tevent_req_create(out, &state,
428 : struct messaging_dgm_out_queue_state);
429 69947 : if (req == NULL) {
430 0 : return NULL;
431 : }
432 69947 : state->ev = ev;
433 69947 : state->pool = out->ctx->pool;
434 69947 : state->sock = out->sock;
435 69947 : state->req = req;
436 :
437 : /*
438 : * Go blocking in a thread
439 : */
440 69947 : if (!out->is_blocking) {
441 242 : int ret = set_blocking(out->sock, true);
442 242 : if (ret == -1) {
443 0 : tevent_req_error(req, errno);
444 0 : return tevent_req_post(req, ev);
445 : }
446 242 : out->is_blocking = true;
447 : }
448 :
449 69947 : buflen = iov_buflen(iov, iovlen);
450 69947 : if (buflen == -1) {
451 0 : tevent_req_error(req, EMSGSIZE);
452 0 : return tevent_req_post(req, ev);
453 : }
454 :
455 69947 : state->buf = talloc_array(state, uint8_t, buflen);
456 69947 : if (tevent_req_nomem(state->buf, req)) {
457 0 : return tevent_req_post(req, ev);
458 : }
459 69947 : iov_buf(iov, iovlen, state->buf, buflen);
460 :
461 69947 : state->fds = talloc_array(state, int, num_fds);
462 69947 : if (tevent_req_nomem(state->fds, req)) {
463 0 : return tevent_req_post(req, ev);
464 : }
465 :
466 69985 : for (i=0; i<num_fds; i++) {
467 38 : state->fds[i] = -1;
468 : }
469 :
470 69985 : for (i=0; i<num_fds; i++) {
471 :
472 38 : state->fds[i] = dup(fds[i]);
473 :
474 38 : if (state->fds[i] == -1) {
475 0 : int ret = errno;
476 :
477 0 : close_fd_array(state->fds, num_fds);
478 :
479 0 : tevent_req_error(req, ret);
480 0 : return tevent_req_post(req, ev);
481 : }
482 : }
483 :
484 69947 : talloc_set_destructor(state, messaging_dgm_out_queue_state_destructor);
485 :
486 69947 : e = tevent_queue_add_entry(out->queue, ev, req,
487 69246 : messaging_dgm_out_queue_trigger, req);
488 69947 : if (tevent_req_nomem(e, req)) {
489 0 : return tevent_req_post(req, ev);
490 : }
491 701 : return req;
492 : }
493 :
494 69944 : static int messaging_dgm_out_queue_state_destructor(
495 : struct messaging_dgm_out_queue_state *state)
496 : {
497 69246 : int *fds;
498 69246 : size_t num_fds;
499 :
500 69944 : if (state->subreq != NULL) {
501 : /*
502 : * We're scheduled, but we're destroyed. This happens
503 : * if the messaging_dgm_context is destroyed while
504 : * we're stuck in a blocking send. There's nothing we
505 : * can do but to leak memory.
506 : */
507 3 : TALLOC_FREE(state->subreq);
508 3 : (void)talloc_reparent(state->req, NULL, state);
509 3 : return -1;
510 : }
511 :
512 69941 : fds = state->fds;
513 69941 : num_fds = talloc_array_length(fds);
514 69941 : close_fd_array(fds, num_fds);
515 69941 : return 0;
516 : }
517 :
518 : /*
519 : * tevent_queue callback that schedules the pthreadpool to actually
520 : * send the queued message fragment.
521 : */
522 :
523 68935 : static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
524 : void *private_data)
525 : {
526 68935 : struct messaging_dgm_out_queue_state *state = tevent_req_data(
527 : req, struct messaging_dgm_out_queue_state);
528 :
529 68935 : tevent_req_reset_endtime(req);
530 :
531 68935 : state->subreq = pthreadpool_tevent_job_send(
532 : state, state->ev, state->pool,
533 : messaging_dgm_out_threaded_job, state);
534 68935 : if (tevent_req_nomem(state->subreq, req)) {
535 0 : return;
536 : }
537 68935 : tevent_req_set_callback(state->subreq, messaging_dgm_out_queue_done,
538 : req);
539 : }
540 :
541 : /*
542 : * Wrapper function run by the pthread that calls
543 : * messaging_dgm_sendmsg() to actually do the sendmsg().
544 : */
545 :
546 68935 : static void messaging_dgm_out_threaded_job(void *private_data)
547 : {
548 68935 : struct messaging_dgm_out_queue_state *state = talloc_get_type_abort(
549 : private_data, struct messaging_dgm_out_queue_state);
550 :
551 137870 : struct iovec iov = { .iov_base = state->buf,
552 68935 : .iov_len = talloc_get_size(state->buf) };
553 68935 : size_t num_fds = talloc_array_length(state->fds);
554 68935 : int msec = 1;
555 :
556 68257 : while (true) {
557 68257 : int ret;
558 :
559 137867 : state->sent = messaging_dgm_sendmsg(state->sock, &iov, 1,
560 68935 : state->fds, num_fds, &state->err);
561 :
562 68932 : if (state->sent != -1) {
563 675 : return;
564 : }
565 1 : if (state->err != ENOBUFS) {
566 0 : return;
567 : }
568 :
569 : /*
570 : * ENOBUFS is the FreeBSD way of saying "Try
571 : * again". We have to do polling.
572 : */
573 0 : do {
574 0 : ret = poll(NULL, 0, msec);
575 0 : } while ((ret == -1) && (errno == EINTR));
576 :
577 : /*
578 : * Exponential backoff up to once a second
579 : */
580 0 : msec *= 2;
581 0 : msec = MIN(msec, 1000);
582 : }
583 : }
584 :
585 : /*
586 : * Pickup the results of the pthread sendmsg().
587 : */
588 :
589 68929 : static void messaging_dgm_out_queue_done(struct tevent_req *subreq)
590 : {
591 68929 : struct tevent_req *req = tevent_req_callback_data(
592 : subreq, struct tevent_req);
593 68929 : struct messaging_dgm_out_queue_state *state = tevent_req_data(
594 : req, struct messaging_dgm_out_queue_state);
595 68254 : int ret;
596 :
597 68929 : if (subreq != state->subreq) {
598 0 : abort();
599 : }
600 :
601 68929 : ret = pthreadpool_tevent_job_recv(subreq);
602 :
603 68929 : TALLOC_FREE(subreq);
604 68929 : state->subreq = NULL;
605 :
606 68929 : if (tevent_req_error(req, ret)) {
607 0 : return;
608 : }
609 68929 : if (state->sent == -1) {
610 0 : tevent_req_error(req, state->err);
611 0 : return;
612 : }
613 68929 : tevent_req_done(req);
614 : }
615 :
616 68952 : static int messaging_dgm_out_queue_recv(struct tevent_req *req)
617 : {
618 68952 : return tevent_req_simple_recv_unix(req);
619 : }
620 :
621 : static void messaging_dgm_out_sent_fragment(struct tevent_req *req);
622 :
623 : /*
624 : * Core function to send a message fragment given a
625 : * connected struct messaging_dgm_out * destination.
626 : * If no current queue tries to send nonblocking
627 : * directly. If not, queues the fragment (which makes
628 : * a copy of it) and adds a 60-second timeout on the send.
629 : */
630 :
631 527082 : static int messaging_dgm_out_send_fragment(
632 : struct tevent_context *ev, struct messaging_dgm_out *out,
633 : const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
634 : {
635 70762 : struct tevent_req *req;
636 70762 : size_t qlen;
637 70762 : bool ok;
638 :
639 527082 : qlen = tevent_queue_length(out->queue);
640 527082 : if (qlen == 0) {
641 1522 : ssize_t nsent;
642 457377 : int err = 0;
643 :
644 457377 : if (out->is_blocking) {
645 149 : int ret = set_blocking(out->sock, false);
646 149 : if (ret == -1) {
647 457135 : return errno;
648 : }
649 149 : out->is_blocking = false;
650 : }
651 :
652 457377 : nsent = messaging_dgm_sendmsg(out->sock, iov, iovlen, fds,
653 : num_fds, &err);
654 457377 : if (nsent >= 0) {
655 455615 : return 0;
656 : }
657 :
658 345 : if (err == ENOBUFS) {
659 : /*
660 : * FreeBSD's way of telling us the dst socket
661 : * is full. EWOULDBLOCK makes us spawn a
662 : * polling helper thread.
663 : */
664 0 : err = EWOULDBLOCK;
665 : }
666 :
667 345 : if (err != EWOULDBLOCK) {
668 4 : return err;
669 : }
670 : }
671 :
672 69947 : req = messaging_dgm_out_queue_send(out, ev, out, iov, iovlen,
673 : fds, num_fds);
674 69947 : if (req == NULL) {
675 0 : return ENOMEM;
676 : }
677 69947 : tevent_req_set_callback(req, messaging_dgm_out_sent_fragment, out);
678 :
679 69947 : ok = tevent_req_set_endtime(req, ev,
680 : tevent_timeval_current_ofs(60, 0));
681 69947 : if (!ok) {
682 0 : TALLOC_FREE(req);
683 0 : return ENOMEM;
684 : }
685 :
686 701 : return 0;
687 : }
688 :
689 : /*
690 : * Pickup the result of the fragment send. Reset idle timer
691 : * if queue empty.
692 : */
693 :
694 68952 : static void messaging_dgm_out_sent_fragment(struct tevent_req *req)
695 : {
696 68952 : struct messaging_dgm_out *out = tevent_req_callback_data(
697 : req, struct messaging_dgm_out);
698 68254 : int ret;
699 :
700 68952 : ret = messaging_dgm_out_queue_recv(req);
701 68952 : TALLOC_FREE(req);
702 :
703 68952 : if (ret != 0) {
704 23 : DBG_WARNING("messaging_out_queue_recv returned %s\n",
705 : strerror(ret));
706 : }
707 :
708 68952 : messaging_dgm_out_rearm_idle_timer(out);
709 68952 : }
710 :
711 :
712 : struct messaging_dgm_fragment_hdr {
713 : size_t msglen;
714 : pid_t pid;
715 : int sock;
716 : };
717 :
718 : /*
719 : * Fragment a message into MESSAGING_DGM_FRAGMENT_LENGTH - 64-bit cookie
720 : * size chunks and send it.
721 : *
722 : * Message fragments are prefixed by a 64-bit cookie that
723 : * stays the same for all fragments. This allows the receiver
724 : * to recognise fragments of the same message and re-assemble
725 : * them on the other end.
726 : *
727 : * Note that this allows other message fragments from other
728 : * senders to be interleaved in the receive read processing,
729 : * the combination of the cookie and header info allows unique
730 : * identification of the message from a specific sender in
731 : * re-assembly.
732 : *
733 : * If the message is smaller than MESSAGING_DGM_FRAGMENT_LENGTH - cookie
734 : * then send a single message with cookie set to zero.
735 : *
736 : * Otherwise the message is fragmented into chunks and added
737 : * to the sending queue. Any file descriptors are passed only
738 : * in the last fragment.
739 : *
740 : * Finally the cookie is incremented (wrap over zero) to
741 : * prepare for the next message sent to this channel.
742 : *
743 : */
744 :
745 512415 : static int messaging_dgm_out_send_fragmented(struct tevent_context *ev,
746 : struct messaging_dgm_out *out,
747 : const struct iovec *iov,
748 : int iovlen,
749 : const int *fds, size_t num_fds)
750 512415 : {
751 68762 : ssize_t msglen, sent;
752 512415 : int ret = 0;
753 512415 : struct iovec iov_copy[iovlen+2];
754 68762 : struct messaging_dgm_fragment_hdr hdr;
755 68762 : struct iovec src_iov;
756 :
757 512415 : if (iovlen < 0) {
758 0 : return EINVAL;
759 : }
760 :
761 512415 : msglen = iov_buflen(iov, iovlen);
762 512415 : if (msglen == -1) {
763 0 : return EMSGSIZE;
764 : }
765 512415 : if (num_fds > INT8_MAX) {
766 0 : return EINVAL;
767 : }
768 :
769 512415 : if ((size_t) msglen <=
770 : (MESSAGING_DGM_FRAGMENT_LENGTH - sizeof(uint64_t))) {
771 499869 : uint64_t cookie = 0;
772 :
773 499869 : iov_copy[0].iov_base = &cookie;
774 499869 : iov_copy[0].iov_len = sizeof(cookie);
775 499869 : if (iovlen > 0) {
776 499869 : memcpy(&iov_copy[1], iov,
777 : sizeof(struct iovec) * iovlen);
778 : }
779 :
780 499869 : return messaging_dgm_out_send_fragment(
781 : ev, out, iov_copy, iovlen+1, fds, num_fds);
782 :
783 : }
784 :
785 25092 : hdr = (struct messaging_dgm_fragment_hdr) {
786 : .msglen = msglen,
787 12546 : .pid = tevent_cached_getpid(),
788 12546 : .sock = out->sock
789 : };
790 :
791 12546 : iov_copy[0].iov_base = &out->cookie;
792 12546 : iov_copy[0].iov_len = sizeof(out->cookie);
793 12546 : iov_copy[1].iov_base = &hdr;
794 12546 : iov_copy[1].iov_len = sizeof(hdr);
795 :
796 12546 : sent = 0;
797 12546 : src_iov = iov[0];
798 :
799 : /*
800 : * The following write loop sends the user message in pieces. We have
801 : * filled the first two iovecs above with "cookie" and "hdr". In the
802 : * following loops we pull message chunks from the user iov array and
803 : * fill iov_copy piece by piece, possibly truncating chunks from the
804 : * caller's iov array. Ugly, but hopefully efficient.
805 : */
806 :
807 39759 : while (sent < msglen) {
808 : size_t fragment_len;
809 25211 : size_t iov_index = 2;
810 :
811 25211 : fragment_len = sizeof(out->cookie) + sizeof(hdr);
812 :
813 54426 : while (fragment_len < MESSAGING_DGM_FRAGMENT_LENGTH) {
814 2004 : size_t space, chunk;
815 :
816 39759 : space = MESSAGING_DGM_FRAGMENT_LENGTH - fragment_len;
817 39759 : chunk = MIN(space, src_iov.iov_len);
818 :
819 39759 : iov_copy[iov_index].iov_base = src_iov.iov_base;
820 39759 : iov_copy[iov_index].iov_len = chunk;
821 39759 : iov_index += 1;
822 :
823 39759 : src_iov.iov_base = (char *)src_iov.iov_base + chunk;
824 39759 : src_iov.iov_len -= chunk;
825 39759 : fragment_len += chunk;
826 :
827 39759 : if (src_iov.iov_len == 0) {
828 25092 : iov += 1;
829 25092 : iovlen -= 1;
830 25092 : if (iovlen == 0) {
831 12544 : break;
832 : }
833 12546 : src_iov = iov[0];
834 : }
835 : }
836 27213 : sent += (fragment_len - sizeof(out->cookie) - sizeof(hdr));
837 :
838 : /*
839 : * only the last fragment should pass the fd array.
840 : * That simplifies the receiver a lot.
841 : */
842 27213 : if (sent < msglen) {
843 14667 : ret = messaging_dgm_out_send_fragment(
844 : ev, out, iov_copy, iov_index, NULL, 0);
845 : } else {
846 12546 : ret = messaging_dgm_out_send_fragment(
847 : ev, out, iov_copy, iov_index, fds, num_fds);
848 : }
849 27213 : if (ret != 0) {
850 0 : break;
851 : }
852 : }
853 :
854 12546 : out->cookie += 1;
855 12546 : if (out->cookie == 0) {
856 0 : out->cookie += 1;
857 : }
858 :
859 12544 : return ret;
860 : }
861 :
862 : static struct messaging_dgm_context *global_dgm_context;
863 :
864 : static int messaging_dgm_context_destructor(struct messaging_dgm_context *c);
865 :
866 59896 : static int messaging_dgm_lockfile_create(struct messaging_dgm_context *ctx,
867 : pid_t pid, int *plockfile_fd,
868 : uint64_t *punique)
869 : {
870 1133 : char buf[64];
871 1133 : int lockfile_fd;
872 1133 : struct sun_path_buf lockfile_name;
873 1133 : struct flock lck;
874 1133 : uint64_t unique;
875 1133 : int unique_len, ret;
876 1133 : ssize_t written;
877 :
878 59896 : ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
879 59896 : "%s/%u", ctx->lockfile_dir.buf, (unsigned)pid);
880 59896 : if (ret < 0) {
881 0 : return errno;
882 : }
883 59896 : if ((unsigned)ret >= sizeof(lockfile_name.buf)) {
884 0 : return ENAMETOOLONG;
885 : }
886 :
887 : /* no O_EXCL, existence check is via the fcntl lock */
888 :
889 59896 : lockfile_fd = open(lockfile_name.buf, O_NONBLOCK|O_CREAT|O_RDWR,
890 : 0644);
891 :
892 59896 : if ((lockfile_fd == -1) &&
893 0 : ((errno == ENXIO) /* Linux */ ||
894 0 : (errno == ENODEV) /* Linux kernel bug */ ||
895 0 : (errno == EOPNOTSUPP) /* FreeBSD */)) {
896 : /*
897 : * Huh -- a socket? This might be a stale socket from
898 : * an upgrade of Samba. Just unlink and retry, nobody
899 : * else is supposed to be here at this time.
900 : *
901 : * Yes, this is racy, but I don't see a way to deal
902 : * with this properly.
903 : */
904 0 : unlink(lockfile_name.buf);
905 :
906 0 : lockfile_fd = open(lockfile_name.buf,
907 : O_NONBLOCK|O_CREAT|O_WRONLY,
908 : 0644);
909 : }
910 :
911 59896 : if (lockfile_fd == -1) {
912 0 : ret = errno;
913 0 : DEBUG(1, ("%s: open failed: %s\n", __func__, strerror(errno)));
914 0 : return ret;
915 : }
916 :
917 59896 : lck = (struct flock) {
918 : .l_type = F_WRLCK,
919 : .l_whence = SEEK_SET
920 : };
921 :
922 59896 : ret = fcntl(lockfile_fd, F_SETLK, &lck);
923 59896 : if (ret == -1) {
924 0 : ret = errno;
925 0 : DEBUG(1, ("%s: fcntl failed: %s\n", __func__, strerror(ret)));
926 0 : goto fail_close;
927 : }
928 :
929 : /*
930 : * Directly using the binary value for
931 : * SERVERID_UNIQUE_ID_NOT_TO_VERIFY is a layering
932 : * violation. But including all of ndr here just for this
933 : * seems to be a bit overkill to me. Also, messages_dgm might
934 : * be replaced sooner or later by something streams-based,
935 : * where unique_id generation will be handled differently.
936 : */
937 :
938 1133 : do {
939 59896 : generate_random_buffer((uint8_t *)&unique, sizeof(unique));
940 59896 : } while (unique == UINT64_C(0xFFFFFFFFFFFFFFFF));
941 :
942 59896 : unique_len = snprintf(buf, sizeof(buf), "%"PRIu64"\n", unique);
943 :
944 : /* shorten a potentially preexisting file */
945 :
946 59896 : ret = ftruncate(lockfile_fd, unique_len);
947 59896 : if (ret == -1) {
948 0 : ret = errno;
949 0 : DEBUG(1, ("%s: ftruncate failed: %s\n", __func__,
950 : strerror(ret)));
951 0 : goto fail_unlink;
952 : }
953 :
954 59896 : written = write(lockfile_fd, buf, unique_len);
955 59896 : if (written != unique_len) {
956 0 : ret = errno;
957 0 : DEBUG(1, ("%s: write failed: %s\n", __func__, strerror(ret)));
958 0 : goto fail_unlink;
959 : }
960 :
961 59896 : *plockfile_fd = lockfile_fd;
962 59896 : *punique = unique;
963 59896 : return 0;
964 :
965 0 : fail_unlink:
966 0 : unlink(lockfile_name.buf);
967 0 : fail_close:
968 0 : close(lockfile_fd);
969 0 : return ret;
970 : }
971 :
972 : static void messaging_dgm_read_handler(struct tevent_context *ev,
973 : struct tevent_fd *fde,
974 : uint16_t flags,
975 : void *private_data);
976 :
977 : /*
978 : * Create the rendezvous point in the file system
979 : * that other processes can use to send messages to
980 : * this pid.
981 : */
982 :
983 59909 : int messaging_dgm_init(struct tevent_context *ev,
984 : uint64_t *punique,
985 : const char *socket_dir,
986 : const char *lockfile_dir,
987 : void (*recv_cb)(struct tevent_context *ev,
988 : const uint8_t *msg,
989 : size_t msg_len,
990 : int *fds,
991 : size_t num_fds,
992 : void *private_data),
993 : void *recv_cb_private_data)
994 : {
995 1140 : struct messaging_dgm_context *ctx;
996 1140 : int ret;
997 1140 : struct sockaddr_un socket_address;
998 1140 : size_t len;
999 1140 : static bool have_dgm_context = false;
1000 :
1001 59909 : if (have_dgm_context) {
1002 0 : return EEXIST;
1003 : }
1004 :
1005 59909 : if ((socket_dir == NULL) || (lockfile_dir == NULL)) {
1006 0 : return EINVAL;
1007 : }
1008 :
1009 59909 : ctx = talloc_zero(NULL, struct messaging_dgm_context);
1010 59909 : if (ctx == NULL) {
1011 0 : goto fail_nomem;
1012 : }
1013 59909 : ctx->ev = ev;
1014 59909 : ctx->pid = tevent_cached_getpid();
1015 59909 : ctx->recv_cb = recv_cb;
1016 59909 : ctx->recv_cb_private_data = recv_cb_private_data;
1017 :
1018 59909 : len = strlcpy(ctx->lockfile_dir.buf, lockfile_dir,
1019 : sizeof(ctx->lockfile_dir.buf));
1020 59909 : if (len >= sizeof(ctx->lockfile_dir.buf)) {
1021 5 : TALLOC_FREE(ctx);
1022 5 : return ENAMETOOLONG;
1023 : }
1024 :
1025 59904 : len = strlcpy(ctx->socket_dir.buf, socket_dir,
1026 : sizeof(ctx->socket_dir.buf));
1027 59904 : if (len >= sizeof(ctx->socket_dir.buf)) {
1028 8 : TALLOC_FREE(ctx);
1029 8 : return ENAMETOOLONG;
1030 : }
1031 :
1032 59896 : socket_address = (struct sockaddr_un) { .sun_family = AF_UNIX };
1033 59896 : len = snprintf(socket_address.sun_path,
1034 : sizeof(socket_address.sun_path),
1035 59896 : "%s/%u", socket_dir, (unsigned)ctx->pid);
1036 59896 : if (len >= sizeof(socket_address.sun_path)) {
1037 0 : TALLOC_FREE(ctx);
1038 0 : return ENAMETOOLONG;
1039 : }
1040 :
1041 59896 : ret = messaging_dgm_lockfile_create(ctx, ctx->pid, &ctx->lockfile_fd,
1042 : punique);
1043 59896 : if (ret != 0) {
1044 0 : DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n",
1045 : __func__, strerror(ret)));
1046 0 : TALLOC_FREE(ctx);
1047 0 : return ret;
1048 : }
1049 :
1050 59896 : unlink(socket_address.sun_path);
1051 :
1052 59896 : ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
1053 59896 : if (ctx->sock == -1) {
1054 0 : ret = errno;
1055 0 : DBG_WARNING("socket failed: %s\n", strerror(ret));
1056 0 : TALLOC_FREE(ctx);
1057 0 : return ret;
1058 : }
1059 :
1060 59896 : ret = prepare_socket_cloexec(ctx->sock);
1061 59896 : if (ret == -1) {
1062 0 : ret = errno;
1063 0 : DBG_WARNING("prepare_socket_cloexec failed: %s\n",
1064 : strerror(ret));
1065 0 : TALLOC_FREE(ctx);
1066 0 : return ret;
1067 : }
1068 :
1069 59896 : ret = bind(ctx->sock, (struct sockaddr *)(void *)&socket_address,
1070 : sizeof(socket_address));
1071 59896 : if (ret == -1) {
1072 0 : ret = errno;
1073 0 : DBG_WARNING("bind failed: %s\n", strerror(ret));
1074 0 : TALLOC_FREE(ctx);
1075 0 : return ret;
1076 : }
1077 :
1078 59896 : talloc_set_destructor(ctx, messaging_dgm_context_destructor);
1079 :
1080 59896 : ctx->have_dgm_context = &have_dgm_context;
1081 :
1082 59896 : ret = pthreadpool_tevent_init(ctx, UINT_MAX, &ctx->pool);
1083 59896 : if (ret != 0) {
1084 0 : DBG_WARNING("pthreadpool_tevent_init failed: %s\n",
1085 : strerror(ret));
1086 0 : TALLOC_FREE(ctx);
1087 0 : return ret;
1088 : }
1089 :
1090 59896 : global_dgm_context = ctx;
1091 59896 : return 0;
1092 :
1093 0 : fail_nomem:
1094 0 : TALLOC_FREE(ctx);
1095 0 : return ENOMEM;
1096 : }
1097 :
1098 : /*
1099 : * Remove the rendezvous point in the filesystem
1100 : * if we're the owner.
1101 : */
1102 :
1103 100002 : static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
1104 : {
1105 157478 : while (c->outsocks != NULL) {
1106 38607 : TALLOC_FREE(c->outsocks);
1107 : }
1108 120950 : while (c->in_msgs != NULL) {
1109 1181 : TALLOC_FREE(c->in_msgs);
1110 : }
1111 123215 : while (c->fde_evs != NULL) {
1112 23213 : tevent_fd_set_flags(c->fde_evs->fde, 0);
1113 23213 : c->fde_evs->ctx = NULL;
1114 24394 : DLIST_REMOVE(c->fde_evs, c->fde_evs);
1115 : }
1116 :
1117 100002 : close(c->sock);
1118 :
1119 100002 : if (tevent_cached_getpid() == c->pid) {
1120 267 : struct sun_path_buf name;
1121 267 : int ret;
1122 :
1123 51628 : ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
1124 51628 : c->socket_dir.buf, (unsigned)c->pid);
1125 51628 : if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
1126 : /*
1127 : * We've checked the length when creating, so this
1128 : * should never happen
1129 : */
1130 0 : abort();
1131 : }
1132 51628 : unlink(name.buf);
1133 :
1134 51628 : ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
1135 51628 : c->lockfile_dir.buf, (unsigned)c->pid);
1136 51628 : if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
1137 : /*
1138 : * We've checked the length when creating, so this
1139 : * should never happen
1140 : */
1141 0 : abort();
1142 : }
1143 51628 : unlink(name.buf);
1144 : }
1145 100002 : close(c->lockfile_fd);
1146 :
1147 100002 : if (c->have_dgm_context != NULL) {
1148 100002 : *c->have_dgm_context = false;
1149 : }
1150 :
1151 100002 : return 0;
1152 : }
1153 :
1154 1283649 : static void messaging_dgm_validate(struct messaging_dgm_context *ctx)
1155 : {
1156 : #ifdef DEVELOPER
1157 1283649 : pid_t pid = tevent_cached_getpid();
1158 154990 : struct sockaddr_storage addr;
1159 1283649 : socklen_t addrlen = sizeof(addr);
1160 154990 : struct sockaddr_un *un_addr;
1161 154990 : struct sun_path_buf pathbuf;
1162 154990 : struct stat st1, st2;
1163 154990 : int ret;
1164 :
1165 : /*
1166 : * Protect against using the wrong messaging context after a
1167 : * fork without reinit_after_fork.
1168 : */
1169 :
1170 1283649 : ret = getsockname(ctx->sock, (struct sockaddr *)&addr, &addrlen);
1171 1283649 : if (ret == -1) {
1172 0 : DBG_ERR("getsockname failed: %s\n", strerror(errno));
1173 0 : goto fail;
1174 : }
1175 1283649 : if (addr.ss_family != AF_UNIX) {
1176 0 : DBG_ERR("getsockname returned family %d\n",
1177 : (int)addr.ss_family);
1178 0 : goto fail;
1179 : }
1180 1283649 : un_addr = (struct sockaddr_un *)&addr;
1181 :
1182 1283649 : ret = snprintf(pathbuf.buf, sizeof(pathbuf.buf),
1183 1283649 : "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
1184 1283649 : if (ret < 0) {
1185 0 : DBG_ERR("snprintf failed: %s\n", strerror(errno));
1186 0 : goto fail;
1187 : }
1188 1283649 : if ((size_t)ret >= sizeof(pathbuf.buf)) {
1189 0 : DBG_ERR("snprintf returned %d chars\n", (int)ret);
1190 0 : goto fail;
1191 : }
1192 :
1193 1283649 : if (strcmp(pathbuf.buf, un_addr->sun_path) != 0) {
1194 0 : DBG_ERR("sockname wrong: Expected %s, got %s\n",
1195 : pathbuf.buf, un_addr->sun_path);
1196 0 : goto fail;
1197 : }
1198 :
1199 1283649 : ret = snprintf(pathbuf.buf, sizeof(pathbuf.buf),
1200 1283649 : "%s/%u", ctx->lockfile_dir.buf, (unsigned)pid);
1201 1283649 : if (ret < 0) {
1202 0 : DBG_ERR("snprintf failed: %s\n", strerror(errno));
1203 0 : goto fail;
1204 : }
1205 1283649 : if ((size_t)ret >= sizeof(pathbuf.buf)) {
1206 0 : DBG_ERR("snprintf returned %d chars\n", (int)ret);
1207 0 : goto fail;
1208 : }
1209 :
1210 1283649 : ret = stat(pathbuf.buf, &st1);
1211 1283649 : if (ret == -1) {
1212 0 : DBG_ERR("stat failed: %s\n", strerror(errno));
1213 0 : goto fail;
1214 : }
1215 1283649 : ret = fstat(ctx->lockfile_fd, &st2);
1216 1283649 : if (ret == -1) {
1217 0 : DBG_ERR("fstat failed: %s\n", strerror(errno));
1218 0 : goto fail;
1219 : }
1220 :
1221 1283649 : if ((st1.st_dev != st2.st_dev) || (st1.st_ino != st2.st_ino)) {
1222 0 : DBG_ERR("lockfile differs, expected (%d/%d), got (%d/%d)\n",
1223 : (int)st2.st_dev, (int)st2.st_ino,
1224 : (int)st1.st_dev, (int)st1.st_ino);
1225 0 : goto fail;
1226 : }
1227 :
1228 1283649 : return;
1229 0 : fail:
1230 0 : abort();
1231 : #else
1232 : return;
1233 : #endif
1234 : }
1235 :
1236 : static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
1237 : struct tevent_context *ev,
1238 : uint8_t *msg, size_t msg_len,
1239 : int *fds, size_t num_fds);
1240 :
1241 : /*
1242 : * Raw read callback handler - passes to messaging_dgm_recv()
1243 : * for fragment reassembly processing.
1244 : */
1245 :
1246 203371 : static void messaging_dgm_read_handler(struct tevent_context *ev,
1247 : struct tevent_fd *fde,
1248 : uint16_t flags,
1249 : void *private_data)
1250 203371 : {
1251 203371 : struct messaging_dgm_context *ctx = talloc_get_type_abort(
1252 : private_data, struct messaging_dgm_context);
1253 68684 : ssize_t received;
1254 68684 : struct msghdr msg;
1255 68684 : struct iovec iov;
1256 203371 : size_t msgbufsize = msghdr_prep_recv_fds(NULL, NULL, 0, INT8_MAX);
1257 203371 : uint8_t msgbuf[msgbufsize];
1258 68684 : uint8_t buf[MESSAGING_DGM_FRAGMENT_LENGTH];
1259 68684 : size_t num_fds;
1260 :
1261 203371 : messaging_dgm_validate(ctx);
1262 :
1263 203371 : if ((flags & TEVENT_FD_READ) == 0) {
1264 0 : return;
1265 : }
1266 :
1267 203371 : iov = (struct iovec) { .iov_base = buf, .iov_len = sizeof(buf) };
1268 203371 : msg = (struct msghdr) { .msg_iov = &iov, .msg_iovlen = 1 };
1269 :
1270 203371 : msghdr_prep_recv_fds(&msg, msgbuf, msgbufsize, INT8_MAX);
1271 :
1272 : #ifdef MSG_CMSG_CLOEXEC
1273 203371 : msg.msg_flags |= MSG_CMSG_CLOEXEC;
1274 : #endif
1275 :
1276 203371 : received = recvmsg(ctx->sock, &msg, 0);
1277 203371 : if (received == -1) {
1278 0 : if ((errno == EAGAIN) ||
1279 0 : (errno == EWOULDBLOCK) ||
1280 0 : (errno == EINTR) ||
1281 0 : (errno == ENOMEM)) {
1282 : /* Not really an error - just try again. */
1283 0 : return;
1284 : }
1285 : /* Problem with the socket. Set it unreadable. */
1286 0 : tevent_fd_set_flags(fde, 0);
1287 0 : return;
1288 : }
1289 :
1290 203371 : if ((size_t)received > sizeof(buf)) {
1291 : /* More than we expected, not for us */
1292 0 : return;
1293 : }
1294 :
1295 203371 : num_fds = msghdr_extract_fds(&msg, NULL, 0);
1296 203371 : if (num_fds == 0) {
1297 68629 : int fds[1];
1298 :
1299 160703 : messaging_dgm_recv(ctx, ev, buf, received, fds, 0);
1300 42668 : } else {
1301 55 : size_t i;
1302 42668 : int fds[num_fds];
1303 :
1304 42668 : msghdr_extract_fds(&msg, fds, num_fds);
1305 :
1306 85394 : for (i = 0; i < num_fds; i++) {
1307 58 : int err;
1308 :
1309 42671 : err = prepare_socket_cloexec(fds[i]);
1310 42671 : if (err != 0) {
1311 0 : close_fd_array(fds, num_fds);
1312 0 : num_fds = 0;
1313 : }
1314 : }
1315 :
1316 42668 : messaging_dgm_recv(ctx, ev, buf, received, fds, num_fds);
1317 : }
1318 : }
1319 :
1320 0 : static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg *m)
1321 : {
1322 0 : DLIST_REMOVE(m->ctx->in_msgs, m);
1323 0 : return 0;
1324 : }
1325 :
1326 189896 : static void messaging_dgm_close_unconsumed(int *fds, size_t num_fds)
1327 : {
1328 66684 : size_t i;
1329 :
1330 232567 : for (i=0; i<num_fds; i++) {
1331 42671 : if (fds[i] != -1) {
1332 6 : close(fds[i]);
1333 6 : fds[i] = -1;
1334 : }
1335 : }
1336 189896 : }
1337 :
1338 : /*
1339 : * Deal with identification of fragmented messages and
1340 : * re-assembly into full messages sent, then calls the
1341 : * callback.
1342 : */
1343 :
1344 203371 : static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
1345 : struct tevent_context *ev,
1346 : uint8_t *buf, size_t buflen,
1347 : int *fds, size_t num_fds)
1348 : {
1349 68684 : struct messaging_dgm_fragment_hdr hdr;
1350 68684 : struct messaging_dgm_in_msg *msg;
1351 68684 : size_t space;
1352 68684 : uint64_t cookie;
1353 :
1354 203371 : if (buflen < sizeof(cookie)) {
1355 0 : goto close_fds;
1356 : }
1357 203371 : memcpy(&cookie, buf, sizeof(cookie));
1358 203371 : buf += sizeof(cookie);
1359 203371 : buflen -= sizeof(cookie);
1360 :
1361 203371 : if (cookie == 0) {
1362 178542 : ctx->recv_cb(ev, buf, buflen, fds, num_fds,
1363 : ctx->recv_cb_private_data);
1364 178542 : messaging_dgm_close_unconsumed(fds, num_fds);
1365 256578 : return;
1366 : }
1367 :
1368 24829 : if (buflen < sizeof(hdr)) {
1369 0 : goto close_fds;
1370 : }
1371 24829 : memcpy(&hdr, buf, sizeof(hdr));
1372 24829 : buf += sizeof(hdr);
1373 24829 : buflen -= sizeof(hdr);
1374 :
1375 24829 : for (msg = ctx->in_msgs; msg != NULL; msg = msg->next) {
1376 13475 : if ((msg->sender_pid == hdr.pid) &&
1377 11475 : (msg->sender_sock == hdr.sock)) {
1378 11475 : break;
1379 : }
1380 : }
1381 :
1382 24829 : if ((msg != NULL) && (msg->cookie != cookie)) {
1383 2002 : TALLOC_FREE(msg);
1384 : }
1385 :
1386 24829 : if (msg == NULL) {
1387 2 : size_t msglen;
1388 11354 : msglen = offsetof(struct messaging_dgm_in_msg, buf) +
1389 11352 : hdr.msglen;
1390 :
1391 11354 : msg = talloc_size(ctx, msglen);
1392 11354 : if (msg == NULL) {
1393 0 : goto close_fds;
1394 : }
1395 11354 : talloc_set_name_const(msg, "struct messaging_dgm_in_msg");
1396 :
1397 11354 : *msg = (struct messaging_dgm_in_msg) {
1398 11352 : .ctx = ctx, .msglen = hdr.msglen,
1399 11352 : .sender_pid = hdr.pid, .sender_sock = hdr.sock,
1400 : .cookie = cookie
1401 : };
1402 11354 : DLIST_ADD(ctx->in_msgs, msg);
1403 11354 : talloc_set_destructor(msg, messaging_dgm_in_msg_destructor);
1404 : }
1405 :
1406 24829 : space = msg->msglen - msg->received;
1407 24829 : if (buflen > space) {
1408 0 : goto close_fds;
1409 : }
1410 :
1411 24829 : memcpy(msg->buf + msg->received, buf, buflen);
1412 24829 : msg->received += buflen;
1413 :
1414 24829 : if (msg->received < msg->msglen) {
1415 : /*
1416 : * Any valid sender will send the fds in the last
1417 : * block. Invalid senders might have sent fd's that we
1418 : * need to close here.
1419 : */
1420 13475 : goto close_fds;
1421 : }
1422 :
1423 11354 : DLIST_REMOVE(ctx->in_msgs, msg);
1424 11354 : talloc_set_destructor(msg, NULL);
1425 :
1426 11354 : ctx->recv_cb(ev, msg->buf, msg->msglen, fds, num_fds,
1427 : ctx->recv_cb_private_data);
1428 11354 : messaging_dgm_close_unconsumed(fds, num_fds);
1429 :
1430 11354 : TALLOC_FREE(msg);
1431 11352 : return;
1432 :
1433 13475 : close_fds:
1434 13475 : close_fd_array(fds, num_fds);
1435 : }
1436 :
1437 100002 : void messaging_dgm_destroy(void)
1438 : {
1439 100002 : TALLOC_FREE(global_dgm_context);
1440 100002 : }
1441 :
1442 518500 : int messaging_dgm_send(pid_t pid,
1443 : const struct iovec *iov, int iovlen,
1444 : const int *fds, size_t num_fds)
1445 : {
1446 518500 : struct messaging_dgm_context *ctx = global_dgm_context;
1447 68663 : struct messaging_dgm_out *out;
1448 68663 : int ret;
1449 518500 : unsigned retries = 0;
1450 :
1451 518500 : if (ctx == NULL) {
1452 0 : return ENOTCONN;
1453 : }
1454 :
1455 518500 : messaging_dgm_validate(ctx);
1456 :
1457 518603 : again:
1458 518603 : ret = messaging_dgm_out_get(ctx, pid, &out);
1459 518603 : if (ret != 0) {
1460 6188 : return ret;
1461 : }
1462 :
1463 512415 : DEBUG(10, ("%s: Sending message to %u\n", __func__, (unsigned)pid));
1464 :
1465 512415 : ret = messaging_dgm_out_send_fragmented(ctx->ev, out, iov, iovlen,
1466 : fds, num_fds);
1467 512415 : if (ret == ECONNREFUSED) {
1468 : /*
1469 : * We cache outgoing sockets. If the receiver has
1470 : * closed and re-opened the socket since our last
1471 : * message, we get connection refused. Retry.
1472 : */
1473 :
1474 103 : TALLOC_FREE(out);
1475 :
1476 103 : if (retries < 5) {
1477 103 : retries += 1;
1478 103 : goto again;
1479 : }
1480 : }
1481 443649 : return ret;
1482 : }
1483 :
1484 561553 : static int messaging_dgm_read_unique(int fd, uint64_t *punique)
1485 : {
1486 17642 : char buf[25];
1487 17642 : ssize_t rw_ret;
1488 561553 : int error = 0;
1489 17642 : unsigned long long unique;
1490 17642 : char *endptr;
1491 :
1492 561553 : rw_ret = pread(fd, buf, sizeof(buf)-1, 0);
1493 561553 : if (rw_ret == -1) {
1494 0 : return errno;
1495 : }
1496 561553 : buf[rw_ret] = '\0';
1497 :
1498 561553 : unique = smb_strtoull(buf, &endptr, 10, &error, SMB_STR_STANDARD);
1499 561553 : if (error != 0) {
1500 0 : return error;
1501 : }
1502 :
1503 561553 : if (endptr[0] != '\n') {
1504 0 : return EINVAL;
1505 : }
1506 561553 : *punique = unique;
1507 561553 : return 0;
1508 : }
1509 :
1510 561553 : int messaging_dgm_get_unique(pid_t pid, uint64_t *unique)
1511 : {
1512 561553 : struct messaging_dgm_context *ctx = global_dgm_context;
1513 17642 : struct sun_path_buf lockfile_name;
1514 17642 : int ret, fd;
1515 :
1516 561553 : if (ctx == NULL) {
1517 0 : return EBADF;
1518 : }
1519 :
1520 561553 : messaging_dgm_validate(ctx);
1521 :
1522 561553 : if (pid == tevent_cached_getpid()) {
1523 : /*
1524 : * Protect against losing our own lock
1525 : */
1526 472476 : return messaging_dgm_read_unique(ctx->lockfile_fd, unique);
1527 : }
1528 :
1529 89077 : ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
1530 89077 : "%s/%u", ctx->lockfile_dir.buf, (int)pid);
1531 89077 : if (ret < 0) {
1532 0 : return errno;
1533 : }
1534 89077 : if ((size_t)ret >= sizeof(lockfile_name.buf)) {
1535 0 : return ENAMETOOLONG;
1536 : }
1537 :
1538 89077 : fd = open(lockfile_name.buf, O_NONBLOCK|O_RDONLY, 0);
1539 89077 : if (fd == -1) {
1540 0 : return errno;
1541 : }
1542 :
1543 89077 : ret = messaging_dgm_read_unique(fd, unique);
1544 89077 : close(fd);
1545 89077 : return ret;
1546 : }
1547 :
1548 15426 : int messaging_dgm_cleanup(pid_t pid)
1549 : {
1550 15426 : struct messaging_dgm_context *ctx = global_dgm_context;
1551 0 : struct sun_path_buf lockfile_name, socket_name;
1552 0 : int fd, len, ret;
1553 15426 : struct flock lck = {
1554 : .l_pid = 0,
1555 : };
1556 :
1557 15426 : if (ctx == NULL) {
1558 0 : return ENOTCONN;
1559 : }
1560 :
1561 15426 : len = snprintf(socket_name.buf, sizeof(socket_name.buf), "%s/%u",
1562 15426 : ctx->socket_dir.buf, (unsigned)pid);
1563 15426 : if (len < 0) {
1564 0 : return errno;
1565 : }
1566 15426 : if ((size_t)len >= sizeof(socket_name.buf)) {
1567 0 : return ENAMETOOLONG;
1568 : }
1569 :
1570 15426 : len = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf), "%s/%u",
1571 15426 : ctx->lockfile_dir.buf, (unsigned)pid);
1572 15426 : if (len < 0) {
1573 0 : return errno;
1574 : }
1575 15426 : if ((size_t)len >= sizeof(lockfile_name.buf)) {
1576 0 : return ENAMETOOLONG;
1577 : }
1578 :
1579 15426 : fd = open(lockfile_name.buf, O_NONBLOCK|O_WRONLY, 0);
1580 15426 : if (fd == -1) {
1581 15384 : ret = errno;
1582 15384 : if (ret != ENOENT) {
1583 0 : DEBUG(10, ("%s: open(%s) failed: %s\n", __func__,
1584 : lockfile_name.buf, strerror(ret)));
1585 : }
1586 15384 : return ret;
1587 : }
1588 :
1589 42 : lck.l_type = F_WRLCK;
1590 42 : lck.l_whence = SEEK_SET;
1591 42 : lck.l_start = 0;
1592 42 : lck.l_len = 0;
1593 :
1594 42 : ret = fcntl(fd, F_SETLK, &lck);
1595 42 : if (ret != 0) {
1596 0 : ret = errno;
1597 0 : if ((ret != EACCES) && (ret != EAGAIN)) {
1598 0 : DEBUG(10, ("%s: Could not get lock: %s\n", __func__,
1599 : strerror(ret)));
1600 : }
1601 0 : close(fd);
1602 0 : return ret;
1603 : }
1604 :
1605 42 : DEBUG(10, ("%s: Cleaning up : %s\n", __func__, strerror(ret)));
1606 :
1607 42 : (void)unlink(socket_name.buf);
1608 42 : (void)unlink(lockfile_name.buf);
1609 42 : (void)close(fd);
1610 42 : return 0;
1611 : }
1612 :
1613 0 : static int messaging_dgm_wipe_fn(pid_t pid, void *private_data)
1614 : {
1615 0 : pid_t *our_pid = (pid_t *)private_data;
1616 0 : int ret;
1617 :
1618 0 : if (pid == *our_pid) {
1619 : /*
1620 : * fcntl(F_GETLK) will succeed for ourselves, we hold
1621 : * that lock ourselves.
1622 : */
1623 0 : return 0;
1624 : }
1625 :
1626 0 : ret = messaging_dgm_cleanup(pid);
1627 0 : DEBUG(10, ("messaging_dgm_cleanup(%lu) returned %s\n",
1628 : (unsigned long)pid, ret ? strerror(ret) : "ok"));
1629 :
1630 0 : return 0;
1631 : }
1632 :
1633 0 : int messaging_dgm_wipe(void)
1634 : {
1635 0 : pid_t pid = tevent_cached_getpid();
1636 0 : messaging_dgm_forall(messaging_dgm_wipe_fn, &pid);
1637 0 : return 0;
1638 : }
1639 :
1640 225 : int messaging_dgm_forall(int (*fn)(pid_t pid, void *private_data),
1641 : void *private_data)
1642 : {
1643 225 : struct messaging_dgm_context *ctx = global_dgm_context;
1644 1 : DIR *msgdir;
1645 1 : struct dirent *dp;
1646 225 : int error = 0;
1647 :
1648 225 : if (ctx == NULL) {
1649 0 : return ENOTCONN;
1650 : }
1651 :
1652 225 : messaging_dgm_validate(ctx);
1653 :
1654 : /*
1655 : * We scan the socket directory and not the lock directory. Otherwise
1656 : * we would race against messaging_dgm_lockfile_create's open(O_CREAT)
1657 : * and fcntl(SETLK).
1658 : */
1659 :
1660 225 : msgdir = opendir(ctx->socket_dir.buf);
1661 225 : if (msgdir == NULL) {
1662 0 : return errno;
1663 : }
1664 :
1665 14938 : while ((dp = readdir(msgdir)) != NULL) {
1666 8 : unsigned long pid;
1667 8 : int ret;
1668 :
1669 14713 : pid = smb_strtoul(dp->d_name, NULL, 10, &error, SMB_STR_STANDARD);
1670 14713 : if ((pid == 0) || (error != 0)) {
1671 : /*
1672 : * . and .. and other malformed entries
1673 : */
1674 450 : continue;
1675 : }
1676 :
1677 14263 : ret = fn(pid, private_data);
1678 14263 : if (ret != 0) {
1679 0 : break;
1680 : }
1681 : }
1682 225 : closedir(msgdir);
1683 :
1684 225 : return 0;
1685 : }
1686 :
1687 : struct messaging_dgm_fde {
1688 : struct tevent_fd *fde;
1689 : };
1690 :
1691 160119 : static int messaging_dgm_fde_ev_destructor(struct messaging_dgm_fde_ev *fde_ev)
1692 : {
1693 160119 : if (fde_ev->ctx != NULL) {
1694 144213 : DLIST_REMOVE(fde_ev->ctx->fde_evs, fde_ev);
1695 144213 : fde_ev->ctx = NULL;
1696 : }
1697 160119 : return 0;
1698 : }
1699 :
1700 : /*
1701 : * Reference counter for a struct tevent_fd messaging read event
1702 : * (with callback function) on a struct tevent_context registered
1703 : * on a messaging context.
1704 : *
1705 : * If we've already registered this struct tevent_context before
1706 : * (so already have a read event), just increase the reference count.
1707 : *
1708 : * Otherwise create a new struct tevent_fd messaging read event on the
1709 : * previously unseen struct tevent_context - this is what drives
1710 : * the message receive processing.
1711 : *
1712 : */
1713 :
1714 646980 : struct messaging_dgm_fde *messaging_dgm_register_tevent_context(
1715 : TALLOC_CTX *mem_ctx, struct tevent_context *ev)
1716 : {
1717 646980 : struct messaging_dgm_context *ctx = global_dgm_context;
1718 21982 : struct messaging_dgm_fde_ev *fde_ev;
1719 21982 : struct messaging_dgm_fde *fde;
1720 :
1721 646980 : if (ctx == NULL) {
1722 0 : return NULL;
1723 : }
1724 :
1725 646980 : fde = talloc(mem_ctx, struct messaging_dgm_fde);
1726 646980 : if (fde == NULL) {
1727 0 : return NULL;
1728 : }
1729 :
1730 845961 : for (fde_ev = ctx->fde_evs; fde_ev != NULL; fde_ev = fde_ev->next) {
1731 726204 : if (tevent_fd_get_flags(fde_ev->fde) == 0) {
1732 : /*
1733 : * If the event context got deleted,
1734 : * tevent_fd_get_flags() will return 0
1735 : * for the stale fde.
1736 : *
1737 : * In that case we should not
1738 : * use fde_ev->ev anymore.
1739 : */
1740 60034 : continue;
1741 : }
1742 666170 : if (fde_ev->ev == ev) {
1743 510155 : break;
1744 : }
1745 : }
1746 :
1747 646980 : if (fde_ev == NULL) {
1748 119757 : fde_ev = talloc(fde, struct messaging_dgm_fde_ev);
1749 119757 : if (fde_ev == NULL) {
1750 0 : return NULL;
1751 : }
1752 119757 : fde_ev->fde = tevent_add_fd(
1753 : ev, fde_ev, ctx->sock, TEVENT_FD_READ,
1754 : messaging_dgm_read_handler, ctx);
1755 119757 : if (fde_ev->fde == NULL) {
1756 0 : TALLOC_FREE(fde);
1757 0 : return NULL;
1758 : }
1759 119757 : fde_ev->ev = ev;
1760 119757 : fde_ev->ctx = ctx;
1761 119757 : DLIST_ADD(ctx->fde_evs, fde_ev);
1762 119757 : talloc_set_destructor(
1763 : fde_ev, messaging_dgm_fde_ev_destructor);
1764 : } else {
1765 : /*
1766 : * Same trick as with tdb_wrap: The caller will never
1767 : * see the talloc_referenced object, the
1768 : * messaging_dgm_fde_ev, so problems with
1769 : * talloc_unlink will not happen.
1770 : */
1771 527223 : if (talloc_reference(fde, fde_ev) == NULL) {
1772 0 : TALLOC_FREE(fde);
1773 0 : return NULL;
1774 : }
1775 : }
1776 :
1777 646980 : fde->fde = fde_ev->fde;
1778 646980 : return fde;
1779 : }
1780 :
1781 503436 : bool messaging_dgm_fde_active(struct messaging_dgm_fde *fde)
1782 : {
1783 132931 : uint16_t flags;
1784 :
1785 503436 : if (fde == NULL) {
1786 0 : return false;
1787 : }
1788 503436 : flags = tevent_fd_get_flags(fde->fde);
1789 503436 : return (flags != 0);
1790 : }
|