Skip to content

DATA-4054 Data pipelines Typescript support #551

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 244 additions & 0 deletions src/app/data-client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,22 @@ import {
UploadMetadata,
} from '../gen/app/datasync/v1/data_sync_pb';
import { DataClient, type FilterOptions } from './data-client';
import {
DataPipeline,
ListDataPipelinesRequest,
ListDataPipelinesResponse,
GetDataPipelineRequest,
GetDataPipelineResponse,
CreateDataPipelineRequest,
CreateDataPipelineResponse,
DeleteDataPipelineRequest,
DeleteDataPipelineResponse,
DataPipelineRun,
DataPipelineRunStatus,
ListDataPipelineRunsRequest,
ListDataPipelineRunsResponse,
} from '../gen/app/datapipelines/v1/data_pipelines_pb';
import { DataPipelinesService } from '../gen/app/datapipelines/v1/data_pipelines_connect';
vi.mock('../gen/app/data/v1/data_pb_service');

let mockTransport: Transport;
Expand Down Expand Up @@ -1404,3 +1420,231 @@ describe('DataSyncClient tests', () => {
});
});
});

describe('DataPipelineClient tests', () => {
const organizationId = 'testOrgId';
const pipelineId = 'testPipelineId';
const pipelineName = 'testPipeline';
const mqlQuery = [{ $match: { component_name: 'sensor-1' } }];
const schedule = '0 0 * * *';

describe('listDataPipelines tests', () => {
const pipeline1 = new DataPipeline({
id: 'pipeline1',
name: 'pipeline1',
organizationId: 'org1',
});
const pipeline2 = new DataPipeline({
id: 'pipeline2',
name: 'pipeline2',
organizationId: 'org2',
});
const pipelines = [pipeline1, pipeline2];

let capReq: ListDataPipelinesRequest;
beforeEach(() => {
mockTransport = createRouterTransport(({ service }) => {
service(DataPipelinesService, {
listDataPipelines: (req: ListDataPipelinesRequest) => {
capReq = req;
return new ListDataPipelinesResponse({
dataPipelines: pipelines,
});
},
});
});
});

it('list data pipelines', async () => {
const expectedRequest = new ListDataPipelinesRequest({
organizationId,
});

const response = await subject().listDataPipelines(organizationId);
expect(capReq).toStrictEqual(expectedRequest);
expect(response).toEqual(pipelines);
});
});

describe('getPipeline tests', () => {
const pipeline = new DataPipeline({
id: pipelineId,
name: pipelineName,
organizationId,
});

let capReq: GetDataPipelineRequest;
beforeEach(() => {
mockTransport = createRouterTransport(({ service }) => {
service(DataPipelinesService, {
getDataPipeline: (req: GetDataPipelineRequest) => {
capReq = req;
return new GetDataPipelineResponse({
dataPipeline: pipeline,
});
},
});
});
});

it('get pipeline', async () => {
const expectedRequest = new GetDataPipelineRequest({
id: pipelineId,
});

const response = await subject().getDataPipeline(pipelineId);
expect(capReq).toStrictEqual(expectedRequest);
expect(response).toEqual(pipeline);
});

it('returns null when pipeline does not exist', async () => {
mockTransport = createRouterTransport(({ service }) => {
service(DataPipelinesService, {
getDataPipeline: () => {
return new GetDataPipelineResponse({});
},
});
});

const response = await subject().getDataPipeline(pipelineId);
expect(response).toBeNull();
});
});

describe('createDataPipeline tests', () => {
let capReq: CreateDataPipelineRequest;
beforeEach(() => {
mockTransport = createRouterTransport(({ service }) => {
service(DataPipelinesService, {
createDataPipeline: (req: CreateDataPipelineRequest) => {
capReq = req;
return new CreateDataPipelineResponse({
id: pipelineId,
});
},
});
});
});

it('create data pipeline', async () => {
const expectedRequest = new CreateDataPipelineRequest({
organizationId,
name: pipelineName,
mqlBinary: mqlQuery.map((value) => BSON.serialize(value)),
schedule,
});

const response = await subject().createDataPipeline(
organizationId,
pipelineName,
mqlQuery,
schedule
);
expect(capReq).toStrictEqual(expectedRequest);
expect(response).toEqual(pipelineId);
});
});

describe('deleteDataPipeline tests', () => {
let capReq: DeleteDataPipelineRequest;
beforeEach(() => {
mockTransport = createRouterTransport(({ service }) => {
service(DataPipelinesService, {
deleteDataPipeline: (req: DeleteDataPipelineRequest) => {
capReq = req;
return new DeleteDataPipelineResponse();
},
});
});
});

it('delete data pipeline', async () => {
const expectedRequest = new DeleteDataPipelineRequest({
id: pipelineId,
});

await subject().deleteDataPipeline(pipelineId);
expect(capReq).toStrictEqual(expectedRequest);
});
});

describe('listDataPipelineRuns tests', () => {
const run1 = new DataPipelineRun({
id: 'run1',
status: DataPipelineRunStatus.STARTED,
});
const run2 = new DataPipelineRun({
id: 'run2',
status: DataPipelineRunStatus.COMPLETED,
});
const runs = [run1, run2];
const pageSize = 10;
const nextPageToken = 'nextPageToken';

let capReq: ListDataPipelineRunsRequest;
beforeEach(() => {
mockTransport = createRouterTransport(({ service }) => {
service(DataPipelinesService, {
listDataPipelineRuns: (req: ListDataPipelineRunsRequest) => {
capReq = req;
return new ListDataPipelineRunsResponse({
runs,
nextPageToken,
});
},
});
});
});

it('list data pipeline runs', async () => {
const expectedRequest = new ListDataPipelineRunsRequest({
id: pipelineId,
pageSize,
});

const page = await subject().listDataPipelineRuns(pipelineId, pageSize);
expect(capReq).toStrictEqual(expectedRequest);
expect(page.runs).toEqual(runs);
const nextPage = await page.nextPage();
expect(nextPage.runs).toEqual(runs);
});

it('get next page of runs', async () => {
const nextPageRuns = [run2];
mockTransport = createRouterTransport(({ service }) => {
service(DataPipelinesService, {
listDataPipelineRuns: (req: ListDataPipelineRunsRequest) => {
capReq = req;
return new ListDataPipelineRunsResponse({
runs: nextPageRuns,
nextPageToken: 'some-token',
});
},
});
});

const page = await subject().listDataPipelineRuns(pipelineId, pageSize);
const nextPage = await page.nextPage();
expect(nextPage.runs).toEqual(nextPageRuns);
});

it('returns empty page when no more runs', async () => {
const someRuns = [run1];
mockTransport = createRouterTransport(({ service }) => {
service(DataPipelinesService, {
listDataPipelineRuns: (req: ListDataPipelineRunsRequest) => {
capReq = req;
return new ListDataPipelineRunsResponse({
runs: someRuns,
nextPageToken: '',
});
},
});
});

const page = await subject().listDataPipelineRuns(pipelineId, pageSize);
const nextPage = await page.nextPage();
expect(nextPage.runs).toEqual([]);
});
});
});
Loading