-
Notifications
You must be signed in to change notification settings - Fork 94
/
Copy pathmaint_global.T
131 lines (116 loc) · 3.55 KB
/
maint_global.T
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// -*-c++-*-
// vim: filetype=cpp foldmethod=marker
//
// TODO:
// Remove keys that are successfully repaired.
// Remove keys that the remote side already has (would handle above case).
// - We want to have a list of objects to save... those are the
// ones that are going to be repaired. I want something like SQS.
#include <arpc.h>
#include <misc_utils.h>
#include <id_utils.h>
#include <rpclib.h>
#include <chord_prot.h>
#include <merkle.h>
#include <maint_policy.h>
void
maint_global::handle_missing (ptr<locationcc> d,
chordID key, bool missing_local)
{
if (missing_local) {
// XXX optimize merkle implementation to not request locally
// missing keys so that we don't have to waste this bandwidth
// on sync?
return;
}
// Keys that are missing remotely need to be pushed to the remote node.
for (size_t i = 0; i < maintqueue.size (); i++) {
if (maintqueue[i] == key)
return;
}
warn << "global enqueue " << key << " from " << m->host << " to " << d->chordnode () << "\n";
maintqueue.push_back (key);
maintdest.push_back (d);
// The maintainer will check this queue when someone asks for repairs.
// dhblock_srv will check whether or not it is responsible for the
// key to decide if it is okay to delete at the end of a repair.
}
maint_global::maint_global (maintainer *m) :
m (m),
rngmin (incID (m->host.x)),
rngmax (incID (m->host.x))
{
}
maint_global::~maint_global ()
{
}
TAMED void
maint_global::next (cbv donecb)
{
VARS {
vec<chordID> keys;
ptr<locationcc> succ (NULL);
ptr<chord_findarg> arg (NULL);
ptr<chord_nodelistres> keysucc (NULL);
clnt_stat err;
};
// Look up first successor key of rngmin in m->db.
// NB: get_keyrange doesn't support wrapping around 0
if (rngmin < m->host.x) {
keys = m->localtree ()->get_keyrange (rngmin, m->host.x, 1);
} else {
keys = m->localtree ()->get_keyrange (rngmin, maxID, 1);
if (!keys.size ())
keys = m->localtree ()->get_keyrange (0, m->host.x, 1);
}
if (!keys.size ()) {
goto nextOUT;
}
// warn << m->host.x << " maint_global: searching for " << keys[0] << "\n";
// Find its successor.
arg = New refcounted<chord_findarg>;
arg->x = keys[0];
arg->return_succs = false;
keysucc = New refcounted<chord_nodelistres>;
BLOCK {
doRPC (m->host, chord_program_1, CHORDPROC_FINDROUTE,
arg, keysucc, @(err));
}
if (err || keysucc->status) {
// doRPC already printed out an error
goto nextOUT;
}
{
chord_node n = make_chord_node (keysucc->resok->nlist.back ());
succ = locationcc::alloc (n);
}
// Prep for next round
rngmax = succ->id ();
// ...and continue if this range is skippable.
if (succ->id () == m->host.x) {
// warn << m->host.x << " maint_global: succ is self\n";
goto nextOUT;
} else {
// warn << m->host.x << " maint_global: succ is " << succ->id () << "\n";
// Anything in the range of our predecessor list should be handled by
// regular maintenance.
for (size_t i = 0; i < m->preds.size (); i++) {
if (succ->id () == m->preds[i]->id ())
goto nextOUT;
}
}
// sync_with (successor) ... for its primary range
// queue up a list of objects to send this host.
// make this queue available for get_repairs.
BLOCK {
warn << m->host.x << " maint_global: syncing (" << rngmin << ", " << rngmax << ") with " << succ->id () << "\n";
m->sync->sync_with (succ, rngmin, rngmax,
m->localtree (),
wrap (this, &maint_global::handle_missing, succ),
@());
}
nextOUT:
if (succ)
rngmin = incID (succ->id ());
(*donecb) ();
}