LCOV - code coverage report
Current view: top level - lib/messaging - messages_dgm.c (source / functions) Hit Total Coverage
Test: coverage report for master 2f515e9b Lines: 612 795 77.0 %
Date: 2024-04-21 15:09:00 Functions: 33 36 91.7 %

          Line data    Source code
       1             : /*
       2             :  * Unix SMB/CIFS implementation.
       3             :  * Samba internal messaging functions
       4             :  * Copyright (C) 2013 by Volker Lendecke
       5             :  *
       6             :  * This program is free software; you can redistribute it and/or modify
       7             :  * it under the terms of the GNU General Public License as published by
       8             :  * the Free Software Foundation; either version 3 of the License, or
       9             :  * (at your option) any later version.
      10             :  *
      11             :  * This program is distributed in the hope that it will be useful,
      12             :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      13             :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      14             :  * GNU General Public License for more details.
      15             :  *
      16             :  * You should have received a copy of the GNU General Public License
      17             :  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
      18             :  */
      19             : 
      20             : #include "replace.h"
      21             : #include "util/util.h"
      22             : #include "system/network.h"
      23             : #include "system/filesys.h"
      24             : #include "system/dir.h"
      25             : #include "system/select.h"
      26             : #include "lib/util/debug.h"
      27             : #include "messages_dgm.h"
      28             : #include "lib/util/genrand.h"
      29             : #include "lib/util/dlinklist.h"
      30             : #include "lib/pthreadpool/pthreadpool_tevent.h"
      31             : #include "lib/util/msghdr.h"
      32             : #include "lib/util/iov_buf.h"
      33             : #include "lib/util/blocking.h"
      34             : #include "lib/util/tevent_unix.h"
      35             : #include "lib/util/smb_strtox.h"
      36             : 
      37             : #define MESSAGING_DGM_FRAGMENT_LENGTH 1024
      38             : 
      39             : struct sun_path_buf {
      40             :         /*
      41             :          * This will carry enough for a socket path
      42             :          */
      43             :         char buf[sizeof(struct sockaddr_un)];
      44             : };
      45             : 
      46             : /*
      47             :  * We can only have one tevent_fd per dgm_context and per
      48             :  * tevent_context. Maintain a list of registered tevent_contexts per
      49             :  * dgm_context.
      50             :  */
      51             : struct messaging_dgm_fde_ev {
      52             :         struct messaging_dgm_fde_ev *prev, *next;
      53             : 
      54             :         /*
      55             :          * Backreference to enable DLIST_REMOVE from our
      56             :          * destructor. Also, set to NULL when the dgm_context dies
      57             :          * before the messaging_dgm_fde_ev.
      58             :          */
      59             :         struct messaging_dgm_context *ctx;
      60             : 
      61             :         struct tevent_context *ev;
      62             :         struct tevent_fd *fde;
      63             : };
      64             : 
      65             : struct messaging_dgm_out {
      66             :         struct messaging_dgm_out *prev, *next;
      67             :         struct messaging_dgm_context *ctx;
      68             : 
      69             :         pid_t pid;
      70             :         int sock;
      71             :         bool is_blocking;
      72             :         uint64_t cookie;
      73             : 
      74             :         struct tevent_queue *queue;
      75             :         struct tevent_timer *idle_timer;
      76             : };
      77             : 
      78             : struct messaging_dgm_in_msg {
      79             :         struct messaging_dgm_in_msg *prev, *next;
      80             :         struct messaging_dgm_context *ctx;
      81             :         size_t msglen;
      82             :         size_t received;
      83             :         pid_t sender_pid;
      84             :         int sender_sock;
      85             :         uint64_t cookie;
      86             :         uint8_t buf[];
      87             : };
      88             : 
      89             : struct messaging_dgm_context {
      90             :         struct tevent_context *ev;
      91             :         pid_t pid;
      92             :         struct sun_path_buf socket_dir;
      93             :         struct sun_path_buf lockfile_dir;
      94             :         int lockfile_fd;
      95             : 
      96             :         int sock;
      97             :         struct messaging_dgm_in_msg *in_msgs;
      98             : 
      99             :         struct messaging_dgm_fde_ev *fde_evs;
     100             :         void (*recv_cb)(struct tevent_context *ev,
     101             :                         const uint8_t *msg,
     102             :                         size_t msg_len,
     103             :                         int *fds,
     104             :                         size_t num_fds,
     105             :                         void *private_data);
     106             :         void *recv_cb_private_data;
     107             : 
     108             :         bool *have_dgm_context;
     109             : 
     110             :         struct pthreadpool_tevent *pool;
     111             :         struct messaging_dgm_out *outsocks;
     112             : };
     113             : 
     114             : /* Set socket close on exec. */
     115      102567 : static int prepare_socket_cloexec(int sock)
     116             : {
     117             : #ifdef FD_CLOEXEC
     118        1191 :         int flags;
     119             : 
     120      102567 :         flags = fcntl(sock, F_GETFD, 0);
     121      102567 :         if (flags == -1) {
     122           0 :                 return errno;
     123             :         }
     124      102567 :         flags |= FD_CLOEXEC;
     125      102567 :         if (fcntl(sock, F_SETFD, flags) == -1) {
     126           0 :                 return errno;
     127             :         }
     128             : #endif
     129      101376 :         return 0;
     130             : }
     131             : 
     132       83416 : static void close_fd_array(int *fds, size_t num_fds)
     133             : {
     134       71243 :         size_t i;
     135             : 
     136       83454 :         for (i = 0; i < num_fds; i++) {
     137          38 :                 if (fds[i] == -1) {
     138           0 :                         continue;
     139             :                 }
     140             : 
     141          38 :                 close(fds[i]);
     142          38 :                 fds[i] = -1;
     143             :         }
     144       83416 : }
     145             : 
     146             : /*
     147             :  * The idle handler can free the struct messaging_dgm_out *,
     148             :  * if it's unused (qlen of zero) which closes the socket.
     149             :  */
     150             : 
     151       12431 : static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
     152             :                                            struct tevent_timer *te,
     153             :                                            struct timeval current_time,
     154             :                                            void *private_data)
     155             : {
     156       12431 :         struct messaging_dgm_out *out = talloc_get_type_abort(
     157             :                 private_data, struct messaging_dgm_out);
     158          28 :         size_t qlen;
     159             : 
     160       12431 :         out->idle_timer = NULL;
     161             : 
     162       12431 :         qlen = tevent_queue_length(out->queue);
     163       12431 :         if (qlen == 0) {
     164       12431 :                 TALLOC_FREE(out);
     165             :         }
     166       12431 : }
     167             : 
     168             : /*
     169             :  * Setup the idle handler to fire after 1 second if the
     170             :  * queue is zero.
     171             :  */
     172             : 
     173      581367 : static void messaging_dgm_out_rearm_idle_timer(struct messaging_dgm_out *out)
     174             : {
     175      137016 :         size_t qlen;
     176             : 
     177      581367 :         qlen = tevent_queue_length(out->queue);
     178      581367 :         if (qlen != 0) {
     179      136370 :                 TALLOC_FREE(out->idle_timer);
     180      136370 :                 return;
     181             :         }
     182             : 
     183      444997 :         if (out->idle_timer != NULL) {
     184      420944 :                 tevent_update_timer(out->idle_timer,
     185             :                                     tevent_timeval_current_ofs(1, 0));
     186      420944 :                 return;
     187             :         }
     188             : 
     189       24053 :         out->idle_timer = tevent_add_timer(
     190             :                 out->ctx->ev, out, tevent_timeval_current_ofs(1, 0),
     191             :                 messaging_dgm_out_idle_handler, out);
     192             :         /*
     193             :          * No NULL check, we'll come back here. Worst case we're
     194             :          * leaking a bit.
     195             :          */
     196             : }
     197             : 
     198             : static int messaging_dgm_out_destructor(struct messaging_dgm_out *dst);
     199             : static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
     200             :                                            struct tevent_timer *te,
     201             :                                            struct timeval current_time,
     202             :                                            void *private_data);
     203             : 
     204             : /*
     205             :  * Connect to an existing rendezvous point for another
     206             :  * pid - wrapped inside a struct messaging_dgm_out *.
     207             :  */
     208             : 
     209       30054 : static int messaging_dgm_out_create(TALLOC_CTX *mem_ctx,
     210             :                                     struct messaging_dgm_context *ctx,
     211             :                                     pid_t pid, struct messaging_dgm_out **pout)
     212             : {
     213         552 :         struct messaging_dgm_out *out;
     214       30054 :         struct sockaddr_un addr = { .sun_family = AF_UNIX };
     215       30054 :         int ret = ENOMEM;
     216         552 :         int out_pathlen;
     217         552 :         char addr_buf[sizeof(addr.sun_path) + (3 * sizeof(unsigned) + 2)];
     218             : 
     219       30054 :         out = talloc(mem_ctx, struct messaging_dgm_out);
     220       30054 :         if (out == NULL) {
     221           0 :                 goto fail;
     222             :         }
     223             : 
     224       30054 :         *out = (struct messaging_dgm_out) {
     225             :                 .pid = pid,
     226             :                 .ctx = ctx,
     227             :                 .cookie = 1
     228             :         };
     229             : 
     230       30054 :         out_pathlen = snprintf(addr_buf, sizeof(addr_buf),
     231       30054 :                                "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
     232       30054 :         if (out_pathlen < 0) {
     233           0 :                 goto errno_fail;
     234             :         }
     235       30054 :         if ((size_t)out_pathlen >= sizeof(addr.sun_path)) {
     236           0 :                 ret = ENAMETOOLONG;
     237           0 :                 goto fail;
     238             :         }
     239             : 
     240       30054 :         memcpy(addr.sun_path, addr_buf, out_pathlen + 1);
     241             : 
     242       30054 :         out->queue = tevent_queue_create(out, addr.sun_path);
     243       30054 :         if (out->queue == NULL) {
     244           0 :                 ret = ENOMEM;
     245           0 :                 goto fail;
     246             :         }
     247             : 
     248       30054 :         out->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
     249       30054 :         if (out->sock == -1) {
     250           0 :                 goto errno_fail;
     251             :         }
     252             : 
     253       30054 :         DLIST_ADD(ctx->outsocks, out);
     254       30054 :         talloc_set_destructor(out, messaging_dgm_out_destructor);
     255             : 
     256         552 :         do {
     257       30054 :                 ret = connect(out->sock,
     258             :                               (const struct sockaddr *)(const void *)&addr,
     259             :                               sizeof(addr));
     260       30054 :         } while ((ret == -1) && (errno == EINTR));
     261             : 
     262       30054 :         if (ret == -1) {
     263        6188 :                 goto errno_fail;
     264             :         }
     265             : 
     266       23866 :         ret = set_blocking(out->sock, false);
     267       23866 :         if (ret == -1) {
     268           0 :                 goto errno_fail;
     269             :         }
     270       23866 :         out->is_blocking = false;
     271             : 
     272       23866 :         *pout = out;
     273       23866 :         return 0;
     274        6188 : errno_fail:
     275        6188 :         ret = errno;
     276        6188 : fail:
     277        6188 :         TALLOC_FREE(out);
     278        6188 :         return ret;
     279             : }
     280             : 
     281       60555 : static int messaging_dgm_out_destructor(struct messaging_dgm_out *out)
     282             : {
     283       60555 :         DLIST_REMOVE(out->ctx->outsocks, out);
     284             : 
     285       60555 :         if ((tevent_queue_length(out->queue) != 0) &&
     286           3 :             (tevent_cached_getpid() == out->ctx->pid)) {
     287             :                 /*
     288             :                  * We have pending jobs. We can't close the socket,
     289             :                  * this has been handed over to messaging_dgm_out_queue_state.
     290             :                  */
     291           0 :                 return 0;
     292             :         }
     293             : 
     294       60552 :         if (out->sock != -1) {
     295       60552 :                 close(out->sock);
     296       60552 :                 out->sock = -1;
     297             :         }
     298       59526 :         return 0;
     299             : }
     300             : 
     301             : /*
     302             :  * Find the struct messaging_dgm_out * to talk to pid.
     303             :  * If we don't have one, create it. Set the timer to
     304             :  * delete after 1 sec.
     305             :  */
     306             : 
     307      518603 : static int messaging_dgm_out_get(struct messaging_dgm_context *ctx, pid_t pid,
     308             :                                  struct messaging_dgm_out **pout)
     309             : {
     310       68762 :         struct messaging_dgm_out *out;
     311       68762 :         int ret;
     312             : 
     313      945232 :         for (out = ctx->outsocks; out != NULL; out = out->next) {
     314      915178 :                 if (out->pid == pid) {
     315      420339 :                         break;
     316             :                 }
     317             :         }
     318             : 
     319      518603 :         if (out == NULL) {
     320       30054 :                 ret = messaging_dgm_out_create(ctx, ctx, pid, &out);
     321       30054 :                 if (ret != 0) {
     322        6188 :                         return ret;
     323             :                 }
     324             :         }
     325             : 
     326             :         /*
     327             :          * shouldn't be possible, should be set if messaging_dgm_out_create
     328             :          * succeeded. This check is to satisfy static checker
     329             :          */
     330      512415 :         if (out == NULL) {
     331           0 :                 return EINVAL;
     332             :         }
     333      512415 :         messaging_dgm_out_rearm_idle_timer(out);
     334             : 
     335      512415 :         *pout = out;
     336      512415 :         return 0;
     337             : }
     338             : 
     339             : /*
     340             :  * This function is called directly to send a message fragment
     341             :  * when the outgoing queue is zero, and from a pthreadpool
     342             :  * job thread when messages are being queued (qlen != 0).
     343             :  * Make sure *ONLY* thread-safe functions are called within.
     344             :  */
     345             : 
     346      526312 : static ssize_t messaging_dgm_sendmsg(int sock,
     347             :                                      const struct iovec *iov, int iovlen,
     348             :                                      const int *fds, size_t num_fds,
     349             :                                      int *perrno)
     350             : {
     351       69779 :         struct msghdr msg;
     352       69779 :         ssize_t fdlen, ret;
     353             : 
     354             :         /*
     355             :          * Do the actual sendmsg syscall. This will be called from a
     356             :          * pthreadpool helper thread, so be careful what you do here.
     357             :          */
     358             : 
     359      526312 :         msg = (struct msghdr) {
     360             :                 .msg_iov = discard_const_p(struct iovec, iov),
     361             :                 .msg_iovlen = iovlen
     362             :         };
     363             : 
     364      526312 :         fdlen = msghdr_prep_fds(&msg, NULL, 0, fds, num_fds);
     365      526312 :         if (fdlen == -1) {
     366           0 :                 *perrno = EINVAL;
     367           0 :                 return -1;
     368             :         }
     369             : 
     370      526312 :         {
     371      526312 :                 uint8_t buf[fdlen];
     372             : 
     373      526312 :                 msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
     374             : 
     375       69779 :                 do {
     376      526312 :                         ret = sendmsg(sock, &msg, 0);
     377      526309 :                 } while ((ret == -1) && (errno == EINTR));
     378             :         }
     379             : 
     380      526309 :         if (ret == -1) {
     381         346 :                 *perrno = errno;
     382             :         }
     383      456530 :         return ret;
     384             : }
     385             : 
     386             : struct messaging_dgm_out_queue_state {
     387             :         struct tevent_context *ev;
     388             :         struct pthreadpool_tevent *pool;
     389             : 
     390             :         struct tevent_req *req;
     391             :         struct tevent_req *subreq;
     392             : 
     393             :         int sock;
     394             : 
     395             :         int *fds;
     396             :         uint8_t *buf;
     397             : 
     398             :         ssize_t sent;
     399             :         int err;
     400             : };
     401             : 
     402             : static int messaging_dgm_out_queue_state_destructor(
     403             :         struct messaging_dgm_out_queue_state *state);
     404             : static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
     405             :                                            void *private_data);
     406             : static void messaging_dgm_out_threaded_job(void *private_data);
     407             : static void messaging_dgm_out_queue_done(struct tevent_req *subreq);
     408             : 
     409             : /*
     410             :  * Push a message fragment onto a queue to be sent by a
     411             :  * threadpool job. Makes copies of data/fd's to be sent.
     412             :  * The running tevent_queue internally creates an immediate
     413             :  * event to schedule the write.
     414             :  */
     415             : 
     416       69947 : static struct tevent_req *messaging_dgm_out_queue_send(
     417             :         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
     418             :         struct messaging_dgm_out *out,
     419             :         const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
     420             : {
     421       69246 :         struct tevent_req *req;
     422       69246 :         struct messaging_dgm_out_queue_state *state;
     423       69246 :         struct tevent_queue_entry *e;
     424       69246 :         size_t i;
     425       69246 :         ssize_t buflen;
     426             : 
     427       69947 :         req = tevent_req_create(out, &state,
     428             :                                 struct messaging_dgm_out_queue_state);
     429       69947 :         if (req == NULL) {
     430           0 :                 return NULL;
     431             :         }
     432       69947 :         state->ev = ev;
     433       69947 :         state->pool = out->ctx->pool;
     434       69947 :         state->sock = out->sock;
     435       69947 :         state->req = req;
     436             : 
     437             :         /*
     438             :          * Go blocking in a thread
     439             :          */
     440       69947 :         if (!out->is_blocking) {
     441         242 :                 int ret = set_blocking(out->sock, true);
     442         242 :                 if (ret == -1) {
     443           0 :                         tevent_req_error(req, errno);
     444           0 :                         return tevent_req_post(req, ev);
     445             :                 }
     446         242 :                 out->is_blocking = true;
     447             :         }
     448             : 
     449       69947 :         buflen = iov_buflen(iov, iovlen);
     450       69947 :         if (buflen == -1) {
     451           0 :                 tevent_req_error(req, EMSGSIZE);
     452           0 :                 return tevent_req_post(req, ev);
     453             :         }
     454             : 
     455       69947 :         state->buf = talloc_array(state, uint8_t, buflen);
     456       69947 :         if (tevent_req_nomem(state->buf, req)) {
     457           0 :                 return tevent_req_post(req, ev);
     458             :         }
     459       69947 :         iov_buf(iov, iovlen, state->buf, buflen);
     460             : 
     461       69947 :         state->fds = talloc_array(state, int, num_fds);
     462       69947 :         if (tevent_req_nomem(state->fds, req)) {
     463           0 :                 return tevent_req_post(req, ev);
     464             :         }
     465             : 
     466       69985 :         for (i=0; i<num_fds; i++) {
     467          38 :                 state->fds[i] = -1;
     468             :         }
     469             : 
     470       69985 :         for (i=0; i<num_fds; i++) {
     471             : 
     472          38 :                 state->fds[i] = dup(fds[i]);
     473             : 
     474          38 :                 if (state->fds[i] == -1) {
     475           0 :                         int ret = errno;
     476             : 
     477           0 :                         close_fd_array(state->fds, num_fds);
     478             : 
     479           0 :                         tevent_req_error(req, ret);
     480           0 :                         return tevent_req_post(req, ev);
     481             :                 }
     482             :         }
     483             : 
     484       69947 :         talloc_set_destructor(state, messaging_dgm_out_queue_state_destructor);
     485             : 
     486       69947 :         e = tevent_queue_add_entry(out->queue, ev, req,
     487       69246 :                                    messaging_dgm_out_queue_trigger, req);
     488       69947 :         if (tevent_req_nomem(e, req)) {
     489           0 :                 return tevent_req_post(req, ev);
     490             :         }
     491         701 :         return req;
     492             : }
     493             : 
     494       69944 : static int messaging_dgm_out_queue_state_destructor(
     495             :         struct messaging_dgm_out_queue_state *state)
     496             : {
     497       69246 :         int *fds;
     498       69246 :         size_t num_fds;
     499             : 
     500       69944 :         if (state->subreq != NULL) {
     501             :                 /*
     502             :                  * We're scheduled, but we're destroyed. This happens
     503             :                  * if the messaging_dgm_context is destroyed while
     504             :                  * we're stuck in a blocking send. There's nothing we
     505             :                  * can do but to leak memory.
     506             :                  */
     507           3 :                 TALLOC_FREE(state->subreq);
     508           3 :                 (void)talloc_reparent(state->req, NULL, state);
     509           3 :                 return -1;
     510             :         }
     511             : 
     512       69941 :         fds = state->fds;
     513       69941 :         num_fds = talloc_array_length(fds);
     514       69941 :         close_fd_array(fds, num_fds);
     515       69941 :         return 0;
     516             : }
     517             : 
     518             : /*
     519             :  * tevent_queue callback that schedules the pthreadpool to actually
     520             :  * send the queued message fragment.
     521             :  */
     522             : 
     523       68935 : static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
     524             :                                            void *private_data)
     525             : {
     526       68935 :         struct messaging_dgm_out_queue_state *state = tevent_req_data(
     527             :                 req, struct messaging_dgm_out_queue_state);
     528             : 
     529       68935 :         tevent_req_reset_endtime(req);
     530             : 
     531       68935 :         state->subreq = pthreadpool_tevent_job_send(
     532             :                 state, state->ev, state->pool,
     533             :                 messaging_dgm_out_threaded_job, state);
     534       68935 :         if (tevent_req_nomem(state->subreq, req)) {
     535           0 :                 return;
     536             :         }
     537       68935 :         tevent_req_set_callback(state->subreq, messaging_dgm_out_queue_done,
     538             :                                 req);
     539             : }
     540             : 
     541             : /*
     542             :  * Wrapper function run by the pthread that calls
     543             :  * messaging_dgm_sendmsg() to actually do the sendmsg().
     544             :  */
     545             : 
     546       68935 : static void messaging_dgm_out_threaded_job(void *private_data)
     547             : {
     548       68935 :         struct messaging_dgm_out_queue_state *state = talloc_get_type_abort(
     549             :                 private_data, struct messaging_dgm_out_queue_state);
     550             : 
     551      137870 :         struct iovec iov = { .iov_base = state->buf,
     552       68935 :                              .iov_len = talloc_get_size(state->buf) };
     553       68935 :         size_t num_fds = talloc_array_length(state->fds);
     554       68935 :         int msec = 1;
     555             : 
     556       68257 :         while (true) {
     557       68257 :                 int ret;
     558             : 
     559      137867 :                 state->sent = messaging_dgm_sendmsg(state->sock, &iov, 1,
     560       68935 :                                             state->fds, num_fds, &state->err);
     561             : 
     562       68932 :                 if (state->sent != -1) {
     563         675 :                         return;
     564             :                 }
     565           1 :                 if (state->err != ENOBUFS) {
     566           0 :                         return;
     567             :                 }
     568             : 
     569             :                 /*
     570             :                  * ENOBUFS is the FreeBSD way of saying "Try
     571             :                  * again". We have to do polling.
     572             :                  */
     573           0 :                 do {
     574           0 :                         ret = poll(NULL, 0, msec);
     575           0 :                 } while ((ret == -1) && (errno == EINTR));
     576             : 
     577             :                 /*
     578             :                  * Exponential backoff up to once a second
     579             :                  */
     580           0 :                 msec *= 2;
     581           0 :                 msec = MIN(msec, 1000);
     582             :         }
     583             : }
     584             : 
     585             : /*
     586             :  * Pickup the results of the pthread sendmsg().
     587             :  */
     588             : 
     589       68929 : static void messaging_dgm_out_queue_done(struct tevent_req *subreq)
     590             : {
     591       68929 :         struct tevent_req *req = tevent_req_callback_data(
     592             :                 subreq, struct tevent_req);
     593       68929 :         struct messaging_dgm_out_queue_state *state = tevent_req_data(
     594             :                 req, struct messaging_dgm_out_queue_state);
     595       68254 :         int ret;
     596             : 
     597       68929 :         if (subreq != state->subreq) {
     598           0 :                 abort();
     599             :         }
     600             : 
     601       68929 :         ret = pthreadpool_tevent_job_recv(subreq);
     602             : 
     603       68929 :         TALLOC_FREE(subreq);
     604       68929 :         state->subreq = NULL;
     605             : 
     606       68929 :         if (tevent_req_error(req, ret)) {
     607           0 :                 return;
     608             :         }
     609       68929 :         if (state->sent == -1) {
     610           0 :                 tevent_req_error(req, state->err);
     611           0 :                 return;
     612             :         }
     613       68929 :         tevent_req_done(req);
     614             : }
     615             : 
     616       68952 : static int messaging_dgm_out_queue_recv(struct tevent_req *req)
     617             : {
     618       68952 :         return tevent_req_simple_recv_unix(req);
     619             : }
     620             : 
     621             : static void messaging_dgm_out_sent_fragment(struct tevent_req *req);
     622             : 
     623             : /*
     624             :  * Core function to send a message fragment given a
     625             :  * connected struct messaging_dgm_out * destination.
     626             :  * If no current queue tries to send nonblocking
     627             :  * directly. If not, queues the fragment (which makes
     628             :  * a copy of it) and adds a 60-second timeout on the send.
     629             :  */
     630             : 
     631      527082 : static int messaging_dgm_out_send_fragment(
     632             :         struct tevent_context *ev, struct messaging_dgm_out *out,
     633             :         const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
     634             : {
     635       70762 :         struct tevent_req *req;
     636       70762 :         size_t qlen;
     637       70762 :         bool ok;
     638             : 
     639      527082 :         qlen = tevent_queue_length(out->queue);
     640      527082 :         if (qlen == 0) {
     641        1522 :                 ssize_t nsent;
     642      457377 :                 int err = 0;
     643             : 
     644      457377 :                 if (out->is_blocking) {
     645         149 :                         int ret = set_blocking(out->sock, false);
     646         149 :                         if (ret == -1) {
     647      457135 :                                 return errno;
     648             :                         }
     649         149 :                         out->is_blocking = false;
     650             :                 }
     651             : 
     652      457377 :                 nsent = messaging_dgm_sendmsg(out->sock, iov, iovlen, fds,
     653             :                                               num_fds, &err);
     654      457377 :                 if (nsent >= 0) {
     655      455615 :                         return 0;
     656             :                 }
     657             : 
     658         345 :                 if (err == ENOBUFS) {
     659             :                         /*
     660             :                          * FreeBSD's way of telling us the dst socket
     661             :                          * is full. EWOULDBLOCK makes us spawn a
     662             :                          * polling helper thread.
     663             :                          */
     664           0 :                         err = EWOULDBLOCK;
     665             :                 }
     666             : 
     667         345 :                 if (err != EWOULDBLOCK) {
     668           4 :                         return err;
     669             :                 }
     670             :         }
     671             : 
     672       69947 :         req = messaging_dgm_out_queue_send(out, ev, out, iov, iovlen,
     673             :                                            fds, num_fds);
     674       69947 :         if (req == NULL) {
     675           0 :                 return ENOMEM;
     676             :         }
     677       69947 :         tevent_req_set_callback(req, messaging_dgm_out_sent_fragment, out);
     678             : 
     679       69947 :         ok = tevent_req_set_endtime(req, ev,
     680             :                                     tevent_timeval_current_ofs(60, 0));
     681       69947 :         if (!ok) {
     682           0 :                 TALLOC_FREE(req);
     683           0 :                 return ENOMEM;
     684             :         }
     685             : 
     686         701 :         return 0;
     687             : }
     688             : 
     689             : /*
     690             :  * Pickup the result of the fragment send. Reset idle timer
     691             :  * if queue empty.
     692             :  */
     693             : 
     694       68952 : static void messaging_dgm_out_sent_fragment(struct tevent_req *req)
     695             : {
     696       68952 :         struct messaging_dgm_out *out = tevent_req_callback_data(
     697             :                 req, struct messaging_dgm_out);
     698       68254 :         int ret;
     699             : 
     700       68952 :         ret = messaging_dgm_out_queue_recv(req);
     701       68952 :         TALLOC_FREE(req);
     702             : 
     703       68952 :         if (ret != 0) {
     704          23 :                 DBG_WARNING("messaging_out_queue_recv returned %s\n",
     705             :                             strerror(ret));
     706             :         }
     707             : 
     708       68952 :         messaging_dgm_out_rearm_idle_timer(out);
     709       68952 : }
     710             : 
     711             : 
     712             : struct messaging_dgm_fragment_hdr {
     713             :         size_t msglen;
     714             :         pid_t pid;
     715             :         int sock;
     716             : };
     717             : 
     718             : /*
     719             :  * Fragment a message into MESSAGING_DGM_FRAGMENT_LENGTH - 64-bit cookie
     720             :  * size chunks and send it.
     721             :  *
     722             :  * Message fragments are prefixed by a 64-bit cookie that
     723             :  * stays the same for all fragments. This allows the receiver
     724             :  * to recognise fragments of the same message and re-assemble
     725             :  * them on the other end.
     726             :  *
     727             :  * Note that this allows other message fragments from other
     728             :  * senders to be interleaved in the receive read processing,
     729             :  * the combination of the cookie and header info allows unique
     730             :  * identification of the message from a specific sender in
     731             :  * re-assembly.
     732             :  *
     733             :  * If the message is smaller than MESSAGING_DGM_FRAGMENT_LENGTH - cookie
     734             :  * then send a single message with cookie set to zero.
     735             :  *
     736             :  * Otherwise the message is fragmented into chunks and added
     737             :  * to the sending queue. Any file descriptors are passed only
     738             :  * in the last fragment.
     739             :  *
     740             :  * Finally the cookie is incremented (wrap over zero) to
     741             :  * prepare for the next message sent to this channel.
     742             :  *
     743             :  */
     744             : 
     745      512415 : static int messaging_dgm_out_send_fragmented(struct tevent_context *ev,
     746             :                                              struct messaging_dgm_out *out,
     747             :                                              const struct iovec *iov,
     748             :                                              int iovlen,
     749             :                                              const int *fds, size_t num_fds)
     750      512415 : {
     751       68762 :         ssize_t msglen, sent;
     752      512415 :         int ret = 0;
     753      512415 :         struct iovec iov_copy[iovlen+2];
     754       68762 :         struct messaging_dgm_fragment_hdr hdr;
     755       68762 :         struct iovec src_iov;
     756             : 
     757      512415 :         if (iovlen < 0) {
     758           0 :                 return EINVAL;
     759             :         }
     760             : 
     761      512415 :         msglen = iov_buflen(iov, iovlen);
     762      512415 :         if (msglen == -1) {
     763           0 :                 return EMSGSIZE;
     764             :         }
     765      512415 :         if (num_fds > INT8_MAX) {
     766           0 :                 return EINVAL;
     767             :         }
     768             : 
     769      512415 :         if ((size_t) msglen <=
     770             :             (MESSAGING_DGM_FRAGMENT_LENGTH - sizeof(uint64_t))) {
     771      499869 :                 uint64_t cookie = 0;
     772             : 
     773      499869 :                 iov_copy[0].iov_base = &cookie;
     774      499869 :                 iov_copy[0].iov_len = sizeof(cookie);
     775      499869 :                 if (iovlen > 0) {
     776      499869 :                         memcpy(&iov_copy[1], iov,
     777             :                                sizeof(struct iovec) * iovlen);
     778             :                 }
     779             : 
     780      499869 :                 return messaging_dgm_out_send_fragment(
     781             :                         ev, out, iov_copy, iovlen+1, fds, num_fds);
     782             : 
     783             :         }
     784             : 
     785       25092 :         hdr = (struct messaging_dgm_fragment_hdr) {
     786             :                 .msglen = msglen,
     787       12546 :                 .pid = tevent_cached_getpid(),
     788       12546 :                 .sock = out->sock
     789             :         };
     790             : 
     791       12546 :         iov_copy[0].iov_base = &out->cookie;
     792       12546 :         iov_copy[0].iov_len = sizeof(out->cookie);
     793       12546 :         iov_copy[1].iov_base = &hdr;
     794       12546 :         iov_copy[1].iov_len = sizeof(hdr);
     795             : 
     796       12546 :         sent = 0;
     797       12546 :         src_iov = iov[0];
     798             : 
     799             :         /*
     800             :          * The following write loop sends the user message in pieces. We have
     801             :          * filled the first two iovecs above with "cookie" and "hdr". In the
     802             :          * following loops we pull message chunks from the user iov array and
     803             :          * fill iov_copy piece by piece, possibly truncating chunks from the
     804             :          * caller's iov array. Ugly, but hopefully efficient.
     805             :          */
     806             : 
     807       39759 :         while (sent < msglen) {
     808             :                 size_t fragment_len;
     809       25211 :                 size_t iov_index = 2;
     810             : 
     811       25211 :                 fragment_len = sizeof(out->cookie) + sizeof(hdr);
     812             : 
     813       54426 :                 while (fragment_len < MESSAGING_DGM_FRAGMENT_LENGTH) {
     814        2004 :                         size_t space, chunk;
     815             : 
     816       39759 :                         space = MESSAGING_DGM_FRAGMENT_LENGTH - fragment_len;
     817       39759 :                         chunk = MIN(space, src_iov.iov_len);
     818             : 
     819       39759 :                         iov_copy[iov_index].iov_base = src_iov.iov_base;
     820       39759 :                         iov_copy[iov_index].iov_len = chunk;
     821       39759 :                         iov_index += 1;
     822             : 
     823       39759 :                         src_iov.iov_base = (char *)src_iov.iov_base + chunk;
     824       39759 :                         src_iov.iov_len -= chunk;
     825       39759 :                         fragment_len += chunk;
     826             : 
     827       39759 :                         if (src_iov.iov_len == 0) {
     828       25092 :                                 iov += 1;
     829       25092 :                                 iovlen -= 1;
     830       25092 :                                 if (iovlen == 0) {
     831       12544 :                                         break;
     832             :                                 }
     833       12546 :                                 src_iov = iov[0];
     834             :                         }
     835             :                 }
     836       27213 :                 sent += (fragment_len - sizeof(out->cookie) - sizeof(hdr));
     837             : 
     838             :                 /*
     839             :                  * only the last fragment should pass the fd array.
     840             :                  * That simplifies the receiver a lot.
     841             :                  */
     842       27213 :                 if (sent < msglen) {
     843       14667 :                         ret = messaging_dgm_out_send_fragment(
     844             :                                 ev, out, iov_copy, iov_index, NULL, 0);
     845             :                 } else {
     846       12546 :                         ret = messaging_dgm_out_send_fragment(
     847             :                                 ev, out, iov_copy, iov_index, fds, num_fds);
     848             :                 }
     849       27213 :                 if (ret != 0) {
     850           0 :                         break;
     851             :                 }
     852             :         }
     853             : 
     854       12546 :         out->cookie += 1;
     855       12546 :         if (out->cookie == 0) {
     856           0 :                 out->cookie += 1;
     857             :         }
     858             : 
     859       12544 :         return ret;
     860             : }
     861             : 
     862             : static struct messaging_dgm_context *global_dgm_context;
     863             : 
     864             : static int messaging_dgm_context_destructor(struct messaging_dgm_context *c);
     865             : 
     866       59896 : static int messaging_dgm_lockfile_create(struct messaging_dgm_context *ctx,
     867             :                                          pid_t pid, int *plockfile_fd,
     868             :                                          uint64_t *punique)
     869             : {
     870        1133 :         char buf[64];
     871        1133 :         int lockfile_fd;
     872        1133 :         struct sun_path_buf lockfile_name;
     873        1133 :         struct flock lck;
     874        1133 :         uint64_t unique;
     875        1133 :         int unique_len, ret;
     876        1133 :         ssize_t written;
     877             : 
     878       59896 :         ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
     879       59896 :                        "%s/%u", ctx->lockfile_dir.buf, (unsigned)pid);
     880       59896 :         if (ret < 0) {
     881           0 :                 return errno;
     882             :         }
     883       59896 :         if ((unsigned)ret >= sizeof(lockfile_name.buf)) {
     884           0 :                 return ENAMETOOLONG;
     885             :         }
     886             : 
     887             :         /* no O_EXCL, existence check is via the fcntl lock */
     888             : 
     889       59896 :         lockfile_fd = open(lockfile_name.buf, O_NONBLOCK|O_CREAT|O_RDWR,
     890             :                            0644);
     891             : 
     892       59896 :         if ((lockfile_fd == -1) &&
     893           0 :             ((errno == ENXIO) /* Linux */ ||
     894           0 :              (errno == ENODEV) /* Linux kernel bug */ ||
     895           0 :              (errno == EOPNOTSUPP) /* FreeBSD */)) {
     896             :                 /*
     897             :                  * Huh -- a socket? This might be a stale socket from
     898             :                  * an upgrade of Samba. Just unlink and retry, nobody
     899             :                  * else is supposed to be here at this time.
     900             :                  *
     901             :                  * Yes, this is racy, but I don't see a way to deal
     902             :                  * with this properly.
     903             :                  */
     904           0 :                 unlink(lockfile_name.buf);
     905             : 
     906           0 :                 lockfile_fd = open(lockfile_name.buf,
     907             :                                    O_NONBLOCK|O_CREAT|O_WRONLY,
     908             :                                    0644);
     909             :         }
     910             : 
     911       59896 :         if (lockfile_fd == -1) {
     912           0 :                 ret = errno;
     913           0 :                 DEBUG(1, ("%s: open failed: %s\n", __func__, strerror(errno)));
     914           0 :                 return ret;
     915             :         }
     916             : 
     917       59896 :         lck = (struct flock) {
     918             :                 .l_type = F_WRLCK,
     919             :                 .l_whence = SEEK_SET
     920             :         };
     921             : 
     922       59896 :         ret = fcntl(lockfile_fd, F_SETLK, &lck);
     923       59896 :         if (ret == -1) {
     924           0 :                 ret = errno;
     925           0 :                 DEBUG(1, ("%s: fcntl failed: %s\n", __func__, strerror(ret)));
     926           0 :                 goto fail_close;
     927             :         }
     928             : 
     929             :         /*
     930             :          * Directly using the binary value for
     931             :          * SERVERID_UNIQUE_ID_NOT_TO_VERIFY is a layering
     932             :          * violation. But including all of ndr here just for this
     933             :          * seems to be a bit overkill to me. Also, messages_dgm might
     934             :          * be replaced sooner or later by something streams-based,
     935             :          * where unique_id generation will be handled differently.
     936             :          */
     937             : 
     938        1133 :         do {
     939       59896 :                 generate_random_buffer((uint8_t *)&unique, sizeof(unique));
     940       59896 :         } while (unique == UINT64_C(0xFFFFFFFFFFFFFFFF));
     941             : 
     942       59896 :         unique_len = snprintf(buf, sizeof(buf), "%"PRIu64"\n", unique);
     943             : 
     944             :         /* shorten a potentially preexisting file */
     945             : 
     946       59896 :         ret = ftruncate(lockfile_fd, unique_len);
     947       59896 :         if (ret == -1) {
     948           0 :                 ret = errno;
     949           0 :                 DEBUG(1, ("%s: ftruncate failed: %s\n", __func__,
     950             :                           strerror(ret)));
     951           0 :                 goto fail_unlink;
     952             :         }
     953             : 
     954       59896 :         written = write(lockfile_fd, buf, unique_len);
     955       59896 :         if (written != unique_len) {
     956           0 :                 ret = errno;
     957           0 :                 DEBUG(1, ("%s: write failed: %s\n", __func__, strerror(ret)));
     958           0 :                 goto fail_unlink;
     959             :         }
     960             : 
     961       59896 :         *plockfile_fd = lockfile_fd;
     962       59896 :         *punique = unique;
     963       59896 :         return 0;
     964             : 
     965           0 : fail_unlink:
     966           0 :         unlink(lockfile_name.buf);
     967           0 : fail_close:
     968           0 :         close(lockfile_fd);
     969           0 :         return ret;
     970             : }
     971             : 
     972             : static void messaging_dgm_read_handler(struct tevent_context *ev,
     973             :                                        struct tevent_fd *fde,
     974             :                                        uint16_t flags,
     975             :                                        void *private_data);
     976             : 
     977             : /*
     978             :  * Create the rendezvous point in the file system
     979             :  * that other processes can use to send messages to
     980             :  * this pid.
     981             :  */
     982             : 
     983       59909 : int messaging_dgm_init(struct tevent_context *ev,
     984             :                        uint64_t *punique,
     985             :                        const char *socket_dir,
     986             :                        const char *lockfile_dir,
     987             :                        void (*recv_cb)(struct tevent_context *ev,
     988             :                                        const uint8_t *msg,
     989             :                                        size_t msg_len,
     990             :                                        int *fds,
     991             :                                        size_t num_fds,
     992             :                                        void *private_data),
     993             :                        void *recv_cb_private_data)
     994             : {
     995        1140 :         struct messaging_dgm_context *ctx;
     996        1140 :         int ret;
     997        1140 :         struct sockaddr_un socket_address;
     998        1140 :         size_t len;
     999        1140 :         static bool have_dgm_context = false;
    1000             : 
    1001       59909 :         if (have_dgm_context) {
    1002           0 :                 return EEXIST;
    1003             :         }
    1004             : 
    1005       59909 :         if ((socket_dir == NULL) || (lockfile_dir == NULL)) {
    1006           0 :                 return EINVAL;
    1007             :         }
    1008             : 
    1009       59909 :         ctx = talloc_zero(NULL, struct messaging_dgm_context);
    1010       59909 :         if (ctx == NULL) {
    1011           0 :                 goto fail_nomem;
    1012             :         }
    1013       59909 :         ctx->ev = ev;
    1014       59909 :         ctx->pid = tevent_cached_getpid();
    1015       59909 :         ctx->recv_cb = recv_cb;
    1016       59909 :         ctx->recv_cb_private_data = recv_cb_private_data;
    1017             : 
    1018       59909 :         len = strlcpy(ctx->lockfile_dir.buf, lockfile_dir,
    1019             :                       sizeof(ctx->lockfile_dir.buf));
    1020       59909 :         if (len >= sizeof(ctx->lockfile_dir.buf)) {
    1021           5 :                 TALLOC_FREE(ctx);
    1022           5 :                 return ENAMETOOLONG;
    1023             :         }
    1024             : 
    1025       59904 :         len = strlcpy(ctx->socket_dir.buf, socket_dir,
    1026             :                       sizeof(ctx->socket_dir.buf));
    1027       59904 :         if (len >= sizeof(ctx->socket_dir.buf)) {
    1028           8 :                 TALLOC_FREE(ctx);
    1029           8 :                 return ENAMETOOLONG;
    1030             :         }
    1031             : 
    1032       59896 :         socket_address = (struct sockaddr_un) { .sun_family = AF_UNIX };
    1033       59896 :         len = snprintf(socket_address.sun_path,
    1034             :                        sizeof(socket_address.sun_path),
    1035       59896 :                        "%s/%u", socket_dir, (unsigned)ctx->pid);
    1036       59896 :         if (len >= sizeof(socket_address.sun_path)) {
    1037           0 :                 TALLOC_FREE(ctx);
    1038           0 :                 return ENAMETOOLONG;
    1039             :         }
    1040             : 
    1041       59896 :         ret = messaging_dgm_lockfile_create(ctx, ctx->pid, &ctx->lockfile_fd,
    1042             :                                             punique);
    1043       59896 :         if (ret != 0) {
    1044           0 :                 DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n",
    1045             :                           __func__, strerror(ret)));
    1046           0 :                 TALLOC_FREE(ctx);
    1047           0 :                 return ret;
    1048             :         }
    1049             : 
    1050       59896 :         unlink(socket_address.sun_path);
    1051             : 
    1052       59896 :         ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
    1053       59896 :         if (ctx->sock == -1) {
    1054           0 :                 ret = errno;
    1055           0 :                 DBG_WARNING("socket failed: %s\n", strerror(ret));
    1056           0 :                 TALLOC_FREE(ctx);
    1057           0 :                 return ret;
    1058             :         }
    1059             : 
    1060       59896 :         ret = prepare_socket_cloexec(ctx->sock);
    1061       59896 :         if (ret == -1) {
    1062           0 :                 ret = errno;
    1063           0 :                 DBG_WARNING("prepare_socket_cloexec failed: %s\n",
    1064             :                             strerror(ret));
    1065           0 :                 TALLOC_FREE(ctx);
    1066           0 :                 return ret;
    1067             :         }
    1068             : 
    1069       59896 :         ret = bind(ctx->sock, (struct sockaddr *)(void *)&socket_address,
    1070             :                    sizeof(socket_address));
    1071       59896 :         if (ret == -1) {
    1072           0 :                 ret = errno;
    1073           0 :                 DBG_WARNING("bind failed: %s\n", strerror(ret));
    1074           0 :                 TALLOC_FREE(ctx);
    1075           0 :                 return ret;
    1076             :         }
    1077             : 
    1078       59896 :         talloc_set_destructor(ctx, messaging_dgm_context_destructor);
    1079             : 
    1080       59896 :         ctx->have_dgm_context = &have_dgm_context;
    1081             : 
    1082       59896 :         ret = pthreadpool_tevent_init(ctx, UINT_MAX, &ctx->pool);
    1083       59896 :         if (ret != 0) {
    1084           0 :                 DBG_WARNING("pthreadpool_tevent_init failed: %s\n",
    1085             :                             strerror(ret));
    1086           0 :                 TALLOC_FREE(ctx);
    1087           0 :                 return ret;
    1088             :         }
    1089             : 
    1090       59896 :         global_dgm_context = ctx;
    1091       59896 :         return 0;
    1092             : 
    1093           0 : fail_nomem:
    1094           0 :         TALLOC_FREE(ctx);
    1095           0 :         return ENOMEM;
    1096             : }
    1097             : 
    1098             : /*
    1099             :  * Remove the rendezvous point in the filesystem
    1100             :  * if we're the owner.
    1101             :  */
    1102             : 
    1103      100002 : static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
    1104             : {
    1105      157478 :         while (c->outsocks != NULL) {
    1106       38607 :                 TALLOC_FREE(c->outsocks);
    1107             :         }
    1108      120950 :         while (c->in_msgs != NULL) {
    1109        1181 :                 TALLOC_FREE(c->in_msgs);
    1110             :         }
    1111      123215 :         while (c->fde_evs != NULL) {
    1112       23213 :                 tevent_fd_set_flags(c->fde_evs->fde, 0);
    1113       23213 :                 c->fde_evs->ctx = NULL;
    1114       24394 :                 DLIST_REMOVE(c->fde_evs, c->fde_evs);
    1115             :         }
    1116             : 
    1117      100002 :         close(c->sock);
    1118             : 
    1119      100002 :         if (tevent_cached_getpid() == c->pid) {
    1120         267 :                 struct sun_path_buf name;
    1121         267 :                 int ret;
    1122             : 
    1123       51628 :                 ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
    1124       51628 :                                c->socket_dir.buf, (unsigned)c->pid);
    1125       51628 :                 if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
    1126             :                         /*
    1127             :                          * We've checked the length when creating, so this
    1128             :                          * should never happen
    1129             :                          */
    1130           0 :                         abort();
    1131             :                 }
    1132       51628 :                 unlink(name.buf);
    1133             : 
    1134       51628 :                 ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
    1135       51628 :                                c->lockfile_dir.buf, (unsigned)c->pid);
    1136       51628 :                 if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
    1137             :                         /*
    1138             :                          * We've checked the length when creating, so this
    1139             :                          * should never happen
    1140             :                          */
    1141           0 :                         abort();
    1142             :                 }
    1143       51628 :                 unlink(name.buf);
    1144             :         }
    1145      100002 :         close(c->lockfile_fd);
    1146             : 
    1147      100002 :         if (c->have_dgm_context != NULL) {
    1148      100002 :                 *c->have_dgm_context = false;
    1149             :         }
    1150             : 
    1151      100002 :         return 0;
    1152             : }
    1153             : 
    1154     1283649 : static void messaging_dgm_validate(struct messaging_dgm_context *ctx)
    1155             : {
    1156             : #ifdef DEVELOPER
    1157     1283649 :         pid_t pid = tevent_cached_getpid();
    1158      154990 :         struct sockaddr_storage addr;
    1159     1283649 :         socklen_t addrlen = sizeof(addr);
    1160      154990 :         struct sockaddr_un *un_addr;
    1161      154990 :         struct sun_path_buf pathbuf;
    1162      154990 :         struct stat st1, st2;
    1163      154990 :         int ret;
    1164             : 
    1165             :         /*
    1166             :          * Protect against using the wrong messaging context after a
    1167             :          * fork without reinit_after_fork.
    1168             :          */
    1169             : 
    1170     1283649 :         ret = getsockname(ctx->sock, (struct sockaddr *)&addr, &addrlen);
    1171     1283649 :         if (ret == -1) {
    1172           0 :                 DBG_ERR("getsockname failed: %s\n", strerror(errno));
    1173           0 :                 goto fail;
    1174             :         }
    1175     1283649 :         if (addr.ss_family != AF_UNIX) {
    1176           0 :                 DBG_ERR("getsockname returned family %d\n",
    1177             :                         (int)addr.ss_family);
    1178           0 :                 goto fail;
    1179             :         }
    1180     1283649 :         un_addr = (struct sockaddr_un *)&addr;
    1181             : 
    1182     1283649 :         ret = snprintf(pathbuf.buf, sizeof(pathbuf.buf),
    1183     1283649 :                        "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
    1184     1283649 :         if (ret < 0) {
    1185           0 :                 DBG_ERR("snprintf failed: %s\n", strerror(errno));
    1186           0 :                 goto fail;
    1187             :         }
    1188     1283649 :         if ((size_t)ret >= sizeof(pathbuf.buf)) {
    1189           0 :                 DBG_ERR("snprintf returned %d chars\n", (int)ret);
    1190           0 :                 goto fail;
    1191             :         }
    1192             : 
    1193     1283649 :         if (strcmp(pathbuf.buf, un_addr->sun_path) != 0) {
    1194           0 :                 DBG_ERR("sockname wrong: Expected %s, got %s\n",
    1195             :                         pathbuf.buf, un_addr->sun_path);
    1196           0 :                 goto fail;
    1197             :         }
    1198             : 
    1199     1283649 :         ret = snprintf(pathbuf.buf, sizeof(pathbuf.buf),
    1200     1283649 :                        "%s/%u", ctx->lockfile_dir.buf, (unsigned)pid);
    1201     1283649 :         if (ret < 0) {
    1202           0 :                 DBG_ERR("snprintf failed: %s\n", strerror(errno));
    1203           0 :                 goto fail;
    1204             :         }
    1205     1283649 :         if ((size_t)ret >= sizeof(pathbuf.buf)) {
    1206           0 :                 DBG_ERR("snprintf returned %d chars\n", (int)ret);
    1207           0 :                 goto fail;
    1208             :         }
    1209             : 
    1210     1283649 :         ret = stat(pathbuf.buf, &st1);
    1211     1283649 :         if (ret == -1) {
    1212           0 :                 DBG_ERR("stat failed: %s\n", strerror(errno));
    1213           0 :                 goto fail;
    1214             :         }
    1215     1283649 :         ret = fstat(ctx->lockfile_fd, &st2);
    1216     1283649 :         if (ret == -1) {
    1217           0 :                 DBG_ERR("fstat failed: %s\n", strerror(errno));
    1218           0 :                 goto fail;
    1219             :         }
    1220             : 
    1221     1283649 :         if ((st1.st_dev != st2.st_dev) || (st1.st_ino != st2.st_ino)) {
    1222           0 :                 DBG_ERR("lockfile differs, expected (%d/%d), got (%d/%d)\n",
    1223             :                         (int)st2.st_dev, (int)st2.st_ino,
    1224             :                         (int)st1.st_dev, (int)st1.st_ino);
    1225           0 :                 goto fail;
    1226             :         }
    1227             : 
    1228     1283649 :         return;
    1229           0 : fail:
    1230           0 :         abort();
    1231             : #else
    1232             :         return;
    1233             : #endif
    1234             : }
    1235             : 
    1236             : static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
    1237             :                                struct tevent_context *ev,
    1238             :                                uint8_t *msg, size_t msg_len,
    1239             :                                int *fds, size_t num_fds);
    1240             : 
    1241             : /*
    1242             :  * Raw read callback handler - passes to messaging_dgm_recv()
    1243             :  * for fragment reassembly processing.
    1244             :  */
    1245             : 
    1246      203371 : static void messaging_dgm_read_handler(struct tevent_context *ev,
    1247             :                                        struct tevent_fd *fde,
    1248             :                                        uint16_t flags,
    1249             :                                        void *private_data)
    1250      203371 : {
    1251      203371 :         struct messaging_dgm_context *ctx = talloc_get_type_abort(
    1252             :                 private_data, struct messaging_dgm_context);
    1253       68684 :         ssize_t received;
    1254       68684 :         struct msghdr msg;
    1255       68684 :         struct iovec iov;
    1256      203371 :         size_t msgbufsize = msghdr_prep_recv_fds(NULL, NULL, 0, INT8_MAX);
    1257      203371 :         uint8_t msgbuf[msgbufsize];
    1258       68684 :         uint8_t buf[MESSAGING_DGM_FRAGMENT_LENGTH];
    1259       68684 :         size_t num_fds;
    1260             : 
    1261      203371 :         messaging_dgm_validate(ctx);
    1262             : 
    1263      203371 :         if ((flags & TEVENT_FD_READ) == 0) {
    1264           0 :                 return;
    1265             :         }
    1266             : 
    1267      203371 :         iov = (struct iovec) { .iov_base = buf, .iov_len = sizeof(buf) };
    1268      203371 :         msg = (struct msghdr) { .msg_iov = &iov, .msg_iovlen = 1 };
    1269             : 
    1270      203371 :         msghdr_prep_recv_fds(&msg, msgbuf, msgbufsize, INT8_MAX);
    1271             : 
    1272             : #ifdef MSG_CMSG_CLOEXEC
    1273      203371 :         msg.msg_flags |= MSG_CMSG_CLOEXEC;
    1274             : #endif
    1275             : 
    1276      203371 :         received = recvmsg(ctx->sock, &msg, 0);
    1277      203371 :         if (received == -1) {
    1278           0 :                 if ((errno == EAGAIN) ||
    1279           0 :                     (errno == EWOULDBLOCK) ||
    1280           0 :                     (errno == EINTR) ||
    1281           0 :                     (errno == ENOMEM)) {
    1282             :                         /* Not really an error - just try again. */
    1283           0 :                         return;
    1284             :                 }
    1285             :                 /* Problem with the socket. Set it unreadable. */
    1286           0 :                 tevent_fd_set_flags(fde, 0);
    1287           0 :                 return;
    1288             :         }
    1289             : 
    1290      203371 :         if ((size_t)received > sizeof(buf)) {
    1291             :                 /* More than we expected, not for us */
    1292           0 :                 return;
    1293             :         }
    1294             : 
    1295      203371 :         num_fds = msghdr_extract_fds(&msg, NULL, 0);
    1296      203371 :         if (num_fds == 0) {
    1297       68629 :                 int fds[1];
    1298             : 
    1299      160703 :                 messaging_dgm_recv(ctx, ev, buf, received, fds, 0);
    1300       42668 :         } else {
    1301          55 :                 size_t i;
    1302       42668 :                 int fds[num_fds];
    1303             : 
    1304       42668 :                 msghdr_extract_fds(&msg, fds, num_fds);
    1305             : 
    1306       85394 :                 for (i = 0; i < num_fds; i++) {
    1307          58 :                         int err;
    1308             : 
    1309       42671 :                         err = prepare_socket_cloexec(fds[i]);
    1310       42671 :                         if (err != 0) {
    1311           0 :                                 close_fd_array(fds, num_fds);
    1312           0 :                                 num_fds = 0;
    1313             :                         }
    1314             :                 }
    1315             : 
    1316       42668 :                 messaging_dgm_recv(ctx, ev, buf, received, fds, num_fds);
    1317             :         }
    1318             : }
    1319             : 
    1320           0 : static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg *m)
    1321             : {
    1322           0 :         DLIST_REMOVE(m->ctx->in_msgs, m);
    1323           0 :         return 0;
    1324             : }
    1325             : 
    1326      189896 : static void messaging_dgm_close_unconsumed(int *fds, size_t num_fds)
    1327             : {
    1328       66684 :         size_t i;
    1329             : 
    1330      232567 :         for (i=0; i<num_fds; i++) {
    1331       42671 :                 if (fds[i] != -1) {
    1332           6 :                         close(fds[i]);
    1333           6 :                         fds[i] = -1;
    1334             :                 }
    1335             :         }
    1336      189896 : }
    1337             : 
    1338             : /*
    1339             :  * Deal with identification of fragmented messages and
    1340             :  * re-assembly into full messages sent, then calls the
    1341             :  * callback.
    1342             :  */
    1343             : 
    1344      203371 : static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
    1345             :                                struct tevent_context *ev,
    1346             :                                uint8_t *buf, size_t buflen,
    1347             :                                int *fds, size_t num_fds)
    1348             : {
    1349       68684 :         struct messaging_dgm_fragment_hdr hdr;
    1350       68684 :         struct messaging_dgm_in_msg *msg;
    1351       68684 :         size_t space;
    1352       68684 :         uint64_t cookie;
    1353             : 
    1354      203371 :         if (buflen < sizeof(cookie)) {
    1355           0 :                 goto close_fds;
    1356             :         }
    1357      203371 :         memcpy(&cookie, buf, sizeof(cookie));
    1358      203371 :         buf += sizeof(cookie);
    1359      203371 :         buflen -= sizeof(cookie);
    1360             : 
    1361      203371 :         if (cookie == 0) {
    1362      178542 :                 ctx->recv_cb(ev, buf, buflen, fds, num_fds,
    1363             :                              ctx->recv_cb_private_data);
    1364      178542 :                 messaging_dgm_close_unconsumed(fds, num_fds);
    1365      256578 :                 return;
    1366             :         }
    1367             : 
    1368       24829 :         if (buflen < sizeof(hdr)) {
    1369           0 :                 goto close_fds;
    1370             :         }
    1371       24829 :         memcpy(&hdr, buf, sizeof(hdr));
    1372       24829 :         buf += sizeof(hdr);
    1373       24829 :         buflen -= sizeof(hdr);
    1374             : 
    1375       24829 :         for (msg = ctx->in_msgs; msg != NULL; msg = msg->next) {
    1376       13475 :                 if ((msg->sender_pid == hdr.pid) &&
    1377       11475 :                     (msg->sender_sock == hdr.sock)) {
    1378       11475 :                         break;
    1379             :                 }
    1380             :         }
    1381             : 
    1382       24829 :         if ((msg != NULL) && (msg->cookie != cookie)) {
    1383        2002 :                 TALLOC_FREE(msg);
    1384             :         }
    1385             : 
    1386       24829 :         if (msg == NULL) {
    1387           2 :                 size_t msglen;
    1388       11354 :                 msglen = offsetof(struct messaging_dgm_in_msg, buf) +
    1389       11352 :                         hdr.msglen;
    1390             : 
    1391       11354 :                 msg = talloc_size(ctx, msglen);
    1392       11354 :                 if (msg == NULL) {
    1393           0 :                         goto close_fds;
    1394             :                 }
    1395       11354 :                 talloc_set_name_const(msg, "struct messaging_dgm_in_msg");
    1396             : 
    1397       11354 :                 *msg = (struct messaging_dgm_in_msg) {
    1398       11352 :                         .ctx = ctx, .msglen = hdr.msglen,
    1399       11352 :                         .sender_pid = hdr.pid, .sender_sock = hdr.sock,
    1400             :                         .cookie = cookie
    1401             :                 };
    1402       11354 :                 DLIST_ADD(ctx->in_msgs, msg);
    1403       11354 :                 talloc_set_destructor(msg, messaging_dgm_in_msg_destructor);
    1404             :         }
    1405             : 
    1406       24829 :         space = msg->msglen - msg->received;
    1407       24829 :         if (buflen > space) {
    1408           0 :                 goto close_fds;
    1409             :         }
    1410             : 
    1411       24829 :         memcpy(msg->buf + msg->received, buf, buflen);
    1412       24829 :         msg->received += buflen;
    1413             : 
    1414       24829 :         if (msg->received < msg->msglen) {
    1415             :                 /*
    1416             :                  * Any valid sender will send the fds in the last
    1417             :                  * block. Invalid senders might have sent fd's that we
    1418             :                  * need to close here.
    1419             :                  */
    1420       13475 :                 goto close_fds;
    1421             :         }
    1422             : 
    1423       11354 :         DLIST_REMOVE(ctx->in_msgs, msg);
    1424       11354 :         talloc_set_destructor(msg, NULL);
    1425             : 
    1426       11354 :         ctx->recv_cb(ev, msg->buf, msg->msglen, fds, num_fds,
    1427             :                      ctx->recv_cb_private_data);
    1428       11354 :         messaging_dgm_close_unconsumed(fds, num_fds);
    1429             : 
    1430       11354 :         TALLOC_FREE(msg);
    1431       11352 :         return;
    1432             : 
    1433       13475 : close_fds:
    1434       13475 :         close_fd_array(fds, num_fds);
    1435             : }
    1436             : 
    1437      100002 : void messaging_dgm_destroy(void)
    1438             : {
    1439      100002 :         TALLOC_FREE(global_dgm_context);
    1440      100002 : }
    1441             : 
    1442      518500 : int messaging_dgm_send(pid_t pid,
    1443             :                        const struct iovec *iov, int iovlen,
    1444             :                        const int *fds, size_t num_fds)
    1445             : {
    1446      518500 :         struct messaging_dgm_context *ctx = global_dgm_context;
    1447       68663 :         struct messaging_dgm_out *out;
    1448       68663 :         int ret;
    1449      518500 :         unsigned retries = 0;
    1450             : 
    1451      518500 :         if (ctx == NULL) {
    1452           0 :                 return ENOTCONN;
    1453             :         }
    1454             : 
    1455      518500 :         messaging_dgm_validate(ctx);
    1456             : 
    1457      518603 : again:
    1458      518603 :         ret = messaging_dgm_out_get(ctx, pid, &out);
    1459      518603 :         if (ret != 0) {
    1460        6188 :                 return ret;
    1461             :         }
    1462             : 
    1463      512415 :         DEBUG(10, ("%s: Sending message to %u\n", __func__, (unsigned)pid));
    1464             : 
    1465      512415 :         ret = messaging_dgm_out_send_fragmented(ctx->ev, out, iov, iovlen,
    1466             :                                                 fds, num_fds);
    1467      512415 :         if (ret == ECONNREFUSED) {
    1468             :                 /*
    1469             :                  * We cache outgoing sockets. If the receiver has
    1470             :                  * closed and re-opened the socket since our last
    1471             :                  * message, we get connection refused. Retry.
    1472             :                  */
    1473             : 
    1474         103 :                 TALLOC_FREE(out);
    1475             : 
    1476         103 :                 if (retries < 5) {
    1477         103 :                         retries += 1;
    1478         103 :                         goto again;
    1479             :                 }
    1480             :         }
    1481      443649 :         return ret;
    1482             : }
    1483             : 
    1484      561553 : static int messaging_dgm_read_unique(int fd, uint64_t *punique)
    1485             : {
    1486       17642 :         char buf[25];
    1487       17642 :         ssize_t rw_ret;
    1488      561553 :         int error = 0;
    1489       17642 :         unsigned long long unique;
    1490       17642 :         char *endptr;
    1491             : 
    1492      561553 :         rw_ret = pread(fd, buf, sizeof(buf)-1, 0);
    1493      561553 :         if (rw_ret == -1) {
    1494           0 :                 return errno;
    1495             :         }
    1496      561553 :         buf[rw_ret] = '\0';
    1497             : 
    1498      561553 :         unique = smb_strtoull(buf, &endptr, 10, &error, SMB_STR_STANDARD);
    1499      561553 :         if (error != 0) {
    1500           0 :                 return error;
    1501             :         }
    1502             : 
    1503      561553 :         if (endptr[0] != '\n') {
    1504           0 :                 return EINVAL;
    1505             :         }
    1506      561553 :         *punique = unique;
    1507      561553 :         return 0;
    1508             : }
    1509             : 
    1510      561553 : int messaging_dgm_get_unique(pid_t pid, uint64_t *unique)
    1511             : {
    1512      561553 :         struct messaging_dgm_context *ctx = global_dgm_context;
    1513       17642 :         struct sun_path_buf lockfile_name;
    1514       17642 :         int ret, fd;
    1515             : 
    1516      561553 :         if (ctx == NULL) {
    1517           0 :                 return EBADF;
    1518             :         }
    1519             : 
    1520      561553 :         messaging_dgm_validate(ctx);
    1521             : 
    1522      561553 :         if (pid == tevent_cached_getpid()) {
    1523             :                 /*
    1524             :                  * Protect against losing our own lock
    1525             :                  */
    1526      472476 :                 return messaging_dgm_read_unique(ctx->lockfile_fd, unique);
    1527             :         }
    1528             : 
    1529       89077 :         ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
    1530       89077 :                        "%s/%u", ctx->lockfile_dir.buf, (int)pid);
    1531       89077 :         if (ret < 0) {
    1532           0 :                 return errno;
    1533             :         }
    1534       89077 :         if ((size_t)ret >= sizeof(lockfile_name.buf)) {
    1535           0 :                 return ENAMETOOLONG;
    1536             :         }
    1537             : 
    1538       89077 :         fd = open(lockfile_name.buf, O_NONBLOCK|O_RDONLY, 0);
    1539       89077 :         if (fd == -1) {
    1540           0 :                 return errno;
    1541             :         }
    1542             : 
    1543       89077 :         ret = messaging_dgm_read_unique(fd, unique);
    1544       89077 :         close(fd);
    1545       89077 :         return ret;
    1546             : }
    1547             : 
    1548       15426 : int messaging_dgm_cleanup(pid_t pid)
    1549             : {
    1550       15426 :         struct messaging_dgm_context *ctx = global_dgm_context;
    1551           0 :         struct sun_path_buf lockfile_name, socket_name;
    1552           0 :         int fd, len, ret;
    1553       15426 :         struct flock lck = {
    1554             :                 .l_pid = 0,
    1555             :         };
    1556             : 
    1557       15426 :         if (ctx == NULL) {
    1558           0 :                 return ENOTCONN;
    1559             :         }
    1560             : 
    1561       15426 :         len = snprintf(socket_name.buf, sizeof(socket_name.buf), "%s/%u",
    1562       15426 :                        ctx->socket_dir.buf, (unsigned)pid);
    1563       15426 :         if (len < 0) {
    1564           0 :                 return errno;
    1565             :         }
    1566       15426 :         if ((size_t)len >= sizeof(socket_name.buf)) {
    1567           0 :                 return ENAMETOOLONG;
    1568             :         }
    1569             : 
    1570       15426 :         len = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf), "%s/%u",
    1571       15426 :                        ctx->lockfile_dir.buf, (unsigned)pid);
    1572       15426 :         if (len < 0) {
    1573           0 :                 return errno;
    1574             :         }
    1575       15426 :         if ((size_t)len >= sizeof(lockfile_name.buf)) {
    1576           0 :                 return ENAMETOOLONG;
    1577             :         }
    1578             : 
    1579       15426 :         fd = open(lockfile_name.buf, O_NONBLOCK|O_WRONLY, 0);
    1580       15426 :         if (fd == -1) {
    1581       15384 :                 ret = errno;
    1582       15384 :                 if (ret != ENOENT) {
    1583           0 :                         DEBUG(10, ("%s: open(%s) failed: %s\n", __func__,
    1584             :                                    lockfile_name.buf, strerror(ret)));
    1585             :                 }
    1586       15384 :                 return ret;
    1587             :         }
    1588             : 
    1589          42 :         lck.l_type = F_WRLCK;
    1590          42 :         lck.l_whence = SEEK_SET;
    1591          42 :         lck.l_start = 0;
    1592          42 :         lck.l_len = 0;
    1593             : 
    1594          42 :         ret = fcntl(fd, F_SETLK, &lck);
    1595          42 :         if (ret != 0) {
    1596           0 :                 ret = errno;
    1597           0 :                 if ((ret != EACCES) && (ret != EAGAIN)) {
    1598           0 :                         DEBUG(10, ("%s: Could not get lock: %s\n", __func__,
    1599             :                                    strerror(ret)));
    1600             :                 }
    1601           0 :                 close(fd);
    1602           0 :                 return ret;
    1603             :         }
    1604             : 
    1605          42 :         DEBUG(10, ("%s: Cleaning up : %s\n", __func__, strerror(ret)));
    1606             : 
    1607          42 :         (void)unlink(socket_name.buf);
    1608          42 :         (void)unlink(lockfile_name.buf);
    1609          42 :         (void)close(fd);
    1610          42 :         return 0;
    1611             : }
    1612             : 
    1613           0 : static int messaging_dgm_wipe_fn(pid_t pid, void *private_data)
    1614             : {
    1615           0 :         pid_t *our_pid = (pid_t *)private_data;
    1616           0 :         int ret;
    1617             : 
    1618           0 :         if (pid == *our_pid) {
    1619             :                 /*
    1620             :                  * fcntl(F_GETLK) will succeed for ourselves, we hold
    1621             :                  * that lock ourselves.
    1622             :                  */
    1623           0 :                 return 0;
    1624             :         }
    1625             : 
    1626           0 :         ret = messaging_dgm_cleanup(pid);
    1627           0 :         DEBUG(10, ("messaging_dgm_cleanup(%lu) returned %s\n",
    1628             :                    (unsigned long)pid, ret ? strerror(ret) : "ok"));
    1629             : 
    1630           0 :         return 0;
    1631             : }
    1632             : 
    1633           0 : int messaging_dgm_wipe(void)
    1634             : {
    1635           0 :         pid_t pid = tevent_cached_getpid();
    1636           0 :         messaging_dgm_forall(messaging_dgm_wipe_fn, &pid);
    1637           0 :         return 0;
    1638             : }
    1639             : 
    1640         225 : int messaging_dgm_forall(int (*fn)(pid_t pid, void *private_data),
    1641             :                          void *private_data)
    1642             : {
    1643         225 :         struct messaging_dgm_context *ctx = global_dgm_context;
    1644           1 :         DIR *msgdir;
    1645           1 :         struct dirent *dp;
    1646         225 :         int error = 0;
    1647             : 
    1648         225 :         if (ctx == NULL) {
    1649           0 :                 return ENOTCONN;
    1650             :         }
    1651             : 
    1652         225 :         messaging_dgm_validate(ctx);
    1653             : 
    1654             :         /*
    1655             :          * We scan the socket directory and not the lock directory. Otherwise
    1656             :          * we would race against messaging_dgm_lockfile_create's open(O_CREAT)
    1657             :          * and fcntl(SETLK).
    1658             :          */
    1659             : 
    1660         225 :         msgdir = opendir(ctx->socket_dir.buf);
    1661         225 :         if (msgdir == NULL) {
    1662           0 :                 return errno;
    1663             :         }
    1664             : 
    1665       14938 :         while ((dp = readdir(msgdir)) != NULL) {
    1666           8 :                 unsigned long pid;
    1667           8 :                 int ret;
    1668             : 
    1669       14713 :                 pid = smb_strtoul(dp->d_name, NULL, 10, &error, SMB_STR_STANDARD);
    1670       14713 :                 if ((pid == 0) || (error != 0)) {
    1671             :                         /*
    1672             :                          * . and .. and other malformed entries
    1673             :                          */
    1674         450 :                         continue;
    1675             :                 }
    1676             : 
    1677       14263 :                 ret = fn(pid, private_data);
    1678       14263 :                 if (ret != 0) {
    1679           0 :                         break;
    1680             :                 }
    1681             :         }
    1682         225 :         closedir(msgdir);
    1683             : 
    1684         225 :         return 0;
    1685             : }
    1686             : 
    1687             : struct messaging_dgm_fde {
    1688             :         struct tevent_fd *fde;
    1689             : };
    1690             : 
    1691      160119 : static int messaging_dgm_fde_ev_destructor(struct messaging_dgm_fde_ev *fde_ev)
    1692             : {
    1693      160119 :         if (fde_ev->ctx != NULL) {
    1694      144213 :                 DLIST_REMOVE(fde_ev->ctx->fde_evs, fde_ev);
    1695      144213 :                 fde_ev->ctx = NULL;
    1696             :         }
    1697      160119 :         return 0;
    1698             : }
    1699             : 
    1700             : /*
    1701             :  * Reference counter for a struct tevent_fd messaging read event
    1702             :  * (with callback function) on a struct tevent_context registered
    1703             :  * on a messaging context.
    1704             :  *
    1705             :  * If we've already registered this struct tevent_context before
    1706             :  * (so already have a read event), just increase the reference count.
    1707             :  *
    1708             :  * Otherwise create a new struct tevent_fd messaging read event on the
    1709             :  * previously unseen struct tevent_context - this is what drives
    1710             :  * the message receive processing.
    1711             :  *
    1712             :  */
    1713             : 
    1714      646980 : struct messaging_dgm_fde *messaging_dgm_register_tevent_context(
    1715             :         TALLOC_CTX *mem_ctx, struct tevent_context *ev)
    1716             : {
    1717      646980 :         struct messaging_dgm_context *ctx = global_dgm_context;
    1718       21982 :         struct messaging_dgm_fde_ev *fde_ev;
    1719       21982 :         struct messaging_dgm_fde *fde;
    1720             : 
    1721      646980 :         if (ctx == NULL) {
    1722           0 :                 return NULL;
    1723             :         }
    1724             : 
    1725      646980 :         fde = talloc(mem_ctx, struct messaging_dgm_fde);
    1726      646980 :         if (fde == NULL) {
    1727           0 :                 return NULL;
    1728             :         }
    1729             : 
    1730      845961 :         for (fde_ev = ctx->fde_evs; fde_ev != NULL; fde_ev = fde_ev->next) {
    1731      726204 :                 if (tevent_fd_get_flags(fde_ev->fde) == 0) {
    1732             :                         /*
    1733             :                          * If the event context got deleted,
    1734             :                          * tevent_fd_get_flags() will return 0
    1735             :                          * for the stale fde.
    1736             :                          *
    1737             :                          * In that case we should not
    1738             :                          * use fde_ev->ev anymore.
    1739             :                          */
    1740       60034 :                         continue;
    1741             :                 }
    1742      666170 :                 if (fde_ev->ev == ev) {
    1743      510155 :                         break;
    1744             :                 }
    1745             :         }
    1746             : 
    1747      646980 :         if (fde_ev == NULL) {
    1748      119757 :                 fde_ev = talloc(fde, struct messaging_dgm_fde_ev);
    1749      119757 :                 if (fde_ev == NULL) {
    1750           0 :                         return NULL;
    1751             :                 }
    1752      119757 :                 fde_ev->fde = tevent_add_fd(
    1753             :                         ev, fde_ev, ctx->sock, TEVENT_FD_READ,
    1754             :                         messaging_dgm_read_handler, ctx);
    1755      119757 :                 if (fde_ev->fde == NULL) {
    1756           0 :                         TALLOC_FREE(fde);
    1757           0 :                         return NULL;
    1758             :                 }
    1759      119757 :                 fde_ev->ev = ev;
    1760      119757 :                 fde_ev->ctx = ctx;
    1761      119757 :                 DLIST_ADD(ctx->fde_evs, fde_ev);
    1762      119757 :                 talloc_set_destructor(
    1763             :                         fde_ev, messaging_dgm_fde_ev_destructor);
    1764             :         } else {
    1765             :                 /*
    1766             :                  * Same trick as with tdb_wrap: The caller will never
    1767             :                  * see the talloc_referenced object, the
    1768             :                  * messaging_dgm_fde_ev, so problems with
    1769             :                  * talloc_unlink will not happen.
    1770             :                  */
    1771      527223 :                 if (talloc_reference(fde, fde_ev) == NULL) {
    1772           0 :                         TALLOC_FREE(fde);
    1773           0 :                         return NULL;
    1774             :                 }
    1775             :         }
    1776             : 
    1777      646980 :         fde->fde = fde_ev->fde;
    1778      646980 :         return fde;
    1779             : }
    1780             : 
    1781      503436 : bool messaging_dgm_fde_active(struct messaging_dgm_fde *fde)
    1782             : {
    1783      132931 :         uint16_t flags;
    1784             : 
    1785      503436 :         if (fde == NULL) {
    1786           0 :                 return false;
    1787             :         }
    1788      503436 :         flags = tevent_fd_get_flags(fde->fde);
    1789      503436 :         return (flags != 0);
    1790             : }

Generated by: LCOV version 1.14