LCOV - code coverage report
Current view: top level - source4/wrepl_server - wrepl_out_helpers.c (source / functions) Hit Total Coverage
Test: coverage report for master 2f515e9b Lines: 270 594 45.5 %
Date: 2024-04-21 15:09:00 Functions: 22 41 53.7 %

          Line data    Source code
       1             : /* 
       2             :    Unix SMB/CIFS implementation.
       3             :    
       4             :    WINS Replication server
       5             :    
       6             :    Copyright (C) Stefan Metzmacher      2005
       7             :    
       8             :    This program is free software; you can redistribute it and/or modify
       9             :    it under the terms of the GNU General Public License as published by
      10             :    the Free Software Foundation; either version 3 of the License, or
      11             :    (at your option) any later version.
      12             :    
      13             :    This program is distributed in the hope that it will be useful,
      14             :    but WITHOUT ANY WARRANTY; without even the implied warranty of
      15             :    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      16             :    GNU General Public License for more details.
      17             :    
      18             :    You should have received a copy of the GNU General Public License
      19             :    along with this program.  If not, see <http://www.gnu.org/licenses/>.
      20             : */
      21             : 
      22             : #include "includes.h"
      23             : #include "lib/events/events.h"
      24             : #include "lib/socket/socket.h"
      25             : #include "samba/service_task.h"
      26             : #include "samba/service_stream.h"
      27             : #include "librpc/gen_ndr/winsrepl.h"
      28             : #include "wrepl_server/wrepl_server.h"
      29             : #include "nbt_server/wins/winsdb.h"
      30             : #include "libcli/composite/composite.h"
      31             : #include "libcli/wrepl/winsrepl.h"
      32             : #include "libcli/resolve/resolve.h"
      33             : #include "param/param.h"
      34             : 
      35             : enum wreplsrv_out_connect_stage {
      36             :         WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET,
      37             :         WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX,
      38             :         WREPLSRV_OUT_CONNECT_STAGE_DONE
      39             : };
      40             : 
      41             : struct wreplsrv_out_connect_state {
      42             :         enum wreplsrv_out_connect_stage stage;
      43             :         struct composite_context *c;
      44             :         struct wrepl_associate assoc_io;
      45             :         enum winsrepl_partner_type type;
      46             :         struct wreplsrv_out_connection *wreplconn;
      47             :         struct tevent_req *subreq;
      48             : };
      49             : 
      50             : static void wreplsrv_out_connect_handler_treq(struct tevent_req *subreq);
      51             : 
      52           0 : static NTSTATUS wreplsrv_out_connect_wait_socket(struct wreplsrv_out_connect_state *state)
      53             : {
      54           0 :         NTSTATUS status;
      55             : 
      56           0 :         status = wrepl_connect_recv(state->subreq);
      57           0 :         TALLOC_FREE(state->subreq);
      58           0 :         NT_STATUS_NOT_OK_RETURN(status);
      59             : 
      60           0 :         state->subreq = wrepl_associate_send(state,
      61           0 :                                              state->wreplconn->service->task->event_ctx,
      62           0 :                                              state->wreplconn->sock, &state->assoc_io);
      63           0 :         NT_STATUS_HAVE_NO_MEMORY(state->subreq);
      64             : 
      65           0 :         tevent_req_set_callback(state->subreq,
      66             :                                 wreplsrv_out_connect_handler_treq,
      67             :                                 state);
      68             : 
      69           0 :         state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX;
      70             : 
      71           0 :         return NT_STATUS_OK;
      72             : }
      73             : 
      74           0 : static NTSTATUS wreplsrv_out_connect_wait_assoc_ctx(struct wreplsrv_out_connect_state *state)
      75             : {
      76           0 :         NTSTATUS status;
      77             : 
      78           0 :         status = wrepl_associate_recv(state->subreq, &state->assoc_io);
      79           0 :         TALLOC_FREE(state->subreq);
      80           0 :         NT_STATUS_NOT_OK_RETURN(status);
      81             : 
      82           0 :         state->wreplconn->assoc_ctx.peer_ctx = state->assoc_io.out.assoc_ctx;
      83           0 :         state->wreplconn->assoc_ctx.peer_major = state->assoc_io.out.major_version;
      84             : 
      85           0 :         if (state->type == WINSREPL_PARTNER_PUSH) {
      86           0 :                 if (state->wreplconn->assoc_ctx.peer_major >= 5) {
      87           0 :                         state->wreplconn->partner->push.wreplconn = state->wreplconn;
      88           0 :                         talloc_steal(state->wreplconn->partner, state->wreplconn);
      89             :                 } else {
      90           0 :                         state->type = WINSREPL_PARTNER_NONE;
      91             :                 }
      92           0 :         } else if (state->type == WINSREPL_PARTNER_PULL) {
      93           0 :                 state->wreplconn->partner->pull.wreplconn = state->wreplconn;
      94           0 :                 talloc_steal(state->wreplconn->partner, state->wreplconn);
      95             :         }
      96             : 
      97           0 :         state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
      98             : 
      99           0 :         return NT_STATUS_OK;
     100             : }
     101             : 
     102           0 : static void wreplsrv_out_connect_handler(struct wreplsrv_out_connect_state *state)
     103             : {
     104           0 :         struct composite_context *c = state->c;
     105             : 
     106           0 :         switch (state->stage) {
     107           0 :         case WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET:
     108           0 :                 c->status = wreplsrv_out_connect_wait_socket(state);
     109           0 :                 break;
     110           0 :         case WREPLSRV_OUT_CONNECT_STAGE_WAIT_ASSOC_CTX:
     111           0 :                 c->status = wreplsrv_out_connect_wait_assoc_ctx(state);
     112           0 :                 c->state  = COMPOSITE_STATE_DONE;
     113           0 :                 break;
     114           0 :         case WREPLSRV_OUT_CONNECT_STAGE_DONE:
     115           0 :                 c->status = NT_STATUS_INTERNAL_ERROR;
     116             :         }
     117             : 
     118           0 :         if (!NT_STATUS_IS_OK(c->status)) {
     119           0 :                 c->state = COMPOSITE_STATE_ERROR;
     120             :         }
     121             : 
     122           0 :         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
     123           0 :                 c->async.fn(c);
     124             :         }
     125           0 : }
     126             : 
     127           0 : static void wreplsrv_out_connect_handler_treq(struct tevent_req *subreq)
     128             : {
     129           0 :         struct wreplsrv_out_connect_state *state = tevent_req_callback_data(subreq,
     130             :                                                    struct wreplsrv_out_connect_state);
     131           0 :         wreplsrv_out_connect_handler(state);
     132           0 :         return;
     133             : }
     134             : 
     135         675 : static struct composite_context *wreplsrv_out_connect_send(struct wreplsrv_partner *partner,
     136             :                                                            enum winsrepl_partner_type type,
     137             :                                                            struct wreplsrv_out_connection *wreplconn)
     138             : {
     139         675 :         struct composite_context *c = NULL;
     140         675 :         struct wreplsrv_service *service = partner->service;
     141         675 :         struct wreplsrv_out_connect_state *state = NULL;
     142         675 :         struct wreplsrv_out_connection **wreplconnp = &wreplconn;
     143         675 :         bool cached_connection = false;
     144             : 
     145         675 :         c = talloc_zero(partner, struct composite_context);
     146         675 :         if (!c) goto failed;
     147             : 
     148         675 :         state = talloc_zero(c, struct wreplsrv_out_connect_state);
     149         675 :         if (!state) goto failed;
     150         675 :         state->c     = c;
     151         675 :         state->type  = type;
     152             : 
     153         675 :         c->state     = COMPOSITE_STATE_IN_PROGRESS;
     154         675 :         c->event_ctx = service->task->event_ctx;
     155         675 :         c->private_data      = state;
     156             : 
     157         675 :         if (type == WINSREPL_PARTNER_PUSH) {
     158           0 :                 cached_connection       = true;
     159           0 :                 wreplconn               = partner->push.wreplconn;
     160           0 :                 wreplconnp              = &partner->push.wreplconn;
     161         675 :         } else if (type == WINSREPL_PARTNER_PULL) {
     162           0 :                 cached_connection       = true;
     163           0 :                 wreplconn               = partner->pull.wreplconn;
     164           0 :                 wreplconnp              = &partner->pull.wreplconn;
     165             :         }
     166             : 
     167             :         /* we have a connection already, so use it */
     168         675 :         if (wreplconn) {
     169         675 :                 if (wrepl_socket_is_connected(wreplconn->sock)) {
     170         675 :                         state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
     171         675 :                         state->wreplconn= wreplconn;
     172         675 :                         composite_done(c);
     173         675 :                         return c;
     174           0 :                 } else if (!cached_connection) {
     175           0 :                         state->stage = WREPLSRV_OUT_CONNECT_STAGE_DONE;
     176           0 :                         state->wreplconn= NULL;
     177           0 :                         composite_done(c);
     178           0 :                         return c;
     179             :                 } else {
     180           0 :                         talloc_free(wreplconn);
     181           0 :                         *wreplconnp = NULL;
     182             :                 }
     183             :         }
     184             : 
     185           0 :         wreplconn = talloc_zero(state, struct wreplsrv_out_connection);
     186           0 :         if (!wreplconn) goto failed;
     187             : 
     188           0 :         wreplconn->service   = service;
     189           0 :         wreplconn->partner   = partner;
     190           0 :         wreplconn->sock              = wrepl_socket_init(wreplconn, service->task->event_ctx);
     191           0 :         if (!wreplconn->sock) goto failed;
     192             : 
     193           0 :         state->stage = WREPLSRV_OUT_CONNECT_STAGE_WAIT_SOCKET;
     194           0 :         state->wreplconn= wreplconn;
     195           0 :         state->subreq        = wrepl_connect_send(state,
     196           0 :                                              service->task->event_ctx,
     197           0 :                                              wreplconn->sock,
     198           0 :                                              partner->our_address?partner->our_address:wrepl_best_ip(service->task->lp_ctx, partner->address),
     199             :                                              partner->address);
     200           0 :         if (!state->subreq) goto failed;
     201             : 
     202           0 :         tevent_req_set_callback(state->subreq,
     203             :                                 wreplsrv_out_connect_handler_treq,
     204             :                                 state);
     205             : 
     206           0 :         return c;
     207           0 : failed:
     208           0 :         talloc_free(c);
     209           0 :         return NULL;
     210             : }
     211             : 
     212         675 : static NTSTATUS wreplsrv_out_connect_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
     213             :                                           struct wreplsrv_out_connection **wreplconn)
     214             : {
     215           0 :         NTSTATUS status;
     216             : 
     217         675 :         status = composite_wait(c);
     218             : 
     219         675 :         if (NT_STATUS_IS_OK(status)) {
     220         675 :                 struct wreplsrv_out_connect_state *state = talloc_get_type(c->private_data,
     221             :                                                            struct wreplsrv_out_connect_state);
     222         675 :                 if (state->wreplconn) {
     223         675 :                         *wreplconn = talloc_reference(mem_ctx, state->wreplconn);
     224         675 :                         if (!*wreplconn) status = NT_STATUS_NO_MEMORY;
     225             :                 } else {
     226           0 :                         status = NT_STATUS_CONNECTION_DISCONNECTED;
     227             :                 }
     228             :         }
     229             : 
     230         675 :         talloc_free(c);
     231         675 :         return status;
     232             :         
     233             : }
     234             : 
     235             : struct wreplsrv_pull_table_io {
     236             :         struct {
     237             :                 struct wreplsrv_partner *partner;
     238             :                 uint32_t num_owners;
     239             :                 struct wrepl_wins_owner *owners;
     240             :         } in;
     241             :         struct {
     242             :                 uint32_t num_owners;
     243             :                 struct wrepl_wins_owner *owners;
     244             :         } out;
     245             : };
     246             : 
     247             : enum wreplsrv_pull_table_stage {
     248             :         WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION,
     249             :         WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY,
     250             :         WREPLSRV_PULL_TABLE_STAGE_DONE
     251             : };
     252             : 
     253             : struct wreplsrv_pull_table_state {
     254             :         enum wreplsrv_pull_table_stage stage;
     255             :         struct composite_context *c;
     256             :         struct wrepl_pull_table table_io;
     257             :         struct wreplsrv_pull_table_io *io;
     258             :         struct composite_context *creq;
     259             :         struct wreplsrv_out_connection *wreplconn;
     260             :         struct tevent_req *subreq;
     261             : };
     262             : 
     263             : static void wreplsrv_pull_table_handler_treq(struct tevent_req *subreq);
     264             : 
     265           0 : static NTSTATUS wreplsrv_pull_table_wait_connection(struct wreplsrv_pull_table_state *state)
     266             : {
     267           0 :         NTSTATUS status;
     268             : 
     269           0 :         status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
     270           0 :         NT_STATUS_NOT_OK_RETURN(status);
     271             : 
     272           0 :         state->table_io.in.assoc_ctx = state->wreplconn->assoc_ctx.peer_ctx;
     273           0 :         state->subreq = wrepl_pull_table_send(state,
     274           0 :                                               state->wreplconn->service->task->event_ctx,
     275           0 :                                               state->wreplconn->sock, &state->table_io);
     276           0 :         NT_STATUS_HAVE_NO_MEMORY(state->subreq);
     277             : 
     278           0 :         tevent_req_set_callback(state->subreq,
     279             :                                 wreplsrv_pull_table_handler_treq,
     280             :                                 state);
     281             : 
     282           0 :         state->stage = WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY;
     283             : 
     284           0 :         return NT_STATUS_OK;
     285             : }
     286             : 
     287           0 : static NTSTATUS wreplsrv_pull_table_wait_table_reply(struct wreplsrv_pull_table_state *state)
     288             : {
     289           0 :         NTSTATUS status;
     290             : 
     291           0 :         status = wrepl_pull_table_recv(state->subreq, state, &state->table_io);
     292           0 :         TALLOC_FREE(state->subreq);
     293           0 :         NT_STATUS_NOT_OK_RETURN(status);
     294             : 
     295           0 :         state->stage = WREPLSRV_PULL_TABLE_STAGE_DONE;
     296             : 
     297           0 :         return NT_STATUS_OK;
     298             : }
     299             : 
     300           0 : static void wreplsrv_pull_table_handler(struct wreplsrv_pull_table_state *state)
     301             : {
     302           0 :         struct composite_context *c = state->c;
     303             : 
     304           0 :         switch (state->stage) {
     305           0 :         case WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION:
     306           0 :                 c->status = wreplsrv_pull_table_wait_connection(state);
     307           0 :                 break;
     308           0 :         case WREPLSRV_PULL_TABLE_STAGE_WAIT_TABLE_REPLY:
     309           0 :                 c->status = wreplsrv_pull_table_wait_table_reply(state);
     310           0 :                 c->state  = COMPOSITE_STATE_DONE;
     311           0 :                 break;
     312           0 :         case WREPLSRV_PULL_TABLE_STAGE_DONE:
     313           0 :                 c->status = NT_STATUS_INTERNAL_ERROR;
     314             :         }
     315             : 
     316           0 :         if (!NT_STATUS_IS_OK(c->status)) {
     317           0 :                 c->state = COMPOSITE_STATE_ERROR;
     318             :         }
     319             : 
     320           0 :         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
     321           0 :                 c->async.fn(c);
     322             :         }
     323           0 : }
     324             : 
     325           0 : static void wreplsrv_pull_table_handler_creq(struct composite_context *creq)
     326             : {
     327           0 :         struct wreplsrv_pull_table_state *state = talloc_get_type(creq->async.private_data,
     328             :                                                   struct wreplsrv_pull_table_state);
     329           0 :         wreplsrv_pull_table_handler(state);
     330           0 :         return;
     331             : }
     332             : 
     333           0 : static void wreplsrv_pull_table_handler_treq(struct tevent_req *subreq)
     334             : {
     335           0 :         struct wreplsrv_pull_table_state *state = tevent_req_callback_data(subreq,
     336             :                                                   struct wreplsrv_pull_table_state);
     337           0 :         wreplsrv_pull_table_handler(state);
     338           0 :         return;
     339             : }
     340             : 
     341         675 : static struct composite_context *wreplsrv_pull_table_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_table_io *io)
     342             : {
     343         675 :         struct composite_context *c = NULL;
     344         675 :         struct wreplsrv_service *service = io->in.partner->service;
     345         675 :         struct wreplsrv_pull_table_state *state = NULL;
     346             : 
     347         675 :         c = talloc_zero(mem_ctx, struct composite_context);
     348         675 :         if (!c) goto failed;
     349             : 
     350         675 :         state = talloc_zero(c, struct wreplsrv_pull_table_state);
     351         675 :         if (!state) goto failed;
     352         675 :         state->c     = c;
     353         675 :         state->io    = io;
     354             : 
     355         675 :         c->state     = COMPOSITE_STATE_IN_PROGRESS;
     356         675 :         c->event_ctx = service->task->event_ctx;
     357         675 :         c->private_data      = state;
     358             : 
     359         675 :         if (io->in.num_owners) {
     360           0 :                 struct wrepl_wins_owner *partners;
     361           0 :                 uint32_t i;
     362             : 
     363         675 :                 partners = talloc_array(state,
     364             :                                         struct wrepl_wins_owner,
     365             :                                         io->in.num_owners);
     366         675 :                 if (composite_nomem(partners, c)) goto failed;
     367             : 
     368        1350 :                 for (i=0; i < io->in.num_owners; i++) {
     369         675 :                         partners[i] = io->in.owners[i];
     370        1350 :                         partners[i].address = talloc_strdup(partners,
     371         675 :                                                 io->in.owners[i].address);
     372         675 :                         if (composite_nomem(partners[i].address, c)) goto failed;
     373             :                 }
     374             : 
     375         675 :                 state->table_io.out.num_partners     = io->in.num_owners;
     376         675 :                 state->table_io.out.partners         = partners;
     377         675 :                 state->stage                         = WREPLSRV_PULL_TABLE_STAGE_DONE;
     378         675 :                 composite_done(c);
     379         675 :                 return c;
     380             :         }
     381             : 
     382           0 :         state->stage    = WREPLSRV_PULL_TABLE_STAGE_WAIT_CONNECTION;
     383           0 :         state->creq  = wreplsrv_out_connect_send(io->in.partner, WINSREPL_PARTNER_PULL, NULL);
     384           0 :         if (!state->creq) goto failed;
     385             : 
     386           0 :         state->creq->async.fn             = wreplsrv_pull_table_handler_creq;
     387           0 :         state->creq->async.private_data   = state;
     388             : 
     389           0 :         return c;
     390           0 : failed:
     391           0 :         talloc_free(c);
     392           0 :         return NULL;
     393             : }
     394             : 
     395         675 : static NTSTATUS wreplsrv_pull_table_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
     396             :                                          struct wreplsrv_pull_table_io *io)
     397             : {
     398           0 :         NTSTATUS status;
     399             : 
     400         675 :         status = composite_wait(c);
     401             : 
     402         675 :         if (NT_STATUS_IS_OK(status)) {
     403         675 :                 struct wreplsrv_pull_table_state *state = talloc_get_type(c->private_data,
     404             :                                                           struct wreplsrv_pull_table_state);
     405         675 :                 io->out.num_owners   = state->table_io.out.num_partners;
     406         675 :                 io->out.owners               = talloc_move(mem_ctx, &state->table_io.out.partners);
     407             :         }
     408             : 
     409         675 :         talloc_free(c);
     410         675 :         return status;  
     411             : }
     412             : 
     413             : struct wreplsrv_pull_names_io {
     414             :         struct {
     415             :                 struct wreplsrv_partner *partner;
     416             :                 struct wreplsrv_out_connection *wreplconn;
     417             :                 struct wrepl_wins_owner owner;
     418             :         } in;
     419             :         struct {
     420             :                 uint32_t num_names;
     421             :                 struct wrepl_name *names;
     422             :         } out;
     423             : };
     424             : 
     425             : enum wreplsrv_pull_names_stage {
     426             :         WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION,
     427             :         WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY,
     428             :         WREPLSRV_PULL_NAMES_STAGE_DONE
     429             : };
     430             : 
     431             : struct wreplsrv_pull_names_state {
     432             :         enum wreplsrv_pull_names_stage stage;
     433             :         struct composite_context *c;
     434             :         struct wrepl_pull_names pull_io;
     435             :         struct wreplsrv_pull_names_io *io;
     436             :         struct composite_context *creq;
     437             :         struct wreplsrv_out_connection *wreplconn;
     438             :         struct tevent_req *subreq;
     439             : };
     440             : 
     441             : static void wreplsrv_pull_names_handler_treq(struct tevent_req *subreq);
     442             : 
     443         675 : static NTSTATUS wreplsrv_pull_names_wait_connection(struct wreplsrv_pull_names_state *state)
     444             : {
     445           0 :         NTSTATUS status;
     446             : 
     447         675 :         status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
     448         675 :         NT_STATUS_NOT_OK_RETURN(status);
     449             : 
     450         675 :         state->pull_io.in.assoc_ctx  = state->wreplconn->assoc_ctx.peer_ctx;
     451         675 :         state->pull_io.in.partner    = state->io->in.owner;
     452        1350 :         state->subreq = wrepl_pull_names_send(state,
     453         675 :                                               state->wreplconn->service->task->event_ctx,
     454         675 :                                               state->wreplconn->sock,
     455         675 :                                               &state->pull_io);
     456         675 :         NT_STATUS_HAVE_NO_MEMORY(state->subreq);
     457             : 
     458         675 :         tevent_req_set_callback(state->subreq,
     459             :                                 wreplsrv_pull_names_handler_treq,
     460             :                                 state);
     461             : 
     462         675 :         state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY;
     463             : 
     464         675 :         return NT_STATUS_OK;
     465             : }
     466             : 
     467         675 : static NTSTATUS wreplsrv_pull_names_wait_send_reply(struct wreplsrv_pull_names_state *state)
     468             : {
     469           0 :         NTSTATUS status;
     470             : 
     471         675 :         status = wrepl_pull_names_recv(state->subreq, state, &state->pull_io);
     472         675 :         TALLOC_FREE(state->subreq);
     473         675 :         NT_STATUS_NOT_OK_RETURN(status);
     474             : 
     475         675 :         state->stage = WREPLSRV_PULL_NAMES_STAGE_DONE;
     476             : 
     477         675 :         return NT_STATUS_OK;
     478             : }
     479             : 
     480        1350 : static void wreplsrv_pull_names_handler(struct wreplsrv_pull_names_state *state)
     481             : {
     482        1350 :         struct composite_context *c = state->c;
     483             : 
     484        1350 :         switch (state->stage) {
     485         675 :         case WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION:
     486         675 :                 c->status = wreplsrv_pull_names_wait_connection(state);
     487        1350 :                 break;
     488         675 :         case WREPLSRV_PULL_NAMES_STAGE_WAIT_SEND_REPLY:
     489         675 :                 c->status = wreplsrv_pull_names_wait_send_reply(state);
     490         675 :                 c->state  = COMPOSITE_STATE_DONE;
     491         675 :                 break;
     492           0 :         case WREPLSRV_PULL_NAMES_STAGE_DONE:
     493           0 :                 c->status = NT_STATUS_INTERNAL_ERROR;
     494             :         }
     495             : 
     496        1350 :         if (!NT_STATUS_IS_OK(c->status)) {
     497           0 :                 c->state = COMPOSITE_STATE_ERROR;
     498             :         }
     499             : 
     500        1350 :         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
     501         675 :                 c->async.fn(c);
     502             :         }
     503        1350 : }
     504             : 
     505         675 : static void wreplsrv_pull_names_handler_creq(struct composite_context *creq)
     506             : {
     507         675 :         struct wreplsrv_pull_names_state *state = talloc_get_type(creq->async.private_data,
     508             :                                                   struct wreplsrv_pull_names_state);
     509         675 :         wreplsrv_pull_names_handler(state);
     510         675 :         return;
     511             : }
     512             : 
     513         675 : static void wreplsrv_pull_names_handler_treq(struct tevent_req *subreq)
     514             : {
     515         675 :         struct wreplsrv_pull_names_state *state = tevent_req_callback_data(subreq,
     516             :                                                   struct wreplsrv_pull_names_state);
     517         675 :         wreplsrv_pull_names_handler(state);
     518         675 :         return;
     519             : }
     520             : 
     521         675 : static struct composite_context *wreplsrv_pull_names_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_names_io *io)
     522             : {
     523         675 :         struct composite_context *c = NULL;
     524         675 :         struct wreplsrv_service *service = io->in.partner->service;
     525         675 :         struct wreplsrv_pull_names_state *state = NULL;
     526         675 :         enum winsrepl_partner_type partner_type = WINSREPL_PARTNER_PULL;
     527             : 
     528         675 :         if (io->in.wreplconn) partner_type = WINSREPL_PARTNER_NONE;
     529             : 
     530         675 :         c = talloc_zero(mem_ctx, struct composite_context);
     531         675 :         if (!c) goto failed;
     532             : 
     533         675 :         state = talloc_zero(c, struct wreplsrv_pull_names_state);
     534         675 :         if (!state) goto failed;
     535         675 :         state->c     = c;
     536         675 :         state->io    = io;
     537             : 
     538         675 :         c->state     = COMPOSITE_STATE_IN_PROGRESS;
     539         675 :         c->event_ctx = service->task->event_ctx;
     540         675 :         c->private_data      = state;
     541             : 
     542         675 :         state->stage = WREPLSRV_PULL_NAMES_STAGE_WAIT_CONNECTION;
     543         675 :         state->creq  = wreplsrv_out_connect_send(io->in.partner, partner_type, io->in.wreplconn);
     544         675 :         if (!state->creq) goto failed;
     545             : 
     546         675 :         state->creq->async.fn             = wreplsrv_pull_names_handler_creq;
     547         675 :         state->creq->async.private_data   = state;
     548             : 
     549         675 :         return c;
     550           0 : failed:
     551           0 :         talloc_free(c);
     552           0 :         return NULL;
     553             : }
     554             : 
     555         675 : static NTSTATUS wreplsrv_pull_names_recv(struct composite_context *c, TALLOC_CTX *mem_ctx,
     556             :                                          struct wreplsrv_pull_names_io *io)
     557             : {
     558           0 :         NTSTATUS status;
     559             : 
     560         675 :         status = composite_wait(c);
     561             : 
     562         675 :         if (NT_STATUS_IS_OK(status)) {
     563         675 :                 struct wreplsrv_pull_names_state *state = talloc_get_type(c->private_data,
     564             :                                                           struct wreplsrv_pull_names_state);
     565         675 :                 io->out.num_names    = state->pull_io.out.num_names;
     566         675 :                 io->out.names                = talloc_move(mem_ctx, &state->pull_io.out.names);
     567             :         }
     568             : 
     569         675 :         talloc_free(c);
     570         675 :         return status;
     571             :         
     572             : }
     573             : 
     574             : enum wreplsrv_pull_cycle_stage {
     575             :         WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY,
     576             :         WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES,
     577             :         WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC,
     578             :         WREPLSRV_PULL_CYCLE_STAGE_DONE
     579             : };
     580             : 
     581             : struct wreplsrv_pull_cycle_state {
     582             :         enum wreplsrv_pull_cycle_stage stage;
     583             :         struct composite_context *c;
     584             :         struct wreplsrv_pull_cycle_io *io;
     585             :         struct wreplsrv_pull_table_io table_io;
     586             :         uint32_t current;
     587             :         struct wreplsrv_pull_names_io names_io;
     588             :         struct composite_context *creq;
     589             :         struct wrepl_associate_stop assoc_stop_io;
     590             :         struct tevent_req *subreq;
     591             : };
     592             : 
     593             : static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq);
     594             : static void wreplsrv_pull_cycle_handler_treq(struct tevent_req *subreq);
     595             : 
     596        1350 : static NTSTATUS wreplsrv_pull_cycle_next_owner_do_work(struct wreplsrv_pull_cycle_state *state)
     597             : {
     598        1350 :         struct wreplsrv_owner *current_owner=NULL;
     599           0 :         struct wreplsrv_owner *local_owner;
     600           0 :         uint32_t i;
     601        1350 :         uint64_t old_max_version = 0;
     602        1350 :         bool do_pull = false;
     603             : 
     604        2025 :         for (i=state->current; i < state->table_io.out.num_owners; i++) {
     605        1350 :                 current_owner = wreplsrv_find_owner(state->io->in.partner->service,
     606        1350 :                                                     state->io->in.partner->pull.table,
     607        1350 :                                                     state->table_io.out.owners[i].address);
     608             : 
     609        1350 :                 local_owner = wreplsrv_find_owner(state->io->in.partner->service,
     610        1350 :                                                   state->io->in.partner->service->table,
     611        1350 :                                                   state->table_io.out.owners[i].address);
     612             :                 /*
     613             :                  * this means we are ourself the current owner,
     614             :                  * and we don't want replicate ourself
     615             :                  */
     616        1350 :                 if (!current_owner) continue;
     617             : 
     618             :                 /*
     619             :                  * this means we don't have any records of this owner
     620             :                  * so fetch them
     621             :                  */
     622        1350 :                 if (!local_owner) {
     623           3 :                         do_pull         = true;
     624             :                         
     625           3 :                         break;
     626             :                 }
     627             : 
     628             :                 /*
     629             :                  * this means the remote partner has some new records of this owner
     630             :                  * fetch them
     631             :                  */
     632        1347 :                 if (current_owner->owner.max_version > local_owner->owner.max_version) {
     633         672 :                         do_pull         = true;
     634         672 :                         old_max_version = local_owner->owner.max_version;
     635         672 :                         break;
     636             :                 }
     637             :         }
     638        1350 :         state->current = i;
     639             : 
     640        1350 :         if (do_pull) {
     641         675 :                 state->names_io.in.partner           = state->io->in.partner;
     642         675 :                 state->names_io.in.wreplconn         = state->io->in.wreplconn;
     643         675 :                 state->names_io.in.owner             = current_owner->owner;
     644         675 :                 state->names_io.in.owner.min_version = old_max_version + 1;
     645         675 :                 state->creq = wreplsrv_pull_names_send(state, &state->names_io);
     646         675 :                 NT_STATUS_HAVE_NO_MEMORY(state->creq);
     647             : 
     648         675 :                 state->creq->async.fn             = wreplsrv_pull_cycle_handler_creq;
     649         675 :                 state->creq->async.private_data   = state;
     650             : 
     651         675 :                 return STATUS_MORE_ENTRIES;
     652             :         }
     653             : 
     654         675 :         return NT_STATUS_OK;
     655             : }
     656             : 
     657        1350 : static NTSTATUS wreplsrv_pull_cycle_next_owner_wrapper(struct wreplsrv_pull_cycle_state *state)
     658             : {
     659           0 :         NTSTATUS status;
     660             : 
     661        1350 :         status = wreplsrv_pull_cycle_next_owner_do_work(state);
     662        1350 :         if (NT_STATUS_IS_OK(status)) {
     663         675 :                 state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
     664         675 :         } else if (NT_STATUS_EQUAL(STATUS_MORE_ENTRIES, status)) {
     665         675 :                 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES;
     666         675 :                 status = NT_STATUS_OK;
     667             :         }
     668             : 
     669        1350 :         if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE && state->io->in.wreplconn) {
     670         675 :                 state->assoc_stop_io.in.assoc_ctx    = state->io->in.wreplconn->assoc_ctx.peer_ctx;
     671         675 :                 state->assoc_stop_io.in.reason               = 0;
     672        1350 :                 state->subreq = wrepl_associate_stop_send(state,
     673         675 :                                                           state->io->in.wreplconn->service->task->event_ctx,
     674         675 :                                                           state->io->in.wreplconn->sock,
     675         675 :                                                           &state->assoc_stop_io);
     676         675 :                 NT_STATUS_HAVE_NO_MEMORY(state->subreq);
     677             : 
     678         675 :                 tevent_req_set_callback(state->subreq,
     679             :                                         wreplsrv_pull_cycle_handler_treq,
     680             :                                         state);
     681             : 
     682         675 :                 state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC;
     683             :         }
     684             : 
     685        1350 :         return status;
     686             : }
     687             : 
     688         675 : static NTSTATUS wreplsrv_pull_cycle_wait_table_reply(struct wreplsrv_pull_cycle_state *state)
     689             : {
     690           0 :         NTSTATUS status;
     691           0 :         uint32_t i;
     692             : 
     693         675 :         status = wreplsrv_pull_table_recv(state->creq, state, &state->table_io);
     694         675 :         NT_STATUS_NOT_OK_RETURN(status);
     695             : 
     696             :         /* update partner table */
     697        1350 :         for (i=0; i < state->table_io.out.num_owners; i++) {
     698         675 :                 status = wreplsrv_add_table(state->io->in.partner->service,
     699         675 :                                             state->io->in.partner, 
     700         675 :                                             &state->io->in.partner->pull.table,
     701         675 :                                             state->table_io.out.owners[i].address,
     702         675 :                                             state->table_io.out.owners[i].max_version);
     703         675 :                 NT_STATUS_NOT_OK_RETURN(status);
     704             :         }
     705             : 
     706         675 :         status = wreplsrv_pull_cycle_next_owner_wrapper(state);
     707         675 :         NT_STATUS_NOT_OK_RETURN(status);
     708             : 
     709         675 :         return status;
     710             : }
     711             : 
     712         675 : static NTSTATUS wreplsrv_pull_cycle_apply_records(struct wreplsrv_pull_cycle_state *state)
     713             : {
     714           0 :         NTSTATUS status;
     715             : 
     716         675 :         status = wreplsrv_apply_records(state->io->in.partner,
     717             :                                         &state->names_io.in.owner,
     718             :                                         state->names_io.out.num_names,
     719             :                                         state->names_io.out.names);
     720         675 :         NT_STATUS_NOT_OK_RETURN(status);
     721             : 
     722         675 :         talloc_free(state->names_io.out.names);
     723         675 :         ZERO_STRUCT(state->names_io);
     724             : 
     725         675 :         return NT_STATUS_OK;
     726             : }
     727             : 
     728         675 : static NTSTATUS wreplsrv_pull_cycle_wait_send_replies(struct wreplsrv_pull_cycle_state *state)
     729             : {
     730           0 :         NTSTATUS status;
     731             : 
     732         675 :         status = wreplsrv_pull_names_recv(state->creq, state, &state->names_io);
     733         675 :         NT_STATUS_NOT_OK_RETURN(status);
     734             : 
     735             :         /*
     736             :          * TODO: this should maybe an async call,
     737             :          *       because we may need some network access
     738             :          *       for conflict resolving
     739             :          */
     740         675 :         status = wreplsrv_pull_cycle_apply_records(state);
     741         675 :         NT_STATUS_NOT_OK_RETURN(status);
     742             : 
     743         675 :         status = wreplsrv_pull_cycle_next_owner_wrapper(state);
     744         675 :         NT_STATUS_NOT_OK_RETURN(status);
     745             : 
     746         675 :         return status;
     747             : }
     748             : 
     749         675 : static NTSTATUS wreplsrv_pull_cycle_wait_stop_assoc(struct wreplsrv_pull_cycle_state *state)
     750             : {
     751           0 :         NTSTATUS status;
     752             : 
     753         675 :         status = wrepl_associate_stop_recv(state->subreq, &state->assoc_stop_io);
     754         675 :         TALLOC_FREE(state->subreq);
     755         675 :         NT_STATUS_NOT_OK_RETURN(status);
     756             : 
     757         675 :         state->stage = WREPLSRV_PULL_CYCLE_STAGE_DONE;
     758             : 
     759         675 :         return status;
     760             : }
     761             : 
     762        2025 : static void wreplsrv_pull_cycle_handler(struct wreplsrv_pull_cycle_state *state)
     763             : {
     764        2025 :         struct composite_context *c = state->c;
     765             : 
     766        2025 :         switch (state->stage) {
     767         675 :         case WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY:
     768         675 :                 c->status = wreplsrv_pull_cycle_wait_table_reply(state);
     769        2025 :                 break;
     770         675 :         case WREPLSRV_PULL_CYCLE_STAGE_WAIT_SEND_REPLIES:
     771         675 :                 c->status = wreplsrv_pull_cycle_wait_send_replies(state);
     772         675 :                 break;
     773         675 :         case WREPLSRV_PULL_CYCLE_STAGE_WAIT_STOP_ASSOC:
     774         675 :                 c->status = wreplsrv_pull_cycle_wait_stop_assoc(state);
     775         675 :                 break;
     776           0 :         case WREPLSRV_PULL_CYCLE_STAGE_DONE:
     777           0 :                 c->status = NT_STATUS_INTERNAL_ERROR;
     778             :         }
     779             : 
     780        2025 :         if (state->stage == WREPLSRV_PULL_CYCLE_STAGE_DONE) {
     781         675 :                 c->state  = COMPOSITE_STATE_DONE;
     782             :         }
     783             : 
     784        2025 :         if (!NT_STATUS_IS_OK(c->status)) {
     785           0 :                 c->state = COMPOSITE_STATE_ERROR;
     786             :         }
     787             : 
     788        2025 :         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
     789         675 :                 c->async.fn(c);
     790             :         }
     791        2025 : }
     792             : 
     793        1350 : static void wreplsrv_pull_cycle_handler_creq(struct composite_context *creq)
     794             : {
     795        1350 :         struct wreplsrv_pull_cycle_state *state = talloc_get_type(creq->async.private_data,
     796             :                                                   struct wreplsrv_pull_cycle_state);
     797        1350 :         wreplsrv_pull_cycle_handler(state);
     798        1350 :         return;
     799             : }
     800             : 
     801         675 : static void wreplsrv_pull_cycle_handler_treq(struct tevent_req *subreq)
     802             : {
     803         675 :         struct wreplsrv_pull_cycle_state *state = tevent_req_callback_data(subreq,
     804             :                                                   struct wreplsrv_pull_cycle_state);
     805         675 :         wreplsrv_pull_cycle_handler(state);
     806         675 :         return;
     807             : }
     808             : 
     809         675 : struct composite_context *wreplsrv_pull_cycle_send(TALLOC_CTX *mem_ctx, struct wreplsrv_pull_cycle_io *io)
     810             : {
     811         675 :         struct composite_context *c = NULL;
     812         675 :         struct wreplsrv_service *service = io->in.partner->service;
     813         675 :         struct wreplsrv_pull_cycle_state *state = NULL;
     814             : 
     815         675 :         c = talloc_zero(mem_ctx, struct composite_context);
     816         675 :         if (!c) goto failed;
     817             : 
     818         675 :         state = talloc_zero(c, struct wreplsrv_pull_cycle_state);
     819         675 :         if (!state) goto failed;
     820         675 :         state->c     = c;
     821         675 :         state->io    = io;
     822             : 
     823         675 :         c->state     = COMPOSITE_STATE_IN_PROGRESS;
     824         675 :         c->event_ctx = service->task->event_ctx;
     825         675 :         c->private_data      = state;
     826             : 
     827         675 :         state->stage = WREPLSRV_PULL_CYCLE_STAGE_WAIT_TABLE_REPLY;
     828         675 :         state->table_io.in.partner   = io->in.partner;
     829         675 :         state->table_io.in.num_owners        = io->in.num_owners;
     830         675 :         state->table_io.in.owners    = io->in.owners;
     831         675 :         state->creq = wreplsrv_pull_table_send(state, &state->table_io);
     832         675 :         if (!state->creq) goto failed;
     833             : 
     834         675 :         state->creq->async.fn             = wreplsrv_pull_cycle_handler_creq;
     835         675 :         state->creq->async.private_data   = state;
     836             : 
     837         675 :         return c;
     838           0 : failed:
     839           0 :         talloc_free(c);
     840           0 :         return NULL;
     841             : }
     842             : 
     843         675 : NTSTATUS wreplsrv_pull_cycle_recv(struct composite_context *c)
     844             : {
     845           0 :         NTSTATUS status;
     846             : 
     847         675 :         status = composite_wait(c);
     848             : 
     849         675 :         talloc_free(c);
     850         675 :         return status;
     851             : }
     852             : 
     853             : enum wreplsrv_push_notify_stage {
     854             :         WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT,
     855             :         WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE,
     856             :         WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM,
     857             :         WREPLSRV_PUSH_NOTIFY_STAGE_DONE
     858             : };
     859             : 
     860             : struct wreplsrv_push_notify_state {
     861             :         enum wreplsrv_push_notify_stage stage;
     862             :         struct composite_context *c;
     863             :         struct wreplsrv_push_notify_io *io;
     864             :         enum wrepl_replication_cmd command;
     865             :         bool full_table;
     866             :         struct wrepl_send_ctrl ctrl;
     867             :         struct wrepl_packet req_packet;
     868             :         struct wrepl_packet *rep_packet;
     869             :         struct composite_context *creq;
     870             :         struct wreplsrv_out_connection *wreplconn;
     871             :         struct tevent_req *subreq;
     872             : };
     873             : 
     874             : static void wreplsrv_push_notify_handler_creq(struct composite_context *creq);
     875             : static void wreplsrv_push_notify_handler_treq(struct tevent_req *subreq);
     876             : 
     877           0 : static NTSTATUS wreplsrv_push_notify_update(struct wreplsrv_push_notify_state *state)
     878             : {
     879           0 :         struct wreplsrv_service *service = state->io->in.partner->service;
     880           0 :         struct wrepl_packet *req = &state->req_packet;
     881           0 :         struct wrepl_replication *repl_out = &state->req_packet.message.replication;
     882           0 :         struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
     883           0 :         NTSTATUS status;
     884             : 
     885             :         /* prepare the outgoing request */
     886           0 :         req->opcode  = WREPL_OPCODE_BITS;
     887           0 :         req->assoc_ctx       = state->wreplconn->assoc_ctx.peer_ctx;
     888           0 :         req->mess_type       = WREPL_REPLICATION;
     889             : 
     890           0 :         repl_out->command = state->command;
     891             : 
     892           0 :         status = wreplsrv_fill_wrepl_table(service, state, table_out,
     893           0 :                                            service->wins_db->local_owner, state->full_table);
     894           0 :         NT_STATUS_NOT_OK_RETURN(status);
     895             : 
     896             :         /* queue the request */
     897           0 :         state->subreq = wrepl_request_send(state,
     898           0 :                                            state->wreplconn->service->task->event_ctx,
     899           0 :                                            state->wreplconn->sock, req, NULL);
     900           0 :         NT_STATUS_HAVE_NO_MEMORY(state->subreq);
     901             : 
     902           0 :         tevent_req_set_callback(state->subreq,
     903             :                                 wreplsrv_push_notify_handler_treq,
     904             :                                 state);
     905             : 
     906           0 :         state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE;
     907             : 
     908           0 :         return NT_STATUS_OK;
     909             : }
     910             : 
     911           0 : static NTSTATUS wreplsrv_push_notify_inform(struct wreplsrv_push_notify_state *state)
     912             : {
     913           0 :         struct wreplsrv_service *service = state->io->in.partner->service;
     914           0 :         struct wrepl_packet *req = &state->req_packet;
     915           0 :         struct wrepl_replication *repl_out = &state->req_packet.message.replication;
     916           0 :         struct wrepl_table *table_out = &state->req_packet.message.replication.info.table;
     917           0 :         NTSTATUS status;
     918             : 
     919           0 :         req->opcode  = WREPL_OPCODE_BITS;
     920           0 :         req->assoc_ctx       = state->wreplconn->assoc_ctx.peer_ctx;
     921           0 :         req->mess_type       = WREPL_REPLICATION;
     922             : 
     923           0 :         repl_out->command = state->command;
     924             : 
     925           0 :         status = wreplsrv_fill_wrepl_table(service, state, table_out,
     926           0 :                                            service->wins_db->local_owner, state->full_table);
     927           0 :         NT_STATUS_NOT_OK_RETURN(status);
     928             : 
     929             :         /* we won't get a reply to a inform message */
     930           0 :         state->ctrl.send_only                = true;
     931             : 
     932           0 :         state->subreq = wrepl_request_send(state,
     933           0 :                                            state->wreplconn->service->task->event_ctx,
     934           0 :                                            state->wreplconn->sock, req, &state->ctrl);
     935           0 :         NT_STATUS_HAVE_NO_MEMORY(state->subreq);
     936             : 
     937           0 :         tevent_req_set_callback(state->subreq,
     938             :                                 wreplsrv_push_notify_handler_treq,
     939             :                                 state);
     940             : 
     941           0 :         state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM;
     942             : 
     943           0 :         return NT_STATUS_OK;
     944             : }
     945             : 
     946           0 : static NTSTATUS wreplsrv_push_notify_wait_connect(struct wreplsrv_push_notify_state *state)
     947             : {
     948           0 :         NTSTATUS status;
     949             : 
     950           0 :         status = wreplsrv_out_connect_recv(state->creq, state, &state->wreplconn);
     951           0 :         NT_STATUS_NOT_OK_RETURN(status);
     952             : 
     953             :         /* is the peer doesn't support inform fallback to update */
     954           0 :         switch (state->command) {
     955           0 :         case WREPL_REPL_INFORM:
     956           0 :                 if (state->wreplconn->assoc_ctx.peer_major < 5) {
     957           0 :                         state->command = WREPL_REPL_UPDATE;
     958             :                 }
     959           0 :                 break;
     960           0 :         case WREPL_REPL_INFORM2:
     961           0 :                 if (state->wreplconn->assoc_ctx.peer_major < 5) {
     962           0 :                         state->command = WREPL_REPL_UPDATE2;
     963             :                 }
     964           0 :                 break;
     965           0 :         default:
     966           0 :                 break;
     967             :         }
     968             : 
     969           0 :         switch (state->command) {
     970           0 :         case WREPL_REPL_UPDATE:
     971           0 :                 state->full_table = true;
     972           0 :                 return wreplsrv_push_notify_update(state);
     973           0 :         case WREPL_REPL_UPDATE2:
     974           0 :                 state->full_table = false;
     975           0 :                 return wreplsrv_push_notify_update(state);
     976           0 :         case WREPL_REPL_INFORM:
     977           0 :                 state->full_table = true;
     978           0 :                 return wreplsrv_push_notify_inform(state);
     979           0 :         case WREPL_REPL_INFORM2:
     980           0 :                 state->full_table = false;
     981           0 :                 return wreplsrv_push_notify_inform(state);
     982           0 :         default:
     983           0 :                 return NT_STATUS_INTERNAL_ERROR;
     984             :         }
     985             : }
     986             : 
     987           0 : static NTSTATUS wreplsrv_push_notify_wait_update(struct wreplsrv_push_notify_state *state)
     988             : {
     989           0 :         struct wreplsrv_in_connection *wrepl_in;
     990           0 :         struct tstream_context *stream;
     991           0 :         void *process_context = NULL;
     992           0 :         NTSTATUS status;
     993             : 
     994           0 :         status = wrepl_request_recv(state->subreq, state, NULL);
     995           0 :         TALLOC_FREE(state->subreq);
     996           0 :         NT_STATUS_NOT_OK_RETURN(status);
     997             : 
     998             :         /*
     999             :          * now we need to convert the wrepl_socket (client connection)
    1000             :          * into a wreplsrv_in_connection (server connection), because
    1001             :          * we'll act as a server on this connection after the WREPL_REPL_UPDATE*
    1002             :          * message is received by the peer.
    1003             :          */
    1004             : 
    1005           0 :         status = wrepl_socket_split_stream(state->wreplconn->sock, state, &stream);
    1006           0 :         NT_STATUS_NOT_OK_RETURN(status);
    1007             : 
    1008             :         /*
    1009             :          * now create a wreplsrv_in_connection,
    1010             :          * on which we act as server
    1011             :          *
    1012             :          * NOTE: stream will be stolen by
    1013             :          *       wreplsrv_in_connection_merge()
    1014             :          */
    1015           0 :         process_context = state->io->in.partner->service->task->process_context;
    1016           0 :         status = wreplsrv_in_connection_merge(state->io->in.partner,
    1017           0 :                                               state->wreplconn->assoc_ctx.peer_ctx,
    1018             :                                               &stream,
    1019             :                                               &wrepl_in, process_context);
    1020           0 :         NT_STATUS_NOT_OK_RETURN(status);
    1021             : 
    1022             :         /* now we can free the wreplsrv_out_connection */
    1023           0 :         TALLOC_FREE(state->wreplconn);
    1024             : 
    1025           0 :         state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
    1026           0 :         return NT_STATUS_OK;
    1027             : }
    1028             : 
    1029           0 : static NTSTATUS wreplsrv_push_notify_wait_inform(struct wreplsrv_push_notify_state *state)
    1030             : {
    1031           0 :         NTSTATUS status;
    1032             : 
    1033           0 :         status = wrepl_request_recv(state->subreq, state, NULL);
    1034           0 :         TALLOC_FREE(state->subreq);
    1035           0 :         NT_STATUS_NOT_OK_RETURN(status);
    1036             : 
    1037           0 :         state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_DONE;
    1038           0 :         return status;
    1039             : }
    1040             : 
    1041           0 : static void wreplsrv_push_notify_handler(struct wreplsrv_push_notify_state *state)
    1042             : {
    1043           0 :         struct composite_context *c = state->c;
    1044             : 
    1045           0 :         switch (state->stage) {
    1046           0 :         case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT:
    1047           0 :                 c->status = wreplsrv_push_notify_wait_connect(state);
    1048           0 :                 break;
    1049           0 :         case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_UPDATE:
    1050           0 :                 c->status = wreplsrv_push_notify_wait_update(state);
    1051           0 :                 break;
    1052           0 :         case WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_INFORM:
    1053           0 :                 c->status = wreplsrv_push_notify_wait_inform(state);
    1054           0 :                 break;
    1055           0 :         case WREPLSRV_PUSH_NOTIFY_STAGE_DONE:
    1056           0 :                 c->status = NT_STATUS_INTERNAL_ERROR;
    1057             :         }
    1058             : 
    1059           0 :         if (state->stage == WREPLSRV_PUSH_NOTIFY_STAGE_DONE) {
    1060           0 :                 c->state  = COMPOSITE_STATE_DONE;
    1061             :         }
    1062             : 
    1063           0 :         if (!NT_STATUS_IS_OK(c->status)) {
    1064           0 :                 c->state = COMPOSITE_STATE_ERROR;
    1065             :         }
    1066             : 
    1067           0 :         if (c->state >= COMPOSITE_STATE_DONE && c->async.fn) {
    1068           0 :                 c->async.fn(c);
    1069             :         }
    1070           0 : }
    1071             : 
    1072           0 : static void wreplsrv_push_notify_handler_creq(struct composite_context *creq)
    1073             : {
    1074           0 :         struct wreplsrv_push_notify_state *state = talloc_get_type(creq->async.private_data,
    1075             :                                                    struct wreplsrv_push_notify_state);
    1076           0 :         wreplsrv_push_notify_handler(state);
    1077           0 :         return;
    1078             : }
    1079             : 
    1080           0 : static void wreplsrv_push_notify_handler_treq(struct tevent_req *subreq)
    1081             : {
    1082           0 :         struct wreplsrv_push_notify_state *state = tevent_req_callback_data(subreq,
    1083             :                                                    struct wreplsrv_push_notify_state);
    1084           0 :         wreplsrv_push_notify_handler(state);
    1085           0 :         return;
    1086             : }
    1087             : 
    1088           0 : struct composite_context *wreplsrv_push_notify_send(TALLOC_CTX *mem_ctx, struct wreplsrv_push_notify_io *io)
    1089             : {
    1090           0 :         struct composite_context *c = NULL;
    1091           0 :         struct wreplsrv_service *service = io->in.partner->service;
    1092           0 :         struct wreplsrv_push_notify_state *state = NULL;
    1093           0 :         enum winsrepl_partner_type partner_type;
    1094             : 
    1095           0 :         c = talloc_zero(mem_ctx, struct composite_context);
    1096           0 :         if (!c) goto failed;
    1097             : 
    1098           0 :         state = talloc_zero(c, struct wreplsrv_push_notify_state);
    1099           0 :         if (!state) goto failed;
    1100           0 :         state->c     = c;
    1101           0 :         state->io    = io;
    1102             : 
    1103           0 :         if (io->in.inform) {
    1104             :                 /* we can cache the connection in partner->push->wreplconn */
    1105           0 :                 partner_type = WINSREPL_PARTNER_PUSH;
    1106           0 :                 if (io->in.propagate) {
    1107           0 :                         state->command       = WREPL_REPL_INFORM2;
    1108             :                 } else {
    1109           0 :                         state->command       = WREPL_REPL_INFORM;
    1110             :                 }
    1111             :         } else {
    1112             :                 /* we can NOT cache the connection */
    1113           0 :                 partner_type = WINSREPL_PARTNER_NONE;
    1114           0 :                 if (io->in.propagate) {
    1115           0 :                         state->command       = WREPL_REPL_UPDATE2;
    1116             :                 } else {
    1117           0 :                         state->command       = WREPL_REPL_UPDATE;
    1118             :                 }       
    1119             :         }
    1120             : 
    1121           0 :         c->state     = COMPOSITE_STATE_IN_PROGRESS;
    1122           0 :         c->event_ctx = service->task->event_ctx;
    1123           0 :         c->private_data      = state;
    1124             : 
    1125           0 :         state->stage = WREPLSRV_PUSH_NOTIFY_STAGE_WAIT_CONNECT;
    1126           0 :         state->creq  = wreplsrv_out_connect_send(io->in.partner, partner_type, NULL);
    1127           0 :         if (!state->creq) goto failed;
    1128             : 
    1129           0 :         state->creq->async.fn             = wreplsrv_push_notify_handler_creq;
    1130           0 :         state->creq->async.private_data   = state;
    1131             : 
    1132           0 :         return c;
    1133           0 : failed:
    1134           0 :         talloc_free(c);
    1135           0 :         return NULL;
    1136             : }
    1137             : 
    1138           0 : NTSTATUS wreplsrv_push_notify_recv(struct composite_context *c)
    1139             : {
    1140           0 :         NTSTATUS status;
    1141             : 
    1142           0 :         status = composite_wait(c);
    1143             : 
    1144           0 :         talloc_free(c);
    1145           0 :         return status;
    1146             : }

Generated by: LCOV version 1.14