Skip to content

Commit

Permalink
feat(etl): comply with the expected output json - EUBFR-22 (#8)
Browse files Browse the repository at this point in the history
* feat(etl): comply with the expected output json - EUBFR-22

* Small fixes

* Reshape the dynamo helpers

* Clean up code
  • Loading branch information
yhuard authored and degliwe committed Sep 21, 2017
1 parent 9464842 commit 09e4afb
Show file tree
Hide file tree
Showing 10 changed files with 298 additions and 140 deletions.
39 changes: 39 additions & 0 deletions lib/dynamodb/actions/save.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { normalizeProject } from '../tables/projects';

export const save = ({ dynamo, table, data }, cb = () => {}) => {
const params = {
TableName: table,
Item: data,
};

dynamo.putItem(params, err => {
if (err) {
return cb(err);
}

return cb(null, 'All fine');
});
};

export const saveProject = ({ dynamo, table, event, data }, cb = () => {}) =>
save(
{
dynamo,
table,
data: normalizeProject(
Object.assign(
{
creation_date: event.eventTime, // already ISO-8601
source: {
producer: event.userIdentity.principalId,
object_key: event.object.key,
},
},
data
)
),
},
cb
);

export default save;
74 changes: 74 additions & 0 deletions lib/dynamodb/helpers/normalize.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Test if it is a string and if the string is not empty.
* @param {string} str The string to test.
* @return {boolean} Return true if it is a string and if the string is not empty.
*/
export const isStringNotEmpty = str =>
Object.prototype.toString.call(str) === '[object String]' &&
str.trim().length > 0;

/**
* Normalize string for DynamoDB.
* @param {string} source The string to normalize.
* @return {Object} The normalized string object or null if the source is not a string or if it is empty.
*/
export const normalizeString = source =>
isStringNotEmpty(source)
? {
S: source.trim(),
}
: null;

/**
* Normalize number for DynamoDB.
* @param {number} source The number to normalize.
* @return {Object} The normalized number object or null if the source is not a finite number.
*/
export const normalizeNumber = source =>
Number.isFinite(source)
? {
N: source.toString(),
}
: null;

/**
* Normalize string set for DynamoDB.
* @param {string[]} source The string set to normalize.
* @return {Object} The normalized string set or null if the source is not a string set or if it is empty.
*/
export const normalizeStringSet = source => {
if (Array.isArray(source) && source.length > 0) {
const normalizedSource = source
.map(item => (isStringNotEmpty(item) ? item.trim() : null))
.filter(item => item !== null);

if (normalizedSource.length > 0) {
return {
SS: normalizedSource,
};
}
}

return null;
};

/**
* Normalize a map for DynamoDB.
* @param {Object} source The object to normalize.
* @return {Object} The normalized object.
*/
export const normalizeMap = source => ({
M: source,
});

/**
* Normalize a list for DynamoDB.
* @param {Array} source The array to normalize.
* @return {Object} The normalized array or null if the source is not an array or if it is an empty array.
*/
export const normalizeList = source =>
Array.isArray(source) && source.length > 0
? {
L: source,
}
: null;
1 change: 1 addition & 0 deletions lib/dynamodb/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { save, saveProject } from './actions/save';
77 changes: 0 additions & 77 deletions lib/dynamodb/normalize.js

This file was deleted.

1 change: 1 addition & 0 deletions lib/dynamodb/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"private": true,
"name": "@eubfr/dynamodb-helpers",
"main": "index.js",
"version": "0.0.1",
"dependencies": {
"uuid": "^3.1.0"
Expand Down
16 changes: 0 additions & 16 deletions lib/dynamodb/save.js

This file was deleted.

72 changes: 72 additions & 0 deletions lib/dynamodb/tables/projects.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import uuid from 'uuid';

import {
normalizeList,
normalizeMap,
normalizeNumber,
normalizeString,
normalizeStringSet,
} from '../helpers/normalize';

export const normalizeProject = record => {
const normalizedObject = {
id: normalizeString(uuid.v1()),
creation_date: normalizeString(record.creation_date),
source: normalizeMap({
producer: normalizeString(record.source && record.source.producer),
object_key: normalizeString(record.source && record.source.object_key),
}),
title: normalizeString(record.title),
cover_image: normalizeString(record.cover_image),
programme_name: normalizeString(record.programme_name),
description: normalizeString(record.description),
results: normalizeString(record.results),
ec_priorities: normalizeStringSet(record.ec_priorities),
coordinators: normalizeStringSet(record.coordinators),
eu_budget_contribution: normalizeNumber(record.eu_budget_contribution),
partners: normalizeStringSet(record.partners),
timeframe: normalizeMap({
from: normalizeString(record.timeframe && record.timeframe.from),
to: normalizeString(record.timeframe && record.timeframe.to),
}),
project_website: normalizeString(record.project_website),
};

// Project locations
if (
Array.isArray(record.project_locations) &&
record.project_locations.length > 0
) {
normalizedObject.project_locations = normalizeList(
record.project_locations.map(location =>
normalizeMap({
name: normalizeString(location.name),
geolocation: normalizeMap({
lat: normalizeString(
location.geolocation && location.geolocation.lat
),
long: normalizeString(
location.geolocation && location.geolocation.long
),
}),
})
)
);
}

// Related links
if (Array.isArray(record.related_links) && record.related_links.length > 0) {
normalizedObject.related_links = normalizeList(
record.related_links.map(link =>
normalizeMap({
label: normalizeString(link.label),
url: normalizeString(link.url),
})
)
);
}

return normalizedObject;
};

export default normalizeProject;
81 changes: 35 additions & 46 deletions services/ingestion/etl/budg/csv/src/handler.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
/* eslint-disable import/prefer-default-export, no-console */
import saveToDB from '@eubfr/dynamodb-helpers/save';
import path from 'path';
import AWS from 'aws-sdk'; // eslint-disable-line import/no-extraneous-dependencies
import parse from 'csv-parse';
import { saveProject } from '@eubfr/dynamodb-helpers';
import transform from './transform';

const onParseError = err => {
console.error(err.message);
};

const onSaveError = err => {
console.error(err);
};

const path = require('path');
const AWS = require('aws-sdk'); // eslint-disable-line import/no-extraneous-dependencies
const parse = require('csv-parse');
const onParseFinish = () => {
console.info('Finished parsing');
};

export const parseCsv = (event, context, callback) => {
/*
Expand Down Expand Up @@ -38,61 +50,38 @@ export const parseCsv = (event, context, callback) => {

parser.on('readable', () => {
let record;

/*
* Extract
*/
// eslint-disable-next-line
while ((record = parser.read())) {
/*
* Transform message
*/
const data = transform(record);

/*
* Map fields
* Load
*/

// Map the fields
const data = {
source: message.object.key,
title: record.Name,
programme_name: record['Programme name'],
description: record['Project description'],
results: record.Results,
ec_priorities: record['EC’s priorities'].split(','),
coordinators: record.Coordinators.split(','),
eu_budget_contribution: record['EU Budget contribution'],
partners: record.Partners.split(','),
project_locations: [
{
name: record['Project country(ies)'],
geolocation: {
lat: record['Project location latitude'],
long: record['Project location longitude'],
},
},
],
timeframe: {
from: record['Timeframe start'],
to: record['Timeframe end'],
saveProject(
{
dynamo,
table: process.env.TABLE,
event: message,
data,
},
};

/*
* Save to DB
*/

saveToDB(dynamo, process.env.TABLE, data, err => {
if (err) {
console.log(err);
err => {
if (err) {
onSaveError(err);
}
}
});
);
}
});

// Catch any error
parser.on('error', err => {
console.log(err.message);
});

// When we are done, test that the parsed output matched what expected
parser.on('finish', () => {});
parser.on('error', onParseError);
parser.on('finish', onParseFinish);

/*
* Start the hard work
Expand Down
Loading

0 comments on commit 09e4afb

Please sign in to comment.