Skip to content
Merged

V0.19.0 #1038

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5771228
- semantic search endpoint /search
jvwong Jun 18, 2021
d324770
Merge pull request #1028 from PathwayCommons/v0.18.0
jvwong Dec 14, 2021
811173b
Merge branch 'unstable' into iss990_semantic-search-api
jvwong Dec 14, 2021
041920f
Use a development version of semantic search now.
jvwong Dec 16, 2021
4a0d0d9
Accommodate the new schema for semantic search.
jvwong Dec 16, 2021
3e5c230
Merge pull request #1026 from PathwayCommons/iss990_semantic-search-api
jvwong Dec 16, 2021
cfdc112
Listen to db changes to create bulk downloads file.
jvwong Jan 5, 2022
c062eac
Config for task delay; encapsulating the logic for task scheduling.
jvwong Jan 6, 2022
7fd7600
Serve the biopax zip file; Use Changefeeds and.task scheduler to recr…
jvwong Jan 6, 2022
d8e3c00
Include env variables for export scheduler
jvwong Jan 6, 2022
4fcdd8b
Typo in env variable description for exports
jvwong Jan 6, 2022
a053cd4
- Wait for server to run before scheduling exports
jvwong Jan 6, 2022
fc7923c
db capability to arbitrarily index table
jvwong Jan 18, 2022
3eb38c1
Trigger index creation at server start
jvwong Jan 18, 2022
e40f808
Use the createdDate index in web api.
jvwong Jan 18, 2022
a3bc9e6
Merge pull request #1035 from PathwayCommons/iss0131_db-index
jvwong Jan 20, 2022
3a8b7f7
Log errors in handler for zip downloads.
jvwong Jan 20, 2022
5ba402b
Merge 'unstable' to update route handlers as independent functions.
jvwong Jan 20, 2022
920d5f6
Create exports when in production OR there is a missing file (i.e. so…
jvwong Jan 20, 2022
ab0077a
Extract the logic from the endpoints for json, biopax and sbgn. Remov…
jvwong Jan 21, 2022
6b97a18
Strip out unused parameters (baseUrl, PORT).
jvwong Jan 21, 2022
a125ddb
Remove any delay in export of different types.
jvwong Jan 26, 2022
ba5fb2d
Merge pull request #1034 from PathwayCommons/iss1032_export-on-change
jvwong Jan 26, 2022
07dec8c
0.19.0
jvwong Jan 26, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ Database:
- `DB_PASS` : password if the db uses auth (undefined by default)
- `DB_CERT` : local file path to certificate (cert) file if the db uses ssl (undefined by default)

Downloads:

- `BULK_DOWNLOADS_PATH` : relative path to bulk downloads
- `BIOPAX_DOWNLOADS_PATH` : relative path to biopax downloads
- `BIOPAX_IDMAP_DOWNLOADS_PATH` : relative path to id-mapped biopax downloads
- `EXPORT_BULK_DELAY_HOURS` : period to delay (batch) export tasks

Services:

- `DEFAULT_CACHE_SIZE` : default max number of entries in each cache
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,5 @@
"engines": {
"node": ">=10.0.0"
},
"version": "0.18.0"
"version": "0.19.0"
}
8 changes: 7 additions & 1 deletion src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ export const DB_USER = env('DB_USER', undefined); // username if db uses auth
export const DB_PASS = env('DB_PASS', undefined); // password if db uses auth
export const DB_CERT = env('DB_CERT', undefined); // path to a certificate (cert) file if db uses ssl

// Downloads
export const BULK_DOWNLOADS_PATH = env('BULK_DOWNLOADS_PATH', 'download/factoid_bulk.zip');
export const BIOPAX_DOWNLOADS_PATH = env('BIOPAX_DOWNLOADS_PATH', 'download/factoid_biopax.zip');
export const BIOPAX_IDMAP_DOWNLOADS_PATH = env('BIOPAX_IDMAP_DOWNLOADS_PATH', 'download/factoid_biopax_with_id_mapping.zip');
export const EXPORT_BULK_DELAY_HOURS = env('EXPORT_BULK_DELAY_HOURS', 0.25);

// Services
export const PC_URL = env('PC_URL', 'https://apps.pathwaycommons.org/');
export const REACH_URL = env('REACH_URL', 'http://reach.baderlab.org/api/uploadFile');
Expand All @@ -65,7 +71,7 @@ export const NCBI_EUTILS_BASE_URL = env('NCBI_EUTILS_BASE_URL', 'https://eutils.
export const NCBI_EUTILS_API_KEY = env('NCBI_EUTILS_API_KEY', 'b99e10ebe0f90d815a7a99f18403aab08008');
export const INDRA_DB_BASE_URL = env('INDRA_DB_BASE_URL', 'https://db.indra.bio/latest/');
export const INDRA_ENGLISH_ASSEMBLER_URL = env('INDRA_ENGLISH_ASSEMBLER_URL', 'http://api.indra.bio:8000/assemblers/english');
export const SEMANTIC_SEARCH_BASE_URL = env('SEMANTIC_SEARCH_BASE_URL', 'https://semanticsearch.baderlab.org/');
export const SEMANTIC_SEARCH_BASE_URL = env('SEMANTIC_SEARCH_BASE_URL', 'https://main.semanticsearch.baderlab.org/');

// Links
export const UNIPROT_LINK_BASE_URL = env('UNIPROT_LINK_BASE_URL', 'http://www.uniprot.org/uniprot/');
Expand Down
19 changes: 19 additions & 0 deletions src/server/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,26 @@ let db = {
table: table
};
} );
},

guaranteeIndex: function( tableName, secondaryKey ){
let t, c, r;
return this.accessTable( tableName )
.then( ({ table, conn, rethink }) => {
t = table;
c = conn;
r = rethink;
})
.then( () => {
return r.branch(
t.indexList().contains( secondaryKey ).not(),
t.indexCreate( secondaryKey ),
null
).run( c );
})
.then( () => t.indexWait( secondaryKey ).run( c ) );
}

};

export default db;
42 changes: 0 additions & 42 deletions src/server/document-util.js

This file was deleted.

13 changes: 10 additions & 3 deletions src/server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import cron from 'node-cron';
import updateCron from './update-cron';
import { Appsignal } from '@appsignal/nodejs';
import { expressMiddleware as asExpressMiddleware, expressErrorHandler as asExpressErrorHandler } from '@appsignal/express';
import { initExportTasks } from './routes/api/document/export';

let app = express();
let server = http.createServer(app);
Expand Down Expand Up @@ -144,18 +145,24 @@ tryPromise( () => {
.then( log('Accessed table "%s"', name) )
.then( t => synch( t, name ) )
.then( log('Set up synching for "%s"', name) )
.then( () => db.guaranteeIndex( 'document', 'createdDate' ) )
.then( log('Set up index for document') )
;
};

const tables = ['element', 'document'];
return tables.reduce( ( p, name ) => p.then( () => setup( name ) ), Promise.resolve() );
} ).then( () => {
} )
.then( () => {
cron.schedule( config.CRON_SCHEDULE, () => {
updateCron();
});
} ).then( () => {
} )
.then( () => {
server.listen(port);
} );
} )
.then( initExportTasks )
;

function normalizePort(val) {
let port = parseInt(val, 10);
Expand Down
163 changes: 139 additions & 24 deletions src/server/routes/api/document/export.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import fetch from 'node-fetch';
import JSZip from 'jszip';
import _ from 'lodash';
import fs from 'fs';
import logger from '../../../logger';
import util from 'util';
import addMilliseconds from 'date-fns/addMilliseconds';
import formatDistance from 'date-fns/formatDistance';

import logger from '../../../logger';
import db from '../../../db';
import Document from '../../../../model/document';

import { convertDocumentToBiopax,
convertDocumentToJson,
convertDocumentToSbgn } from '../../../document-util';

import { checkHTTPStatus } from '../../../../util';
import { getBioPAX, getSBGN, getDocuments, getDocumentJson } from './index';
import {
BULK_DOWNLOADS_PATH,
BIOPAX_DOWNLOADS_PATH,
EXPORT_BULK_DELAY_HOURS,
BIOPAX_IDMAP_DOWNLOADS_PATH,
NODE_ENV
} from '../../../../config';

const DOCUMENT_STATUS_FIELDS = Document.statusFields();
const CHUNK_SIZE = 20;
Expand All @@ -20,20 +25,21 @@ const EXPORT_TYPES = Object.freeze({
'SBGN': 'sbgn'
});

const exportToZip = (baseUrl, zipPath, types, biopaxIdMapping) => {
const exportToZip = (zipPath, types, biopaxIdMapping) => {
let offset = 0;
let zip = new JSZip();

const processNext = () => {
return fetch(`${baseUrl}/api/document?limit=${CHUNK_SIZE}&offset=${offset}`)
.then( checkHTTPStatus )
.then( res => res.json() )
.then( res => {
offset += res.length;
if ( res.length > 0 ) {
const includedStatuses = [DOCUMENT_STATUS_FIELDS.PUBLIC];
const shouldInclude = doc => _.includes(includedStatuses, doc.status);
let ids = res.filter( shouldInclude ).map( doc => doc.id );

return getDocuments({
limit: CHUNK_SIZE,
offset,
status: [DOCUMENT_STATUS_FIELDS.PUBLIC]
})
.then( ({ results }) => {
offset += results.length;
if ( results.length > 0 ) {
let ids = results.map( doc => doc.id );
return addToZip( ids, biopaxIdMapping ).then( processNext );
}

Expand All @@ -50,11 +56,12 @@ const exportToZip = (baseUrl, zipPath, types, biopaxIdMapping) => {
};

const addToZip = ( ids, biopaxIdMapping ) => {
let _convertDocumentToBiopax = ( id, baseUrl ) => convertDocumentToBiopax( id, baseUrl, biopaxIdMapping );
let _getBioPAX = id => getBioPAX( id, biopaxIdMapping );
let _getDocumentJson = id => getDocumentJson( id ).then( r => JSON.stringify( r ) );
let typeToConverter = {
[EXPORT_TYPES.JSON]: convertDocumentToJson,
[EXPORT_TYPES.BP]: _convertDocumentToBiopax,
[EXPORT_TYPES.SBGN]: convertDocumentToSbgn
[EXPORT_TYPES.JSON]: _getDocumentJson,
[EXPORT_TYPES.BP]: _getBioPAX,
[EXPORT_TYPES.SBGN]: getSBGN
};

let typeToExt = {
Expand All @@ -69,7 +76,7 @@ const exportToZip = (baseUrl, zipPath, types, biopaxIdMapping) => {
}

const idToFiles = id => types.map( t => {
return typeToConverter[ t ]( id, baseUrl )
return typeToConverter[ t ]( id )
.catch( () => {
logger.error(`Error in export: cannot convert the document ${id} into ${t}`);
return null;
Expand Down Expand Up @@ -99,4 +106,112 @@ const exportToZip = (baseUrl, zipPath, types, biopaxIdMapping) => {
return processNext();
};

export { exportToZip, EXPORT_TYPES };
/**
* scheduleTask
* Schedule a task in 'delay' ms. Ignore additional requests while scheduled.
*
* @param {object} task The task to execute
* @param {number} delay ms delay for task execution (default 0)
* @param {object} next The callback to run after a task is initiated
*/
const taskScheduler = ( task, delay = 0, next = () => {} ) => {
let taskScheduled = false;
let taskTime = null;
const resetTaskSchedule = () => { taskScheduled = false; taskTime = null; };

return (() => {
const setTimeoutPromise = util.promisify( setTimeout );
let now = new Date();
logger.info( `A task request has been received` );

if( taskScheduled ){
logger.info( `A task has already been scheduled for ${taskTime} (${formatDistance( now, taskTime )})` );

} else {
taskTime = addMilliseconds( new Date(), delay );
logger.info( `A task was scheduled for ${taskTime} (${formatDistance( now, taskTime )})` );
taskScheduled = true;

setTimeoutPromise( delay )
.then( task )
.then( next )
.catch( () => {} ) // swallow
.finally( resetTaskSchedule ); // allow another backup request
}

return Promise.resolve();
});
};

// Configure Changefeeds for the document table
const setupChangefeeds = async ({ rethink: r, conn, table }) => {
const docOpts = {
includeTypes: true,
squash: true
};

// Database restore of doc with public status
const toPublicStatusFromNull = r.row( 'new_val' )( 'status' ).eq( 'public' )
.and( r.row( 'old_val' ).eq( null ) );
// Status changed to 'public'
const toPublicStatusFromOtherStatus = r.row( 'new_val' )( 'status' ).eq( 'public' )
.and( r.row( 'old_val' )( 'status' ).ne( 'public' ) );
// Status is changed from 'public'
const toOtherStatusFromPublicStatus = r.row( 'new_val' )( 'status' ).ne( 'public' )
.and( r.row( 'old_val' )( 'status' ).eq( 'public' ) );

const docFilter = toPublicStatusFromNull.or( toPublicStatusFromOtherStatus ).or( toOtherStatusFromPublicStatus );
const cursor = await table.changes( docOpts )
.filter( docFilter )
.run( conn );
return cursor;
};

/**
* initExportTasks
* Initialize the export tasks
*/
const initExportTasks = async () => {
const MS_PER_SEC = 1000;
const SEC_PER_MIN = 60;
const MIN_PER_HOUR = 60;

const loadTable = name => db.accessTable( name );
const dbTable = await loadTable( 'document' );
const cursor = await setupChangefeeds( dbTable );

let export_delay = MS_PER_SEC * SEC_PER_MIN * MIN_PER_HOUR * EXPORT_BULK_DELAY_HOURS;

const exportTask = () => exportToZip( BULK_DOWNLOADS_PATH );
const doExport = taskScheduler( exportTask, export_delay );

const exportBiopaxTask = () => exportToZip( BIOPAX_DOWNLOADS_PATH, [ EXPORT_TYPES.BP ], false );
const doExportBiopax = taskScheduler( exportBiopaxTask, export_delay );
const exportBiopaxIdMapTask = () => exportToZip( BIOPAX_IDMAP_DOWNLOADS_PATH, [ EXPORT_TYPES.BP ], true );
const doExportBiopaxIdMap = taskScheduler( exportBiopaxIdMapTask, export_delay );

let taskList = [ exportTask, exportBiopaxTask, exportBiopaxIdMapTask ];
let scheduledTaskList = [ doExport, doExportBiopax, doExportBiopaxIdMap ];

const doTasks = tasks => Promise.all( tasks.map( t => t() ) );

cursor.each( async err => {
if( err ){
logger.error( `Error in Changefeed: ${err}` );
return;
}
await doTasks( scheduledTaskList );
});

const isProduction = NODE_ENV == 'production';
const exportIsMissing = [
BULK_DOWNLOADS_PATH,
BIOPAX_DOWNLOADS_PATH,
BIOPAX_IDMAP_DOWNLOADS_PATH
].some( p => !fs.existsSync( p ) );

if ( isProduction || exportIsMissing ) await doTasks( taskList );
};


export { exportToZip, EXPORT_TYPES, initExportTasks };
Loading