Skip to content

Commit

Permalink
Refactored csv parser
Browse files Browse the repository at this point in the history
  • Loading branch information
anvaka committed Jan 13, 2016
1 parent 3225c91 commit 3220301
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 54 deletions.
36 changes: 8 additions & 28 deletions import_watchers_to_redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
*
* Each line is stored as a set twice:
*
* repo:repository_name -> [list of star gazers]
* user:user_name -> [list of starred repositories]
* repo:repository_name -> [set of star gazers]
* user:user_name -> [set of starred repositories]
*/

var fileName = process.argv[2];
Expand All @@ -14,39 +14,19 @@ if (!fs.existsSync(fileName)) {
throw new Error('Cannot find input file with csv data: ' + fileName);
}

var inputFile = require('fs').createReadStream(fileName),
redis = require("redis"),
client = redis.createClient(),
csv = require('csv-parse'),
parser = csv();
var redis = require("redis");
var client = redis.createClient();
var forEachLine = require('./lib/for-each-line-in-csv.js');

var processed = 0;
parser.on('readable', saveLine);
parser.on('end', function() { client.unref(); });
forEachLine(fileName, saveLine, function() { client.unref(); });

inputFile.pipe(parser);

function saveLine() {
var line = parser.read();
if (!line) return;
var login = line[0];
var repo = line[1];

// we want to fix twitter. Normally we should not care about it
// but in this it has to be changed, since it is so popular
if (login === 'twitter' && repo === 'bootstrap') {
login = 'twbs';
}
if (!repo || repo.indexOf('/') <= 0) return; // ignore invalid data.

processed += 1;
if (processed % 10000 === 0) console.log('Saved: ', processed);
function saveLine(login, repo) {
client.sadd('repo:' + repo, login, printError('repo', repo, login));
client.sadd('user:' + login, repo, printError('user', repo, login));
}

function printError(type, repo, login) {
return function(err, res) {
return function(err) {
if (err) console.log('!! Failed to save ' + type + ': ' + repo + '/' + login, err);
};
}
38 changes: 38 additions & 0 deletions lib/for-each-line-in-csv.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Reads csv file of watch events line by line
*/
var fs = require('fs');
var csv = require('csv-parse');
module.exports = forEachLine;

function forEachLine(fileName, cb, done) {
var inputFile = fs.createReadStream(fileName);
var parser = csv();
var processed = 0;

parser.on('readable', processLine);
parser.on('end', function() {
done();
});

inputFile.pipe(parser);

function processLine() {
var line = parser.read();
if (!line) return;
var user = line[0];
var repo = line[1];

// we want to fix twitter. Normally we should not care about it
// but in this it has to be changed, since it is so popular
if (user === 'twitter' && repo === 'bootstrap') {
user = 'twbs';
}

if (!repo || repo.indexOf('/') <= 0) return; // ignore invalid data.

cb(user, repo);
processed += 1;
if (processed % 100000 === 0) console.log('Read ' + processed + ' lines of ' + fileName);
}
}
19 changes: 19 additions & 0 deletions lib/load-repositories.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* loads set of unique repositories within csv file
*/
module.exports = loadRepositories;
var forEachLine = require('./for-each-line-in-csv.js');

function loadRepositories(fileName, doneCb) {
var repos = new Set();

forEachLine(fileName, processLine, done);

function processLine(user, repo) {
repos.add(user + '/' + repo);
}

function done() {
doneCb(repos);
}
}
31 changes: 6 additions & 25 deletions lib/load-watchers.js
Original file line number Diff line number Diff line change
@@ -1,38 +1,19 @@
module.exports = loadWatchers;
var forEachLine = require('./for-each-line-in-csv.js');

function loadWatchers(fileName, doneCb) {
var fs = require('fs');
var inputFile = require('fs').createReadStream(fileName),
csv = require('csv-parse'),
parser = csv();

var userRepo = new Map();
var repoUsers = new Map();

var processed = 0;
parser.on('readable', processLine);
parser.on('end', function() {
doneCb(repoUsers, userRepo);
});

inputFile.pipe(parser);
forEachLine(fileName, processLine, done);

function processLine() {
var line = parser.read();
if (!line) return;
var user = line[0];
var repo = line[1];

// we want to fix twitter. Normally we should not care about it
// but in this it has to be changed, since it is so popular
if (user === 'twitter' && repo === 'bootstrap') {
user = 'twbs';
}
function processLine(user, repo) {
addToMap(userRepo, user, repo);
addToMap(repoUsers, repo, user);
}

processed += 1;
if (processed % 100000 === 0) console.log('Loaded: ', processed);
function done() {
doneCb(repoUsers, userRepo);
}

function addToMap(map, key, value) {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
},
"dependencies": {
"bluebird": "^2.3.11",
"csv-parse": "0.0.6",
"csv-parse": "^1.0.1",
"hiredis": "^0.4.1",
"redis": "^0.12.1"
}
Expand Down

0 comments on commit 3220301

Please sign in to comment.