Line data Source code
1 : /*
2 : * Unix SMB/CIFS implementation.
3 : *
4 : * Copyright (C) Volker Lendecke 2014
5 : *
6 : * This program is free software; you can redistribute it and/or modify
7 : * it under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation; either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * This program is distributed in the hope that it will be useful,
12 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : * GNU General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public License
17 : * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : */
19 :
20 : #include "replace.h"
21 : #include <tevent.h>
22 : #include "notifyd_private.h"
23 : #include "lib/util/server_id.h"
24 : #include "lib/util/data_blob.h"
25 : #include "librpc/gen_ndr/notify.h"
26 : #include "librpc/gen_ndr/messaging.h"
27 : #include "librpc/gen_ndr/server_id.h"
28 : #include "lib/dbwrap/dbwrap.h"
29 : #include "lib/dbwrap/dbwrap_rbt.h"
30 : #include "messages.h"
31 : #include "tdb.h"
32 : #include "util_tdb.h"
33 : #include "notifyd.h"
34 : #include "lib/util/server_id_db.h"
35 : #include "lib/util/tevent_unix.h"
36 : #include "lib/util/tevent_ntstatus.h"
37 : #include "ctdbd_conn.h"
38 : #include "ctdb_srvids.h"
39 : #include "server_id_db_util.h"
40 : #include "lib/util/iov_buf.h"
41 : #include "messages_util.h"
42 :
43 : #ifdef CLUSTER_SUPPORT
44 : #include "ctdb_protocol.h"
45 : #endif
46 :
47 : struct notifyd_peer;
48 :
49 : /*
50 : * All of notifyd's state
51 : */
52 :
53 : struct notifyd_state {
54 : struct tevent_context *ev;
55 : struct messaging_context *msg_ctx;
56 : struct ctdbd_connection *ctdbd_conn;
57 :
58 : /*
59 : * Database of everything clients show interest in. Indexed by
60 : * absolute path. The database keys are not 0-terminated
61 : * to allow the critical operation, notifyd_trigger, to walk
62 : * the structure from the top without adding intermediate 0s.
63 : * The database records contain an array of
64 : *
65 : * struct notifyd_instance
66 : *
67 : * to be maintained and parsed by notifyd_parse_entry()
68 : */
69 : struct db_context *entries;
70 :
71 : /*
72 : * In the cluster case, this is the place where we store a log
73 : * of all MSG_SMB_NOTIFY_REC_CHANGE messages. We just 1:1
74 : * forward them to our peer notifyd's in the cluster once a
75 : * second or when the log grows too large.
76 : */
77 :
78 : struct messaging_reclog *log;
79 :
80 : /*
81 : * Array of companion notifyd's in a cluster. Every notifyd
82 : * broadcasts its messaging_reclog to every other notifyd in
83 : * the cluster. This is done by making ctdb send a message to
84 : * srvid CTDB_SRVID_SAMBA_NOTIFY_PROXY with destination node
85 : * number CTDB_BROADCAST_CONNECTED. Everybody in the cluster who
86 : * had called register_with_ctdbd this srvid will receive the
87 : * broadcasts.
88 : *
89 : * Database replication happens via these broadcasts. Also,
90 : * they serve as liveness indication. If a notifyd receives a
91 : * broadcast from an unknown peer, it will create one for this
92 : * srvid. Also when we don't hear anything from a peer for a
93 : * while, we will discard it.
94 : */
95 :
96 : struct notifyd_peer **peers;
97 : size_t num_peers;
98 :
99 : sys_notify_watch_fn sys_notify_watch;
100 : struct sys_notify_context *sys_notify_ctx;
101 : };
102 :
103 : struct notifyd_peer {
104 : struct notifyd_state *state;
105 : struct server_id pid;
106 : uint64_t rec_index;
107 : struct db_context *db;
108 : time_t last_broadcast;
109 : };
110 :
111 : static void notifyd_rec_change(struct messaging_context *msg_ctx,
112 : void *private_data, uint32_t msg_type,
113 : struct server_id src, DATA_BLOB *data);
114 : static void notifyd_trigger(struct messaging_context *msg_ctx,
115 : void *private_data, uint32_t msg_type,
116 : struct server_id src, DATA_BLOB *data);
117 : static void notifyd_get_db(struct messaging_context *msg_ctx,
118 : void *private_data, uint32_t msg_type,
119 : struct server_id src, DATA_BLOB *data);
120 :
121 : #ifdef CLUSTER_SUPPORT
122 : static void notifyd_got_db(struct messaging_context *msg_ctx,
123 : void *private_data, uint32_t msg_type,
124 : struct server_id src, DATA_BLOB *data);
125 : static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
126 : struct server_id src,
127 : struct messaging_reclog *log);
128 : #endif
129 : static void notifyd_sys_callback(struct sys_notify_context *ctx,
130 : void *private_data, struct notify_event *ev,
131 : uint32_t filter);
132 :
133 : #ifdef CLUSTER_SUPPORT
134 : static struct tevent_req *notifyd_broadcast_reclog_send(
135 : TALLOC_CTX *mem_ctx, struct tevent_context *ev,
136 : struct ctdbd_connection *ctdbd_conn, struct server_id src,
137 : struct messaging_reclog *log);
138 : static int notifyd_broadcast_reclog_recv(struct tevent_req *req);
139 :
140 : static struct tevent_req *notifyd_clean_peers_send(
141 : TALLOC_CTX *mem_ctx, struct tevent_context *ev,
142 : struct notifyd_state *notifyd);
143 : static int notifyd_clean_peers_recv(struct tevent_req *req);
144 : #endif
145 :
146 0 : static int sys_notify_watch_dummy(
147 : TALLOC_CTX *mem_ctx,
148 : struct sys_notify_context *ctx,
149 : const char *path,
150 : uint32_t *filter,
151 : uint32_t *subdir_filter,
152 : void (*callback)(struct sys_notify_context *ctx,
153 : void *private_data,
154 : struct notify_event *ev,
155 : uint32_t filter),
156 : void *private_data,
157 : void *handle_p)
158 : {
159 0 : void **handle = handle_p;
160 0 : *handle = NULL;
161 0 : return 0;
162 : }
163 :
164 : #ifdef CLUSTER_SUPPORT
165 : static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq);
166 : static void notifyd_clean_peers_finished(struct tevent_req *subreq);
167 : static int notifyd_snoop_broadcast(struct tevent_context *ev,
168 : uint32_t src_vnn, uint32_t dst_vnn,
169 : uint64_t dst_srvid,
170 : const uint8_t *msg, size_t msglen,
171 : void *private_data);
172 : #endif
173 :
174 0 : struct tevent_req *notifyd_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
175 : struct messaging_context *msg_ctx,
176 : struct ctdbd_connection *ctdbd_conn,
177 : sys_notify_watch_fn sys_notify_watch,
178 : struct sys_notify_context *sys_notify_ctx)
179 : {
180 0 : struct tevent_req *req;
181 : #ifdef CLUSTER_SUPPORT
182 : struct tevent_req *subreq;
183 : #endif
184 0 : struct notifyd_state *state;
185 0 : struct server_id_db *names_db;
186 0 : NTSTATUS status;
187 0 : int ret;
188 :
189 0 : req = tevent_req_create(mem_ctx, &state, struct notifyd_state);
190 0 : if (req == NULL) {
191 0 : return NULL;
192 : }
193 0 : state->ev = ev;
194 0 : state->msg_ctx = msg_ctx;
195 0 : state->ctdbd_conn = ctdbd_conn;
196 :
197 0 : if (sys_notify_watch == NULL) {
198 0 : sys_notify_watch = sys_notify_watch_dummy;
199 : }
200 :
201 0 : state->sys_notify_watch = sys_notify_watch;
202 0 : state->sys_notify_ctx = sys_notify_ctx;
203 :
204 0 : state->entries = db_open_rbt(state);
205 0 : if (tevent_req_nomem(state->entries, req)) {
206 0 : return tevent_req_post(req, ev);
207 : }
208 :
209 0 : status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_REC_CHANGE,
210 : notifyd_rec_change);
211 0 : if (tevent_req_nterror(req, status)) {
212 0 : return tevent_req_post(req, ev);
213 : }
214 :
215 0 : status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_TRIGGER,
216 : notifyd_trigger);
217 0 : if (tevent_req_nterror(req, status)) {
218 0 : goto deregister_rec_change;
219 : }
220 :
221 0 : status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_GET_DB,
222 : notifyd_get_db);
223 0 : if (tevent_req_nterror(req, status)) {
224 0 : goto deregister_trigger;
225 : }
226 :
227 0 : names_db = messaging_names_db(msg_ctx);
228 :
229 0 : ret = server_id_db_set_exclusive(names_db, "notify-daemon");
230 0 : if (ret != 0) {
231 0 : DBG_DEBUG("server_id_db_set_exclusive() failed: %s\n",
232 : strerror(ret));
233 0 : tevent_req_error(req, ret);
234 0 : goto deregister_get_db;
235 : }
236 :
237 0 : if (ctdbd_conn == NULL) {
238 : /*
239 : * No cluster around, skip the database replication
240 : * engine
241 : */
242 0 : return req;
243 : }
244 :
245 : #ifdef CLUSTER_SUPPORT
246 : status = messaging_register(msg_ctx, state, MSG_SMB_NOTIFY_DB,
247 : notifyd_got_db);
248 : if (tevent_req_nterror(req, status)) {
249 : goto deregister_get_db;
250 : }
251 :
252 : state->log = talloc_zero(state, struct messaging_reclog);
253 : if (tevent_req_nomem(state->log, req)) {
254 : goto deregister_db;
255 : }
256 :
257 : subreq = notifyd_broadcast_reclog_send(
258 : state->log, ev, ctdbd_conn,
259 : messaging_server_id(msg_ctx),
260 : state->log);
261 : if (tevent_req_nomem(subreq, req)) {
262 : goto deregister_db;
263 : }
264 : tevent_req_set_callback(subreq,
265 : notifyd_broadcast_reclog_finished,
266 : req);
267 :
268 : subreq = notifyd_clean_peers_send(state, ev, state);
269 : if (tevent_req_nomem(subreq, req)) {
270 : goto deregister_db;
271 : }
272 : tevent_req_set_callback(subreq, notifyd_clean_peers_finished,
273 : req);
274 :
275 : ret = register_with_ctdbd(ctdbd_conn,
276 : CTDB_SRVID_SAMBA_NOTIFY_PROXY,
277 : notifyd_snoop_broadcast, state);
278 : if (ret != 0) {
279 : tevent_req_error(req, ret);
280 : goto deregister_db;
281 : }
282 : #endif
283 :
284 0 : return req;
285 :
286 : #ifdef CLUSTER_SUPPORT
287 : deregister_db:
288 : messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_DB, state);
289 : #endif
290 0 : deregister_get_db:
291 0 : messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_GET_DB, state);
292 0 : deregister_trigger:
293 0 : messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_TRIGGER, state);
294 0 : deregister_rec_change:
295 0 : messaging_deregister(msg_ctx, MSG_SMB_NOTIFY_REC_CHANGE, state);
296 0 : return tevent_req_post(req, ev);
297 : }
298 :
299 : #ifdef CLUSTER_SUPPORT
300 :
301 : static void notifyd_broadcast_reclog_finished(struct tevent_req *subreq)
302 : {
303 : struct tevent_req *req = tevent_req_callback_data(
304 : subreq, struct tevent_req);
305 : int ret;
306 :
307 : ret = notifyd_broadcast_reclog_recv(subreq);
308 : TALLOC_FREE(subreq);
309 : tevent_req_error(req, ret);
310 : }
311 :
312 : static void notifyd_clean_peers_finished(struct tevent_req *subreq)
313 : {
314 : struct tevent_req *req = tevent_req_callback_data(
315 : subreq, struct tevent_req);
316 : int ret;
317 :
318 : ret = notifyd_clean_peers_recv(subreq);
319 : TALLOC_FREE(subreq);
320 : tevent_req_error(req, ret);
321 : }
322 :
323 : #endif
324 :
325 0 : int notifyd_recv(struct tevent_req *req)
326 : {
327 0 : return tevent_req_simple_recv_unix(req);
328 : }
329 :
330 0 : static bool notifyd_apply_rec_change(
331 : const struct server_id *client,
332 : const char *path, size_t pathlen,
333 : const struct notify_instance *chg,
334 : struct db_context *entries,
335 : sys_notify_watch_fn sys_notify_watch,
336 : struct sys_notify_context *sys_notify_ctx,
337 : struct messaging_context *msg_ctx)
338 : {
339 0 : struct db_record *rec = NULL;
340 0 : struct notifyd_instance *instances = NULL;
341 0 : size_t num_instances;
342 0 : size_t i;
343 0 : struct notifyd_instance *instance = NULL;
344 0 : TDB_DATA value;
345 0 : NTSTATUS status;
346 0 : bool ok = false;
347 :
348 0 : if (pathlen == 0) {
349 0 : DBG_WARNING("pathlen==0\n");
350 0 : return false;
351 : }
352 0 : if (path[pathlen-1] != '\0') {
353 0 : DBG_WARNING("path not 0-terminated\n");
354 0 : return false;
355 : }
356 :
357 0 : DBG_DEBUG("path=%s, filter=%"PRIu32", subdir_filter=%"PRIu32", "
358 : "private_data=%p\n",
359 : path,
360 : chg->filter,
361 : chg->subdir_filter,
362 : chg->private_data);
363 :
364 0 : rec = dbwrap_fetch_locked(
365 : entries, entries,
366 : make_tdb_data((const uint8_t *)path, pathlen-1));
367 :
368 0 : if (rec == NULL) {
369 0 : DBG_WARNING("dbwrap_fetch_locked failed\n");
370 0 : goto fail;
371 : }
372 :
373 0 : num_instances = 0;
374 0 : value = dbwrap_record_get_value(rec);
375 :
376 0 : if (value.dsize != 0) {
377 0 : if (!notifyd_parse_entry(value.dptr, value.dsize, NULL,
378 : &num_instances)) {
379 0 : goto fail;
380 : }
381 : }
382 :
383 : /*
384 : * Overallocate by one instance to avoid a realloc when adding
385 : */
386 0 : instances = talloc_array(rec, struct notifyd_instance,
387 : num_instances + 1);
388 0 : if (instances == NULL) {
389 0 : DBG_WARNING("talloc failed\n");
390 0 : goto fail;
391 : }
392 :
393 0 : if (value.dsize != 0) {
394 0 : memcpy(instances, value.dptr, value.dsize);
395 : }
396 :
397 0 : for (i=0; i<num_instances; i++) {
398 0 : instance = &instances[i];
399 :
400 0 : if (server_id_equal(&instance->client, client) &&
401 0 : (instance->instance.private_data == chg->private_data)) {
402 0 : break;
403 : }
404 : }
405 :
406 0 : if (i < num_instances) {
407 0 : instance->instance = *chg;
408 : } else {
409 : /*
410 : * We've overallocated for one instance
411 : */
412 0 : instance = &instances[num_instances];
413 :
414 0 : *instance = (struct notifyd_instance) {
415 0 : .client = *client,
416 0 : .instance = *chg,
417 0 : .internal_filter = chg->filter,
418 0 : .internal_subdir_filter = chg->subdir_filter
419 : };
420 :
421 0 : num_instances += 1;
422 : }
423 :
424 0 : if ((instance->instance.filter != 0) ||
425 0 : (instance->instance.subdir_filter != 0)) {
426 0 : int ret;
427 :
428 0 : TALLOC_FREE(instance->sys_watch);
429 :
430 0 : ret = sys_notify_watch(entries, sys_notify_ctx, path,
431 : &instance->internal_filter,
432 : &instance->internal_subdir_filter,
433 : notifyd_sys_callback, msg_ctx,
434 0 : &instance->sys_watch);
435 0 : if (ret != 0) {
436 0 : DBG_WARNING("sys_notify_watch for [%s] returned %s\n",
437 : path, strerror(errno));
438 : }
439 : }
440 :
441 0 : if ((instance->instance.filter == 0) &&
442 0 : (instance->instance.subdir_filter == 0)) {
443 : /* This is a delete request */
444 0 : TALLOC_FREE(instance->sys_watch);
445 0 : *instance = instances[num_instances-1];
446 0 : num_instances -= 1;
447 : }
448 :
449 0 : DBG_DEBUG("%s has %zu instances\n", path, num_instances);
450 :
451 0 : if (num_instances == 0) {
452 0 : status = dbwrap_record_delete(rec);
453 0 : if (!NT_STATUS_IS_OK(status)) {
454 0 : DBG_WARNING("dbwrap_record_delete returned %s\n",
455 : nt_errstr(status));
456 0 : goto fail;
457 : }
458 : } else {
459 0 : value = make_tdb_data(
460 : (uint8_t *)instances,
461 : sizeof(struct notifyd_instance) * num_instances);
462 :
463 0 : status = dbwrap_record_store(rec, value, 0);
464 0 : if (!NT_STATUS_IS_OK(status)) {
465 0 : DBG_WARNING("dbwrap_record_store returned %s\n",
466 : nt_errstr(status));
467 0 : goto fail;
468 : }
469 : }
470 :
471 0 : ok = true;
472 0 : fail:
473 0 : TALLOC_FREE(rec);
474 0 : return ok;
475 : }
476 :
477 0 : static void notifyd_sys_callback(struct sys_notify_context *ctx,
478 : void *private_data, struct notify_event *ev,
479 : uint32_t filter)
480 : {
481 0 : struct messaging_context *msg_ctx = talloc_get_type_abort(
482 : private_data, struct messaging_context);
483 0 : struct notify_trigger_msg msg;
484 0 : struct iovec iov[4];
485 0 : char slash = '/';
486 :
487 0 : msg = (struct notify_trigger_msg) {
488 0 : .when = timespec_current(),
489 0 : .action = ev->action,
490 : .filter = filter,
491 : };
492 :
493 0 : iov[0].iov_base = &msg;
494 0 : iov[0].iov_len = offsetof(struct notify_trigger_msg, path);
495 0 : iov[1].iov_base = discard_const_p(char, ev->dir);
496 0 : iov[1].iov_len = strlen(ev->dir);
497 0 : iov[2].iov_base = &slash;
498 0 : iov[2].iov_len = 1;
499 0 : iov[3].iov_base = discard_const_p(char, ev->path);
500 0 : iov[3].iov_len = strlen(ev->path)+1;
501 :
502 0 : messaging_send_iov(
503 : msg_ctx, messaging_server_id(msg_ctx),
504 : MSG_SMB_NOTIFY_TRIGGER, iov, ARRAY_SIZE(iov), NULL, 0);
505 0 : }
506 :
507 0 : static bool notifyd_parse_rec_change(uint8_t *buf, size_t bufsize,
508 : struct notify_rec_change_msg **pmsg,
509 : size_t *pathlen)
510 : {
511 0 : struct notify_rec_change_msg *msg;
512 :
513 0 : if (bufsize < offsetof(struct notify_rec_change_msg, path) + 1) {
514 0 : DBG_WARNING("message too short, ignoring: %zu\n", bufsize);
515 0 : return false;
516 : }
517 :
518 0 : *pmsg = msg = (struct notify_rec_change_msg *)buf;
519 0 : *pathlen = bufsize - offsetof(struct notify_rec_change_msg, path);
520 :
521 0 : DBG_DEBUG("Got rec_change_msg filter=%"PRIu32", "
522 : "subdir_filter=%"PRIu32", private_data=%p, path=%.*s\n",
523 : msg->instance.filter,
524 : msg->instance.subdir_filter,
525 : msg->instance.private_data,
526 : (int)(*pathlen),
527 : msg->path);
528 :
529 0 : return true;
530 : }
531 :
532 0 : static void notifyd_rec_change(struct messaging_context *msg_ctx,
533 : void *private_data, uint32_t msg_type,
534 : struct server_id src, DATA_BLOB *data)
535 : {
536 0 : struct notifyd_state *state = talloc_get_type_abort(
537 : private_data, struct notifyd_state);
538 0 : struct server_id_buf idbuf;
539 0 : struct notify_rec_change_msg *msg;
540 0 : size_t pathlen;
541 0 : bool ok;
542 0 : struct notify_instance instance;
543 :
544 0 : DBG_DEBUG("Got %zu bytes from %s\n", data->length,
545 : server_id_str_buf(src, &idbuf));
546 :
547 0 : ok = notifyd_parse_rec_change(data->data, data->length,
548 : &msg, &pathlen);
549 0 : if (!ok) {
550 0 : return;
551 : }
552 :
553 0 : memcpy(&instance, &msg->instance, sizeof(instance)); /* avoid SIGBUS */
554 :
555 0 : ok = notifyd_apply_rec_change(
556 0 : &src, msg->path, pathlen, &instance,
557 : state->entries, state->sys_notify_watch, state->sys_notify_ctx,
558 : state->msg_ctx);
559 0 : if (!ok) {
560 0 : DBG_DEBUG("notifyd_apply_rec_change failed, ignoring\n");
561 0 : return;
562 : }
563 :
564 0 : if ((state->log == NULL) || (state->ctdbd_conn == NULL)) {
565 0 : return;
566 : }
567 :
568 : #ifdef CLUSTER_SUPPORT
569 : {
570 :
571 : struct messaging_rec **tmp;
572 : struct messaging_reclog *log;
573 : struct iovec iov = { .iov_base = data->data, .iov_len = data->length };
574 :
575 : log = state->log;
576 :
577 : tmp = talloc_realloc(log, log->recs, struct messaging_rec *,
578 : log->num_recs+1);
579 : if (tmp == NULL) {
580 : DBG_WARNING("talloc_realloc failed, ignoring\n");
581 : return;
582 : }
583 : log->recs = tmp;
584 :
585 : log->recs[log->num_recs] = messaging_rec_create(
586 : log->recs, src, messaging_server_id(msg_ctx),
587 : msg_type, &iov, 1, NULL, 0);
588 :
589 : if (log->recs[log->num_recs] == NULL) {
590 : DBG_WARNING("messaging_rec_create failed, ignoring\n");
591 : return;
592 : }
593 :
594 : log->num_recs += 1;
595 :
596 : if (log->num_recs >= 100) {
597 : /*
598 : * Don't let the log grow too large
599 : */
600 : notifyd_broadcast_reclog(state->ctdbd_conn,
601 : messaging_server_id(msg_ctx), log);
602 : }
603 :
604 : }
605 : #endif
606 : }
607 :
608 : struct notifyd_trigger_state {
609 : struct messaging_context *msg_ctx;
610 : struct notify_trigger_msg *msg;
611 : bool recursive;
612 : bool covered_by_sys_notify;
613 : };
614 :
615 : static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
616 : void *private_data);
617 :
618 0 : static void notifyd_trigger(struct messaging_context *msg_ctx,
619 : void *private_data, uint32_t msg_type,
620 : struct server_id src, DATA_BLOB *data)
621 : {
622 0 : struct notifyd_state *state = talloc_get_type_abort(
623 : private_data, struct notifyd_state);
624 0 : struct server_id my_id = messaging_server_id(msg_ctx);
625 0 : struct notifyd_trigger_state tstate;
626 0 : const char *path;
627 0 : const char *p, *next_p;
628 :
629 0 : if (data->length < offsetof(struct notify_trigger_msg, path) + 1) {
630 0 : DBG_WARNING("message too short, ignoring: %zu\n",
631 : data->length);
632 0 : return;
633 : }
634 0 : if (data->data[data->length-1] != 0) {
635 0 : DBG_WARNING("path not 0-terminated, ignoring\n");;
636 0 : return;
637 : }
638 :
639 0 : tstate.msg_ctx = msg_ctx;
640 :
641 0 : tstate.covered_by_sys_notify = (src.vnn == my_id.vnn);
642 0 : tstate.covered_by_sys_notify &= !server_id_equal(&src, &my_id);
643 :
644 0 : tstate.msg = (struct notify_trigger_msg *)data->data;
645 0 : path = tstate.msg->path;
646 :
647 0 : DBG_DEBUG("Got trigger_msg action=%"PRIu32", filter=%"PRIu32", "
648 : "path=%s\n",
649 : tstate.msg->action,
650 : tstate.msg->filter,
651 : path);
652 :
653 0 : if (path[0] != '/') {
654 0 : DBG_WARNING("path %s does not start with /, ignoring\n",
655 : path);
656 0 : return;
657 : }
658 :
659 0 : for (p = strchr(path+1, '/'); p != NULL; p = next_p) {
660 0 : ptrdiff_t path_len = p - path;
661 0 : TDB_DATA key;
662 0 : uint32_t i;
663 :
664 0 : next_p = strchr(p+1, '/');
665 0 : tstate.recursive = (next_p != NULL);
666 :
667 0 : DBG_DEBUG("Trying path %.*s\n", (int)path_len, path);
668 :
669 0 : key = (TDB_DATA) { .dptr = discard_const_p(uint8_t, path),
670 : .dsize = path_len };
671 :
672 0 : dbwrap_parse_record(state->entries, key,
673 : notifyd_trigger_parser, &tstate);
674 :
675 0 : if (state->peers == NULL) {
676 0 : continue;
677 : }
678 :
679 0 : if (src.vnn != my_id.vnn) {
680 0 : continue;
681 : }
682 :
683 0 : for (i=0; i<state->num_peers; i++) {
684 0 : if (state->peers[i]->db == NULL) {
685 : /*
686 : * Inactive peer, did not get a db yet
687 : */
688 0 : continue;
689 : }
690 0 : dbwrap_parse_record(state->peers[i]->db, key,
691 : notifyd_trigger_parser, &tstate);
692 : }
693 : }
694 : }
695 :
696 : static void notifyd_send_delete(struct messaging_context *msg_ctx,
697 : TDB_DATA key,
698 : struct notifyd_instance *instance);
699 :
700 0 : static void notifyd_trigger_parser(TDB_DATA key, TDB_DATA data,
701 : void *private_data)
702 :
703 : {
704 0 : struct notifyd_trigger_state *tstate = private_data;
705 0 : struct notify_event_msg msg = { .action = tstate->msg->action,
706 0 : .when = tstate->msg->when };
707 0 : struct iovec iov[2];
708 0 : size_t path_len = key.dsize;
709 0 : struct notifyd_instance *instances = NULL;
710 0 : size_t num_instances = 0;
711 0 : size_t i;
712 :
713 0 : if (!notifyd_parse_entry(data.dptr, data.dsize, &instances,
714 : &num_instances)) {
715 0 : DBG_DEBUG("Could not parse notifyd_entry\n");
716 0 : return;
717 : }
718 :
719 0 : DBG_DEBUG("Found %zu instances for %.*s\n",
720 : num_instances,
721 : (int)key.dsize,
722 : (char *)key.dptr);
723 :
724 0 : iov[0].iov_base = &msg;
725 0 : iov[0].iov_len = offsetof(struct notify_event_msg, path);
726 0 : iov[1].iov_base = tstate->msg->path + path_len + 1;
727 0 : iov[1].iov_len = strlen((char *)(iov[1].iov_base)) + 1;
728 :
729 0 : for (i=0; i<num_instances; i++) {
730 0 : struct notifyd_instance *instance = &instances[i];
731 0 : struct server_id_buf idbuf;
732 0 : uint32_t i_filter;
733 0 : NTSTATUS status;
734 :
735 0 : if (tstate->covered_by_sys_notify) {
736 0 : if (tstate->recursive) {
737 0 : i_filter = instance->internal_subdir_filter;
738 : } else {
739 0 : i_filter = instance->internal_filter;
740 : }
741 : } else {
742 0 : if (tstate->recursive) {
743 0 : i_filter = instance->instance.subdir_filter;
744 : } else {
745 0 : i_filter = instance->instance.filter;
746 : }
747 : }
748 :
749 0 : if ((i_filter & tstate->msg->filter) == 0) {
750 0 : continue;
751 : }
752 :
753 0 : msg.private_data = instance->instance.private_data;
754 :
755 0 : status = messaging_send_iov(
756 : tstate->msg_ctx, instance->client,
757 : MSG_PVFS_NOTIFY, iov, ARRAY_SIZE(iov), NULL, 0);
758 :
759 0 : DBG_DEBUG("messaging_send_iov to %s returned %s\n",
760 : server_id_str_buf(instance->client, &idbuf),
761 : nt_errstr(status));
762 :
763 0 : if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND) &&
764 0 : procid_is_local(&instance->client)) {
765 : /*
766 : * That process has died
767 : */
768 0 : notifyd_send_delete(tstate->msg_ctx, key, instance);
769 0 : continue;
770 : }
771 :
772 0 : if (!NT_STATUS_IS_OK(status)) {
773 0 : DBG_WARNING("messaging_send_iov returned %s\n",
774 : nt_errstr(status));
775 : }
776 : }
777 : }
778 :
779 : /*
780 : * Send a delete request to ourselves to properly discard a notify
781 : * record for an smbd that has died.
782 : */
783 :
784 0 : static void notifyd_send_delete(struct messaging_context *msg_ctx,
785 : TDB_DATA key,
786 : struct notifyd_instance *instance)
787 : {
788 0 : struct notify_rec_change_msg msg = {
789 0 : .instance.private_data = instance->instance.private_data
790 : };
791 0 : uint8_t nul = 0;
792 0 : struct iovec iov[3];
793 0 : int ret;
794 :
795 : /*
796 : * Send a rec_change to ourselves to delete a dead entry
797 : */
798 :
799 0 : iov[0] = (struct iovec) {
800 : .iov_base = &msg,
801 : .iov_len = offsetof(struct notify_rec_change_msg, path) };
802 0 : iov[1] = (struct iovec) { .iov_base = key.dptr, .iov_len = key.dsize };
803 0 : iov[2] = (struct iovec) { .iov_base = &nul, .iov_len = sizeof(nul) };
804 :
805 0 : ret = messaging_send_iov_from(
806 : msg_ctx, instance->client, messaging_server_id(msg_ctx),
807 : MSG_SMB_NOTIFY_REC_CHANGE, iov, ARRAY_SIZE(iov), NULL, 0);
808 :
809 0 : if (ret != 0) {
810 0 : DBG_WARNING("messaging_send_iov_from returned %s\n",
811 : strerror(ret));
812 : }
813 0 : }
814 :
815 0 : static void notifyd_get_db(struct messaging_context *msg_ctx,
816 : void *private_data, uint32_t msg_type,
817 : struct server_id src, DATA_BLOB *data)
818 : {
819 0 : struct notifyd_state *state = talloc_get_type_abort(
820 : private_data, struct notifyd_state);
821 0 : struct server_id_buf id1, id2;
822 0 : NTSTATUS status;
823 0 : uint64_t rec_index = UINT64_MAX;
824 0 : uint8_t index_buf[sizeof(uint64_t)];
825 0 : size_t dbsize;
826 0 : uint8_t *buf;
827 0 : struct iovec iov[2];
828 :
829 0 : dbsize = dbwrap_marshall(state->entries, NULL, 0);
830 :
831 0 : buf = talloc_array(talloc_tos(), uint8_t, dbsize);
832 0 : if (buf == NULL) {
833 0 : DBG_WARNING("talloc_array(%zu) failed\n", dbsize);
834 0 : return;
835 : }
836 :
837 0 : dbsize = dbwrap_marshall(state->entries, buf, dbsize);
838 :
839 0 : if (dbsize != talloc_get_size(buf)) {
840 0 : DBG_DEBUG("dbsize changed: %zu->%zu\n",
841 : talloc_get_size(buf),
842 : dbsize);
843 0 : TALLOC_FREE(buf);
844 0 : return;
845 : }
846 :
847 0 : if (state->log != NULL) {
848 0 : rec_index = state->log->rec_index;
849 : }
850 0 : SBVAL(index_buf, 0, rec_index);
851 :
852 0 : iov[0] = (struct iovec) { .iov_base = index_buf,
853 : .iov_len = sizeof(index_buf) };
854 0 : iov[1] = (struct iovec) { .iov_base = buf,
855 : .iov_len = dbsize };
856 :
857 0 : DBG_DEBUG("Sending %zu bytes to %s->%s\n",
858 : iov_buflen(iov, ARRAY_SIZE(iov)),
859 : server_id_str_buf(messaging_server_id(msg_ctx), &id1),
860 : server_id_str_buf(src, &id2));
861 :
862 0 : status = messaging_send_iov(msg_ctx, src, MSG_SMB_NOTIFY_DB,
863 : iov, ARRAY_SIZE(iov), NULL, 0);
864 0 : TALLOC_FREE(buf);
865 0 : if (!NT_STATUS_IS_OK(status)) {
866 0 : DBG_WARNING("messaging_send_iov failed: %s\n",
867 : nt_errstr(status));
868 : }
869 : }
870 :
871 : #ifdef CLUSTER_SUPPORT
872 :
873 : static int notifyd_add_proxy_syswatches(struct db_record *rec,
874 : void *private_data);
875 :
876 : static void notifyd_got_db(struct messaging_context *msg_ctx,
877 : void *private_data, uint32_t msg_type,
878 : struct server_id src, DATA_BLOB *data)
879 : {
880 : struct notifyd_state *state = talloc_get_type_abort(
881 : private_data, struct notifyd_state);
882 : struct notifyd_peer *p = NULL;
883 : struct server_id_buf idbuf;
884 : NTSTATUS status;
885 : int count;
886 : size_t i;
887 :
888 : for (i=0; i<state->num_peers; i++) {
889 : if (server_id_equal(&src, &state->peers[i]->pid)) {
890 : p = state->peers[i];
891 : break;
892 : }
893 : }
894 :
895 : if (p == NULL) {
896 : DBG_DEBUG("Did not find peer for db from %s\n",
897 : server_id_str_buf(src, &idbuf));
898 : return;
899 : }
900 :
901 : if (data->length < 8) {
902 : DBG_DEBUG("Got short db length %zu from %s\n", data->length,
903 : server_id_str_buf(src, &idbuf));
904 : TALLOC_FREE(p);
905 : return;
906 : }
907 :
908 : p->rec_index = BVAL(data->data, 0);
909 :
910 : p->db = db_open_rbt(p);
911 : if (p->db == NULL) {
912 : DBG_DEBUG("db_open_rbt failed\n");
913 : TALLOC_FREE(p);
914 : return;
915 : }
916 :
917 : status = dbwrap_unmarshall(p->db, data->data + 8,
918 : data->length - 8);
919 : if (!NT_STATUS_IS_OK(status)) {
920 : DBG_DEBUG("dbwrap_unmarshall returned %s for db %s\n",
921 : nt_errstr(status),
922 : server_id_str_buf(src, &idbuf));
923 : TALLOC_FREE(p);
924 : return;
925 : }
926 :
927 : dbwrap_traverse_read(p->db, notifyd_add_proxy_syswatches, state,
928 : &count);
929 :
930 : DBG_DEBUG("Database from %s contained %d records\n",
931 : server_id_str_buf(src, &idbuf),
932 : count);
933 : }
934 :
935 : static void notifyd_broadcast_reclog(struct ctdbd_connection *ctdbd_conn,
936 : struct server_id src,
937 : struct messaging_reclog *log)
938 : {
939 : enum ndr_err_code ndr_err;
940 : uint8_t msghdr[MESSAGE_HDR_LENGTH];
941 : DATA_BLOB blob;
942 : struct iovec iov[2];
943 : int ret;
944 :
945 : if (log == NULL) {
946 : return;
947 : }
948 :
949 : DBG_DEBUG("rec_index=%"PRIu64", num_recs=%"PRIu32"\n",
950 : log->rec_index,
951 : log->num_recs);
952 :
953 : message_hdr_put(msghdr, MSG_SMB_NOTIFY_REC_CHANGES, src,
954 : (struct server_id) {0 });
955 : iov[0] = (struct iovec) { .iov_base = msghdr,
956 : .iov_len = sizeof(msghdr) };
957 :
958 : ndr_err = ndr_push_struct_blob(
959 : &blob, log, log,
960 : (ndr_push_flags_fn_t)ndr_push_messaging_reclog);
961 : if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
962 : DBG_WARNING("ndr_push_messaging_recs failed: %s\n",
963 : ndr_errstr(ndr_err));
964 : goto done;
965 : }
966 : iov[1] = (struct iovec) { .iov_base = blob.data,
967 : .iov_len = blob.length };
968 :
969 : ret = ctdbd_messaging_send_iov(
970 : ctdbd_conn, CTDB_BROADCAST_CONNECTED,
971 : CTDB_SRVID_SAMBA_NOTIFY_PROXY, iov, ARRAY_SIZE(iov));
972 : TALLOC_FREE(blob.data);
973 : if (ret != 0) {
974 : DBG_WARNING("ctdbd_messaging_send failed: %s\n",
975 : strerror(ret));
976 : goto done;
977 : }
978 :
979 : log->rec_index += 1;
980 :
981 : done:
982 : log->num_recs = 0;
983 : TALLOC_FREE(log->recs);
984 : }
985 :
986 : struct notifyd_broadcast_reclog_state {
987 : struct tevent_context *ev;
988 : struct ctdbd_connection *ctdbd_conn;
989 : struct server_id src;
990 : struct messaging_reclog *log;
991 : };
992 :
993 : static void notifyd_broadcast_reclog_next(struct tevent_req *subreq);
994 :
995 : static struct tevent_req *notifyd_broadcast_reclog_send(
996 : TALLOC_CTX *mem_ctx, struct tevent_context *ev,
997 : struct ctdbd_connection *ctdbd_conn, struct server_id src,
998 : struct messaging_reclog *log)
999 : {
1000 : struct tevent_req *req, *subreq;
1001 : struct notifyd_broadcast_reclog_state *state;
1002 :
1003 : req = tevent_req_create(mem_ctx, &state,
1004 : struct notifyd_broadcast_reclog_state);
1005 : if (req == NULL) {
1006 : return NULL;
1007 : }
1008 : state->ev = ev;
1009 : state->ctdbd_conn = ctdbd_conn;
1010 : state->src = src;
1011 : state->log = log;
1012 :
1013 : subreq = tevent_wakeup_send(state, state->ev,
1014 : timeval_current_ofs_msec(1000));
1015 : if (tevent_req_nomem(subreq, req)) {
1016 : return tevent_req_post(req, ev);
1017 : }
1018 : tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1019 : return req;
1020 : }
1021 :
1022 : static void notifyd_broadcast_reclog_next(struct tevent_req *subreq)
1023 : {
1024 : struct tevent_req *req = tevent_req_callback_data(
1025 : subreq, struct tevent_req);
1026 : struct notifyd_broadcast_reclog_state *state = tevent_req_data(
1027 : req, struct notifyd_broadcast_reclog_state);
1028 : bool ok;
1029 :
1030 : ok = tevent_wakeup_recv(subreq);
1031 : TALLOC_FREE(subreq);
1032 : if (!ok) {
1033 : tevent_req_oom(req);
1034 : return;
1035 : }
1036 :
1037 : notifyd_broadcast_reclog(state->ctdbd_conn, state->src, state->log);
1038 :
1039 : subreq = tevent_wakeup_send(state, state->ev,
1040 : timeval_current_ofs_msec(1000));
1041 : if (tevent_req_nomem(subreq, req)) {
1042 : return;
1043 : }
1044 : tevent_req_set_callback(subreq, notifyd_broadcast_reclog_next, req);
1045 : }
1046 :
1047 : static int notifyd_broadcast_reclog_recv(struct tevent_req *req)
1048 : {
1049 : return tevent_req_simple_recv_unix(req);
1050 : }
1051 :
1052 : struct notifyd_clean_peers_state {
1053 : struct tevent_context *ev;
1054 : struct notifyd_state *notifyd;
1055 : };
1056 :
1057 : static void notifyd_clean_peers_next(struct tevent_req *subreq);
1058 :
1059 : static struct tevent_req *notifyd_clean_peers_send(
1060 : TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1061 : struct notifyd_state *notifyd)
1062 : {
1063 : struct tevent_req *req, *subreq;
1064 : struct notifyd_clean_peers_state *state;
1065 :
1066 : req = tevent_req_create(mem_ctx, &state,
1067 : struct notifyd_clean_peers_state);
1068 : if (req == NULL) {
1069 : return NULL;
1070 : }
1071 : state->ev = ev;
1072 : state->notifyd = notifyd;
1073 :
1074 : subreq = tevent_wakeup_send(state, state->ev,
1075 : timeval_current_ofs_msec(30000));
1076 : if (tevent_req_nomem(subreq, req)) {
1077 : return tevent_req_post(req, ev);
1078 : }
1079 : tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1080 : return req;
1081 : }
1082 :
1083 : static void notifyd_clean_peers_next(struct tevent_req *subreq)
1084 : {
1085 : struct tevent_req *req = tevent_req_callback_data(
1086 : subreq, struct tevent_req);
1087 : struct notifyd_clean_peers_state *state = tevent_req_data(
1088 : req, struct notifyd_clean_peers_state);
1089 : struct notifyd_state *notifyd = state->notifyd;
1090 : size_t i;
1091 : bool ok;
1092 : time_t now = time(NULL);
1093 :
1094 : ok = tevent_wakeup_recv(subreq);
1095 : TALLOC_FREE(subreq);
1096 : if (!ok) {
1097 : tevent_req_oom(req);
1098 : return;
1099 : }
1100 :
1101 : i = 0;
1102 : while (i < notifyd->num_peers) {
1103 : struct notifyd_peer *p = notifyd->peers[i];
1104 :
1105 : if ((now - p->last_broadcast) > 60) {
1106 : struct server_id_buf idbuf;
1107 :
1108 : /*
1109 : * Haven't heard for more than 60 seconds. Call this
1110 : * peer dead
1111 : */
1112 :
1113 : DBG_DEBUG("peer %s died\n",
1114 : server_id_str_buf(p->pid, &idbuf));
1115 : /*
1116 : * This implicitly decrements notifyd->num_peers
1117 : */
1118 : TALLOC_FREE(p);
1119 : } else {
1120 : i += 1;
1121 : }
1122 : }
1123 :
1124 : subreq = tevent_wakeup_send(state, state->ev,
1125 : timeval_current_ofs_msec(30000));
1126 : if (tevent_req_nomem(subreq, req)) {
1127 : return;
1128 : }
1129 : tevent_req_set_callback(subreq, notifyd_clean_peers_next, req);
1130 : }
1131 :
1132 : static int notifyd_clean_peers_recv(struct tevent_req *req)
1133 : {
1134 : return tevent_req_simple_recv_unix(req);
1135 : }
1136 :
1137 : static int notifyd_add_proxy_syswatches(struct db_record *rec,
1138 : void *private_data)
1139 : {
1140 : struct notifyd_state *state = talloc_get_type_abort(
1141 : private_data, struct notifyd_state);
1142 : struct db_context *db = dbwrap_record_get_db(rec);
1143 : TDB_DATA key = dbwrap_record_get_key(rec);
1144 : TDB_DATA value = dbwrap_record_get_value(rec);
1145 : struct notifyd_instance *instances = NULL;
1146 : size_t num_instances = 0;
1147 : size_t i;
1148 : char path[key.dsize+1];
1149 : bool ok;
1150 :
1151 : memcpy(path, key.dptr, key.dsize);
1152 : path[key.dsize] = '\0';
1153 :
1154 : ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1155 : &num_instances);
1156 : if (!ok) {
1157 : DBG_WARNING("Could not parse notifyd entry for %s\n", path);
1158 : return 0;
1159 : }
1160 :
1161 : for (i=0; i<num_instances; i++) {
1162 : struct notifyd_instance *instance = &instances[i];
1163 : uint32_t filter = instance->instance.filter;
1164 : uint32_t subdir_filter = instance->instance.subdir_filter;
1165 : int ret;
1166 :
1167 : /*
1168 : * This is a remote database. Pointers that we were
1169 : * given don't make sense locally. Initialize to NULL
1170 : * in case sys_notify_watch fails.
1171 : */
1172 : instances[i].sys_watch = NULL;
1173 :
1174 : ret = state->sys_notify_watch(
1175 : db, state->sys_notify_ctx, path,
1176 : &filter, &subdir_filter,
1177 : notifyd_sys_callback, state->msg_ctx,
1178 : &instance->sys_watch);
1179 : if (ret != 0) {
1180 : DBG_WARNING("inotify_watch returned %s\n",
1181 : strerror(errno));
1182 : }
1183 : }
1184 :
1185 : return 0;
1186 : }
1187 :
1188 : static int notifyd_db_del_syswatches(struct db_record *rec, void *private_data)
1189 : {
1190 : TDB_DATA key = dbwrap_record_get_key(rec);
1191 : TDB_DATA value = dbwrap_record_get_value(rec);
1192 : struct notifyd_instance *instances = NULL;
1193 : size_t num_instances = 0;
1194 : size_t i;
1195 : bool ok;
1196 :
1197 : ok = notifyd_parse_entry(value.dptr, value.dsize, &instances,
1198 : &num_instances);
1199 : if (!ok) {
1200 : DBG_WARNING("Could not parse notifyd entry for %.*s\n",
1201 : (int)key.dsize, (char *)key.dptr);
1202 : return 0;
1203 : }
1204 : for (i=0; i<num_instances; i++) {
1205 : TALLOC_FREE(instances[i].sys_watch);
1206 : }
1207 : return 0;
1208 : }
1209 :
1210 : static int notifyd_peer_destructor(struct notifyd_peer *p)
1211 : {
1212 : struct notifyd_state *state = p->state;
1213 : size_t i;
1214 :
1215 : if (p->db != NULL) {
1216 : dbwrap_traverse_read(p->db, notifyd_db_del_syswatches,
1217 : NULL, NULL);
1218 : }
1219 :
1220 : for (i = 0; i<state->num_peers; i++) {
1221 : if (p == state->peers[i]) {
1222 : state->peers[i] = state->peers[state->num_peers-1];
1223 : state->num_peers -= 1;
1224 : break;
1225 : }
1226 : }
1227 : return 0;
1228 : }
1229 :
1230 : static struct notifyd_peer *notifyd_peer_new(
1231 : struct notifyd_state *state, struct server_id pid)
1232 : {
1233 : struct notifyd_peer *p, **tmp;
1234 :
1235 : tmp = talloc_realloc(state, state->peers, struct notifyd_peer *,
1236 : state->num_peers+1);
1237 : if (tmp == NULL) {
1238 : return NULL;
1239 : }
1240 : state->peers = tmp;
1241 :
1242 : p = talloc_zero(state->peers, struct notifyd_peer);
1243 : if (p == NULL) {
1244 : return NULL;
1245 : }
1246 : p->state = state;
1247 : p->pid = pid;
1248 :
1249 : state->peers[state->num_peers] = p;
1250 : state->num_peers += 1;
1251 :
1252 : talloc_set_destructor(p, notifyd_peer_destructor);
1253 :
1254 : return p;
1255 : }
1256 :
1257 : static void notifyd_apply_reclog(struct notifyd_peer *peer,
1258 : const uint8_t *msg, size_t msglen)
1259 : {
1260 : struct notifyd_state *state = peer->state;
1261 : DATA_BLOB blob = { .data = discard_const_p(uint8_t, msg),
1262 : .length = msglen };
1263 : struct server_id_buf idbuf;
1264 : struct messaging_reclog *log;
1265 : enum ndr_err_code ndr_err;
1266 : uint32_t i;
1267 :
1268 : if (peer->db == NULL) {
1269 : /*
1270 : * No db yet
1271 : */
1272 : return;
1273 : }
1274 :
1275 : log = talloc(peer, struct messaging_reclog);
1276 : if (log == NULL) {
1277 : DBG_DEBUG("talloc failed\n");
1278 : return;
1279 : }
1280 :
1281 : ndr_err = ndr_pull_struct_blob_all(
1282 : &blob, log, log,
1283 : (ndr_pull_flags_fn_t)ndr_pull_messaging_reclog);
1284 : if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
1285 : DBG_DEBUG("ndr_pull_messaging_reclog failed: %s\n",
1286 : ndr_errstr(ndr_err));
1287 : goto fail;
1288 : }
1289 :
1290 : DBG_DEBUG("Got %"PRIu32" recs index %"PRIu64" from %s\n",
1291 : log->num_recs,
1292 : log->rec_index,
1293 : server_id_str_buf(peer->pid, &idbuf));
1294 :
1295 : if (log->rec_index != peer->rec_index) {
1296 : DBG_INFO("Got rec index %"PRIu64" from %s, "
1297 : "expected %"PRIu64"\n",
1298 : log->rec_index,
1299 : server_id_str_buf(peer->pid, &idbuf),
1300 : peer->rec_index);
1301 : goto fail;
1302 : }
1303 :
1304 : for (i=0; i<log->num_recs; i++) {
1305 : struct messaging_rec *r = log->recs[i];
1306 : struct notify_rec_change_msg *chg;
1307 : size_t pathlen;
1308 : bool ok;
1309 : struct notify_instance instance;
1310 :
1311 : ok = notifyd_parse_rec_change(r->buf.data, r->buf.length,
1312 : &chg, &pathlen);
1313 : if (!ok) {
1314 : DBG_INFO("notifyd_parse_rec_change failed\n");
1315 : goto fail;
1316 : }
1317 :
1318 : /* avoid SIGBUS */
1319 : memcpy(&instance, &chg->instance, sizeof(instance));
1320 :
1321 : ok = notifyd_apply_rec_change(&r->src, chg->path, pathlen,
1322 : &instance, peer->db,
1323 : state->sys_notify_watch,
1324 : state->sys_notify_ctx,
1325 : state->msg_ctx);
1326 : if (!ok) {
1327 : DBG_INFO("notifyd_apply_rec_change failed\n");
1328 : goto fail;
1329 : }
1330 : }
1331 :
1332 : peer->rec_index += 1;
1333 : peer->last_broadcast = time(NULL);
1334 :
1335 : TALLOC_FREE(log);
1336 : return;
1337 :
1338 : fail:
1339 : DBG_DEBUG("Dropping peer %s\n",
1340 : server_id_str_buf(peer->pid, &idbuf));
1341 : TALLOC_FREE(peer);
1342 : }
1343 :
1344 : /*
1345 : * Receive messaging_reclog (log of MSG_SMB_NOTIFY_REC_CHANGE
1346 : * messages) broadcasts by other notifyds. Several cases:
1347 : *
1348 : * We don't know the source. This creates a new peer. Creating a peer
1349 : * involves asking the peer for its full database. We assume ordered
1350 : * messages, so the new database will arrive before the next broadcast
1351 : * will.
1352 : *
1353 : * We know the source and the log index matches. We will apply the log
1354 : * locally to our peer's db as if we had received it from a local
1355 : * client.
1356 : *
1357 : * We know the source but the log index does not match. This means we
1358 : * lost a message. We just drop the whole peer and wait for the next
1359 : * broadcast, which will then trigger a fresh database pull.
1360 : */
1361 :
1362 : static int notifyd_snoop_broadcast(struct tevent_context *ev,
1363 : uint32_t src_vnn, uint32_t dst_vnn,
1364 : uint64_t dst_srvid,
1365 : const uint8_t *msg, size_t msglen,
1366 : void *private_data)
1367 : {
1368 : struct notifyd_state *state = talloc_get_type_abort(
1369 : private_data, struct notifyd_state);
1370 : struct server_id my_id = messaging_server_id(state->msg_ctx);
1371 : struct notifyd_peer *p;
1372 : uint32_t i;
1373 : uint32_t msg_type;
1374 : struct server_id src, dst;
1375 : struct server_id_buf idbuf;
1376 : NTSTATUS status;
1377 :
1378 : if (msglen < MESSAGE_HDR_LENGTH) {
1379 : DBG_DEBUG("Got short broadcast\n");
1380 : return 0;
1381 : }
1382 : message_hdr_get(&msg_type, &src, &dst, msg);
1383 :
1384 : if (msg_type != MSG_SMB_NOTIFY_REC_CHANGES) {
1385 : DBG_DEBUG("Got message %"PRIu32", ignoring\n", msg_type);
1386 : return 0;
1387 : }
1388 : if (server_id_equal(&src, &my_id)) {
1389 : DBG_DEBUG("Ignoring my own broadcast\n");
1390 : return 0;
1391 : }
1392 :
1393 : DBG_DEBUG("Got MSG_SMB_NOTIFY_REC_CHANGES from %s\n",
1394 : server_id_str_buf(src, &idbuf));
1395 :
1396 : for (i=0; i<state->num_peers; i++) {
1397 : if (server_id_equal(&state->peers[i]->pid, &src)) {
1398 :
1399 : DBG_DEBUG("Applying changes to peer %"PRIu32"\n", i);
1400 :
1401 : notifyd_apply_reclog(state->peers[i],
1402 : msg + MESSAGE_HDR_LENGTH,
1403 : msglen - MESSAGE_HDR_LENGTH);
1404 : return 0;
1405 : }
1406 : }
1407 :
1408 : DBG_DEBUG("Creating new peer for %s\n",
1409 : server_id_str_buf(src, &idbuf));
1410 :
1411 : p = notifyd_peer_new(state, src);
1412 : if (p == NULL) {
1413 : DBG_DEBUG("notifyd_peer_new failed\n");
1414 : return 0;
1415 : }
1416 :
1417 : status = messaging_send_buf(state->msg_ctx, src, MSG_SMB_NOTIFY_GET_DB,
1418 : NULL, 0);
1419 : if (!NT_STATUS_IS_OK(status)) {
1420 : DBG_DEBUG("messaging_send_buf failed: %s\n",
1421 : nt_errstr(status));
1422 : TALLOC_FREE(p);
1423 : return 0;
1424 : }
1425 :
1426 : return 0;
1427 : }
1428 : #endif
|