Skip to content

Commit

Permalink
add pooling impl in data source service
Browse files Browse the repository at this point in the history
Signed-off-by: Zhongnan Su <szhongna@amazon.com>
  • Loading branch information
zhongnansu committed Aug 16, 2022
1 parent 01cce3e commit b61ee94
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 30 deletions.
2 changes: 2 additions & 0 deletions src/plugins/data_source/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@

export const PLUGIN_ID = 'dataSource';
export const PLUGIN_NAME = 'data_source';
export const DATA_SOURCE_SAVED_OBJECT_TYPE = 'data-source';
export const CREDENTIAL_SAVED_OBJECT_TYPE = 'credential';

export { Credential } from './credentials';
3 changes: 3 additions & 0 deletions src/plugins/data_source/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ export const configSchema = schema.object({
defaultValue: new Array(32).fill(0),
}),
}),
clientPool: schema.object({
size: schema.number({ defaultValue: 5 }),
}),
});

export type DataSourcePluginConfigType = TypeOf<typeof configSchema>;
125 changes: 121 additions & 4 deletions src/plugins/data_source/server/client/data_source_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,21 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { Logger, OpenSearchClient, SavedObjectsClientContract } from 'src/core/server';
import { Client } from '@opensearch-project/opensearch';
import {
Logger,
OpenSearchClient,
SavedObject,
SavedObjectsClientContract,
SavedObjectsErrorHelpers,
} from 'src/core/server';
import { CREDENTIAL_SAVED_OBJECT_TYPE, DATA_SOURCE_SAVED_OBJECT_TYPE } from '../../common';
import {
CredentialMaterials,
CredentialSavedObjectAttributes,
} from '../../common/credentials/types';
import { DataSourceAttributes } from '../../common/data_sources';
import { DataSourcePluginConfigType } from '../../config';
import { DataSourceService } from '../data_source_service';

/**
Expand All @@ -23,18 +37,121 @@ interface DataSourceClientCtorParams {
dataSourceService: DataSourceService;
logger: Logger;
scopedSavedObjectsClient: SavedObjectsClientContract;
config: DataSourcePluginConfigType;
}
// TODO: This needs further implementation. See https://github.com/opensearch-project/OpenSearch-Dashboards/issues/1981
export class DataSourceClient implements IDataSourceClient {
private dataSourceService: DataSourceService;
private log: Logger;
private scopedSavedObjectClient;
// scoped saved object client to fetch save object on behalf of user
private scopedSavedObjectClient: SavedObjectsClientContract;
private config: DataSourcePluginConfigType;

constructor(ctorParams: DataSourceClientCtorParams) {
this.dataSourceService = ctorParams.dataSourceService;
this.log = ctorParams.logger;
this.scopedSavedObjectClient = ctorParams.scopedSavedObjectsClient;
this.config = ctorParams.config;
}

asDataSource!: (dataSourceId: string) => Promise<OpenSearchClient>;
async asDataSource(dataSourceId: string) {
const dataSource = await this.getDataSource(dataSourceId);
const rootClient = this.getRootClient(dataSource.attributes, this.config);
const credential = await this.getCredential(dataSource.references[0].id); // assuming there is 1 and only 1 credential for each data source

return this.getQueryClient(rootClient, credential.attributes, dataSource!.withAuth);
}

private async getDataSource(dataSourceId: string): Promise<SavedObject<DataSourceAttributes>> {
try {
const dataSource = await this.scopedSavedObjectClient.get<DataSourceAttributes>(
DATA_SOURCE_SAVED_OBJECT_TYPE,
dataSourceId
);
return dataSource;
} catch (error: any) {
// it will cause 500 error when failed to get saved objects, need to handle such error gracefully
throw SavedObjectsErrorHelpers.createBadRequestError(error.message);
}
}

private async getCredential(
credentialId: string
): Promise<SavedObject<CredentialSavedObjectAttributes>> {
try {
const dataSource = await this.scopedSavedObjectClient.get<CredentialSavedObjectAttributes>(
CREDENTIAL_SAVED_OBJECT_TYPE,
credentialId
);
return dataSource;
} catch (error: any) {
// it will cause 500 error when failed to get saved objects, need to handle such error gracefully
throw SavedObjectsErrorHelpers.createBadRequestError(error.message);
}
}

/**
* Create a child client object with given auth info.
*
* @param rootClient root client for the connection with given data source endpoint.
* @param credentialAttr credential saved object attribute.
* @returns child client.
*/
private getQueryClient(
rootClient: Client,
credentialAttr: CredentialSavedObjectAttributes,
withAuth = false
): Client {
if (withAuth) {
return this.getBasicAuthClient(rootClient, credentialAttr.credentialMaterials);
} else {
return rootClient.child();
}
}

/**
* Gets a root client object of the OpenSearch endpoint.
* Will attempt to get from cache, if cache miss, create a new one and load into cache.
*
* @param dataSourceAttr data source saved objects attributes.
* @returns OpenSearch client for the given data source endpoint.
*/
private getRootClient(
dataSourceAttr: DataSourceAttributes,
config: DataSourcePluginConfigType
): Client {
const endpoint = dataSourceAttr.endpoint;
const cachedClient = this.dataSourceService.getCachedClient(endpoint);

if (cachedClient) {
return cachedClient;
} else {
const client = this.configureDataSourceClient(config, endpoint);

this.dataSourceService.addClientToPool(endpoint, client);
return client;
}
}

private getBasicAuthClient(rootClient: Client, credential: CredentialMaterials): Client {
const { username, password } = credential.credentialMaterialsContent;
return rootClient.child({
auth: {
username,
password,
},
});
}

// TODO: will use client configs, that comes from a merge result of user config and defaults
private configureDataSourceClient(config: DataSourcePluginConfigType, endpoint: string) {
const client = new Client({
node: endpoint,
ssl: {
requestCert: true,
rejectUnauthorized: true, // cosnider making this configurable in future
},
});

return client;
}
}
2 changes: 1 addition & 1 deletion src/plugins/data_source/server/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
* SPDX-License-Identifier: Apache-2.0
*/

export { IDataSourceClient, ICustomDataSourceClient, DataSourceClient } from './data_source_client';
export { IDataSourceClient, DataSourceClient } from './data_source_client';
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class OpenSearchDataSourceRouteHandlerContext {
try {
const client = await this.dataSourceClient.asDataSource(dataSourceId);
return client;
} catch (error) {
} catch (error: any) {
// TODO: convert as audit log when integrate with osd auditing
// https://github.com/opensearch-project/OpenSearch-Dashboards/issues/1986
this.logger.error(
Expand Down
58 changes: 44 additions & 14 deletions src/plugins/data_source/server/data_source_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,63 @@
*/

import { Client } from '@opensearch-project/opensearch';
import { Logger, OpenSearchClient, SavedObjectsClientContract } from 'src/core/server';
import LRUCache from 'lru-cache';
import { Logger, SavedObjectsClientContract } from 'src/core/server';
import { DataSourcePluginConfigType } from '../config';
import { DataSourceClient } from './client';
import { IDataSourceService } from './types';

export class DataSourceService implements IDataSourceService {
private openSearchClientsPool: Map<string, Client>;
constructor() {
this.openSearchClientsPool = new Map<string, Client>();
private openSearchClientPool?: LRUCache<string, Client>;
private isClosed = false;

constructor(private logger: Logger, private config: DataSourcePluginConfigType) {}

public setup() {
const logger = this.logger;
const { size } = this.config.clientPool;

this.openSearchClientPool = new LRUCache({
max: size,
maxAge: 15 * 60 * 1000, // by default, TCP connection times out in 15 minutes

async dispose(endpoint, client) {
try {
await client.close();
} catch (error: any) {
// log and do nothing since we are anyways evicting the client object from cache
logger.warn(
`Error closing OpenSearch client when removing from client pool: ${error.message}`
);
}
},
});
this.logger.info(`Created data source client pool of size ${size}`);
}
// TODO: placeholders, need implement when adding global config
isEnabled(): boolean {
throw new Error('Method not implemented.');

public getCachedClient(endpoint: string) {
return this.openSearchClientPool!.get(endpoint);
}

public addClientToPool(endpoint: string, client: Client) {
this.openSearchClientPool!.set(endpoint, client);
}

getDataSourceClient(logger: Logger, savedObjectClient: SavedObjectsClientContract) {
return new DataSourceClient({
logger,
dataSourceService: this,
scopedSavedObjectsClient: savedObjectClient,
config: this.config,
});
}
// TODO: placeholders, need implement client pooling strategy
addOpenSearchClient() {}
getOpenSearchClient(): OpenSearchClient {
throw new Error('Method not implemented.');
}

// TODO: close all data source clients in the clients pool
stop() {}
// close all data source clients in the pool
async stop() {
if (this.isClosed) {
return;
}
this.isClosed = true;
Promise.all(this.openSearchClientPool!.values().map((client) => client.close()));
}
}
31 changes: 24 additions & 7 deletions src/plugins/data_source/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/

import { first } from 'rxjs/operators';

import { dataSource, credential, CredentialSavedObjectsClientWrapper } from './saved_objects';
import { DataSourcePluginConfigType } from '../config';
import {
Expand All @@ -16,9 +15,7 @@ import {
} from '../../../../src/core/server';
import { DataSourceService } from './data_source_service';
import { createDataSourceRouteHandlerContext } from './data_source_route_handler_context';

import { DataSourcePluginSetup, DataSourcePluginStart } from './types';

import { CryptographyClient } from './cryptography';

export class DataSourcePlugin implements Plugin<DataSourcePluginSetup, DataSourcePluginStart> {
Expand All @@ -38,9 +35,11 @@ export class DataSourcePlugin implements Plugin<DataSourcePluginSetup, DataSourc
// Register data source saved object type
core.savedObjects.registerType(dataSource);

// Fetch configs used to create redential saved objects client wrapper
const { encryption } = await this.initializerContext.config.create().pipe(first()).toPromise();
const { wrappingKeyName, wrappingKeyNamespace, wrappingKey } = encryption;
const config$ = this.initializerContext.config.create<DataSourcePluginConfigType>();
const config: DataSourcePluginConfigType = await config$.pipe(first()).toPromise();

// Fetch configs used to create credential saved objects client wrapper
const { wrappingKeyName, wrappingKeyNamespace, wrappingKey } = config.encryption;

// Create credential saved objects client wrapper
const credentialSavedObjectsClientWrapper = new CredentialSavedObjectsClientWrapper(
Expand All @@ -53,14 +52,32 @@ export class DataSourcePlugin implements Plugin<DataSourcePluginSetup, DataSourc
'credential',
credentialSavedObjectsClientWrapper.wrapperFactory
);
this.dataSourceService = new DataSourceService();

this.dataSourceService = new DataSourceService(this.logger, config);
this.dataSourceService.setup();

// Register plugin context to route handler context
core.http.registerRouteHandlerContext(
'data_source',
createDataSourceRouteHandlerContext(this.dataSourceService, this.logger)
);

/**
* TODO: Test purpose ,need removal
*/
const router = core.http.createRouter();
router.get(
{
path: '/data-source/test',
validate: false,
},
async (context, request, response) => {
// const client = await context.dataSources.getOpenSearchClient('37df1970-b6b0-11ec-a339-c18008b701cd');
const client = await context.data_source.opensearch.getClient('aaa');
return response.ok();
}
);

return {};
}

Expand Down
3 changes: 0 additions & 3 deletions src/plugins/data_source/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@ import { Logger, OpenSearchClient, SavedObjectsClientContract } from 'src/core/s
import { DataSourceClient } from './client';

export interface IDataSourceService {
isEnabled(): boolean;
getDataSourceClient(
logger: Logger,
savedObjectClient: SavedObjectsClientContract
): DataSourceClient;
addOpenSearchClient(): void;
getOpenSearchClient(): OpenSearchClient;
stop(): void;
}
export interface DataSourcePluginRequestContext {
Expand Down

0 comments on commit b61ee94

Please sign in to comment.