Skip to content

groupBy / aggregate #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .babelrc
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{
"plugins": [
"transform-strict-mode",
"transform-es2015-spread",
"transform-object-rest-spread"
]
}
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ const sortBy = 'priority';
const expiration = 30000; // ms

// perform op
const currentTime = Date.now();

redis
.fsort('set-of-ids', 'metadata*', sortBy, 'DESC', filter, offset, limit, expiration)
.fsort('set-of-ids', 'metadata*', sortBy, 'DESC', filter, currentTime, offset, limit, expiration)
.then(data => {
// how many items in the complete list
// rest of the data is ids from the 'set-of-ids'
Expand All @@ -56,3 +58,17 @@ redis
});
});
```

### Aggregate functions

1. use fsort to generate list of ids
2. pass that id to aggregate function and receive results back

```js
redis
.fsortAggregate(ID_LIST_KEY, META_KEY_PATTERN, mod.filter({
age: 'sum'
}))
.then(JSON.parse)
.get('age')
```
53 changes: 53 additions & 0 deletions groupped-list.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
-- cached id list key
local idListKey = KEYS[1];
-- meta key
local metadataKey = KEYS[2];
-- stringified [key]: [aggregateMethod] pairs
local aggregates = ARGV[1];

-- local cache
local rcall = redis.call;
local tinsert = table.insert;

local jsonAggregates = cjson.decode(aggregates);
local aggregateKeys = {};
local result = {};

local function aggregateSum(value1, value2)
value1 = value1 or 0;
value2 = value2 or 0;
return value1 + value2;
end

local aggregateType = {
sum = aggregateSum
};

for key, method in pairs(jsonAggregates) do
tinsert(aggregateKeys, key);
result[key] = 0;

if type(aggregateType[method]) ~= "function" then
return error("not supported op: " .. method);
end
end

local valuesToGroup = rcall("LRANGE", idListKey, 0, -1);

-- group
for _, id in ipairs(valuesToGroup) do
-- metadata is stored here
local metaKey = metadataKey:gsub("*", id, 1);
-- pull information about required aggregate keys
-- only 1 operation is supported now - sum
-- but we can calculate multiple values
local values = rcall("HMGET", metaKey, unpack(aggregateKeys));

for i, aggregateKey in ipairs(aggregateKeys) do
local aggregateMethod = aggregateType[jsonAggregates[aggregateKey]];
local value = tonumber(values[i] or 0);
result[aggregateKey] = aggregateMethod(result[aggregateKey], value);
end
end

return cjson.encode(result);
4 changes: 3 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const snakeCase = require('lodash/snakeCase');

const lua = fs.readFileSync(path.join(__dirname, 'sorted-filtered-list.lua'));
const fsortBust = fs.readFileSync(path.join(__dirname, 'filtered-list-bust.lua'));
const aggregateScript = fs.readFileSync(path.join(__dirname, 'groupped-list.lua'));

// cached vars
const regexp = /[\^\$\(\)\%\.\[\]\*\+\-\?]/g;
Expand Down Expand Up @@ -37,9 +38,11 @@ const fsortBustScript = luaWrapper(fsortBust);
exports.attach = function attachToRedis(redis, _name, useSnakeCase = false) {
const name = _name || 'sortedFilteredList';
const bustName = (useSnakeCase ? snakeCase : camelCase)(`${name}Bust`);
const aggregateName = (useSnakeCase ? snakeCase : camelCase)(`${name}Aggregate`);

redis.defineCommand(name, { numberOfKeys: 2, lua: fsortScript });
redis.defineCommand(bustName, { numberOfKeys: 1, lua: fsortBustScript });
redis.defineCommand(aggregateName, { numberOfKeys: 2, lua: aggregateScript });
};

/**
Expand Down Expand Up @@ -96,4 +99,3 @@ exports.filter = function filter(obj) {
* @type {Buffer}
*/
exports.script = lua;

21 changes: 11 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
{
"name": "redis-filtered-sort",
"version": "2.0.6",
"version": "2.1.0",
"description": "Exports LUA script, which is able to perform multi filter operations, as well as sorts",
"main": "./lib",
"engine": {
"node": ">= 7.6.0"
},
"scripts": {
"test": "./test/docker.sh",
"compile": "make all",
Expand All @@ -27,20 +30,18 @@
},
"homepage": "https://github.com/makeomatic/redis-filtered-sort#readme",
"peerDependencies": {
"ioredis": "1.x.x || 2.x.x"
"ioredis": "2.x.x || 3.x.x"
},
"devDependencies": {
"babel-register": "^6.16.3",
"babel-cli": "^6.24.1",
"babel-plugin-transform-object-rest-spread": "^6.23.0",
"babel-plugin-transform-strict-mode": "^6.24.1",
"babel-register": "^6.24.1",
"chai": "^3.4.1",
"faker": "^3.0.1",
"faker": "^4.1.0",
"ioredis": "^2.4.0",
"lodash": "^4.16.3",
"mocha": "^3.1.0",
"babel-cli": "^6.14.0",
"babel-plugin-syntax-async-functions": "^6.13.0",
"babel-plugin-transform-es2015-spread": "^6.0.0",
"babel-plugin-transform-object-rest-spread": "^6.0.0",
"babel-plugin-transform-strict-mode": "^6.11.3"
"mocha": "^3.3.0"
},
"dependencies": {
"cluster-key-slot": "^1.0.6"
Expand Down
11 changes: 11 additions & 0 deletions sorted-filtered-list.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ local offset = tonumber(ARGV[5] or 0);
local limit = tonumber(ARGV[6] or 10);
-- caching time
local expiration = tonumber(ARGV[7] or 30000);
-- return the key only
local returnKeyOnly = ARGV[8] or false;

-- local caches
local tempKeysSet = getIndexTempKeys(idSet);
Expand Down Expand Up @@ -105,8 +107,17 @@ local function storeCacheBuster(key)
end

local function updateExpireAndReturnWithSize(key)
-- stores key for cache busting
storeCacheBuster(key);

-- lengthens cache
rcall("PEXPIRE", key, expiration);

-- returns either results or key where it's stored
if returnKeyOnly ~= false then
return key;
end

local ret = rcall("LRANGE", key, offset, offset + limit - 1);
tinsert(ret, rcall("LLEN", key));
return ret;
Expand Down
37 changes: 34 additions & 3 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ const ld = require('lodash');
describe('filtered sort suite', function suite() {
const idSetKey = 'id-set';
const metaKeyPattern = '*-metadata';
const keyPrefix = 'fsort-test:';
const redis = new Redis({
port: process.env.REDIS_PORT_6379_TCP_PORT,
host: process.env.REDIS_PORT_6379_TCP_ADDR,
keyPrefix: 'fsort-test:',
keyPrefix,
});
const monitor = redis.duplicate();
const mod = require('../index.js');
Expand Down Expand Up @@ -142,7 +143,7 @@ describe('filtered sort suite', function suite() {

describe('cache invalidation', function invalidationSuite() {
it('should invalidate cache: sort', function test() {

return redis.fsort(idSetKey, null, null, 'ASC', '{}', Date.now())
.then(() => redis.zrangebyscore(`${idSetKey}::${mod.FSORT_TEMP_KEYSET}`, '-inf', '+inf'))
.tap(keys => expect(keys.length).to.be.eq(1))
Expand All @@ -156,7 +157,7 @@ describe('filtered sort suite', function suite() {
expect(keys).to.be.eq(0);
});
});

it('should invalidate cache: sort + filter', function test() {
const fieldName = 'fieldExists';
const filter = mod.filter({
Expand Down Expand Up @@ -511,4 +512,34 @@ describe('filtered sort suite', function suite() {
});
});
});

describe('aggregate filter', function suite() {
it('sums up age and returns key', function test() {
const filter = mod.filter({
age: { gte: 10, lte: 40 },
});

return redis
.fsort(idSetKey, metaKeyPattern, 'age', 'DESC', filter, Date.now(), 0, 0, 30000, 1)
.then((response) => {
// key where the resulting data is stored
expect(response).to.be.equal('fsort-test:id-set:DESC:fsort-test:*-metadata:age:{"age":{"gte":10,"lte":40}}');
this.response = response;
});
});

it('returns aggregate', function test() {
const aggregate = mod.filter({
age: 'sum',
});

return redis
.fsortAggregate(this.response.slice(keyPrefix.length), metaKeyPattern, aggregate)
.then(JSON.parse)
.then((response) => {
// this would average to 15000+ due to random ranges
expect(response.age).to.be.gt(15000);
});
});
});
});
Loading