Skip to content

Commit 23817b5

Browse files
feat(rds/dataloader): Add support for using RDS as a dataloader (#110)
1 parent 392c04d commit 23817b5

File tree

6 files changed

+536
-37
lines changed

6 files changed

+536
-37
lines changed

README.md

Lines changed: 113 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,12 @@ Put options under `custom.appsync-simulator` in your `serverless.yml` file
6767
| dynamoDb.secretAccessKey | DEFAULT_SECRET | AWS Secret Key to access DynamoDB |
6868
| dynamoDb.sessionToken | DEFAULT_ACCESS_TOKEEN | AWS Session Token to access DynamoDB, only if you have temporary security credentials configured on AWS |
6969
| dynamoDb.\* | | You can add every configuration accepted by [DynamoDB SDK](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB.html#constructor-property) |
70+
| rds.dbName | | Name of the database |
71+
| rds.dbHost | | Database host |
72+
| rds.dbDialect | | Database dialect. Possible values (mysql|postgres) |
73+
| rds.dbUsername | | Database username |
74+
| rds.dbPassword | | Database password |
75+
| rds.dbPort | | Database port |
7076
| watch | - \*.graphql<br/> - \*.vtl | Array of glob patterns to watch for hot-reloading. |
7177

7278
Example:
@@ -250,10 +256,115 @@ This plugin supports resolvers implemented by `amplify-appsync-simulator`, as we
250256

251257
- AMAZON_ELASTIC_SEARCH
252258
- HTTP
259+
- RELATIONAL_DATABASE
253260

254-
**Not Supported / TODO**
261+
## Relational Database
262+
### Sample VTL for a create mutation
255263

256-
- RELATIONAL_DATABASE
264+
```
265+
#set( $cols = [] )
266+
#set( $vals = [] )
267+
#foreach( $entry in $ctx.args.input.keySet() )
268+
#set( $regex = "([a-z])([A-Z]+)")
269+
#set( $replacement = "$1_$2")
270+
#set( $toSnake = $entry.replaceAll($regex, $replacement).toLowerCase() )
271+
#set( $discard = $cols.add("$toSnake") )
272+
#if( $util.isBoolean($ctx.args.input[$entry]) )
273+
#if( $ctx.args.input[$entry] )
274+
#set( $discard = $vals.add("1") )
275+
#else
276+
#set( $discard = $vals.add("0") )
277+
#end
278+
#else
279+
#set( $discard = $vals.add("'$ctx.args.input[$entry]'") )
280+
#end
281+
#end
282+
#set( $valStr = $vals.toString().replace("[","(").replace("]",")") )
283+
#set( $colStr = $cols.toString().replace("[","(").replace("]",")") )
284+
#if ( $valStr.substring(0, 1) != '(' )
285+
#set( $valStr = "($valStr)" )
286+
#end
287+
#if ( $colStr.substring(0, 1) != '(' )
288+
#set( $colStr = "($colStr)" )
289+
#end
290+
{
291+
"version": "2018-05-29",
292+
"statements": ["INSERT INTO <name-of-table> $colStr VALUES $valStr", "SELECT * FROM <name-of-table> ORDER BY id DESC LIMIT 1"]
293+
}
294+
```
295+
296+
### Sample VTL for an update mutation
297+
```
298+
#set( $update = "" )
299+
#set( $equals = "=" )
300+
#foreach( $entry in $ctx.args.input.keySet() )
301+
#set( $cur = $ctx.args.input[$entry] )
302+
#set( $regex = "([a-z])([A-Z]+)")
303+
#set( $replacement = "$1_$2")
304+
#set( $toSnake = $entry.replaceAll($regex, $replacement).toLowerCase() )
305+
#if( $util.isBoolean($cur) )
306+
#if( $cur )
307+
#set ( $cur = "1" )
308+
#else
309+
#set ( $cur = "0" )
310+
#end
311+
#end
312+
#if ( $util.isNullOrEmpty($update) )
313+
#set($update = "$toSnake$equals'$cur'" )
314+
#else
315+
#set($update = "$update,$toSnake$equals'$cur'" )
316+
#end
317+
#end
318+
{
319+
"version": "2018-05-29",
320+
"statements": ["UPDATE <name-of-table> SET $update WHERE id=$ctx.args.input.id", "SELECT * FROM <name-of-table> WHERE id=$ctx.args.input.id"]
321+
}
322+
```
323+
324+
325+
### Sample resolver for delete mutation
326+
```
327+
{
328+
"version": "2018-05-29",
329+
"statements": ["UPDATE <name-of-table> set deleted_at=NOW() WHERE id=$ctx.args.id", "SELECT * FROM <name-of-table> WHERE id=$ctx.args.id"]
330+
}
331+
```
332+
333+
### Sample mutation response VTL with support for handling AWSDateTime
334+
```
335+
#set ( $index = -1)
336+
#set ( $result = $util.parseJson($ctx.result) )
337+
#set ( $meta = $result.sqlStatementResults[1].columnMetadata)
338+
#foreach ($column in $meta)
339+
#set ($index = $index + 1)
340+
#if ( $column["typeName"] == "timestamptz" )
341+
#set ($time = $result["sqlStatementResults"][1]["records"][0][$index]["stringValue"] )
342+
#set ( $nowEpochMillis = $util.time.parseFormattedToEpochMilliSeconds("$time.substring(0,19)+0000", "yyyy-MM-dd HH:mm:ssZ") )
343+
#set ( $isoDateTime = $util.time.epochMilliSecondsToISO8601($nowEpochMillis) )
344+
$util.qr( $result["sqlStatementResults"][1]["records"][0][$index].put("stringValue", "$isoDateTime") )
345+
#end
346+
#end
347+
#set ( $res = $util.parseJson($util.rds.toJsonString($util.toJson($result)))[1][0] )
348+
#set ( $response = {} )
349+
#foreach($mapKey in $res.keySet())
350+
#set ( $s = $mapKey.split("_") )
351+
#set ( $camelCase="" )
352+
#set ( $isFirst=true )
353+
#foreach($entry in $s)
354+
#if ( $isFirst )
355+
#set ( $first = $entry.substring(0,1) )
356+
#else
357+
#set ( $first = $entry.substring(0,1).toUpperCase() )
358+
#end
359+
#set ( $isFirst=false )
360+
#set ( $stringLength = $entry.length() )
361+
#set ( $remaining = $entry.substring(1, $stringLength) )
362+
#set ( $camelCase = "$camelCase$first$remaining" )
363+
#end
364+
$util.qr( $response.put("$camelCase", $res[$mapKey]) )
365+
#end
366+
$utils.toJson($response)
367+
```
257368
258369
## Contributors ✨
259370

package.json

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,21 @@
2121
"/lib"
2222
],
2323
"dependencies": {
24-
"amplify-appsync-simulator": "^1.23.9",
24+
"amplify-appsync-simulator": "^1.27.4",
2525
"amplify-nodejs-function-runtime-provider": "^1.1.6",
2626
"aws-sdk": "^2.792.0",
2727
"axios": "^0.21.0",
2828
"babel-jest": "^26.6.3",
29+
"bluebird": "^3.7.2",
2930
"cfn-resolver-lib": "^1.1.7",
3031
"dataloader": "^2.0.0",
3132
"fb-watchman": "^2.0.1",
3233
"globby": "^11.0.3",
3334
"jest": "^26.6.3",
3435
"lodash": "^4.17.20",
35-
"merge-graphql-schemas": "^1.5.8"
36+
"merge-graphql-schemas": "^1.5.8",
37+
"mysql2": "^2.2.5",
38+
"pg": "^8.6.0"
3639
},
3740
"devDependencies": {
3841
"@babel/cli": "^7.12.1",
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
import Promise from 'bluebird';
2+
import { Client, types as pgTypes } from 'pg';
3+
import mysql from 'mysql2/promise';
4+
import { Types } from 'mysql2';
5+
const FLAGS = {
6+
NOT_NULL: 1,
7+
PRI_KEY: 2,
8+
UNIQUE_KEY: 4,
9+
MULTIPLE_KEY: 8,
10+
BLOB: 16,
11+
UNSIGNED: 32,
12+
ZEROFILL: 64,
13+
BINARY: 128,
14+
ENUM: 256,
15+
AUTO_INCREMENT: 512,
16+
TIMESTAMP: 1024,
17+
SET: 2048,
18+
NO_DEFAULT_VALUE: 4096,
19+
ON_UPDATE_NOW: 8192,
20+
NUM: 32768,
21+
};
22+
23+
const decToBin = (dec) => parseInt((dec >>> 0).toString(2), 2);
24+
25+
const convertMySQLResponseToColumnMetaData = (rows) => {
26+
return rows.map((row) => {
27+
// @TODO: Add for the following fields
28+
// arrayBaseColumnType,
29+
// isCaseSensitive,
30+
// isCurrency,
31+
// currency,
32+
// precision,
33+
// scale,
34+
// schemaName,
35+
return {
36+
isAutoIncrement:
37+
decToBin(row.flags & FLAGS.AUTO_INCREMENT) === FLAGS.AUTO_INCREMENT,
38+
label: row.name,
39+
name: row.name,
40+
nullable: decToBin(row.flags && FLAGS.NOT_NULL) !== FLAGS.NOT_NULL,
41+
type: row.columnType,
42+
typeName: Object.keys(Types)
43+
.find((key) => Types[key] === row.columnType)
44+
.toUpperCase(),
45+
isSigned: decToBin(row.flags & FLAGS.UNSIGNED) !== FLAGS.UNSIGNED,
46+
autoIncrement:
47+
decToBin(row.flags & FLAGS.AUTO_INCREMENT) === FLAGS.AUTO_INCREMENT,
48+
tableName: row._buf
49+
.slice(row._tableStart, row._tableStart + row._tableLength)
50+
.toString(),
51+
};
52+
});
53+
};
54+
const convertSQLResponseToRDSRecords = (rows) => {
55+
const records = [];
56+
57+
rows.forEach((dbObject) => {
58+
const record = [];
59+
Object.keys(dbObject).forEach((key) => {
60+
record.push(
61+
dbObject[key] === null
62+
? { isNull: true, null: true }
63+
: typeof dbObject[key] === 'string'
64+
? { stringValue: dbObject[key] }
65+
: typeof dbObject[key] === 'number'
66+
? { longValue: dbObject[key] }
67+
: { stringValue: dbObject[key] },
68+
);
69+
});
70+
records.push(record);
71+
});
72+
return records;
73+
};
74+
75+
const convertPostgresSQLResponseToColumnMetaData = (rows) => {
76+
return rows.map((row) => {
77+
const typeName = Object.keys(pgTypes.builtins)
78+
.find((d) => pgTypes.builtins[d] === row.dataTypeID)
79+
.toUpperCase();
80+
// @TODO: Add support for the following fields
81+
// isAutoIncrement,
82+
// nullable,
83+
// isSigned,
84+
// autoIncrement,
85+
// tableName,
86+
// arrayBaseColumnType,
87+
// isCaseSensitive,
88+
// isCurrency,
89+
// currency,
90+
// precision,
91+
// scale,
92+
// schemaName,
93+
return {
94+
label: row.name,
95+
name: row.name,
96+
type: row.dataTypeID,
97+
typeName,
98+
};
99+
});
100+
};
101+
const executeSqlStatements = async (client, req) =>
102+
Promise.mapSeries(req.statements, (statement) => client.query(statement));
103+
export default class RelationalDataLoader {
104+
constructor(config) {
105+
this.config = config;
106+
}
107+
108+
async load(req) {
109+
try {
110+
const requiredKeys = [
111+
'dbDialect',
112+
'dbUsername',
113+
'dbPassword',
114+
'dbHost',
115+
'dbName',
116+
'dbPort',
117+
];
118+
if (!this.config.rds) {
119+
throw new Error('RDS configuration not passed');
120+
}
121+
const missingKey = requiredKeys.find((key) => {
122+
return !this.config.rds[key];
123+
});
124+
if (missingKey) {
125+
throw new Error(`${missingKey} is required.`);
126+
}
127+
128+
const dbConfig = {
129+
host: this.config.rds.dbHost,
130+
user: this.config.rds.dbUsername,
131+
password: this.config.rds.dbPassword,
132+
database: this.config.rds.dbName,
133+
port: this.config.rds.dbPort,
134+
};
135+
136+
const res = {};
137+
if (this.config.rds.dbDialect === 'mysql') {
138+
const client = await mysql.createConnection(dbConfig);
139+
const results = await executeSqlStatements(client, req);
140+
141+
res.sqlStatementResults = results.map((result) => {
142+
if (result.length < 2) {
143+
return {};
144+
}
145+
if (!result[1]) {
146+
// not a select query
147+
return {
148+
numberOfRecordsUpdated: result[0].affectedRows,
149+
generatedFields: [],
150+
};
151+
}
152+
return {
153+
numberOfRecordsUpdated: result[0].length,
154+
records: convertSQLResponseToRDSRecords(result[0]),
155+
columnMetadata: convertMySQLResponseToColumnMetaData(result[1]),
156+
};
157+
});
158+
} else if (this.config.rds.dbDialect === 'postgres') {
159+
const client = new Client(dbConfig);
160+
await client.connect();
161+
const results = await executeSqlStatements(client, req);
162+
res.sqlStatementResults = results.map((result) => {
163+
return {
164+
numberOfRecordsUpdated: result.rowCount,
165+
records: convertSQLResponseToRDSRecords(result.rows),
166+
columnMetadata: convertPostgresSQLResponseToColumnMetaData(
167+
result.fields,
168+
),
169+
generatedFields: [],
170+
};
171+
});
172+
}
173+
return JSON.stringify(res);
174+
} catch (e) {
175+
console.log(e);
176+
return e;
177+
}
178+
}
179+
}

src/getAppSyncConfig.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ export default function getAppSyncConfig(context, appSyncConfig) {
8282
},
8383
};
8484
}
85+
case 'RELATIONAL_DATABASE': {
86+
return {
87+
...dataSource,
88+
rds: context.options.rds,
89+
};
90+
}
8591
case 'AWS_LAMBDA': {
8692
const { functionName } = source.config;
8793
if (functionName === undefined) {

src/index.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import getAppSyncConfig from './getAppSyncConfig';
99
import NotImplementedDataLoader from './data-loaders/NotImplementedDataLoader';
1010
import ElasticDataLoader from './data-loaders/ElasticDataLoader';
1111
import HttpDataLoader from './data-loaders/HttpDataLoader';
12+
import RelationalDataLoader from './data-loaders/RelationalDataLoader';
1213
import watchman from 'fb-watchman';
1314

1415
const resolverPathMap = {
@@ -27,7 +28,7 @@ class ServerlessAppSyncSimulator {
2728

2829
addDataLoader('HTTP', HttpDataLoader);
2930
addDataLoader('AMAZON_ELASTICSEARCH', ElasticDataLoader);
30-
addDataLoader('RELATIONAL_DATABASE', NotImplementedDataLoader);
31+
addDataLoader('RELATIONAL_DATABASE', RelationalDataLoader);
3132

3233
this.hooks = {
3334
'before:offline:start:init': this.startServers.bind(this),
@@ -268,6 +269,7 @@ class ServerlessAppSyncSimulator {
268269
refMap: {},
269270
getAttMap: {},
270271
importValueMap: {},
272+
rds: {},
271273
dynamoDb: {
272274
endpoint: `http://localhost:${get(
273275
this.serverless.service,

0 commit comments

Comments
 (0)