Skip to content

Commit f2aeccd

Browse files
committed
ft: ARSN-65 oplog pattern library
Snapshot-scan-oplog pattern with state persistence for applications requiring reading the oplog
1 parent 7b45124 commit f2aeccd

File tree

14 files changed

+1627
-14
lines changed

14 files changed

+1627
-14
lines changed

index.js

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,16 @@ module.exports = {
100100
},
101101
storage: {
102102
metadata: {
103-
MetadataFileServer:
104-
require('./lib/storage/metadata/file/MetadataFileServer'),
105-
MetadataFileClient:
106-
require('./lib/storage/metadata/file/MetadataFileClient'),
107-
LogConsumer:
108-
require('./lib/storage/metadata/bucketclient/LogConsumer'),
103+
bucketclient: {
104+
LogConsumer:
105+
require('./lib/storage/metadata/bucketclient/LogConsumer'),
106+
},
107+
file: {
108+
MetadataFileServer:
109+
require('./lib/storage/metadata/file/MetadataFileServer'),
110+
MetadataFileClient:
111+
require('./lib/storage/metadata/file/MetadataFileClient'),
112+
},
109113
},
110114
data: {
111115
file: {

lib/storage/metadata/MetadataWrapper.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ const errors = require('../../errors');
33
const BucketInfo = require('../../models/BucketInfo');
44

55
const BucketClientInterface = require('./bucketclient/BucketClientInterface');
6-
const BucketFileInterface = require('./file/BucketFileInterface');
7-
const MongoClientInterface = require('./mongoclient/MongoClientInterface');
8-
const metastore = require('./in_memory/metastore');
6+
// const BucketFileInterface = require('./file/BucketFileInterface');
7+
// const MongoClientInterface = require('./mongoclient/MongoClientInterface');
8+
const metastore = require('./in_memory/backend');
99

1010
let CdmiMetadata;
1111
try {
@@ -71,14 +71,14 @@ class MetadataWrapper {
7171
if (clientName === 'mem') {
7272
this.client = metastore;
7373
this.implName = 'memorybucket';
74-
} else if (clientName === 'file') {
74+
} /* else if (clientName === 'file') {
7575
this.client = new BucketFileInterface(params, logger);
7676
this.implName = 'bucketfile';
77-
} else if (clientName === 'scality') {
77+
} */ else if (clientName === 'scality') {
7878
this.client = new BucketClientInterface(params, bucketclient,
7979
logger);
8080
this.implName = 'bucketclient';
81-
} else if (clientName === 'mongodb') {
81+
} /* else if (clientName === 'mongodb') {
8282
this.client = new MongoClientInterface({
8383
replicaSetHosts: params.mongodb.replicaSetHosts,
8484
writeConcern: params.mongodb.writeConcern,
@@ -90,7 +90,7 @@ class MetadataWrapper {
9090
logger,
9191
});
9292
this.implName = 'mongoclient';
93-
} else if (clientName === 'cdmi') {
93+
} */ else if (clientName === 'cdmi') {
9494
if (!CdmiMetadata) {
9595
throw new Error('Unauthorized backend');
9696
}
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
/*
2+
* Main interface for bucketd oplog management
3+
*/
4+
const async = require('async');
5+
const { RESTClient: BucketClient } = require('bucketclient');
6+
const { jsutil, errors } = require('arsenal');
7+
const LogConsumer = require('arsenal/lib/storage/metadata/bucketclient/LogConsumer');
8+
const { isMasterKey } = require('arsenal/lib/versioning/Version');
9+
const OplogInterface = require('./OplogInterface');
10+
11+
class BucketdOplogInterface extends OplogInterface {
12+
13+
constructor(params) {
14+
super(params);
15+
this.stopAt = -1;
16+
this.backendRetryTimes = 3;
17+
this.backendRetryInterval = 300;
18+
this.bucketdOplogQuerySize = 20;
19+
let bkBootstrap = ['localhost:9000'];
20+
if (params && params.bootstrap !== undefined) {
21+
bkBootstrap = params.bootstrap;
22+
}
23+
if (params && params.stopAt !== undefined) {
24+
this.stopAt = params.stopAt;
25+
}
26+
this.bkClient = new BucketClient(bkBootstrap);
27+
}
28+
29+
start(filter, cb) {
30+
if (!(filter.filterType === 'bucket' ||
31+
filter.filterType === 'raftSession')) {
32+
return cb(errors.NotImplemented);
33+
}
34+
const filterName = filter.filterName;
35+
async.waterfall([
36+
/*
37+
* In this step we get the raftId for filterName
38+
*/
39+
next => {
40+
if (filter.filterType === 'raftSession') {
41+
return next(null, filter.raftSession.raftId);
42+
}
43+
this.logger.info('obtaining raftId',
44+
{ filterName });
45+
async.retry(
46+
{
47+
times: this.backendRetryTimes,
48+
interval: this.backendRetryInterval,
49+
},
50+
done => {
51+
this.bkClient.getBucketInformation(
52+
filter.bucket.bucketName,
53+
null,
54+
(err, info) => {
55+
if (err) {
56+
this.logger.info('retrying getBucketInformation', { err, filterName });
57+
return done(err);
58+
}
59+
return done(null, JSON.parse(info));
60+
});
61+
},
62+
(err, res) => {
63+
if (err) {
64+
this.logger.error('getBucketInformation too many failures', { err, filterName });
65+
return next(err);
66+
}
67+
return next(null, res.raftSessionId);
68+
});
69+
return undefined;
70+
},
71+
/*
72+
* In this step we get the stored offset if we have it
73+
*/
74+
(raftId, next) => {
75+
let cseq = undefined;
76+
this.persist.load(filterName, this.persistData, (err, offset) => {
77+
if (err) {
78+
return next(err);
79+
}
80+
cseq = offset;
81+
return next(null, raftId, cseq);
82+
});
83+
},
84+
/*
85+
* In this step we acquire the offset if we don't already have it
86+
*/
87+
(raftId, cseq, next) => {
88+
if (cseq !== undefined) {
89+
this.logger.info(`skipping cseq acquisition (cseq=${cseq})`,
90+
{ filterName });
91+
return next(null, raftId, cseq, true);
92+
}
93+
this.logger.info('cseq acquisition',
94+
{ filterName });
95+
async.retry(
96+
{
97+
times: this.backendRetryTimes,
98+
interval: this.backendRetryInterval,
99+
},
100+
done => {
101+
this.bkClient.getRaftLog(
102+
raftId,
103+
1,
104+
1,
105+
true,
106+
null,
107+
(err, stream) => {
108+
if (err) {
109+
this.logger.info('retrying getRaftLog', { err, filterName });
110+
return done(err);
111+
}
112+
const chunks = [];
113+
stream.on('data', chunk => {
114+
chunks.push(chunk);
115+
});
116+
stream.on('end', () => {
117+
const info = JSON.parse(Buffer.concat(chunks));
118+
return done(null, info);
119+
});
120+
return undefined;
121+
});
122+
},
123+
(err, res) => {
124+
if (err) {
125+
this.logger.error('getRaftLog too many failures', { err, filterName });
126+
return next(err);
127+
}
128+
return next(null, raftId, res.info.cseq, false);
129+
});
130+
return undefined;
131+
},
132+
/*
133+
* In this step we init the state (e.g. scan)
134+
*/
135+
(raftId, cseq, skipInit, next) => {
136+
if (skipInit) {
137+
this.logger.info(`skipping state initialization cseq=${cseq}`,
138+
{ filterName });
139+
return next(null, raftId, cseq);
140+
}
141+
this.logger.info(`initializing state cseq=${cseq}`,
142+
{ filterName });
143+
this.persistData.initState(err => {
144+
if (err) {
145+
return next(err);
146+
}
147+
this.persist.save(
148+
filterName, this.persistData, cseq, err => {
149+
if (err) {
150+
return next(err);
151+
}
152+
return next(null, raftId, cseq);
153+
});
154+
return undefined;
155+
});
156+
return undefined;
157+
},
158+
/*
159+
* In this step we loop over the oplog
160+
*/
161+
(raftId, cseq, next) => {
162+
this.logger.info(`reading oplog raftId=${raftId} cseq=${cseq}`,
163+
{ filterName });
164+
// only way to get out of the loop in all cases
165+
const nextOnce = jsutil.once(next);
166+
let doStop = false;
167+
// resume reading the oplog from cseq. changes are idempotent
168+
const logConsumer = new LogConsumer({
169+
bucketClient: this.bkClient,
170+
raftSession: raftId,
171+
});
172+
let _cseq = cseq;
173+
async.until(
174+
() => doStop,
175+
_next => {
176+
logConsumer.readRecords({
177+
startSeq: _cseq,
178+
limit: this.bucketdOplogQuerySize,
179+
}, (err, record) => {
180+
if (err) {
181+
this.logger.error('readRecords error', { err, filterName });
182+
return setTimeout(() => _next(), 5000);
183+
}
184+
if (!record.log) {
185+
// nothing to read
186+
return setTimeout(() => _next(), 5000);
187+
}
188+
const seqs = [];
189+
record.log.on('data', chunk => {
190+
seqs.push(chunk);
191+
});
192+
record.log.on('end', () => {
193+
const addQueue = [];
194+
const delQueue = [];
195+
for (let i = 0; i < seqs.length; i++) {
196+
if (filter.filterType === 'raftSession' ||
197+
(filter.filterType === 'bucket' &&
198+
seqs[i].db === filter.bucket.bucketName)) {
199+
for (let j = 0; j < seqs[i].entries.length; j++) {
200+
const _item = {};
201+
_item.bucketName = seqs[i].db;
202+
_item.key = seqs[i].entries[j].key;
203+
if (seqs[i].entries[j].type !== undefined &&
204+
seqs[i].entries[j].type === 'del') {
205+
if (!isMasterKey(_item.key)) {
206+
// ignore for now
207+
return;
208+
}
209+
delQueue.push(_item);
210+
} else {
211+
_item.value = Object.assign({}, seqs[i].entries[j].value);
212+
addQueue.push(_item);
213+
}
214+
}
215+
}
216+
}
217+
this.persistData.updateState(
218+
addQueue, delQueue, err => {
219+
if (err) {
220+
return _next(err);
221+
}
222+
_cseq += seqs.length;
223+
this.persist.save(
224+
filterName, this.persistData, _cseq, err => {
225+
if (err) {
226+
return _next(err);
227+
}
228+
if (_cseq > this.stopAt) {
229+
doStop = true;
230+
}
231+
return _next();
232+
});
233+
return undefined;
234+
});
235+
});
236+
return undefined;
237+
});
238+
}, err => {
239+
if (err) {
240+
return nextOnce(err);
241+
}
242+
return nextOnce();
243+
});
244+
},
245+
], err => {
246+
if (err) {
247+
return cb(err);
248+
}
249+
this.logger.info('returning',
250+
{ filterName });
251+
return cb();
252+
});
253+
return undefined;
254+
}
255+
}
256+
257+
module.exports = BucketdOplogInterface;

0 commit comments

Comments
 (0)