LCOV - code coverage report
Current view: top level - source3/lib - messages.c (source / functions) Hit Total Coverage
Test: coverage report for master 2f515e9b Lines: 468 586 79.9 %
Date: 2024-04-21 15:09:00 Functions: 38 42 90.5 %

          Line data    Source code
       1             : /* 
       2             :    Unix SMB/CIFS implementation.
       3             :    Samba internal messaging functions
       4             :    Copyright (C) Andrew Tridgell 2000
       5             :    Copyright (C) 2001 by Martin Pool
       6             :    Copyright (C) 2002 by Jeremy Allison
       7             :    Copyright (C) 2007 by Volker Lendecke
       8             : 
       9             :    This program is free software; you can redistribute it and/or modify
      10             :    it under the terms of the GNU General Public License as published by
      11             :    the Free Software Foundation; either version 3 of the License, or
      12             :    (at your option) any later version.
      13             : 
      14             :    This program is distributed in the hope that it will be useful,
      15             :    but WITHOUT ANY WARRANTY; without even the implied warranty of
      16             :    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      17             :    GNU General Public License for more details.
      18             : 
      19             :    You should have received a copy of the GNU General Public License
      20             :    along with this program.  If not, see <http://www.gnu.org/licenses/>.
      21             : */
      22             : 
      23             : /**
      24             :   @defgroup messages Internal messaging framework
      25             :   @{
      26             :   @file messages.c
      27             : 
      28             :   @brief  Module for internal messaging between Samba daemons. 
      29             : 
      30             :    The idea is that if a part of Samba wants to do communication with
      31             :    another Samba process then it will do a message_register() of a
      32             :    dispatch function, and use message_send_pid() to send messages to
      33             :    that process.
      34             : 
      35             :    The dispatch function is given the pid of the sender, and it can
      36             :    use that to reply by message_send_pid().  See ping_message() for a
      37             :    simple example.
      38             : 
      39             :    @caution Dispatch functions must be able to cope with incoming
      40             :    messages on an *odd* byte boundary.
      41             : 
      42             :    This system doesn't have any inherent size limitations but is not
      43             :    very efficient for large messages or when messages are sent in very
      44             :    quick succession.
      45             : 
      46             : */
      47             : 
      48             : #include "includes.h"
      49             : #include "lib/util/server_id.h"
      50             : #include "dbwrap/dbwrap.h"
      51             : #include "serverid.h"
      52             : #include "messages.h"
      53             : #include "lib/util/tevent_unix.h"
      54             : #include "lib/background.h"
      55             : #include "lib/messaging/messages_dgm.h"
      56             : #include "lib/util/iov_buf.h"
      57             : #include "lib/util/server_id_db.h"
      58             : #include "lib/messaging/messages_dgm_ref.h"
      59             : #include "lib/messages_ctdb.h"
      60             : #include "lib/messages_ctdb_ref.h"
      61             : #include "lib/messages_util.h"
      62             : #include "cluster_support.h"
      63             : #include "ctdbd_conn.h"
      64             : #include "ctdb_srvids.h"
      65             : #include "source3/lib/tallocmsg.h"
      66             : 
      67             : #ifdef CLUSTER_SUPPORT
      68             : #include "ctdb_protocol.h"
      69             : #endif
      70             : 
      71             : struct messaging_callback {
      72             :         struct messaging_callback *prev, *next;
      73             :         uint32_t msg_type;
      74             :         void (*fn)(struct messaging_context *msg, void *private_data, 
      75             :                    uint32_t msg_type, 
      76             :                    struct server_id server_id, DATA_BLOB *data);
      77             :         void *private_data;
      78             : };
      79             : 
      80             : struct messaging_registered_ev {
      81             :         struct tevent_context *ev;
      82             :         struct tevent_immediate *im;
      83             :         size_t refcount;
      84             : };
      85             : 
      86             : struct messaging_context {
      87             :         struct server_id id;
      88             :         struct tevent_context *event_ctx;
      89             :         struct messaging_callback *callbacks;
      90             : 
      91             :         struct messaging_rec *posted_msgs;
      92             : 
      93             :         struct messaging_registered_ev *event_contexts;
      94             : 
      95             :         struct tevent_req **new_waiters;
      96             :         size_t num_new_waiters;
      97             : 
      98             :         struct tevent_req **waiters;
      99             :         size_t num_waiters;
     100             : 
     101             :         struct server_id_db *names_db;
     102             : 
     103             :         TALLOC_CTX *per_process_talloc_ctx;
     104             : };
     105             : 
     106             : static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
     107             :                                                struct messaging_rec *rec);
     108             : static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
     109             :                                        struct messaging_rec *rec);
     110             : static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
     111             :                                        struct tevent_context *ev,
     112             :                                        struct messaging_rec *rec);
     113             : static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
     114             :                                    struct tevent_context *ev,
     115             :                                    struct messaging_rec *rec);
     116             : 
     117             : /****************************************************************************
     118             :  A useful function for testing the message system.
     119             : ****************************************************************************/
     120             : 
     121         105 : static void ping_message(struct messaging_context *msg_ctx,
     122             :                          void *private_data,
     123             :                          uint32_t msg_type,
     124             :                          struct server_id src,
     125             :                          DATA_BLOB *data)
     126             : {
     127         105 :         struct server_id_buf idbuf;
     128             : 
     129         210 :         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
     130             :                   server_id_str_buf(src, &idbuf), (int)data->length,
     131             :                   data->data ? (char *)data->data : ""));
     132             : 
     133         105 :         messaging_send(msg_ctx, src, MSG_PONG, data);
     134         105 : }
     135             : 
     136      172884 : struct messaging_rec *messaging_rec_create(
     137             :         TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
     138             :         uint32_t msg_type, const struct iovec *iov, int iovlen,
     139             :         const int *fds, size_t num_fds)
     140             : {
     141         369 :         ssize_t buflen;
     142         369 :         uint8_t *buf;
     143         369 :         struct messaging_rec *result;
     144             : 
     145      172884 :         if (num_fds > INT8_MAX) {
     146           0 :                 return NULL;
     147             :         }
     148             : 
     149      172884 :         buflen = iov_buflen(iov, iovlen);
     150      172884 :         if (buflen == -1) {
     151           0 :                 return NULL;
     152             :         }
     153      172884 :         buf = talloc_array(mem_ctx, uint8_t, buflen);
     154      172884 :         if (buf == NULL) {
     155           0 :                 return NULL;
     156             :         }
     157      172884 :         iov_buf(iov, iovlen, buf, buflen);
     158             : 
     159      172884 :         {
     160         369 :                 struct messaging_rec rec;
     161      172884 :                 int64_t fds64[MAX(1, num_fds)];
     162         369 :                 size_t i;
     163             : 
     164      172885 :                 for (i=0; i<num_fds; i++) {
     165           1 :                         fds64[i] = fds[i];
     166             :                 }
     167             : 
     168      172884 :                 rec = (struct messaging_rec) {
     169             :                         .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
     170             :                         .src = src, .dest = dst,
     171             :                         .buf.data = buf, .buf.length = buflen,
     172             :                         .num_fds = num_fds, .fds = fds64,
     173             :                 };
     174             : 
     175      172884 :                 result = messaging_rec_dup(mem_ctx, &rec);
     176             :         }
     177             : 
     178      172884 :         TALLOC_FREE(buf);
     179             : 
     180      172884 :         return result;
     181             : }
     182             : 
     183      143566 : static bool messaging_register_event_context(struct messaging_context *ctx,
     184             :                                              struct tevent_context *ev)
     185             : {
     186        3936 :         size_t i, num_event_contexts;
     187      143566 :         struct messaging_registered_ev *free_reg = NULL;
     188        3936 :         struct messaging_registered_ev *tmp;
     189             : 
     190      143566 :         num_event_contexts = talloc_array_length(ctx->event_contexts);
     191             : 
     192      143599 :         for (i=0; i<num_event_contexts; i++) {
     193      138367 :                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
     194             : 
     195      138367 :                 if (reg->refcount == 0) {
     196           5 :                         if (reg->ev != NULL) {
     197           0 :                                 abort();
     198             :                         }
     199           5 :                         free_reg = reg;
     200             :                         /*
     201             :                          * We continue here and may find another
     202             :                          * free_req, but the important thing is
     203             :                          * that we continue to search for an
     204             :                          * existing registration in the loop.
     205             :                          */
     206           5 :                         continue;
     207             :                 }
     208             : 
     209      138362 :                 if (reg->ev == ev) {
     210      138334 :                         reg->refcount += 1;
     211      138334 :                         return true;
     212             :                 }
     213             :         }
     214             : 
     215        5232 :         if (free_reg == NULL) {
     216        5227 :                 struct tevent_immediate *im = NULL;
     217             : 
     218        5227 :                 im = tevent_create_immediate(ctx);
     219        5227 :                 if (im == NULL) {
     220           0 :                         return false;
     221             :                 }
     222             : 
     223        5227 :                 tmp = talloc_realloc(ctx, ctx->event_contexts,
     224             :                                      struct messaging_registered_ev,
     225             :                                      num_event_contexts+1);
     226        5227 :                 if (tmp == NULL) {
     227           0 :                         return false;
     228             :                 }
     229        5227 :                 ctx->event_contexts = tmp;
     230             : 
     231        5227 :                 free_reg = &ctx->event_contexts[num_event_contexts];
     232        5227 :                 free_reg->im = talloc_move(ctx->event_contexts, &im);
     233             :         }
     234             : 
     235             :         /*
     236             :          * free_reg->im might be cached
     237             :          */
     238        5232 :         free_reg->ev = ev;
     239        5232 :         free_reg->refcount = 1;
     240             : 
     241        5232 :         return true;
     242             : }
     243             : 
     244      131872 : static bool messaging_deregister_event_context(struct messaging_context *ctx,
     245             :                                                struct tevent_context *ev)
     246             : {
     247        3664 :         size_t i, num_event_contexts;
     248             : 
     249      131872 :         num_event_contexts = talloc_array_length(ctx->event_contexts);
     250             : 
     251      131900 :         for (i=0; i<num_event_contexts; i++) {
     252      131900 :                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
     253             : 
     254      131900 :                 if (reg->refcount == 0) {
     255           0 :                         continue;
     256             :                 }
     257             : 
     258      131900 :                 if (reg->ev == ev) {
     259      131872 :                         reg->refcount -= 1;
     260             : 
     261      131872 :                         if (reg->refcount == 0) {
     262             :                                 /*
     263             :                                  * The primary event context
     264             :                                  * is never unregistered using
     265             :                                  * messaging_deregister_event_context()
     266             :                                  * it's only registered using
     267             :                                  * messaging_register_event_context().
     268             :                                  */
     269          28 :                                 SMB_ASSERT(ev != ctx->event_ctx);
     270          28 :                                 SMB_ASSERT(reg->ev != ctx->event_ctx);
     271             : 
     272             :                                 /*
     273             :                                  * Not strictly necessary, just
     274             :                                  * paranoia
     275             :                                  */
     276          28 :                                 reg->ev = NULL;
     277             : 
     278             :                                 /*
     279             :                                  * Do not talloc_free(reg->im),
     280             :                                  * recycle immediates events.
     281             :                                  *
     282             :                                  * We just invalidate it using
     283             :                                  * the primary event context,
     284             :                                  * which is never unregistered.
     285             :                                  */
     286          28 :                                 tevent_schedule_immediate(reg->im,
     287             :                                                           ctx->event_ctx,
     288        3664 :                                                           NULL, NULL);
     289             :                         }
     290      131872 :                         return true;
     291             :                 }
     292             :         }
     293           0 :         return false;
     294             : }
     295             : 
     296      172396 : static void messaging_post_main_event_context(struct tevent_context *ev,
     297             :                                               struct tevent_immediate *im,
     298             :                                               void *private_data)
     299             : {
     300      172396 :         struct messaging_context *ctx = talloc_get_type_abort(
     301             :                 private_data, struct messaging_context);
     302             : 
     303      373568 :         while (ctx->posted_msgs != NULL) {
     304      172829 :                 struct messaging_rec *rec = ctx->posted_msgs;
     305         366 :                 bool consumed;
     306             : 
     307      172829 :                 DLIST_REMOVE(ctx->posted_msgs, rec);
     308             : 
     309      172829 :                 consumed = messaging_dispatch_classic(ctx, rec);
     310      172825 :                 if (!consumed) {
     311       22173 :                         consumed = messaging_dispatch_waiters(
     312             :                                 ctx, ctx->event_ctx, rec);
     313             :                 }
     314             : 
     315      172825 :                 if (!consumed) {
     316             :                         uint8_t i;
     317             : 
     318       20728 :                         for (i=0; i<rec->num_fds; i++) {
     319           0 :                                 close(rec->fds[i]);
     320             :                         }
     321             :                 }
     322             : 
     323      173178 :                 TALLOC_FREE(rec);
     324             :         }
     325      172392 : }
     326             : 
     327           0 : static void messaging_post_sub_event_context(struct tevent_context *ev,
     328             :                                              struct tevent_immediate *im,
     329             :                                              void *private_data)
     330             : {
     331           0 :         struct messaging_context *ctx = talloc_get_type_abort(
     332             :                 private_data, struct messaging_context);
     333           0 :         struct messaging_rec *rec, *next;
     334             : 
     335           0 :         for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
     336           0 :                 bool consumed;
     337             : 
     338           0 :                 next = rec->next;
     339             : 
     340           0 :                 consumed = messaging_dispatch_waiters(ctx, ev, rec);
     341           0 :                 if (consumed) {
     342           0 :                         DLIST_REMOVE(ctx->posted_msgs, rec);
     343           0 :                         TALLOC_FREE(rec);
     344             :                 }
     345             :         }
     346           0 : }
     347             : 
     348      172884 : static bool messaging_alert_event_contexts(struct messaging_context *ctx)
     349             : {
     350         369 :         size_t i, num_event_contexts;
     351             : 
     352      172884 :         num_event_contexts = talloc_array_length(ctx->event_contexts);
     353             : 
     354      345792 :         for (i=0; i<num_event_contexts; i++) {
     355      172908 :                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
     356             : 
     357      172908 :                 if (reg->refcount == 0) {
     358          20 :                         continue;
     359             :                 }
     360             : 
     361             :                 /*
     362             :                  * We depend on schedule_immediate to work
     363             :                  * multiple times. Might be a bit inefficient,
     364             :                  * but this needs to be proven in tests. The
     365             :                  * alternatively would be to track whether the
     366             :                  * immediate has already been scheduled. For
     367             :                  * now, avoid that complexity here.
     368             :                  */
     369             : 
     370      172888 :                 if (reg->ev == ctx->event_ctx) {
     371      172884 :                         tevent_schedule_immediate(
     372             :                                 reg->im, reg->ev,
     373             :                                 messaging_post_main_event_context,
     374         369 :                                 ctx);
     375             :                 } else {
     376           4 :                         tevent_schedule_immediate(
     377             :                                 reg->im, reg->ev,
     378             :                                 messaging_post_sub_event_context,
     379         371 :                                 ctx);
     380             :                 }
     381             : 
     382             :         }
     383      172884 :         return true;
     384             : }
     385             : 
     386       85799 : static void messaging_recv_cb(struct tevent_context *ev,
     387             :                               const uint8_t *msg, size_t msg_len,
     388             :                               int *fds, size_t num_fds,
     389             :                               void *private_data)
     390       85799 : {
     391       85799 :         struct messaging_context *msg_ctx = talloc_get_type_abort(
     392             :                 private_data, struct messaging_context);
     393         345 :         struct server_id_buf idbuf;
     394         345 :         struct messaging_rec rec;
     395       85799 :         int64_t fds64[MAX(1, MIN(num_fds, INT8_MAX))];
     396         345 :         size_t i;
     397             : 
     398       85799 :         if (msg_len < MESSAGE_HDR_LENGTH) {
     399           0 :                 DBG_WARNING("message too short: %zu\n", msg_len);
     400           0 :                 return;
     401             :         }
     402             : 
     403       85799 :         if (num_fds > INT8_MAX) {
     404           0 :                 DBG_WARNING("too many fds: %zu\n", num_fds);
     405           0 :                 return;
     406             :         }
     407             : 
     408      121641 :         for (i=0; i < num_fds; i++) {
     409       35842 :                 fds64[i] = fds[i];
     410             :         }
     411             : 
     412       85799 :         rec = (struct messaging_rec) {
     413             :                 .msg_version = MESSAGE_VERSION,
     414       85799 :                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
     415       85799 :                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
     416             :                 .num_fds = num_fds,
     417             :                 .fds = fds64,
     418             :         };
     419             : 
     420       85799 :         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
     421             : 
     422       85799 :         DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
     423             :                   (unsigned)rec.msg_type, rec.buf.length, num_fds,
     424             :                   server_id_str_buf(rec.src, &idbuf));
     425             : 
     426       85799 :         if (server_id_same_process(&rec.src, &msg_ctx->id)) {
     427           0 :                 DBG_DEBUG("Ignoring self-send\n");
     428           0 :                 return;
     429             :         }
     430             : 
     431       85799 :         messaging_dispatch_rec(msg_ctx, ev, &rec);
     432             : 
     433      121986 :         for (i=0; i<num_fds; i++) {
     434       35842 :                 fds[i] = fds64[i];
     435             :         }
     436             : }
     437             : 
     438       36135 : static int messaging_context_destructor(struct messaging_context *ctx)
     439             : {
     440         966 :         size_t i;
     441             : 
     442      200500 :         for (i=0; i<ctx->num_new_waiters; i++) {
     443      164365 :                 if (ctx->new_waiters[i] != NULL) {
     444       33688 :                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
     445       33688 :                         ctx->new_waiters[i] = NULL;
     446             :                 }
     447             :         }
     448       56151 :         for (i=0; i<ctx->num_waiters; i++) {
     449       20016 :                 if (ctx->waiters[i] != NULL) {
     450        2454 :                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
     451        2454 :                         ctx->waiters[i] = NULL;
     452             :                 }
     453             :         }
     454             : 
     455             :         /*
     456             :          * The immediates from messaging_alert_event_contexts
     457             :          * reference "ctx". Don't let them outlive the
     458             :          * messaging_context we're destroying here.
     459             :          */
     460       36135 :         TALLOC_FREE(ctx->event_contexts);
     461             : 
     462       36135 :         return 0;
     463             : }
     464             : 
     465       37016 : static const char *private_path(const char *name)
     466             : {
     467       37016 :         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
     468             : }
     469             : 
     470        5204 : static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
     471             :                                         struct tevent_context *ev,
     472             :                                         struct messaging_context **pmsg_ctx)
     473             : {
     474         137 :         TALLOC_CTX *frame;
     475         137 :         struct messaging_context *ctx;
     476         137 :         NTSTATUS status;
     477         137 :         int ret;
     478         137 :         const char *lck_path;
     479         137 :         const char *priv_path;
     480         137 :         void *ref;
     481         137 :         bool ok;
     482             : 
     483             :         /*
     484             :          * sec_init() *must* be called before any other
     485             :          * functions that use sec_XXX(). e.g. sec_initial_uid().
     486             :          */
     487             : 
     488        5204 :         sec_init();
     489             : 
     490        5204 :         lck_path = lock_path(talloc_tos(), "msg.lock");
     491        5204 :         if (lck_path == NULL) {
     492           0 :                 return NT_STATUS_NO_MEMORY;
     493             :         }
     494             : 
     495        5204 :         ok = directory_create_or_exist_strict(lck_path,
     496             :                                               sec_initial_uid(),
     497             :                                               0755);
     498        5204 :         if (!ok) {
     499           0 :                 DBG_DEBUG("Could not create lock directory: %s\n",
     500             :                           strerror(errno));
     501           0 :                 return NT_STATUS_ACCESS_DENIED;
     502             :         }
     503             : 
     504        5204 :         priv_path = private_path("msg.sock");
     505        5204 :         if (priv_path == NULL) {
     506           0 :                 return NT_STATUS_NO_MEMORY;
     507             :         }
     508             : 
     509        5204 :         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
     510             :                                               0700);
     511        5204 :         if (!ok) {
     512           0 :                 DBG_DEBUG("Could not create msg directory: %s\n",
     513             :                           strerror(errno));
     514           0 :                 return NT_STATUS_ACCESS_DENIED;
     515             :         }
     516             : 
     517        5204 :         frame = talloc_stackframe();
     518        5204 :         if (frame == NULL) {
     519           0 :                 return NT_STATUS_NO_MEMORY;
     520             :         }
     521             : 
     522        5204 :         ctx = talloc_zero(frame, struct messaging_context);
     523        5204 :         if (ctx == NULL) {
     524           0 :                 status = NT_STATUS_NO_MEMORY;
     525           0 :                 goto done;
     526             :         }
     527             : 
     528        5341 :         ctx->id = (struct server_id) {
     529        5204 :                 .pid = tevent_cached_getpid(), .vnn = NONCLUSTER_VNN
     530             :         };
     531             : 
     532        5204 :         ctx->event_ctx = ev;
     533             : 
     534        5204 :         ctx->per_process_talloc_ctx = talloc_new(ctx);
     535        5204 :         if (ctx->per_process_talloc_ctx == NULL) {
     536           0 :                 status = NT_STATUS_NO_MEMORY;
     537           0 :                 goto done;
     538             :         }
     539             : 
     540        5204 :         ok = messaging_register_event_context(ctx, ev);
     541        5204 :         if (!ok) {
     542           0 :                 status = NT_STATUS_NO_MEMORY;
     543           0 :                 goto done;
     544             :         }
     545             : 
     546        5204 :         ref = messaging_dgm_ref(
     547             :                 ctx->per_process_talloc_ctx,
     548             :                 ctx->event_ctx,
     549             :                 &ctx->id.unique_id,
     550             :                 priv_path,
     551             :                 lck_path,
     552             :                 messaging_recv_cb,
     553             :                 ctx,
     554             :                 &ret);
     555        5204 :         if (ref == NULL) {
     556           5 :                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
     557           5 :                 status = map_nt_error_from_unix(ret);
     558           5 :                 goto done;
     559             :         }
     560        5199 :         talloc_set_destructor(ctx, messaging_context_destructor);
     561             : 
     562             : #ifdef CLUSTER_SUPPORT
     563             :         if (lp_clustering()) {
     564             :                 ref = messaging_ctdb_ref(
     565             :                         ctx->per_process_talloc_ctx,
     566             :                         ctx->event_ctx,
     567             :                         lp_ctdbd_socket(),
     568             :                         lp_ctdb_timeout(),
     569             :                         ctx->id.unique_id,
     570             :                         messaging_recv_cb,
     571             :                         ctx,
     572             :                         &ret);
     573             :                 if (ref == NULL) {
     574             :                         DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
     575             :                                    strerror(ret));
     576             :                         status = map_nt_error_from_unix(ret);
     577             :                         goto done;
     578             :                 }
     579             :         }
     580             : #endif
     581             : 
     582        5199 :         ctx->id.vnn = get_my_vnn();
     583             : 
     584        5199 :         ctx->names_db = server_id_db_init(ctx,
     585             :                                           ctx->id,
     586             :                                           lp_lock_directory(),
     587             :                                           0,
     588             :                                           TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
     589        5199 :         if (ctx->names_db == NULL) {
     590           0 :                 DBG_DEBUG("server_id_db_init failed\n");
     591           0 :                 status = NT_STATUS_NO_MEMORY;
     592           0 :                 goto done;
     593             :         }
     594             : 
     595        5199 :         messaging_register(ctx, NULL, MSG_PING, ping_message);
     596             : 
     597             :         /* Register some debugging related messages */
     598             : 
     599        5199 :         register_msg_pool_usage(ctx->per_process_talloc_ctx, ctx);
     600        5199 :         register_dmalloc_msgs(ctx);
     601        5199 :         debug_register_msgs(ctx);
     602             : 
     603             :         {
     604         132 :                 struct server_id_buf tmp;
     605        5199 :                 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
     606             :         }
     607             : 
     608        5199 :         *pmsg_ctx = talloc_steal(mem_ctx, ctx);
     609             : 
     610        5199 :         status = NT_STATUS_OK;
     611        5204 : done:
     612        5204 :         TALLOC_FREE(frame);
     613             : 
     614        5204 :         return status;
     615             : }
     616             : 
     617        5204 : struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
     618             :                                          struct tevent_context *ev)
     619             : {
     620        5204 :         struct messaging_context *ctx = NULL;
     621         137 :         NTSTATUS status;
     622             : 
     623        5204 :         status = messaging_init_internal(mem_ctx,
     624             :                                          ev,
     625             :                                          &ctx);
     626        5204 :         if (!NT_STATUS_IS_OK(status)) {
     627           0 :                 return NULL;
     628             :         }
     629             : 
     630        5199 :         return ctx;
     631             : }
     632             : 
     633     5881850 : struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
     634             : {
     635     5881850 :         return msg_ctx->id;
     636             : }
     637             : 
     638             : /*
     639             :  * re-init after a fork
     640             :  */
     641       31812 : NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
     642             : {
     643         855 :         int ret;
     644         855 :         char *lck_path;
     645         855 :         void *ref;
     646             : 
     647       31812 :         TALLOC_FREE(msg_ctx->per_process_talloc_ctx);
     648             : 
     649       31812 :         msg_ctx->per_process_talloc_ctx = talloc_new(msg_ctx);
     650       31812 :         if (msg_ctx->per_process_talloc_ctx == NULL) {
     651           0 :                 return NT_STATUS_NO_MEMORY;
     652             :         }
     653             : 
     654       32667 :         msg_ctx->id = (struct server_id) {
     655       31812 :                 .pid = tevent_cached_getpid(), .vnn = msg_ctx->id.vnn
     656             :         };
     657             : 
     658       31812 :         lck_path = lock_path(talloc_tos(), "msg.lock");
     659       31812 :         if (lck_path == NULL) {
     660           0 :                 return NT_STATUS_NO_MEMORY;
     661             :         }
     662             : 
     663       31812 :         ref = messaging_dgm_ref(
     664             :                 msg_ctx->per_process_talloc_ctx,
     665             :                 msg_ctx->event_ctx,
     666             :                 &msg_ctx->id.unique_id,
     667             :                 private_path("msg.sock"),
     668             :                 lck_path,
     669             :                 messaging_recv_cb,
     670             :                 msg_ctx,
     671             :                 &ret);
     672             : 
     673       31812 :         if (ref == NULL) {
     674           0 :                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
     675           0 :                 return map_nt_error_from_unix(ret);
     676             :         }
     677             : 
     678       31812 :         if (lp_clustering()) {
     679           0 :                 ref = messaging_ctdb_ref(
     680             :                         msg_ctx->per_process_talloc_ctx,
     681             :                         msg_ctx->event_ctx,
     682             :                         lp_ctdbd_socket(),
     683             :                         lp_ctdb_timeout(),
     684             :                         msg_ctx->id.unique_id,
     685             :                         messaging_recv_cb,
     686             :                         msg_ctx,
     687             :                         &ret);
     688           0 :                 if (ref == NULL) {
     689           0 :                         DBG_NOTICE("messaging_ctdb_ref failed: %s\n",
     690             :                                    strerror(ret));
     691           0 :                         return map_nt_error_from_unix(ret);
     692             :                 }
     693             :         }
     694             : 
     695       31812 :         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
     696       31812 :         register_msg_pool_usage(msg_ctx->per_process_talloc_ctx, msg_ctx);
     697             : 
     698       31812 :         return NT_STATUS_OK;
     699             : }
     700             : 
     701             : 
     702             : /*
     703             :  * Register a dispatch function for a particular message type. Allow multiple
     704             :  * registrants
     705             : */
     706      473512 : NTSTATUS messaging_register(struct messaging_context *msg_ctx,
     707             :                             void *private_data,
     708             :                             uint32_t msg_type,
     709             :                             void (*fn)(struct messaging_context *msg,
     710             :                                        void *private_data, 
     711             :                                        uint32_t msg_type, 
     712             :                                        struct server_id server_id,
     713             :                                        DATA_BLOB *data))
     714             : {
     715       11928 :         struct messaging_callback *cb;
     716             : 
     717      473512 :         DEBUG(5, ("Registering messaging pointer for type %u - "
     718             :                   "private_data=%p\n",
     719             :                   (unsigned)msg_type, private_data));
     720             : 
     721             :         /*
     722             :          * Only one callback per type
     723             :          */
     724             : 
     725    10967693 :         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
     726             :                 /* we allow a second registration of the same message
     727             :                    type if it has a different private pointer. This is
     728             :                    needed in, for example, the internal notify code,
     729             :                    which creates a new notify context for each tree
     730             :                    connect, and expects to receive messages to each of
     731             :                    them. */
     732    10525789 :                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
     733       31608 :                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
     734             :                                   (unsigned)msg_type, private_data));
     735       31608 :                         cb->fn = fn;
     736       31608 :                         cb->private_data = private_data;
     737       31608 :                         return NT_STATUS_OK;
     738             :                 }
     739             :         }
     740             : 
     741      441904 :         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
     742           0 :                 return NT_STATUS_NO_MEMORY;
     743             :         }
     744             : 
     745      441904 :         cb->msg_type = msg_type;
     746      441904 :         cb->fn = fn;
     747      441904 :         cb->private_data = private_data;
     748             : 
     749      441904 :         DLIST_ADD(msg_ctx->callbacks, cb);
     750      441904 :         return NT_STATUS_OK;
     751             : }
     752             : 
     753             : /*
     754             :   De-register the function for a particular message type.
     755             : */
     756      210177 : void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
     757             :                           void *private_data)
     758             : {
     759        5396 :         struct messaging_callback *cb, *next;
     760             : 
     761     5801708 :         for (cb = ctx->callbacks; cb; cb = next) {
     762     5591531 :                 next = cb->next;
     763     5591531 :                 if ((cb->msg_type == msg_type)
     764      200532 :                     && (cb->private_data == private_data)) {
     765      200532 :                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
     766             :                                   (unsigned)msg_type, private_data));
     767      200532 :                         DLIST_REMOVE(ctx->callbacks, cb);
     768      200532 :                         TALLOC_FREE(cb);
     769             :                 }
     770             :         }
     771      210177 : }
     772             : 
     773             : /*
     774             :   Send a message to a particular server
     775             : */
     776      225459 : NTSTATUS messaging_send(struct messaging_context *msg_ctx,
     777             :                         struct server_id server, uint32_t msg_type,
     778             :                         const DATA_BLOB *data)
     779             : {
     780      225459 :         struct iovec iov = {0};
     781             : 
     782      225459 :         if (data != NULL) {
     783      224919 :                 iov.iov_base = data->data;
     784      224919 :                 iov.iov_len = data->length;
     785         585 :         };
     786             : 
     787      225459 :         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
     788             : }
     789             : 
     790       17875 : NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
     791             :                             struct server_id server, uint32_t msg_type,
     792             :                             const uint8_t *buf, size_t len)
     793             : {
     794       17875 :         DATA_BLOB blob = data_blob_const(buf, len);
     795       17875 :         return messaging_send(msg_ctx, server, msg_type, &blob);
     796             : }
     797             : 
     798      172884 : static int messaging_post_self(struct messaging_context *msg_ctx,
     799             :                                struct server_id src, struct server_id dst,
     800             :                                uint32_t msg_type,
     801             :                                const struct iovec *iov, int iovlen,
     802             :                                const int *fds, size_t num_fds)
     803             : {
     804         369 :         struct messaging_rec *rec;
     805         369 :         bool ok;
     806             : 
     807      172884 :         rec = messaging_rec_create(
     808             :                 msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
     809      172884 :         if (rec == NULL) {
     810           0 :                 return ENOMEM;
     811             :         }
     812             : 
     813      172884 :         ok = messaging_alert_event_contexts(msg_ctx);
     814      172884 :         if (!ok) {
     815           0 :                 TALLOC_FREE(rec);
     816           0 :                 return ENOMEM;
     817             :         }
     818             : 
     819      172884 :         DLIST_ADD_END(msg_ctx->posted_msgs, rec);
     820             : 
     821      172515 :         return 0;
     822             : }
     823             : 
     824      587075 : int messaging_send_iov_from(struct messaging_context *msg_ctx,
     825             :                             struct server_id src, struct server_id dst,
     826             :                             uint32_t msg_type,
     827             :                             const struct iovec *iov, int iovlen,
     828             :                             const int *fds, size_t num_fds)
     829      587075 : {
     830        1686 :         int ret;
     831        1686 :         uint8_t hdr[MESSAGE_HDR_LENGTH];
     832      587075 :         struct iovec iov2[iovlen+1];
     833             : 
     834      587075 :         if (server_id_is_disconnected(&dst)) {
     835           0 :                 return EINVAL;
     836             :         }
     837             : 
     838      587075 :         if (num_fds > INT8_MAX) {
     839           0 :                 return EINVAL;
     840             :         }
     841             : 
     842      587075 :         if (server_id_equal(&dst, &msg_ctx->id)) {
     843      172485 :                 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
     844             :                                           iov, iovlen, fds, num_fds);
     845      172485 :                 return ret;
     846             :         }
     847             : 
     848      414590 :         message_hdr_put(hdr, msg_type, src, dst);
     849      414590 :         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
     850      414590 :         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
     851             : 
     852      414590 :         if (dst.vnn != msg_ctx->id.vnn) {
     853           0 :                 if (num_fds > 0) {
     854           0 :                         return ENOSYS;
     855             :                 }
     856             : 
     857           0 :                 ret = messaging_ctdb_send(dst.vnn, dst.pid, iov2, iovlen+1);
     858           0 :                 return ret;
     859             :         }
     860             : 
     861      414590 :         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
     862             : 
     863      414590 :         if (ret == EACCES) {
     864           0 :                 become_root();
     865           0 :                 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
     866             :                                          fds, num_fds);
     867           0 :                 unbecome_root();
     868             :         }
     869             : 
     870      414590 :         if (ret == ECONNREFUSED) {
     871             :                 /*
     872             :                  * Linux returns this when a socket exists in the file
     873             :                  * system without a listening process. This is not
     874             :                  * documented in susv4 or the linux manpages, but it's
     875             :                  * easily testable. For the higher levels this is the
     876             :                  * same as "destination does not exist"
     877             :                  */
     878        6167 :                 ret = ENOENT;
     879             :         }
     880             : 
     881      413273 :         return ret;
     882             : }
     883             : 
     884      587075 : NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
     885             :                             struct server_id dst, uint32_t msg_type,
     886             :                             const struct iovec *iov, int iovlen,
     887             :                             const int *fds, size_t num_fds)
     888             : {
     889        1686 :         int ret;
     890             : 
     891      587075 :         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
     892             :                                       iov, iovlen, fds, num_fds);
     893      587075 :         if (ret != 0) {
     894        6169 :                 return map_nt_error_from_unix(ret);
     895             :         }
     896      580906 :         return NT_STATUS_OK;
     897             : }
     898             : 
     899             : struct send_all_state {
     900             :         struct messaging_context *msg_ctx;
     901             :         int msg_type;
     902             :         const void *buf;
     903             :         size_t len;
     904             : };
     905             : 
     906       14290 : static int send_all_fn(pid_t pid, void *private_data)
     907             : {
     908       14290 :         struct send_all_state *state = private_data;
     909           6 :         NTSTATUS status;
     910             : 
     911       14290 :         if (pid == tevent_cached_getpid()) {
     912         228 :                 DBG_DEBUG("Skip ourselves in messaging_send_all\n");
     913         228 :                 return 0;
     914             :         }
     915             : 
     916       14062 :         status = messaging_send_buf(state->msg_ctx, pid_to_procid(pid),
     917       14062 :                                     state->msg_type, state->buf, state->len);
     918       14062 :         if (!NT_STATUS_IS_OK(status)) {
     919        6167 :                 DBG_NOTICE("messaging_send_buf to %ju failed: %s\n",
     920             :                             (uintmax_t)pid, nt_errstr(status));
     921             :         }
     922             : 
     923       14057 :         return 0;
     924             : }
     925             : 
     926         228 : void messaging_send_all(struct messaging_context *msg_ctx,
     927             :                         int msg_type, const void *buf, size_t len)
     928             : {
     929         228 :         struct send_all_state state = {
     930             :                 .msg_ctx = msg_ctx, .msg_type = msg_type,
     931             :                 .buf = buf, .len = len
     932             :         };
     933           1 :         int ret;
     934             : 
     935             : #ifdef CLUSTER_SUPPORT
     936             :         if (lp_clustering()) {
     937             :                 struct ctdbd_connection *conn = messaging_ctdb_connection();
     938             :                 uint8_t msghdr[MESSAGE_HDR_LENGTH];
     939             :                 struct iovec iov[] = {
     940             :                         { .iov_base = msghdr,
     941             :                           .iov_len = sizeof(msghdr) },
     942             :                         { .iov_base = discard_const_p(void, buf),
     943             :                           .iov_len = len }
     944             :                 };
     945             : 
     946             :                 message_hdr_put(msghdr, msg_type, messaging_server_id(msg_ctx),
     947             :                                 (struct server_id) {0});
     948             : 
     949             :                 ret = ctdbd_messaging_send_iov(
     950             :                         conn, CTDB_BROADCAST_CONNECTED,
     951             :                         CTDB_SRVID_SAMBA_PROCESS,
     952             :                         iov, ARRAY_SIZE(iov));
     953             :                 if (ret != 0) {
     954             :                         DBG_WARNING("ctdbd_messaging_send_iov failed: %s\n",
     955             :                                     strerror(ret));
     956             :                 }
     957             : 
     958             :                 return;
     959             :         }
     960             : #endif
     961             : 
     962         228 :         ret = messaging_dgm_forall(send_all_fn, &state);
     963         228 :         if (ret != 0) {
     964           0 :                 DBG_WARNING("messaging_dgm_forall failed: %s\n",
     965             :                             strerror(ret));
     966             :         }
     967         228 : }
     968             : 
     969      177158 : static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
     970             :                                                struct messaging_rec *rec)
     971             : {
     972         597 :         struct messaging_rec *result;
     973      177158 :         size_t fds_size = sizeof(int64_t) * rec->num_fds;
     974         597 :         size_t payload_len;
     975             : 
     976      177158 :         payload_len = rec->buf.length + fds_size;
     977      177158 :         if (payload_len < rec->buf.length) {
     978             :                 /* overflow */
     979           0 :                 return NULL;
     980             :         }
     981             : 
     982      177158 :         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
     983             :                                       payload_len);
     984      177158 :         if (result == NULL) {
     985           0 :                 return NULL;
     986             :         }
     987      177158 :         *result = *rec;
     988             : 
     989             :         /* Doesn't fail, see talloc_pooled_object */
     990             : 
     991      177158 :         result->buf.data = talloc_memdup(result, rec->buf.data,
     992             :                                          rec->buf.length);
     993             : 
     994      177158 :         result->fds = NULL;
     995      177158 :         if (result->num_fds > 0) {
     996          56 :                 size_t i;
     997             : 
     998        1110 :                 result->fds = talloc_memdup(result, rec->fds, fds_size);
     999             : 
    1000        2223 :                 for (i=0; i<rec->num_fds; i++) {
    1001             :                         /*
    1002             :                          * fd's can only exist once
    1003             :                          */
    1004        1113 :                         rec->fds[i] = -1;
    1005             :                 }
    1006             :         }
    1007             : 
    1008      176561 :         return result;
    1009             : }
    1010             : 
    1011             : struct messaging_filtered_read_state {
    1012             :         struct tevent_context *ev;
    1013             :         struct messaging_context *msg_ctx;
    1014             :         struct messaging_dgm_fde *fde;
    1015             :         struct messaging_ctdb_fde *cluster_fde;
    1016             : 
    1017             :         bool (*filter)(struct messaging_rec *rec, void *private_data);
    1018             :         void *private_data;
    1019             : 
    1020             :         struct messaging_rec *rec;
    1021             : };
    1022             : 
    1023             : static void messaging_filtered_read_cleanup(struct tevent_req *req,
    1024             :                                             enum tevent_req_state req_state);
    1025             : 
    1026      138362 : struct tevent_req *messaging_filtered_read_send(
    1027             :         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
    1028             :         struct messaging_context *msg_ctx,
    1029             :         bool (*filter)(struct messaging_rec *rec, void *private_data),
    1030             :         void *private_data)
    1031             : {
    1032        3799 :         struct tevent_req *req;
    1033        3799 :         struct messaging_filtered_read_state *state;
    1034        3799 :         size_t new_waiters_len;
    1035        3799 :         bool ok;
    1036             : 
    1037      138362 :         req = tevent_req_create(mem_ctx, &state,
    1038             :                                 struct messaging_filtered_read_state);
    1039      138362 :         if (req == NULL) {
    1040           0 :                 return NULL;
    1041             :         }
    1042      138362 :         state->ev = ev;
    1043      138362 :         state->msg_ctx = msg_ctx;
    1044      138362 :         state->filter = filter;
    1045      138362 :         state->private_data = private_data;
    1046             : 
    1047             :         /*
    1048             :          * We have to defer the callback here, as we might be called from
    1049             :          * within a different tevent_context than state->ev
    1050             :          */
    1051      138362 :         tevent_req_defer_callback(req, state->ev);
    1052             : 
    1053      138362 :         state->fde = messaging_dgm_register_tevent_context(state, ev);
    1054      138362 :         if (tevent_req_nomem(state->fde, req)) {
    1055           0 :                 return tevent_req_post(req, ev);
    1056             :         }
    1057             : 
    1058      138362 :         if (lp_clustering()) {
    1059           0 :                 state->cluster_fde =
    1060           0 :                         messaging_ctdb_register_tevent_context(state, ev);
    1061           0 :                 if (tevent_req_nomem(state->cluster_fde, req)) {
    1062           0 :                         return tevent_req_post(req, ev);
    1063             :                 }
    1064             :         }
    1065             : 
    1066             :         /*
    1067             :          * We add ourselves to the "new_waiters" array, not the "waiters"
    1068             :          * array. If we are called from within messaging_read_done,
    1069             :          * messaging_dispatch_rec will be in an active for-loop on
    1070             :          * "waiters". We must be careful not to mess with this array, because
    1071             :          * it could mean that a single event is being delivered twice.
    1072             :          */
    1073             : 
    1074      138362 :         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
    1075             : 
    1076      138362 :         if (new_waiters_len == msg_ctx->num_new_waiters) {
    1077        3733 :                 struct tevent_req **tmp;
    1078             : 
    1079      117836 :                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
    1080             :                                      struct tevent_req *, new_waiters_len+1);
    1081      117836 :                 if (tevent_req_nomem(tmp, req)) {
    1082           0 :                         return tevent_req_post(req, ev);
    1083             :                 }
    1084      117836 :                 msg_ctx->new_waiters = tmp;
    1085             :         }
    1086             : 
    1087      138362 :         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
    1088      138362 :         msg_ctx->num_new_waiters += 1;
    1089      138362 :         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
    1090             : 
    1091      138362 :         ok = messaging_register_event_context(msg_ctx, ev);
    1092      138362 :         if (!ok) {
    1093           0 :                 tevent_req_oom(req);
    1094           0 :                 return tevent_req_post(req, ev);
    1095             :         }
    1096             : 
    1097      134563 :         return req;
    1098             : }
    1099             : 
    1100      131872 : static void messaging_filtered_read_cleanup(struct tevent_req *req,
    1101             :                                             enum tevent_req_state req_state)
    1102             : {
    1103      131872 :         struct messaging_filtered_read_state *state = tevent_req_data(
    1104             :                 req, struct messaging_filtered_read_state);
    1105      131872 :         struct messaging_context *msg_ctx = state->msg_ctx;
    1106        3664 :         size_t i;
    1107        3664 :         bool ok;
    1108             : 
    1109      131872 :         tevent_req_set_cleanup_fn(req, NULL);
    1110             : 
    1111      131872 :         TALLOC_FREE(state->fde);
    1112      131872 :         TALLOC_FREE(state->cluster_fde);
    1113             : 
    1114      131872 :         ok = messaging_deregister_event_context(msg_ctx, state->ev);
    1115      131872 :         if (!ok) {
    1116           0 :                 abort();
    1117             :         }
    1118             : 
    1119             :         /*
    1120             :          * Just set the [new_]waiters entry to NULL, be careful not to mess
    1121             :          * with the other "waiters" array contents. We are often called from
    1122             :          * within "messaging_dispatch_rec", which loops over
    1123             :          * "waiters". Messing with the "waiters" array will mess up that
    1124             :          * for-loop.
    1125             :          */
    1126             : 
    1127      193451 :         for (i=0; i<msg_ctx->num_waiters; i++) {
    1128       82406 :                 if (msg_ctx->waiters[i] == req) {
    1129       20827 :                         msg_ctx->waiters[i] = NULL;
    1130       20827 :                         return;
    1131             :                 }
    1132             :         }
    1133             : 
    1134      435496 :         for (i=0; i<msg_ctx->num_new_waiters; i++) {
    1135      435496 :                 if (msg_ctx->new_waiters[i] == req) {
    1136      111045 :                         msg_ctx->new_waiters[i] = NULL;
    1137      111045 :                         return;
    1138             :                 }
    1139             :         }
    1140             : }
    1141             : 
    1142        4274 : static void messaging_filtered_read_done(struct tevent_req *req,
    1143             :                                          struct messaging_rec *rec)
    1144             : {
    1145        4274 :         struct messaging_filtered_read_state *state = tevent_req_data(
    1146             :                 req, struct messaging_filtered_read_state);
    1147             : 
    1148        4274 :         state->rec = messaging_rec_dup(state, rec);
    1149        4274 :         if (tevent_req_nomem(state->rec, req)) {
    1150           0 :                 return;
    1151             :         }
    1152        4274 :         tevent_req_done(req);
    1153             : }
    1154             : 
    1155        4274 : int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
    1156             :                                  struct messaging_rec **presult)
    1157             : {
    1158        4274 :         struct messaging_filtered_read_state *state = tevent_req_data(
    1159             :                 req, struct messaging_filtered_read_state);
    1160         228 :         int err;
    1161             : 
    1162        4274 :         if (tevent_req_is_unix_error(req, &err)) {
    1163           0 :                 tevent_req_received(req);
    1164           0 :                 return err;
    1165             :         }
    1166        4274 :         if (presult != NULL) {
    1167        4274 :                 *presult = talloc_move(mem_ctx, &state->rec);
    1168             :         }
    1169        4274 :         tevent_req_received(req);
    1170        4274 :         return 0;
    1171             : }
    1172             : 
    1173             : struct messaging_read_state {
    1174             :         uint32_t msg_type;
    1175             :         struct messaging_rec *rec;
    1176             : };
    1177             : 
    1178             : static bool messaging_read_filter(struct messaging_rec *rec,
    1179             :                                   void *private_data);
    1180             : static void messaging_read_done(struct tevent_req *subreq);
    1181             : 
    1182       31034 : struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
    1183             :                                        struct tevent_context *ev,
    1184             :                                        struct messaging_context *msg,
    1185             :                                        uint32_t msg_type)
    1186             : {
    1187         949 :         struct tevent_req *req, *subreq;
    1188         949 :         struct messaging_read_state *state;
    1189             : 
    1190       31034 :         req = tevent_req_create(mem_ctx, &state,
    1191             :                                 struct messaging_read_state);
    1192       31034 :         if (req == NULL) {
    1193           0 :                 return NULL;
    1194             :         }
    1195       31034 :         state->msg_type = msg_type;
    1196             : 
    1197       31034 :         subreq = messaging_filtered_read_send(state, ev, msg,
    1198             :                                               messaging_read_filter, state);
    1199       31034 :         if (tevent_req_nomem(subreq, req)) {
    1200           0 :                 return tevent_req_post(req, ev);
    1201             :         }
    1202       31034 :         tevent_req_set_callback(subreq, messaging_read_done, req);
    1203       31034 :         return req;
    1204             : }
    1205             : 
    1206       25392 : static bool messaging_read_filter(struct messaging_rec *rec,
    1207             :                                   void *private_data)
    1208             : {
    1209       25392 :         struct messaging_read_state *state = talloc_get_type_abort(
    1210             :                 private_data, struct messaging_read_state);
    1211             : 
    1212       25392 :         if (rec->num_fds != 0) {
    1213         696 :                 return false;
    1214             :         }
    1215             : 
    1216       24694 :         return rec->msg_type == state->msg_type;
    1217             : }
    1218             : 
    1219         168 : static void messaging_read_done(struct tevent_req *subreq)
    1220             : {
    1221         168 :         struct tevent_req *req = tevent_req_callback_data(
    1222             :                 subreq, struct tevent_req);
    1223         168 :         struct messaging_read_state *state = tevent_req_data(
    1224             :                 req, struct messaging_read_state);
    1225         112 :         int ret;
    1226             : 
    1227         168 :         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
    1228         168 :         TALLOC_FREE(subreq);
    1229         168 :         if (tevent_req_error(req, ret)) {
    1230           0 :                 return;
    1231             :         }
    1232         168 :         tevent_req_done(req);
    1233             : }
    1234             : 
    1235         168 : int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
    1236             :                         struct messaging_rec **presult)
    1237             : {
    1238         168 :         struct messaging_read_state *state = tevent_req_data(
    1239             :                 req, struct messaging_read_state);
    1240         112 :         int err;
    1241             : 
    1242         168 :         if (tevent_req_is_unix_error(req, &err)) {
    1243           0 :                 return err;
    1244             :         }
    1245         168 :         if (presult != NULL) {
    1246          66 :                 *presult = talloc_move(mem_ctx, &state->rec);
    1247             :         }
    1248          56 :         return 0;
    1249             : }
    1250             : 
    1251       65080 : static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
    1252             : {
    1253       65080 :         if (msg_ctx->num_new_waiters == 0) {
    1254       60017 :                 return true;
    1255             :         }
    1256             : 
    1257        5059 :         if (talloc_array_length(msg_ctx->waiters) <
    1258        5059 :             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
    1259         217 :                 struct tevent_req **tmp;
    1260        3083 :                 tmp = talloc_realloc(
    1261             :                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
    1262             :                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
    1263        3083 :                 if (tmp == NULL) {
    1264           0 :                         DEBUG(1, ("%s: talloc failed\n", __func__));
    1265           0 :                         return false;
    1266             :                 }
    1267        3083 :                 msg_ctx->waiters = tmp;
    1268             :         }
    1269             : 
    1270        5059 :         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
    1271        5059 :                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
    1272             : 
    1273        5059 :         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
    1274        5059 :         msg_ctx->num_new_waiters = 0;
    1275             : 
    1276        5059 :         return true;
    1277             : }
    1278             : 
    1279      258207 : static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
    1280             :                                        struct messaging_rec *rec)
    1281             : {
    1282         711 :         struct messaging_callback *cb, *next;
    1283             : 
    1284     1435140 :         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
    1285        5414 :                 size_t j;
    1286             : 
    1287     1370481 :                 next = cb->next;
    1288     1370481 :                 if (cb->msg_type != rec->msg_type) {
    1289     1176933 :                         continue;
    1290             :                 }
    1291             : 
    1292             :                 /*
    1293             :                  * the old style callbacks don't support fd passing
    1294             :                  */
    1295      193548 :                 for (j=0; j < rec->num_fds; j++) {
    1296           0 :                         int fd = rec->fds[j];
    1297           0 :                         close(fd);
    1298             :                 }
    1299      193548 :                 rec->num_fds = 0;
    1300      193548 :                 rec->fds = NULL;
    1301             : 
    1302      193548 :                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
    1303             :                        rec->src, &rec->buf);
    1304             : 
    1305      193544 :                 return true;
    1306             :         }
    1307             : 
    1308       64424 :         return false;
    1309             : }
    1310             : 
    1311       65080 : static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
    1312             :                                        struct tevent_context *ev,
    1313             :                                        struct messaging_rec *rec)
    1314             : {
    1315         235 :         size_t i;
    1316             : 
    1317       65080 :         if (!messaging_append_new_waiters(msg_ctx)) {
    1318           0 :                 return false;
    1319             :         }
    1320             : 
    1321       64845 :         i = 0;
    1322      289927 :         while (i < msg_ctx->num_waiters) {
    1323         889 :                 struct tevent_req *req;
    1324         889 :                 struct messaging_filtered_read_state *state;
    1325             : 
    1326      229121 :                 req = msg_ctx->waiters[i];
    1327      229121 :                 if (req == NULL) {
    1328             :                         /*
    1329             :                          * This got cleaned up. In the meantime,
    1330             :                          * move everything down one. We need
    1331             :                          * to keep the order of waiters, as
    1332             :                          * other code may depend on this.
    1333             :                          */
    1334        6278 :                         ARRAY_DEL_ELEMENT(
    1335         227 :                                 msg_ctx->waiters, i, msg_ctx->num_waiters);
    1336        6278 :                         msg_ctx->num_waiters -= 1;
    1337        6278 :                         continue;
    1338             :                 }
    1339             : 
    1340      222843 :                 state = tevent_req_data(
    1341             :                         req, struct messaging_filtered_read_state);
    1342      444036 :                 if ((ev == state->ev) &&
    1343      221193 :                     state->filter(rec, state->private_data)) {
    1344        4274 :                         messaging_filtered_read_done(req, rec);
    1345        4274 :                         return true;
    1346             :                 }
    1347             : 
    1348      218569 :                 i += 1;
    1349             :         }
    1350             : 
    1351       60799 :         return false;
    1352             : }
    1353             : 
    1354             : /*
    1355             :   Dispatch one messaging_rec
    1356             : */
    1357       85799 : static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
    1358             :                                    struct tevent_context *ev,
    1359             :                                    struct messaging_rec *rec)
    1360             : {
    1361         345 :         bool consumed;
    1362         345 :         size_t i;
    1363             : 
    1364       85799 :         if (ev == msg_ctx->event_ctx) {
    1365       85378 :                 consumed = messaging_dispatch_classic(msg_ctx, rec);
    1366       85378 :                 if (consumed) {
    1367       42766 :                         return;
    1368             :                 }
    1369             :         }
    1370             : 
    1371       42907 :         consumed = messaging_dispatch_waiters(msg_ctx, ev, rec);
    1372       42907 :         if (consumed) {
    1373        2616 :                 return;
    1374             :         }
    1375             : 
    1376       40078 :         if (ev != msg_ctx->event_ctx) {
    1377           0 :                 struct iovec iov;
    1378         399 :                 int fds[MAX(1, rec->num_fds)];
    1379           0 :                 int ret;
    1380             : 
    1381             :                 /*
    1382             :                  * We've been listening on a nested event
    1383             :                  * context. Messages need to be handled in the main
    1384             :                  * event context, so post to ourselves
    1385             :                  */
    1386             : 
    1387         399 :                 iov.iov_base = rec->buf.data;
    1388         399 :                 iov.iov_len = rec->buf.length;
    1389             : 
    1390         399 :                 for (i=0; i<rec->num_fds; i++) {
    1391           0 :                         fds[i] = rec->fds[i];
    1392             :                 }
    1393             : 
    1394         399 :                 ret = messaging_post_self(
    1395         399 :                         msg_ctx, rec->src, rec->dest, rec->msg_type,
    1396         399 :                         &iov, 1, fds, rec->num_fds);
    1397         399 :                 if (ret == 0) {
    1398         399 :                         return;
    1399             :                 }
    1400             :         }
    1401             : }
    1402             : 
    1403             : static int mess_parent_dgm_cleanup(void *private_data);
    1404             : static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
    1405             : 
    1406          88 : bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
    1407             : {
    1408           0 :         struct tevent_req *req;
    1409             : 
    1410          88 :         req = background_job_send(
    1411             :                 msg, msg->event_ctx, msg, NULL, 0,
    1412          88 :                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
    1413             :                             60*15),
    1414             :                 mess_parent_dgm_cleanup, msg);
    1415          88 :         if (req == NULL) {
    1416           0 :                 DBG_WARNING("background_job_send failed\n");
    1417           0 :                 return false;
    1418             :         }
    1419          88 :         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
    1420          88 :         return true;
    1421             : }
    1422             : 
    1423           0 : static int mess_parent_dgm_cleanup(void *private_data)
    1424             : {
    1425           0 :         int ret;
    1426             : 
    1427           0 :         ret = messaging_dgm_wipe();
    1428           0 :         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
    1429             :                    ret ? strerror(ret) : "ok"));
    1430           0 :         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
    1431             :                            60*15);
    1432             : }
    1433             : 
    1434           0 : static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
    1435             : {
    1436           0 :         struct messaging_context *msg = tevent_req_callback_data(
    1437             :                 req, struct messaging_context);
    1438           0 :         NTSTATUS status;
    1439             : 
    1440           0 :         status = background_job_recv(req);
    1441           0 :         TALLOC_FREE(req);
    1442           0 :         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
    1443             :                   nt_errstr(status)));
    1444             : 
    1445           0 :         req = background_job_send(
    1446             :                 msg, msg->event_ctx, msg, NULL, 0,
    1447           0 :                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
    1448             :                             60*15),
    1449             :                 mess_parent_dgm_cleanup, msg);
    1450           0 :         if (req == NULL) {
    1451           0 :                 DEBUG(1, ("background_job_send failed\n"));
    1452           0 :                 return;
    1453             :         }
    1454           0 :         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
    1455             : }
    1456             : 
    1457           0 : int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
    1458             : {
    1459           0 :         int ret;
    1460             : 
    1461           0 :         if (pid == 0) {
    1462           0 :                 ret = messaging_dgm_wipe();
    1463             :         } else {
    1464           0 :                 ret = messaging_dgm_cleanup(pid);
    1465             :         }
    1466             : 
    1467           0 :         return ret;
    1468             : }
    1469             : 
    1470       75113 : struct tevent_context *messaging_tevent_context(
    1471             :         struct messaging_context *msg_ctx)
    1472             : {
    1473       75113 :         return msg_ctx->event_ctx;
    1474             : }
    1475             : 
    1476       20649 : struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
    1477             : {
    1478       20649 :         return msg_ctx->names_db;
    1479             : }
    1480             : 
    1481             : /** @} **/

Generated by: LCOV version 1.14