This repository has been archived by the owner on May 30, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
redis_feature_store.js
186 lines (164 loc) · 5.34 KB
/
redis_feature_store.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
const base = require('./redis_base');
const dataKind = require('launchdarkly-node-server-sdk/versioned_data_kind');
const CachingStoreWrapper = require('launchdarkly-node-server-sdk/caching_store_wrapper');
const noop = function() {};
function RedisFeatureStore(redisOpts, cacheTTL, prefix, logger, client) {
let options;
if (cacheTTL || prefix || logger || client) {
// convert from older syntax
options = { redisOpts, cacheTTL, prefix, client };
} else {
// first parameter could be either options (newer syntax) or redisOpts
options = redisOpts || {};
if (options.redisOpts === undefined && options.cacheTTL === undefined
&& options.prefix === undefined && options.client === undefined) {
options = { redisOpts };
}
}
return config =>
new CachingStoreWrapper(
redisFeatureStoreInternal(options, logger || config.logger),
options.cacheTTL,
'Redis'
);
// Note, config.logger is guaranteed to be defined - the SDK will have provided a default one if necessary
}
function redisFeatureStoreInternal(options, logger) {
const state = base.initState(options, logger);
const client = state.client;
const itemsPrefix = state.prefix;
const initedKey = itemsPrefix + '$inited';
const store = {};
function itemsKey(kind) {
return itemsPrefix + kind.namespace;
}
// A helper that performs a get with the redis client
function doGet(kind, key, maybeCallback) {
const cb = maybeCallback || noop;
if (!state.connected && !state.initialConnect) {
logger.warn('Attempted to fetch key ' + key + ' while Redis connection is down');
cb(null);
return;
}
client.hget(itemsKey(kind), key, (err, obj) => {
if (err) {
logger.error('Error fetching key ' + key + " from Redis in '" + kind.namespace + "'", err); // eslint-disable-line quotes
cb(null);
} else {
const item = JSON.parse(obj);
cb(item);
}
});
}
store.getInternal = (kind, key, maybeCallback) => {
const cb = maybeCallback || noop;
doGet(kind, key, item => {
if (item && !item.deleted) {
cb(item);
} else {
cb(null);
}
});
};
store.getAllInternal = (kind, maybeCallback) => {
const cb = maybeCallback || noop;
if (!state.connected && !state.initialConnect) {
logger.warn('Attempted to fetch all keys while Redis connection is down');
cb(null);
return;
}
client.hgetall(itemsKey(kind), (err, obj) => {
if (err) {
logger.error("Error fetching '" + kind.namespace + "' from Redis", err); // eslint-disable-line quotes
cb(null);
} else {
const results = {},
items = obj;
for (const key in items) {
if (Object.hasOwnProperty.call(items, key)) {
results[key] = JSON.parse(items[key]);
}
}
cb(results);
}
});
};
store.initInternal = (allData, cb) => {
const multi = client.multi();
for (const kindNamespace in allData) {
if (Object.hasOwnProperty.call(allData, kindNamespace)) {
const kind = dataKind[kindNamespace];
const baseKey = itemsKey(kind);
const items = allData[kindNamespace];
const stringified = {};
multi.del(baseKey);
for (const key in items) {
if (Object.hasOwnProperty.call(items, key)) {
stringified[key] = JSON.stringify(items[key]);
}
}
// Redis does not allow hmset() with an empty object
if (Object.keys(stringified).length > 0) {
multi.hmset(baseKey, stringified);
}
}
}
multi.set(initedKey, '');
multi.exec(err => {
if (err) {
logger.error('Error initializing Redis store', err);
}
cb();
});
};
store.upsertInternal = (kind, item, cb) => {
updateItemWithVersioning(kind, item, (err, attemptedWrite) => {
if (err) {
logger.error('Error upserting key ' + item.key + " in '" + kind.namespace + "'", err); // eslint-disable-line quotes
}
cb(err, attemptedWrite);
});
};
function updateItemWithVersioning(kind, newItem, cb) {
client.watch(itemsKey(kind));
const multi = client.multi();
// testUpdateHook is instrumentation, used only by the unit tests
const prepare =
store.testUpdateHook ||
function(prepareCb) {
prepareCb();
};
prepare(() => {
doGet(kind, newItem.key, oldItem => {
if (oldItem && oldItem.version >= newItem.version) {
multi.discard();
cb(null, oldItem);
} else {
multi.hset(itemsKey(kind), newItem.key, JSON.stringify(newItem));
multi.exec((err, replies) => {
if (!err && replies === null) {
// This means the EXEC failed because someone modified the watched key
logger.debug('Concurrent modification detected, retrying');
updateItemWithVersioning(kind, newItem, cb);
} else {
cb(err, newItem);
}
});
}
});
});
}
store.initializedInternal = maybeCallback => {
const cb = maybeCallback || noop;
client.exists(initedKey, (err, obj) => {
cb(Boolean(!err && obj));
});
};
store.close = () => {
if (state.stopClientOnClose) {
client.quit();
}
};
return store;
}
module.exports = RedisFeatureStore;