Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Client consolidation] Migrate from legacy elasticsearch client to opensearch-js client in osd-opensearch-archiver package #4142

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fix EUI/OUI type errors ([#3798](https://github.com/opensearch-project/OpenSearch-Dashboards/pull/3798))
- Remove unused Sass in `tile_map` plugin ([#4110](https://github.com/opensearch-project/OpenSearch-Dashboards/pull/4110))
- [Table Visualization] Remove custom styling for text-align:center in favor of OUI utility class. ([#4164](https://github.com/opensearch-project/OpenSearch-Dashboards/pull/4164))
- Migrate from legacy elasticsearch client to opensearch-js client in `osd-opensearch-archiver` package([#4142](https://github.com/opensearch-project/OpenSearch-Dashboards/pull/4142))
- Replace the use of `bluebird` in `saved_objects` plugin ([#4026](https://github.com/opensearch-project/OpenSearch-Dashboards/pull/4026))
- [Vis Colors] Replace color maps with OUI color palettes ([#4293](https://github.com/opensearch-project/OpenSearch-Dashboards/pull/4293))
- [Vis Colors] [Maps] Replace hardcoded color to OUI color in `maps_legacy` plugin ([#4294](https://github.com/opensearch-project/OpenSearch-Dashboards/pull/4294))
Expand Down
6 changes: 2 additions & 4 deletions packages/osd-opensearch-archiver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
},
"dependencies": {
"@osd/dev-utils": "1.0.0",
"elasticsearch": "^16.7.0"
"@opensearch-project/opensearch": "^2.2.0"
},
"devDependencies": {
"@types/elasticsearch": "^5.0.33"
}
"devDependencies": {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* under the License.
*/

import { Client } from 'elasticsearch';
import { Client } from '@opensearch-project/opensearch';
import { ToolingLog, OsdClient } from '@osd/dev-utils';

import {
Expand Down
4 changes: 2 additions & 2 deletions packages/osd-opensearch-archiver/src/actions/load.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import { resolve } from 'path';
import { createReadStream } from 'fs';
import { Readable } from 'stream';
import { ToolingLog, OsdClient } from '@osd/dev-utils';
import { Client } from 'elasticsearch';
import { Client } from '@opensearch-project/opensearch';

import { createPromiseFromStreams, concatStreamProviders } from '../lib/streams';

Expand Down Expand Up @@ -114,7 +114,7 @@ export async function loadAction({

await client.indices.refresh({
index: '_all',
allowNoIndices: true,
allow_no_indices: true,
});

// If we affected the OpenSearch Dashboards index, we need to ensure it's migrated...
Expand Down
2 changes: 1 addition & 1 deletion packages/osd-opensearch-archiver/src/actions/save.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import { resolve } from 'path';
import { createWriteStream, mkdirSync } from 'fs';
import { Readable, Writable } from 'stream';
import { Client } from 'elasticsearch';
import { Client } from '@opensearch-project/opensearch';
import { ToolingLog } from '@osd/dev-utils';

import { createListStream, createPromiseFromStreams } from '../lib/streams';
Expand Down
2 changes: 1 addition & 1 deletion packages/osd-opensearch-archiver/src/actions/unload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import { resolve } from 'path';
import { createReadStream } from 'fs';
import { Readable, Writable } from 'stream';
import { Client } from 'elasticsearch';
import { Client } from '@opensearch-project/opensearch';
import { ToolingLog, OsdClient } from '@osd/dev-utils';

import { createPromiseFromStreams } from '../lib/streams';
Expand Down
13 changes: 7 additions & 6 deletions packages/osd-opensearch-archiver/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ import Path from 'path';
import readline from 'readline';

import { RunWithCommands, createFlagError } from '@osd/dev-utils';
import { Client, ClientOptions } from '@opensearch-project/opensearch';
import { readConfigFile } from '@osd/test';
import legacyElasticsearch from 'elasticsearch';

import { OpenSearchArchiver } from './opensearch_archiver';

Expand All @@ -56,7 +56,7 @@ export function runCli() {
default: ${defaultConfigPath}
--opensearch-url url for OpenSearch, prefer the --config flag
--opensearch-dashboards-url url for OpenSearch Dashboards, prefer the --config flag
--dir where arechives are stored, prefer the --config flag
--dir where archives are stored, prefer the --config flag
`,
},
async extendContext({ log, flags, addCleanupTask }) {
Expand Down Expand Up @@ -99,10 +99,11 @@ export function runCli() {
throw createFlagError('--dir or --config must be defined');
}

const client = new legacyElasticsearch.Client({
host: opensearchUrl,
log: flags.verbose ? 'trace' : [],
});
const clientOptions: ClientOptions = {
node: opensearchUrl.toString(),
};

const client = new Client(clientOptions);
addCleanupTask(() => client.close());

const opensearchArchiver = new OpenSearchArchiver({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ describe('opensearchArchiver: createGenerateDocRecordsStream()', () => {
expect(params).to.have.property('index', 'logstash-*');
expect(params).to.have.property('size', 1000);
return {
hits: {
total: 0,
hits: [],
body: {
hits: {
total: 0,
hits: [],
},
},
};
},
Expand All @@ -74,9 +76,11 @@ describe('opensearchArchiver: createGenerateDocRecordsStream()', () => {
expect(params).to.have.property('scroll', '1m');
expect(params).to.have.property('rest_total_hits_as_int', true);
return {
hits: {
total: 0,
hits: [],
body: {
hits: {
total: 0,
hits: [],
},
},
};
},
Expand All @@ -101,25 +105,27 @@ describe('opensearchArchiver: createGenerateDocRecordsStream()', () => {
expect(params).to.have.property('index', 'index1');
await delay(200);
return {
_scroll_id: 'index1ScrollId',
hits: { total: 2, hits: [{ _id: 1, _index: '.opensearch_dashboards_1' }] },
body: {
_scroll_id: 'index1ScrollId',
hits: { total: 2, hits: [{ _id: 1, _index: '.opensearch_dashboards_1' }] },
},
};
},
async (name, params) => {
expect(name).to.be('scroll');
expect(params).to.have.property('scrollId', 'index1ScrollId');
expect(params).to.have.property('scroll_id', 'index1ScrollId');
expect(Date.now() - checkpoint).to.not.be.lessThan(200);
checkpoint = Date.now();
await delay(200);
return { hits: { total: 2, hits: [{ _id: 2, _index: 'foo' }] } };
return { body: { hits: { total: 2, hits: [{ _id: 2, _index: 'foo' }] } } };
},
async (name, params) => {
expect(name).to.be('search');
expect(params).to.have.property('index', 'index2');
expect(Date.now() - checkpoint).to.not.be.lessThan(200);
checkpoint = Date.now();
await delay(200);
return { hits: { total: 0, hits: [] } };
return { body: { hits: { total: 0, hits: [] } } };
},
]);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/

import { Transform } from 'stream';
import { Client, SearchParams, SearchResponse } from 'elasticsearch';
import { Client, ApiResponse } from '@opensearch-project/opensearch';
import { Stats } from '../stats';
import { Progress } from '../progress';

Expand All @@ -53,7 +53,7 @@ export function createGenerateDocRecordsStream({
async transform(index, enc, callback) {
try {
let remainingHits = 0;
let resp: SearchResponse<any> | null = null;
let resp: ApiResponse<any> | null = null;

while (!resp || remainingHits > 0) {
if (!resp) {
Expand All @@ -66,17 +66,17 @@ export function createGenerateDocRecordsStream({
query,
},
rest_total_hits_as_int: true, // not declared on SearchParams type
} as SearchParams);
remainingHits = resp.hits.total;
});
remainingHits = resp.body.hits.total;
progress.addToTotal(remainingHits);
} else {
resp = await client.scroll({
scrollId: resp._scroll_id!,
scroll_id: resp.body._scroll_id!,
scroll: SCROLL_TIMEOUT,
});
}

for (const hit of resp.hits.hits) {
for (const hit of resp.body?.hits.hits) {
remainingHits -= 1;
stats.archivedDoc(hit._index);
this.push({
Expand All @@ -94,7 +94,7 @@ export function createGenerateDocRecordsStream({
});
}

progress.addToComplete(resp.hits.hits.length);
progress.addToComplete(resp.body.hits.hits.length);
}

callback(undefined);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ describe('opensearchArchiver: createIndexDocRecordsStream()', () => {
const client = createStubClient([
async (name, params) => {
expect(name).to.be('bulk');
expect(params).to.eql({
body: recordsToBulkBody(records),
requestTimeout: 120000,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are request timeouts handled/specified differently in the new client?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Earlier bulk API would take requestTimeout input as part of BulkRequest param, but with new bulk API, requestTimeout is not part of bulkRequest Param interface (renamed/re-typed to timeout), instead it is used as part of TransportRequestOptions.
https://github.com/opensearch-project/OpenSearch-Dashboards/pull/4142/files/0fb11084c023cf65a27a4cdbdd626c70576e213a#diff-a2f5251433aa1701470ba8eec10215aff6149564b512d7be3e97aa3a1b967190R61

https://github.com/opensearch-project/opensearch-js/blob/e3efb225e5880fe4f5a4403dbc10eaa5e2173e31/api/opensearch_dashboards.d.ts#L91

BulkRequest

});
return { ok: true };
expect(params).to.eql({ body: recordsToBulkBody(records) });
return {
body: {
ok: true,
},
};
},
]);
const stats = createStubStats();
Expand All @@ -88,19 +89,21 @@ describe('opensearchArchiver: createIndexDocRecordsStream()', () => {
const client = createStubClient([
async (name, params) => {
expect(name).to.be('bulk');
expect(params).to.eql({
body: recordsToBulkBody(records.slice(0, 1)),
requestTimeout: 120000,
});
return { ok: true };
expect(params).to.eql({ body: recordsToBulkBody(records.slice(0, 1)) });
return {
body: {
ok: true,
},
};
},
async (name, params) => {
expect(name).to.be('bulk');
expect(params).to.eql({
body: recordsToBulkBody(records.slice(1)),
requestTimeout: 120000,
});
return { ok: true };
expect(params).to.eql({ body: recordsToBulkBody(records.slice(1)) });
return {
body: {
ok: true,
},
};
},
]);
const stats = createStubStats();
Expand All @@ -124,21 +127,23 @@ describe('opensearchArchiver: createIndexDocRecordsStream()', () => {
const client = createStubClient([
async (name, params) => {
expect(name).to.be('bulk');
expect(params).to.eql({
body: recordsToBulkBody(records.slice(0, 1)),
requestTimeout: 120000,
});
expect(params).to.eql({ body: recordsToBulkBody(records.slice(0, 1)) });
await delay(delayMs);
return { ok: true };
return {
body: {
ok: true,
},
};
},
async (name, params) => {
expect(name).to.be('bulk');
expect(params).to.eql({
body: recordsToBulkBody(records.slice(1)),
requestTimeout: 120000,
});
expect(params).to.eql({ body: recordsToBulkBody(records.slice(1)) });
expect(Date.now() - start).to.not.be.lessThan(delayMs);
return { ok: true };
return {
body: {
ok: true,
},
};
},
]);
const progress = new Progress();
Expand All @@ -160,17 +165,29 @@ describe('opensearchArchiver: createIndexDocRecordsStream()', () => {
async (name, params) => {
expect(name).to.be('bulk');
expect(params.body.length).to.eql(1 * 2);
return { ok: true };
return {
body: {
ok: true,
},
};
},
async (name, params) => {
expect(name).to.be('bulk');
expect(params.body.length).to.eql(299 * 2);
return { ok: true };
return {
body: {
ok: true,
},
};
},
async (name, params) => {
expect(name).to.be('bulk');
expect(params.body.length).to.eql(1 * 2);
return { ok: true };
return {
body: {
ok: true,
},
};
},
]);
const progress = new Progress();
Expand All @@ -189,8 +206,8 @@ describe('opensearchArchiver: createIndexDocRecordsStream()', () => {
const records = createPersonDocRecords(2);
const stats = createStubStats();
const client = createStubClient([
async () => ({ ok: true }),
async () => ({ errors: true, forcedError: true }),
async () => ({ body: { ok: true } }),
async () => ({ body: { errors: true, forcedError: true } }),
]);
const progress = new Progress();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* under the License.
*/

import { Client } from 'elasticsearch';
import { Client } from '@opensearch-project/opensearch';
import { Writable } from 'stream';
import { Stats } from '../stats';
import { Progress } from '../progress';
Expand Down Expand Up @@ -58,8 +58,8 @@ export function createIndexDocRecordsStream(
);
});

const resp = await client.bulk({ requestTimeout: 2 * 60 * 1000, body });
if (resp.errors) {
const resp = await client.bulk({ body }, { requestTimeout: 2 * 60 * 1000 });
if (resp.body.errors) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also throw if resp.body is undefined

throw new Error(`Failed to index all documents: ${JSON.stringify(resp, null, 2)}`);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* under the License.
*/

import { Client } from 'elasticsearch';
import { Client } from '@opensearch-project/opensearch';
import sinon from 'sinon';
import Chance from 'chance';
import { times } from 'lodash';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ describe('opensearchArchiver: createCreateIndexStream()', () => {
expect((client.indices.getAlias as sinon.SinonSpy).calledOnce).to.be.ok();
expect((client.indices.getAlias as sinon.SinonSpy).args[0][0]).to.eql({
name: 'existing-index',
ignore: [404],
});
expect((client.indices.delete as sinon.SinonSpy).calledOnce).to.be.ok();
expect((client.indices.delete as sinon.SinonSpy).args[0][0]).to.eql({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import { Transform, Readable } from 'stream';
import { inspect } from 'util';

import { Client } from 'elasticsearch';
import { Client } from '@opensearch-project/opensearch';
import { ToolingLog } from '@osd/dev-utils';

import { Stats } from '../stats';
Expand Down
Loading