diff --git a/plugins/packages/athena/lib/icon.svg b/plugins/packages/athena/lib/icon.svg index d9f7f873c2..b457b94aa9 100644 --- a/plugins/packages/athena/lib/icon.svg +++ b/plugins/packages/athena/lib/icon.svg @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file diff --git a/plugins/packages/athena/lib/index.ts b/plugins/packages/athena/lib/index.ts index 05dbcae73b..9a0a1863e4 100644 --- a/plugins/packages/athena/lib/index.ts +++ b/plugins/packages/athena/lib/index.ts @@ -4,31 +4,17 @@ import { SourceOptions, QueryOptions } from './types'; export default class Athena implements QueryService { async run(sourceOptions: SourceOptions, queryOptions: QueryOptions, dataSourceId: string): Promise { let result = {}; - const AthenaExpress = require('athena-express'), - AWS = require('aws-sdk'), - awsCredentials = { - region: sourceOptions.region, - accessKeyId: sourceOptions.access_key, - secretAccessKey: sourceOptions.secret_key, - }; - - AWS.config.update(awsCredentials); - - const athenaExpressConfig = { - aws: AWS, - getStats: true, - db: 'mydatabase', - }; - - const athenaExpress = new AthenaExpress(athenaExpressConfig); + const athenaClient = await this.getConnection(sourceOptions); const myQuery = { - sql: `CREATE DATABASE clickstreams`, - getStats: true, + sql: queryOptions.query, + db: sourceOptions.database, }; try { - console.log(JSON.stringify(myQuery, (_, v) => (typeof v === 'bigint' ? v.toString() : v))); - result = await athenaExpress.query('SHOW TABLES'); + console.log( + JSON.stringify(myQuery, (_, myQuery) => (typeof myQuery === 'bigint' ? myQuery.toString() : myQuery)) + ); + result = await athenaClient.query(this.toObject(myQuery)); console.log('result ::: ', result); } catch (error) { throw new QueryError('Query could not be completed', error.message, {}); @@ -41,25 +27,9 @@ export default class Athena implements QueryService { } async testConnection(sourceOptions: SourceOptions): Promise { - const AthenaExpress = require('athena-express'), - AWS = require('aws-sdk'), - awsCredentials = { - region: sourceOptions.region, - accessKeyId: sourceOptions.access_key, - secretAccessKey: sourceOptions.secret_key, - }; - - AWS.config.update(awsCredentials); - - const athenaExpressConfig = { - aws: AWS, - getStats: true, - db: sourceOptions.database, - }; - - const athenaExpress = new AthenaExpress(athenaExpressConfig); + const athenaClient = await this.getConnection(sourceOptions); try { - await athenaExpress.query('SHOW TABLES'); + await athenaClient.query('SHOW TABLES'); } catch (error) { throw new Error(error); } @@ -77,22 +47,20 @@ export default class Athena implements QueryService { }; AWS.config.update(awsCredentials); - const athenaExpress = new AthenaExpress(awsCredentials); - const myQuery = { - sql: `SELECT os, COUNT(*) count FROM cloudfront_logs WHERE date BETWEEN date 2014-07-05 AND date 2014-08-05 GROUP BY os`, + + const athenaExpressConfig = { + aws: AWS, + getStats: true, db: sourceOptions.database, + ...(sourceOptions.output_location?.length > 0 && { s3: sourceOptions.output_location }), }; - const client = athenaExpress - .query(myQuery) - .then((results) => { - return results; - }) - .catch((error) => { - throw new Error('Invalid credentials'); - }); - return { client }; + + const athenaExpress = new AthenaExpress(athenaExpressConfig); + return athenaExpress; } - private toJson(data) { - return JSON.stringify(data, (_, v) => (typeof v === 'bigint' ? `${v}n` : v)).replace(/"(-?\d+)n"/g, (_, a) => a); + private toObject(data) { + const val = typeof data === 'bigint' ? data.toString() : data; + const newVal = JSON.parse(JSON.stringify(val)); + return newVal; } }