Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ build
coverage
.nyc_output
data/member-a/blobs
data/member-b/blobs
data/member-a/destinations
data/member-b/blobs
data/member-b/destinations
4 changes: 2 additions & 2 deletions src/handlers/blobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ export const deliverBlob = async ({ blobPath, recipientID, recipientURL, request
let sender = peerID;
if(senderDestination !== undefined) {
formData.append('senderDestination', senderDestination);
sender += utils.constants.ID_SEGMENT_SEPARATOR + senderDestination
sender += '/' + senderDestination
}
let recipient = recipientID;
if(recipientDestination !== undefined) {
formData.append('recipientDestination', recipientDestination);
recipient += utils.constants.ID_SEGMENT_SEPARATOR + recipientDestination;
recipient += '/' + recipientDestination;
}
formData.append('blob', stream);
const httpsAgent = new https.Agent({ cert, key, ca });
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ export const deliverMessage = async ({ message, recipientID, recipientURL, reque
let sender = peerID;
if(senderDestination !== undefined) {
formData.append('senderDestination', senderDestination);
sender += utils.constants.ID_SEGMENT_SEPARATOR + senderDestination
sender += '/' + senderDestination
}
let recipient = recipientID;
if(recipientDestination !== undefined) {
formData.append('recipientDestination', recipientDestination);
recipient += utils.constants.ID_SEGMENT_SEPARATOR + recipientDestination;
recipient += '/' + recipientDestination;
}
formData.append('message', message);
log.trace(`Delivering message to ${recipient} at ${recipientURL}`);
Expand Down
35 changes: 26 additions & 9 deletions src/lib/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const ajv = new Ajv();
const validateConfig = ajv.compile(configSchema);
const configFilePath = path.join(utils.constants.DATA_DIRECTORY, utils.constants.CONFIG_FILE_NAME);
const peersFilePath = path.join(utils.constants.DATA_DIRECTORY, utils.constants.PEERS_FILE_NAME);
const destinationsFilePath = path.join(utils.constants.DATA_DIRECTORY, utils.constants.DESTINATIONS_FILE_NAME);

export let config: IConfig;

Expand All @@ -39,6 +40,18 @@ const loadConfig = async () => {
try {
log.debug(`Reading config file ${configFilePath}`);
const data = JSON.parse(await fs.readFile(configFilePath, 'utf8'));
try {
log.debug(`Reading destinations file ${destinationsFilePath}`);
data.destinations = JSON.parse(await fs.readFile(destinationsFilePath, 'utf8'));
} catch (err: any) {
// if file does not exist, just set destinations to an empty array
log.debug(`Error code when reading destinations file ${err.code}`);
if (err.code === 'ENOENT') {
data.destinations = data.destinations;
} else {
throw err;
}
}
try {
log.debug(`Reading peers file ${peersFilePath}`);
data.peers = JSON.parse(await fs.readFile(peersFilePath, 'utf8'));
Expand Down Expand Up @@ -68,28 +81,32 @@ const loadConfig = async () => {
};

export const persistPeers = async () => {
await ensurePeersDirectoryExists();
await ensureDirectoryExists(peersFilePath);
await fs.writeFile(peersFilePath, JSON.stringify(config.peers, null, 2));
};

export const persistDestinations = async () => {
await ensureDirectoryExists(destinationsFilePath);
await fs.writeFile(destinationsFilePath, JSON.stringify(config.destinations, null, 2));
};

const ensurePeersDirectoryExists = async () => {
const ensureDirectoryExists = async (directory: string) => {
try {
await fs.access(peersFilePath);
await fs.access(directory);
} catch(err: any) {
if(err.code === 'ENOENT') {
await createPeersDirectory();
await createDirectory(directory);
} else {
log.warn(`Could not check for existence of peers subdirectory ${err.code}`);
log.warn(`Could not check for existence of ${err.code}`);
}
}
};

const createPeersDirectory = async () => {
const createDirectory = async (directory: string) => {
try {
await fs.mkdir(path.parse(peersFilePath).dir, { recursive: true });
log.info('Peers subdirectory created');
await fs.mkdir(path.parse(directory).dir, { recursive: true });
log.info(`Directory ${directory} created`);
} catch(err: any) {
log.error(`Failed to create peers subdirectory ${err.code}`);
log.error(`Failed to create directory ${directory} ${err.code}`);
}
};
2 changes: 2 additions & 0 deletions src/lib/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ export interface IConfig {
endpoint?: string
}
apiKey?: string
destinations?: string[]
peers: {
id: string
endpoint: string
destinations?: string[]
}[]
jsonParserLimit?: string
}
Expand Down
6 changes: 3 additions & 3 deletions src/lib/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export const constants = {
CERT_FILE: 'cert.pem',
KEY_FILE: 'key.pem',
CA_FILE: 'ca.pem',
DESTINATIONS_FILE_NAME: 'destinations/data.json',
TRANSFER_HASH_ALGORITHM: 'sha256',
REST_API_CALL_MAX_ATTEMPTS: 5,
REST_API_CALL_RETRY_DELAY_MS: 500,
Expand All @@ -46,8 +47,7 @@ export const constants = {
SIZE_HEADER_NAME: 'dx-size',
LAST_UPDATE_HEADER_NAME: 'dx-last-update',
DEFAULT_JSON_PARSER_LIMIT: '1mb',
DEFAULT_MAX_INFLIGHT: 100,
ID_SEGMENT_SEPARATOR: '/'
DEFAULT_MAX_INFLIGHT: 100
};
const log = new Logger('utils.ts');
axios.defaults.timeout = constants.REST_API_CALL_REQUEST_TIMEOUT;
Expand Down Expand Up @@ -140,7 +140,7 @@ export const axiosWithRetry = async (config: AxiosRequestConfig) => {
const data = err.response?.data;
log.error(`${config.method} ${config.url} attempt ${attempts} [${err.response?.status}]`, (data && !data.on) ? data : err.stack);
if (err.response?.status === 404) {
throw err;
throw data.error ? new Error(data.error) : err;
} else {
currentError = err;
attempts++;
Expand Down
142 changes: 86 additions & 56 deletions src/routers/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import * as blobsHandler from '../handlers/blobs';
import * as eventsHandler from '../handlers/events';
import * as messagesHandler from '../handlers/messages';
import { ca, cert, certBundle, key, peerID } from '../lib/cert';
import { config, persistPeers } from '../lib/config';
import { config, persistDestinations, persistPeers } from '../lib/config';
import { IStatus } from '../lib/interfaces';
import RequestError from '../lib/request-error';
import * as utils from '../lib/utils';
Expand All @@ -41,7 +41,8 @@ router.get('/id', async (_req, res, next) => {
res.send({
id: peerID,
endpoint: config.p2p.endpoint ?? `https://${config.p2p.hostname}:${config.p2p.port}`,
cert: certBundle
cert: certBundle,
destinations: config.destinations
});
} catch (err) {
next(err);
Expand Down Expand Up @@ -82,24 +83,42 @@ router.get('/peers', (_req, res) => {
res.send(config.peers);
});

router.put('/peers/:id', async (req, res, next) => {
router.put('/peers/:id/:destination?', async (req, res, next) => {
try {
if (req.body.endpoint === undefined) {
throw new RequestError('Missing endpoint', 400);
}
if (req.body.cert !== undefined) {
await fs.writeFile(path.join(utils.constants.DATA_DIRECTORY, utils.constants.PEER_CERTS_SUBDIRECTORY, `${req.params.id}.pem`), req.body.cert);
}
let peer = config.peers.find(peer => peer.id === req.params.id);
if (peer === undefined) {
peer = {
id: req.params.id,
endpoint: req.body.endpoint
};
config.peers.push(peer);
if (req.params.id === peerID) {
if (req.params.destination !== undefined) {
if (config.destinations === undefined) {
config.destinations = [req.params.destination]
} else if (!config.destinations.includes(req.params.destination)) {
config.destinations.push(req.params.destination);
}
await persistDestinations();
}
} else {
let peer = config.peers.find(peer => peer.id === req.params.id);
if (peer === undefined) {
if (req.body.endpoint === undefined) {
throw new RequestError('Missing endpoint', 400);
}
peer = {
id: req.params.id,
endpoint: req.body.endpoint
};
config.peers.push(peer);
}
if (req.params.destination !== undefined) {
if (peer.destinations === undefined) {
peer.destinations = [req.params.destination];
} else if (!peer.destinations.includes(req.params.destination)) {
peer.destinations.push(req.params.destination);
}
}
await persistPeers();
if (req.body.cert !== undefined) {
await fs.writeFile(path.join(utils.constants.DATA_DIRECTORY, utils.constants.PEER_CERTS_SUBDIRECTORY, `${req.params.id}.pem`), req.body.cert);
await refreshCACerts();
}
}
await persistPeers();
await refreshCACerts();
res.send({ status: 'added' });
} catch (err) {
next(err);
Expand Down Expand Up @@ -133,40 +152,46 @@ router.post('/messages', async (req, res, next) => {
}
let senderDestination: string | undefined = undefined;
if (typeof req.body.sender === 'string') {
if (!req.body.sender.startsWith(peerID)) {
throw new RequestError('Invalid sender');
} else {
const destination = req.body.sender.substring(peerID.length + 1);
if(destination.length > 0) {
senderDestination = destination;
const segments = req.body.sender.split('/');
if (segments[0] !== peerID) {
throw new RequestError(`Sender ID mismatch expected=${peerID} recieved=${segments[0]}`, 400);
}
if (segments.length > 1) {
if (!config.destinations?.includes(segments[1])) {
throw new RequestError(`Unknown sender destination expected=${config.destinations?.join('|') ?? 'none'} recieved=${segments[1]}`, 400);
}
senderDestination = segments[1];
}
}
let recipientID: string;
let recipientDestination: string | undefined = undefined;
if (typeof req.body.recipient === 'string') {
const index = req.body.recipient.indexOf(utils.constants.ID_SEGMENT_SEPARATOR);
if (index !== -1) {
recipientID = req.body.recipient.substring(0, index);
const destination = req.body.recipient.substring(index + 1);
if(destination.length > 0) {
recipientDestination = destination;
}
} else {
recipientID = req.body.recipient;
const segments = req.body.recipient.split('/');
recipientID = segments[0];
if (segments.length > 1) {
recipientDestination = segments[1];
}
} else {
throw new RequestError('Missing recipient', 400);
}
let recipientURL = config.peers.find(peer => peer.id === recipientID)?.endpoint;
if (recipientURL === undefined) {
throw new RequestError(`Unknown recipient`, 400);
let recipientEndpoint: string;
if (recipientID === peerID) {
recipientEndpoint = config.p2p.endpoint ?? `https://${config.p2p.hostname}:${config.p2p.port}`;
} else {
let recipientPeer = config.peers.find(peer => peer.id === recipientID);
if (recipientPeer === undefined) {
throw new RequestError(`Unknown recipient ${recipientID}`, 400);
}
recipientEndpoint = recipientPeer.endpoint;
if (recipientDestination !== undefined && !recipientPeer.destinations?.includes(recipientDestination)) {
throw new RequestError(`Unknown recipient destination expected=${recipientPeer.destinations?.join('|') ?? 'none'} recieved=${recipientDestination}`, 400);
}
}
let requestId = uuidV4();
if (typeof req.body.requestId === 'string') {
requestId = req.body.requestId;
}
messagesHandler.sendMessage(req.body.message, recipientID, recipientURL, requestId, senderDestination, recipientDestination);
messagesHandler.sendMessage(req.body.message, recipientID, recipientEndpoint, requestId, senderDestination, recipientDestination);
res.send({ requestId });
} catch (err) {
next(err);
Expand Down Expand Up @@ -231,40 +256,45 @@ router.post('/transfers', async (req, res, next) => {
await blobsHandler.retrieveMetadata(req.body.path);
let senderDestination: string | undefined = undefined;
if (typeof req.body.sender === 'string') {
if (!req.body.sender.startsWith(peerID)) {
throw new RequestError('Invalid sender');
} else {
const destination = req.body.sender.substring(peerID.length + 1);
if(destination.length > 0) {
senderDestination = destination;
const segments = req.body.sender.split('/');
if (segments[0] !== peerID) {
throw new RequestError(`Sender ID mismatch expected=${peerID} recieved=${segments[0]}`, 400);
}
if (segments.length > 1) {
if (!config.destinations?.includes(segments[1])) {
throw new RequestError(`Unknown sender destination expected=${config.destinations?.join('|')} recieved=${segments[1]}`, 400);
}
senderDestination = segments[1];
}
}
let recipientID: string;
let recipientDestination: string | undefined = undefined;
if (typeof req.body.recipient === 'string') {
const index = req.body.recipient.indexOf(utils.constants.ID_SEGMENT_SEPARATOR);
if (index !== -1) {
recipientID = req.body.recipient.substring(0, index);
const destination = req.body.recipient.substring(index + 1);
if(destination.length > 0) {
recipientDestination = destination;
}
} else {
recipientID = req.body.recipient;
const segments = req.body.recipient.split('/');
recipientID = segments[0];
if (segments.length > 1) {
recipientDestination = segments[1];
}
} else {
throw new RequestError('Missing recipient', 400);
}
let recipientURL = config.peers.find(peer => peer.id === recipientID)?.endpoint;
if (recipientURL === undefined) {
throw new RequestError(`Unknown recipient`, 400);
let recipientEndpoint: string;
recipientEndpoint = config.p2p.endpoint ?? `https://${config.p2p.hostname}:${config.p2p.port}`;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks wrong... maybe this was supposed to be inside the if below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Precisely

if (recipientID === peerID) {
} else {
let recipientPeer = config.peers.find(peer => peer.id === recipientID);
if (recipientPeer === undefined) {
throw new RequestError(`Unknown recipient`, 400);
}
if (recipientDestination !== undefined && !recipientPeer.destinations?.includes(recipientDestination)) {
throw new RequestError(`Unknown recipient destination expected=${recipientPeer.destinations?.join('|')} recieved=${recipientDestination}`, 400);
}
}
let requestId = uuidV4();
if (typeof req.body.requestId === 'string') {
requestId = req.body.requestId;
}
blobsHandler.sendBlob(req.body.path, recipientID, recipientURL, requestId, senderDestination, recipientDestination);
blobsHandler.sendBlob(req.body.path, recipientID, recipientEndpoint, requestId, senderDestination, recipientDestination);
res.send({ requestId });
} catch (err) {
next(err);
Expand Down
Loading