-
Notifications
You must be signed in to change notification settings - Fork 94
/
Copy pathmaint_sync.T
89 lines (82 loc) · 1.87 KB
/
maint_sync.T
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// -*-c++-*-
// vim: filetype=cpp foldmethod=marker
#include <arpc.h>
#include <comm.h>
#include <merkle.h>
#include <merkle_tree_disk.h>
#include "maint_policy.h"
static void
doRPCer (ptr<aclnt> c, RPC_delay_args *args)
{
assert ((args->prog.progno == c->rp.progno) &&
(args->prog.versno == c->rp.versno));
c->call (args->procno, args->in, args->out, args->cb);
}
const rpc_program &
merkle_sync::sync_program ()
{
return merklesync_program_1;
}
ref<syncer>
merkle_sync::produce_syncer (dhash_ctype c)
{
return New refcounted<merkle_sync> (c);
}
TAMED void
merkle_sync::sync_with (ptr<locationcc> who,
chordID rngmin, chordID rngmax,
ptr<merkle_tree> localtree,
missingfnc_t missing,
cbv cb)
{
VARS {
ptr<aclnt> client (NULL);
ptr<merkle_syncer> msyncer (NULL);
int err (0);
}
BLOCK {
who->get_stream_aclnt (merklesync_program_1, @(client));
}
if (client) {
BLOCK {
msyncer = New refcounted<merkle_syncer> (
who->vnode (), ctype,
localtree,
wrap (&doRPCer, client),
missing);
msyncer->sync (rngmin, rngmax, @(err));
}
}
// Ignore !client and any syncer err; we'll retry later.
cb ();
}
void
merkle_sync::dispatch (ptr<merkle_tree> ltree, svccb *sbp)
{
if (!sbp)
return;
if (sbp->prog () != merklesync_program_1.progno) {
sbp->reject (PROG_UNAVAIL);
}
switch (sbp->proc ()) {
case MERKLESYNC_SENDNODE:
{
sendnode_arg *arg = sbp->Xtmpl getarg<sendnode_arg> ();
sendnode_res res (MERKLE_OK);
merkle_server::handle_send_node (ltree, arg, &res);
sbp->reply (&res);
}
break;
case MERKLESYNC_GETKEYS:
{
getkeys_arg *arg = sbp->Xtmpl getarg<getkeys_arg> ();
getkeys_res res (MERKLE_OK);
merkle_server::handle_get_keys (ltree, arg, &res);
sbp->reply (&res);
}
break;
default:
sbp->reject (PROC_UNAVAIL);
break;
}
}