Skip to content

Commit

Permalink
Merged in EPOC-394-update-oh-orchestrator-to-compa (pull request #2)
Browse files Browse the repository at this point in the history
EPOC-394 update oh orchestrator to compa

Approved-by: Abdullah Abdulqader
Approved-by: Roman Teyibov
Approved-by: Metin Barut
  • Loading branch information
kevintoormimik authored and metinbarutmimik committed Mar 16, 2021
2 parents f9fd7ed + 5575a1c commit bcd6776
Show file tree
Hide file tree
Showing 21 changed files with 555 additions and 128 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ lib-cov
.lock-wscript

# Compiled binary addons (http://nodejs.org/api/addons.html)
build/Release
build
coverage

# Dependency directories
Expand Down
2 changes: 2 additions & 0 deletions local/start-example.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,7 @@
"HZN_EXCHANGE_URL": "http://localhost:3090/v1/",
"HZN_CSS_URL": "http://localhost:9443",
"HZN_EXCHANGE_USER_AUTH": "--horizon-exchange-user-auth-(username:password)--",
"HZN_ESS_TRACKED_OBJECT_TYPES": "--horizon-ess-object-types-to-sync--",
"NODE_TLS_REJECT_UNAUTHORIZED": "0", // In order to use self signed ess certificates
"NODE_ENV": "local"
}
21 changes: 19 additions & 2 deletions src/configuration/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ const pack = require('../../package.json');
* | HZN_EXCHANGE_USER_AUTH | Hzn exchange user auth to be used for registering anax nodes | | example: admin:password
* | HZN_DEFAULT_NODE_TOKEN | Hzn node token to use to register anax node with exchange | nodeToken | default nodeId is first 6 chars of edge nodeId. So node auth will be nodeId:nodeToken
* | HZN_CLI_CONFIG_FILE | File location where hzn config is stored | /etc/default/horizon | example file content: HZN_EXCHANGE_URL=http://192.168.1.77:3090/v1/\nHZN_FSS_CSSURL=http://192.168.1.77:9443\n
* | HZN_ESS_TRACKED_OBJECT_TYPES | HZN Object types to fetch from ESS and serve using mCDN | | example: ml_model,reco_model
* | DOCKER_SOCKET_PATH | Path to the docker daemon socket | /var/run/docker.sock |
* | NODE_POLICIES_DIR | Directory to temporarily store node policies in | /var/tmp/oh/policies |
* | ANAX_STORAGE_BASE_PATH_DIR | Directory to store anax data for container in | /var/tmp/oh/storage |
* | ANAX_CONTAINERS_STORAGE_DIR | Directory to store anax data for container in | /var/tmp/oh/storage |
* | EDGE_NODES_SYNC_JOB_INTERVAL | Job interval to sync edge nodes using super (gateway) mdeploy | 60 |
* | GATEWAY_NODE_SYNC_JOB_INTERVAL | Job interval to sync gateway node using super (gateway) mdeploy | 120 |
* | ANAX_CONTAINERS_PORT_NUM_START | Port range starting point to use for anax containers | 8200 |
Expand All @@ -37,8 +38,15 @@ const pack = require('../../package.json');
module.exports = (() => {
const edgeEngineUrl = process.env.EDGE_ENGINE_URL || 'http://localhost:8083';
const edgeEngineProjectId = process.env.EDGE_ENGINE_PROJECT_ID;

const edgeEngineMdeployEndpoint = process.env.EDGE_ENGINE_MDEPLOY_ENDPOINT || '/mdeploy/v1';

const trackedObjectTypesStr = process.env.HZN_ESS_TRACKED_OBJECT_TYPES;
let trackedObjectTypes;
if (trackedObjectTypesStr && trackedObjectTypesStr !== '') {
trackedObjectTypes = trackedObjectTypesStr.split(',');
}

const configuration = setConfig(pack, {
dependencies: {
MDEPLOY: {
Expand All @@ -58,15 +66,24 @@ module.exports = (() => {
exchangeUserAuth: process.env.HZN_EXCHANGE_USER_AUTH,
defaultNodeToken: process.env.HZN_DEFAULT_NODE_TOKEN || 'nodeToken',
cliConfigFile: process.env.HZN_CLI_CONFIG_FILE || '/etc/default/horizon',
ess: {
trackedObjectTypes,
gatewayDeploymentPropertyType: process.env.HZN_ESS_GATEWAY_DEPLOYMENT_PROPERTY_TYPE || 'deployment',
gatewayDeploymentPropertyName: process.env.HZN_ESS_GATEWAY_DEPLOYMENT_PROPERTY_NAME || 'location',
gatewayDeploymentPropertyValue: process.env.HZN_ESS_GATEWAY_DEPLOYMENT_PROPERTY_VALUE || 'gatewayNode',
},
},
edgeEngine: {
url: edgeEngineUrl,
projectId: edgeEngineProjectId,
mdeployEndpoint: edgeEngineMdeployEndpoint,
},
mcdnAuthToken: process.env.MCDN_AUTH_TOKEN || '1234',
dockerSocketPath: process.env.DOCKER_SOCKET_PATH || '/var/run/docker.sock',
nodePoliciesDir: process.env.NODE_POLICIES_DIR || '/var/tmp/oh/policies',
anaxStorageBasePathDir: process.env.ANAX_STORAGE_BASE_PATH_DIR || '/var/tmp/oh/storage',
essObjectsStorageDir: process.env.ESS_OBJECTS_STORAGE_DIR || '/var/tmp/oh/essStorage',
anaxContainersStorageDir: process.env.ANAX_CONTAINERS_STORAGE_DIR || '/var/tmp/oh/anaxStorage',
essObjectsPollingInterval: parseInt(process.env.ESS_OBJECTS_POLLING_INTERVAL, 10) || 30000,
edgeNodesSyncJobInterval: parseInt(process.env.EDGE_NODES_SYNC_JOB_INTERVAL, 10) || 60,
gatewayNodeSyncJobInterval: parseInt(process.env.GATEWAY_NODE_SYNC_JOB_INTERVAL, 10) || 120,
anaxContainersPortNumStart: parseInt(process.env.ANAX_CONTAINERS_PORT_NUM_START, 10) || 8200,
Expand Down
24 changes: 24 additions & 0 deletions src/external/anaxRequests.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
const rp = require('request-promise');

const fetchActiveAgreements = (containerPort, correlationId) => rp({
uri: `http://localhost:${containerPort}/agreement`,
})
.then((response) => {
let parsedResponse = {};

try {
parsedResponse = JSON.parse(response);
}
catch (e) {
throw new Error('Error occured while parsing response from Anax');
}
if (!parsedResponse.agreements) return [];
return parsedResponse.agreements.active;
})
.catch((error) => {
console.log('===> Error occured while fetching agreements', error);
});

module.exports = {
fetchActiveAgreements,
};
8 changes: 4 additions & 4 deletions src/external/edgedaemonRequests.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ const getNodes = (correlationId) => {
.then((res) => res.data);
};

const findNode = (nodeId, correlationId) => getNodes(correlationId)
.then((nodes) => nodes.find((node) => node.id === nodeId));

const createNode = (id, dockerSocketPath, correlationId) => {
const rpOptions = {
method: 'POST',
Expand All @@ -36,9 +39,6 @@ const createNode = (id, dockerSocketPath, correlationId) => {
json: true,
};
return rpAuth('EDGEDAEMON', rpOptions)
.catch((err) => {
throw getRichError('System', 'Failed to create node in edgedaemon', { id, dockerSocketPath }, err, 'error', correlationId);
})
.then((res) => res.data);
};

Expand All @@ -59,7 +59,7 @@ const deleteNode = (id, correlationId) => {
};

module.exports = {
getNodes,
findNode,
createNode,
deleteNode,
};
64 changes: 64 additions & 0 deletions src/external/essRequests/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
const { getRequestData } = require('./requestDataHelper');

const {
dataRequest,
fileDownloadRequest,
} = require('./socketHelper');

const ESS_REQUEST_BASE_PATH = 'https://localhost/api/v1';

const getObjectsByType = (nodeId, agreementId, objectType, correlationId) => getRequestData(nodeId, agreementId, correlationId)
.then((requestData) => {
const request = {
method: 'GET',
path: `${ESS_REQUEST_BASE_PATH}/objects/${objectType}`,
};

const completeRequest = { ...request, ...requestData };
return dataRequest(nodeId, completeRequest, correlationId);
});

const markObjectReceived = (nodeId, agreementId, objectType, objectId, correlationId) => getRequestData(nodeId, agreementId, correlationId)
.then((requestData) => {
const request = {
method: 'PUT',
path: `${ESS_REQUEST_BASE_PATH}/objects/${objectType}/${objectId}/received`,
};

const completeRequest = { ...request, ...requestData };
return dataRequest(nodeId, completeRequest, correlationId);
});

const downloadObjectFile = (nodeId, agreementId, objectType, objectId, outputFilePath, correlationId) => getRequestData(nodeId, agreementId, correlationId)
.then((requestData) => {
const request = {
method: 'GET',
path: `${ESS_REQUEST_BASE_PATH}/objects/${objectType}/${objectId}/data`,
};

const completeRequest = { ...request, ...requestData };

return fileDownloadRequest(nodeId, outputFilePath, completeRequest, correlationId);
});

const establishObectTypeWebhook = (nodeId, agreementId, objectType, receiverUrl, correlationId) => getRequestData(nodeId, agreementId, correlationId)
.then((requestData) => {
const request = {
method: 'PUT',
path: `${ESS_REQUEST_BASE_PATH}/objects/${objectType}`,
body: JSON.stringify({
action: 'register',
url: receiverUrl,
}),
};

const completeRequest = { ...request, ...requestData };
return dataRequest(nodeId, completeRequest, correlationId);
});

module.exports = {
getObjectsByType,
downloadObjectFile,
markObjectReceived,
establishObectTypeWebhook,
};
66 changes: 66 additions & 0 deletions src/external/essRequests/requestDataHelper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
const Promise = require('bluebird');
const fs = require('fs-extra');

const { anaxContainersStorageDir } = require('../../configuration/config');
const { shortenNodeId } = require('../../util/nodeUtil');

const getRequestData = (nodeId, agreementId, correlationId) => Promise.resolve()
.then(() => {
const authDataPromises = [];

const shortenedNodeId = shortenNodeId(nodeId);
const certFilePath = `${anaxContainersStorageDir}/${shortenedNodeId}/ess-auth/SSL/cert/cert.pem`;
const essSocketFilePath = `${anaxContainersStorageDir}/${shortenedNodeId}/fss-domain-socket/essapi.sock`;
const authKeyFilePath = `${anaxContainersStorageDir}/${shortenedNodeId}/ess-auth/${agreementId}/auth.json`;

console.log('===> files', {
certFilePath,
essSocketFilePath,
authKeyFilePath,
});

authDataPromises.push(fs.access(essSocketFilePath)
.then(() => essSocketFilePath)
.catch((err) => {
throw new Error(`Failed to find/access to socketPath for nodeId, error: ${err}`);
}));

authDataPromises.push(fs.readJSON(authKeyFilePath)
.catch((err) => {
throw new Error(`ESS Auth key file cannot be read, error: ${err}`);
})
.then(({ id, token }) => Buffer.from(`${id}:${token}`).toString('base64')));

authDataPromises.push(fs.readFile(certFilePath)
.then((content) => {
if (!content) {
throw new Error('ESS Cert file does not contain anything');
}

let fetchedCert;
try {
fetchedCert = Buffer.from(content).toString();
return fetchedCert;
}
catch (err) {
throw new Error(`ESS Cert file content cannot be converted to string, error: ${err}`);
}
}));

return Promise.all(authDataPromises)
.then(([socketPath, authToken, cert]) => {
const requestData = {
cert,
headers: {
Authorization: `Basic ${authToken}`,
},
socketPath,
};

return requestData;
});
});

module.exports = {
getRequestData,
};
70 changes: 70 additions & 0 deletions src/external/essRequests/socketHelper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
const Promise = require('bluebird');
const https = require('https');
const fs = require('fs-extra');

const dataRequest = (nodeId, request, correlationId) => new Promise((resolve, reject) => {
console.log('===> in dataRequest');
console.log('===> nodeId', nodeId);

const callback = (res) => {
let allData = '';
res.setEncoding('utf8');

res.on('data', (data) => {
allData += data;
});

res.on('error', (error) => {
reject(new Error(`Received error from ESS socket, error: ${error}`));
});

res.on('close', () => {
let result;
try {
result = JSON.parse(allData);
}
catch (e) {
result = allData;
}

const response = {};
response.headers = res.headers;
response.status = {
code: res.statusCode,
message: res.statusMessage,
};

resolve(result);
});
};

const clientRequest = https.request(request, callback);
if (request.body) clientRequest.write(request.body);
clientRequest.end();
});

const fileDownloadRequest = (nodeId, outputFilePath, request, correlationId) => new Promise((resolve, reject) => {
const dest = fs.createWriteStream(outputFilePath);

const callback = (res) => {
res.on('data', (data) => {
dest.write(data);
});

res.on('error', (error) => {
reject(new Error(`Received error from ESS socket, error: ${error}`));
});

res.on('close', () => {
resolve();
});
};
const clientRequest = https.request(request, callback);
if (request.body) clientRequest.write(request.body);
clientRequest.end();
});

module.exports = {
dataRequest,
fileDownloadRequest,
};
42 changes: 42 additions & 0 deletions src/external/mCDNRequests.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
const rp = require('request-promise');
const fs = require('fs-extra');

const {
mcdnAuthToken,
edgeEngine: {
projectId,
},
} = require('../configuration/config');

const mCDNURL = `http://${gatewayNodeIpAddress}:8083/${projectId}/mcdn/v1`;
const MCDN_FILES_ENDPOINT = `${mCDNURL}/files`;

const postFile = (nodeId, pathName, fileName, localFilePath, correlationId) => {
const mCDNFilePath = `${MCDN_FILES_ENDPOINT}/${pathName}/${fileName}`;

return rp({
uri: mCDNFilePath,
method: 'POST',
headers: {
Authorization: `Bearer ${mcdnAuthToken}`,
'Content-Type': 'multipart/form-data',
},
formData: {
file: {
value: fs.createReadStream(localFilePath),
options: {
filename: fileName,
contentType: 'application/zip',
},
},
metadata: JSON.stringify({
mimeType: 'application/zip',
}),
},
})
.then(() => ({ mCDNURL, pathName, fileName }));
};

module.exports = {
postFile,
};
Loading

0 comments on commit bcd6776

Please sign in to comment.