From 0107b413845c89f09a6cbc2303614971176d4ef2 Mon Sep 17 00:00:00 2001 From: Jeff McAffer Date: Sun, 5 Mar 2017 21:39:12 -0800 Subject: [PATCH 1/2] add an event retro fit command --- bin/cc | 71 ++++++++++++++++++++++++++++++++++++++++++++++++---- package.json | 3 ++- 2 files changed, 68 insertions(+), 6 deletions(-) diff --git a/bin/cc b/bin/cc index 1b02024..0d459b2 100644 --- a/bin/cc +++ b/bin/cc @@ -2,12 +2,15 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. +const argSplit = require('argv-split'); const commander = require('commander'); const CrawlerClient = require('../lib/crawlerClient'); const readline = require('readline'); -const split = require('argv-split'); const Q = require('q'); const qlimit = require('qlimit'); +const request = require('request'); +const split = require('split'); +const throttle = require('throttle'); const crawlerClient = new CrawlerClient(); let promise = Q(); @@ -39,7 +42,7 @@ function getCommands() { .description('Stop all processing in the crawler') .action(() => configureCount(0)); commands - .command('queue ') + .command('queue ') .description('Queue the given list of orgs and/or repos to be processed.') .action(requests => queueRequests(requests)); commands @@ -66,6 +69,11 @@ function getCommands() { .option('-d, --delete ', 'Delete the deadletter with the given urn') .description('Manage deadletters') .action(options => deadletters(options)); + commands + .command('events ') + .option('-z, --test', 'Test mode. Process but do not actually queue the events.') + .description('Backfill the events in the resource at the URL. The resource is assumed to be one JSON object per line') + .action((url, options) => queueEvents(url, options)); commands .command('exit') .description('Exit this tool') @@ -83,7 +91,7 @@ function startReplLoop(commands) { rl.prompt(); rl.on('line', (line) => { - const command = split(line); + const command = argSplit(line); // pad up the command line to keep commander happy command.unshift('node', 'cc'); commands.parse(command); @@ -121,8 +129,8 @@ function setTokens(tokens) { } function queueRequests(specs) { - let requests = JSON.parse(specs); - requests = Array.isArray(requests) ? requests : [requests]; + let requests = Array.isArray(specs) ? specs : [specs]; + requests = requests.map(request => request.trim().startsWith('{') ? JSON.parse(request) : request); promise = crawlerClient.queueRequests(requests).then(() => console.log(`Queued ${requests.length} requests`)); } @@ -175,3 +183,56 @@ function requeueDeadletter(urn) { function deleteDeadletter(urn) { promise = crawlerClient.deleteDeadletter(urn).then(() => console.log(`Deleted ${urn}`)); } + +function queueEvents(url, options) { + let requests = []; + let count = 0; + const response = request(url); + response + .pipe(split()) + .on('data', (line) => { + const request = createEventRequest(line); + if (!request) { + return; + } + requests.push(request); + if (requests.length === 10 && !options.test) { + const toQueue = requests.slice(); + requests = []; + response.pause(); + count += 10; + crawlerClient.queueRequests(toQueue, 'later').then( + () => { + console.log(`Queued ${count} events so far...`); + response.resume(); + }, + error => + console.log(error) + ); + } + }) + .on('error', error => { + console.log(error); + }) + .on('end', () => { + console.log('ended'); + }); +} + +function createEventRequest(line) { + try { + const event = JSON.parse(line); + delete event._id; + const eventUrlBase = event.repo ? event.repo.url : event.org.url; + const request = { type: event.type, url: `${eventUrlBase}/events/${event.id}` }; + request.payload = { body: event, etag: 1 }; + if (event.created_at) { + request.payload.fetchedAt = event.created_at; + } + request.policy = 'default:self'; + return request; + } catch (error) { + console.log(error); + return null; + } +} \ No newline at end of file diff --git a/package.json b/package.json index 5917838..98cd805 100644 --- a/package.json +++ b/package.json @@ -32,7 +32,8 @@ "q": "1.4.1", "qlimit": "^0.1.1", "readline": "^1.3.0", - "request": "^2.79.0" + "request": "^2.79.0", + "split": "^1.0.0" }, "devDependencies": {} } From bfa002d92194f5e2bfd9e20e081af44692f776b3 Mon Sep 17 00:00:00 2001 From: Jeff McAffer Date: Sun, 5 Mar 2017 22:37:39 -0800 Subject: [PATCH 2/2] add byte count for event upload --- bin/cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bin/cc b/bin/cc index 0d459b2..d7deab3 100644 --- a/bin/cc +++ b/bin/cc @@ -187,6 +187,7 @@ function deleteDeadletter(urn) { function queueEvents(url, options) { let requests = []; let count = 0; + let bytes = 0; const response = request(url); response .pipe(split()) @@ -195,6 +196,7 @@ function queueEvents(url, options) { if (!request) { return; } + bytes += line.length; requests.push(request); if (requests.length === 10 && !options.test) { const toQueue = requests.slice(); @@ -203,7 +205,7 @@ function queueEvents(url, options) { count += 10; crawlerClient.queueRequests(toQueue, 'later').then( () => { - console.log(`Queued ${count} events so far...`); + console.log(`Queued ${count} events and ${bytes} bytes so far...`); response.resume(); }, error =>