Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ Database:
- `DB_PASS` : password if the db uses auth (undefined by default)
- `DB_CERT` : local file path to certificate (cert) file if the db uses ssl (undefined by default)

Downloads:

- `BULK_DOWNLOADS_PATH` : relative path to bulk downloads
- `BIOPAX_DOWNLOADS_PATH` : relative path to biopax downloads
- `BIOPAX_IDMAP_DOWNLOADS_PATH` : relative path to id-mapped biopax downloads
- `EXPORT_BULK_DELAY_HOURS` : period to delay (batch) export tasks

Services:

- `DEFAULT_CACHE_SIZE` : default max number of entries in each cache
Expand Down
6 changes: 6 additions & 0 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ export const DB_USER = env('DB_USER', undefined); // username if db uses auth
export const DB_PASS = env('DB_PASS', undefined); // password if db uses auth
export const DB_CERT = env('DB_CERT', undefined); // path to a certificate (cert) file if db uses ssl

// Downloads
export const BULK_DOWNLOADS_PATH = env('BULK_DOWNLOADS_PATH', 'download/factoid_bulk.zip');
export const BIOPAX_DOWNLOADS_PATH = env('BIOPAX_DOWNLOADS_PATH', 'download/factoid_biopax.zip');
export const BIOPAX_IDMAP_DOWNLOADS_PATH = env('BIOPAX_IDMAP_DOWNLOADS_PATH', 'download/factoid_biopax_with_id_mapping.zip');
export const EXPORT_BULK_DELAY_HOURS = env('EXPORT_BULK_DELAY_HOURS', 0.25);

// Services
export const PC_URL = env('PC_URL', 'https://apps.pathwaycommons.org/');
export const REACH_URL = env('REACH_URL', 'http://reach.baderlab.org/api/uploadFile');
Expand Down
42 changes: 0 additions & 42 deletions src/server/document-util.js

This file was deleted.

11 changes: 8 additions & 3 deletions src/server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import cron from 'node-cron';
import updateCron from './update-cron';
import { Appsignal } from '@appsignal/nodejs';
import { expressMiddleware as asExpressMiddleware, expressErrorHandler as asExpressErrorHandler } from '@appsignal/express';
import { initExportTasks } from './routes/api/document/export';

let app = express();
let server = http.createServer(app);
Expand Down Expand Up @@ -151,13 +152,17 @@ tryPromise( () => {

const tables = ['element', 'document'];
return tables.reduce( ( p, name ) => p.then( () => setup( name ) ), Promise.resolve() );
} ).then( () => {
} )
.then( () => {
cron.schedule( config.CRON_SCHEDULE, () => {
updateCron();
});
} ).then( () => {
} )
.then( () => {
server.listen(port);
} );
} )
.then( initExportTasks )
;

function normalizePort(val) {
let port = parseInt(val, 10);
Expand Down
163 changes: 139 additions & 24 deletions src/server/routes/api/document/export.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import fetch from 'node-fetch';
import JSZip from 'jszip';
import _ from 'lodash';
import fs from 'fs';
import logger from '../../../logger';
import util from 'util';
import addMilliseconds from 'date-fns/addMilliseconds';
import formatDistance from 'date-fns/formatDistance';

import logger from '../../../logger';
import db from '../../../db';
import Document from '../../../../model/document';

import { convertDocumentToBiopax,
convertDocumentToJson,
convertDocumentToSbgn } from '../../../document-util';

import { checkHTTPStatus } from '../../../../util';
import { getBioPAX, getSBGN, getDocuments, getDocumentJson } from './index';
import {
BULK_DOWNLOADS_PATH,
BIOPAX_DOWNLOADS_PATH,
EXPORT_BULK_DELAY_HOURS,
BIOPAX_IDMAP_DOWNLOADS_PATH,
NODE_ENV
} from '../../../../config';

const DOCUMENT_STATUS_FIELDS = Document.statusFields();
const CHUNK_SIZE = 20;
Expand All @@ -20,20 +25,21 @@ const EXPORT_TYPES = Object.freeze({
'SBGN': 'sbgn'
});

const exportToZip = (baseUrl, zipPath, types, biopaxIdMapping) => {
const exportToZip = (zipPath, types, biopaxIdMapping) => {
let offset = 0;
let zip = new JSZip();

const processNext = () => {
return fetch(`${baseUrl}/api/document?limit=${CHUNK_SIZE}&offset=${offset}`)
.then( checkHTTPStatus )
.then( res => res.json() )
.then( res => {
offset += res.length;
if ( res.length > 0 ) {
const includedStatuses = [DOCUMENT_STATUS_FIELDS.PUBLIC];
const shouldInclude = doc => _.includes(includedStatuses, doc.status);
let ids = res.filter( shouldInclude ).map( doc => doc.id );

return getDocuments({
limit: CHUNK_SIZE,
offset,
status: [DOCUMENT_STATUS_FIELDS.PUBLIC]
})
.then( ({ results }) => {
offset += results.length;
if ( results.length > 0 ) {
let ids = results.map( doc => doc.id );
return addToZip( ids, biopaxIdMapping ).then( processNext );
}

Expand All @@ -50,11 +56,12 @@ const exportToZip = (baseUrl, zipPath, types, biopaxIdMapping) => {
};

const addToZip = ( ids, biopaxIdMapping ) => {
let _convertDocumentToBiopax = ( id, baseUrl ) => convertDocumentToBiopax( id, baseUrl, biopaxIdMapping );
let _getBioPAX = id => getBioPAX( id, biopaxIdMapping );
let _getDocumentJson = id => getDocumentJson( id ).then( r => JSON.stringify( r ) );
let typeToConverter = {
[EXPORT_TYPES.JSON]: convertDocumentToJson,
[EXPORT_TYPES.BP]: _convertDocumentToBiopax,
[EXPORT_TYPES.SBGN]: convertDocumentToSbgn
[EXPORT_TYPES.JSON]: _getDocumentJson,
[EXPORT_TYPES.BP]: _getBioPAX,
[EXPORT_TYPES.SBGN]: getSBGN
};

let typeToExt = {
Expand All @@ -69,7 +76,7 @@ const exportToZip = (baseUrl, zipPath, types, biopaxIdMapping) => {
}

const idToFiles = id => types.map( t => {
return typeToConverter[ t ]( id, baseUrl )
return typeToConverter[ t ]( id )
.catch( () => {
logger.error(`Error in export: cannot convert the document ${id} into ${t}`);
return null;
Expand Down Expand Up @@ -99,4 +106,112 @@ const exportToZip = (baseUrl, zipPath, types, biopaxIdMapping) => {
return processNext();
};

export { exportToZip, EXPORT_TYPES };
/**
* scheduleTask
* Schedule a task in 'delay' ms. Ignore additional requests while scheduled.
*
* @param {object} task The task to execute
* @param {number} delay ms delay for task execution (default 0)
* @param {object} next The callback to run after a task is initiated
*/
const taskScheduler = ( task, delay = 0, next = () => {} ) => {
let taskScheduled = false;
let taskTime = null;
const resetTaskSchedule = () => { taskScheduled = false; taskTime = null; };

return (() => {
const setTimeoutPromise = util.promisify( setTimeout );
let now = new Date();
logger.info( `A task request has been received` );

if( taskScheduled ){
logger.info( `A task has already been scheduled for ${taskTime} (${formatDistance( now, taskTime )})` );

} else {
taskTime = addMilliseconds( new Date(), delay );
logger.info( `A task was scheduled for ${taskTime} (${formatDistance( now, taskTime )})` );
taskScheduled = true;

setTimeoutPromise( delay )
.then( task )
.then( next )
.catch( () => {} ) // swallow
.finally( resetTaskSchedule ); // allow another backup request
}

return Promise.resolve();
});
};

// Configure Changefeeds for the document table
const setupChangefeeds = async ({ rethink: r, conn, table }) => {
const docOpts = {
includeTypes: true,
squash: true
};

// Database restore of doc with public status
const toPublicStatusFromNull = r.row( 'new_val' )( 'status' ).eq( 'public' )
.and( r.row( 'old_val' ).eq( null ) );
// Status changed to 'public'
const toPublicStatusFromOtherStatus = r.row( 'new_val' )( 'status' ).eq( 'public' )
.and( r.row( 'old_val' )( 'status' ).ne( 'public' ) );
// Status is changed from 'public'
const toOtherStatusFromPublicStatus = r.row( 'new_val' )( 'status' ).ne( 'public' )
.and( r.row( 'old_val' )( 'status' ).eq( 'public' ) );

const docFilter = toPublicStatusFromNull.or( toPublicStatusFromOtherStatus ).or( toOtherStatusFromPublicStatus );
const cursor = await table.changes( docOpts )
.filter( docFilter )
.run( conn );
return cursor;
};

/**
* initExportTasks
* Initialize the export tasks
*/
const initExportTasks = async () => {
const MS_PER_SEC = 1000;
const SEC_PER_MIN = 60;
const MIN_PER_HOUR = 60;

const loadTable = name => db.accessTable( name );
const dbTable = await loadTable( 'document' );
const cursor = await setupChangefeeds( dbTable );

let export_delay = MS_PER_SEC * SEC_PER_MIN * MIN_PER_HOUR * EXPORT_BULK_DELAY_HOURS;

const exportTask = () => exportToZip( BULK_DOWNLOADS_PATH );
const doExport = taskScheduler( exportTask, export_delay );

const exportBiopaxTask = () => exportToZip( BIOPAX_DOWNLOADS_PATH, [ EXPORT_TYPES.BP ], false );
const doExportBiopax = taskScheduler( exportBiopaxTask, export_delay );
const exportBiopaxIdMapTask = () => exportToZip( BIOPAX_IDMAP_DOWNLOADS_PATH, [ EXPORT_TYPES.BP ], true );
const doExportBiopaxIdMap = taskScheduler( exportBiopaxIdMapTask, export_delay );

let taskList = [ exportTask, exportBiopaxTask, exportBiopaxIdMapTask ];
let scheduledTaskList = [ doExport, doExportBiopax, doExportBiopaxIdMap ];

const doTasks = tasks => Promise.all( tasks.map( t => t() ) );

cursor.each( async err => {
if( err ){
logger.error( `Error in Changefeed: ${err}` );
return;
}
await doTasks( scheduledTaskList );
});

const isProduction = NODE_ENV == 'production';
const exportIsMissing = [
BULK_DOWNLOADS_PATH,
BIOPAX_DOWNLOADS_PATH,
BIOPAX_IDMAP_DOWNLOADS_PATH
].some( p => !fs.existsSync( p ) );

if ( isProduction || exportIsMissing ) await doTasks( taskList );
};


export { exportToZip, EXPORT_TYPES, initExportTasks };
Loading