Skip to content

Commit

Permalink
code refactor : get and test connection
Browse files Browse the repository at this point in the history
  • Loading branch information
stepinfwd committed Jun 14, 2022
1 parent 2cc2234 commit 9750fe3
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 54 deletions.
2 changes: 1 addition & 1 deletion plugins/packages/athena/lib/icon.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
74 changes: 21 additions & 53 deletions plugins/packages/athena/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,17 @@ import { SourceOptions, QueryOptions } from './types';
export default class Athena implements QueryService {
async run(sourceOptions: SourceOptions, queryOptions: QueryOptions, dataSourceId: string): Promise<QueryResult> {
let result = {};
const AthenaExpress = require('athena-express'),
AWS = require('aws-sdk'),
awsCredentials = {
region: sourceOptions.region,
accessKeyId: sourceOptions.access_key,
secretAccessKey: sourceOptions.secret_key,
};

AWS.config.update(awsCredentials);

const athenaExpressConfig = {
aws: AWS,
getStats: true,
db: 'mydatabase',
};

const athenaExpress = new AthenaExpress(athenaExpressConfig);
const athenaClient = await this.getConnection(sourceOptions);
const myQuery = {
sql: `CREATE DATABASE clickstreams`,
getStats: true,
sql: queryOptions.query,
db: sourceOptions.database,
};

try {
console.log(JSON.stringify(myQuery, (_, v) => (typeof v === 'bigint' ? v.toString() : v)));
result = await athenaExpress.query('SHOW TABLES');
console.log(
JSON.stringify(myQuery, (_, myQuery) => (typeof myQuery === 'bigint' ? myQuery.toString() : myQuery))
);
result = await athenaClient.query(this.toObject(myQuery));
console.log('result ::: ', result);
} catch (error) {
throw new QueryError('Query could not be completed', error.message, {});
Expand All @@ -41,25 +27,9 @@ export default class Athena implements QueryService {
}

async testConnection(sourceOptions: SourceOptions): Promise<ConnectionTestResult> {
const AthenaExpress = require('athena-express'),
AWS = require('aws-sdk'),
awsCredentials = {
region: sourceOptions.region,
accessKeyId: sourceOptions.access_key,
secretAccessKey: sourceOptions.secret_key,
};

AWS.config.update(awsCredentials);

const athenaExpressConfig = {
aws: AWS,
getStats: true,
db: sourceOptions.database,
};

const athenaExpress = new AthenaExpress(athenaExpressConfig);
const athenaClient = await this.getConnection(sourceOptions);
try {
await athenaExpress.query('SHOW TABLES');
await athenaClient.query('SHOW TABLES');
} catch (error) {
throw new Error(error);
}
Expand All @@ -77,22 +47,20 @@ export default class Athena implements QueryService {
};

AWS.config.update(awsCredentials);
const athenaExpress = new AthenaExpress(awsCredentials);
const myQuery = {
sql: `SELECT os, COUNT(*) count FROM cloudfront_logs WHERE date BETWEEN date 2014-07-05 AND date 2014-08-05 GROUP BY os`,

const athenaExpressConfig = {
aws: AWS,
getStats: true,
db: sourceOptions.database,
...(sourceOptions.output_location?.length > 0 && { s3: sourceOptions.output_location }),
};
const client = athenaExpress
.query(myQuery)
.then((results) => {
return results;
})
.catch((error) => {
throw new Error('Invalid credentials');
});
return { client };

const athenaExpress = new AthenaExpress(athenaExpressConfig);
return athenaExpress;
}
private toJson(data) {
return JSON.stringify(data, (_, v) => (typeof v === 'bigint' ? `${v}n` : v)).replace(/"(-?\d+)n"/g, (_, a) => a);
private toObject(data) {
const val = typeof data === 'bigint' ? data.toString() : data;
const newVal = JSON.parse(JSON.stringify(val));
return newVal;
}
}

0 comments on commit 9750fe3

Please sign in to comment.