Skip to content

Commit

Permalink
feat(#107): Adds multi-db watcher support (#113)
Browse files Browse the repository at this point in the history
Adds watcher for continuous doc import. Watcher has a timeout of 5 seconds between tries.
Adds multi database support.
Ignores postgres deadlock errors, and adds retry.

Updates e2e tests to:

not use two additional containers to push data to CouchDb, that used python to push data
not have data in a zipped file, where it's literally invisible to the developer. instead using scalability csv docs, and using cht-conf to generate documents that get uploaded before the test runs.
updates test to wait until DBT processing is complete and checks number of results.
#107
  • Loading branch information
dianabarsan authored Jun 18, 2024
1 parent c5cb57f commit 279d8f2
Show file tree
Hide file tree
Showing 34 changed files with 28,229 additions and 2,781 deletions.
28 changes: 9 additions & 19 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ name: Test

on: [push, pull_request]

env:
INTERNAL_CONTRIBUTOR: ${{ secrets.DOCKERHUB_USERNAME && 'true' }}

jobs:
unit-tests:
name: Unit tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-node@v2
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: 20.x
- run: npm ci
Expand All @@ -21,30 +24,17 @@ jobs:
name: E2E Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-node@v2
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: 20.x
- name: Get Docker Hub username
id: get-docker-hub-username
run: echo '::set-output name=dockerhub_username::${{ secrets.DOCKERHUB_USERNAME }}'
- name: Login to Docker Hub
uses: docker/login-action@v1
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
if: steps.get-docker-hub-username.outputs.dockerhub_username
if: ${{ env.INTERNAL_CONTRIBUTOR }}
- run: npm ci
- name: Start containers
run: >
docker compose
--env-file ./tests/.e2e-env
-f docker-compose.yml
-f docker-compose.couchdb.yml
-f docker-compose.postgres.yml
up -d
- name: Sleep for 60 seconds
run: sleep 60s
- name: Run e2e tests
run: npm run test:e2e

1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ node_modules
*/coverage/*
/couch2pg/.nyc_output/
/.eslintcache
/tests/data/json_docs
2 changes: 2 additions & 0 deletions couch2pg/src/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,5 @@ export const getCouchDbClient = (dbName) => {
const url = `${COUCHDB_SECURE === 'true' ? 'https' : 'http'}://${COUCHDB_USER}:${COUCHDB_PASSWORD}@${COUCHDB_HOST}:${COUCHDB_PORT}/${dbName}`;
return new PouchDb(url, { skip_setup: true });
};

export const couchDbs = COUCHDB_DBS.split(',').map(db => db.trim());
20 changes: 16 additions & 4 deletions couch2pg/src/importer.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,23 @@ const loadAndStoreDocs = async (couchdb, docsToDownload) => {
const allDocsResult = await couchdb.allDocs({ keys: docIds, include_docs: true });
console.info('Pulled ' + allDocsResult.rows.length + ' results from couchdb');

const { query, values } = buildBulkInsertQuery(allDocsResult);
await storeDocs(allDocsResult);
};

const client = await db.getPgClient();
await client.query(query, values);
await client.end();
const storeDocs = async (allDocsResult) => {
try {
const { query, values } = buildBulkInsertQuery(allDocsResult);

const client = await db.getPgClient();
await client.query(query, values);
await client.end();
} catch (err) {
if (err.code === '40P01') {
// deadlock detected
return storeDocs(allDocsResult);
}
throw err;
}
};

const deleteDocs = async () => {
Expand Down
4 changes: 2 additions & 2 deletions couch2pg/src/index.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import * as setup from './setup.js';
import watcher from './watcher.js';
import * as db from './db.js';
import importer from './importer.js';

(async() => {
await setup.createDatabase();

await importer(db.getCouchDbClient());
db.couchDbs.forEach(db => watcher(db));
})();
15 changes: 15 additions & 0 deletions couch2pg/src/watcher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import importer from './importer.js';
import * as db from './db.js';

const DELAY = 5 * 1000; // 5 seconds

export default async (dbName) => {
const couchDb = db.getCouchDbClient(dbName);
do {
const processedChanges = await importer(couchDb);
if (!processedChanges) {
await new Promise(r => setTimeout(r, DELAY));
}
// eslint-disable-next-line no-constant-condition
} while (true);
};
71 changes: 71 additions & 0 deletions couch2pg/tests/unit/watcher.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import '../common.js';
import sinon from 'sinon';
import esmock from 'esmock';

let clock;
let importer;
let db;
let watcher;

describe('watcher', () => {
beforeEach(async () => {
clock = sinon.useFakeTimers();

db = { getCouchDbClient: sinon.stub() };
importer = sinon.stub();

watcher = await esmock('../../src/watcher', { '../../src/db': db, '../../src/importer': importer });
});

afterEach(() => {
clock.restore();
sinon.restore();
});

it('should watch proposed database', async () => {
importer.onCall(0).resolves(2);
importer.onCall(1).resolves(3);
importer.onCall(2).resolves(0);

watcher('this is the db name');

expect(db.getCouchDbClient.calledOnceWith('this is the db name')).to.equal(true);
expect(importer.calledOnce).to.equal(true);
await Promise.resolve();
expect(importer.calledTwice).to.equal(true);
await Promise.resolve();
expect(importer.calledThrice).to.equal(true);
});

it('should wait for 5 seconds after no results are processed', async () => {
importer.onCall(0).resolves(2);
importer.onCall(1).resolves(3);
importer.onCall(2).resolves(0);
importer.onCall(3).resolves(2);
importer.onCall(4).resolves(0);

watcher('medic-sentinel');

expect(db.getCouchDbClient.calledOnceWith('medic-sentinel')).to.equal(true);
expect(importer.calledOnce).to.equal(true);
await Promise.resolve();
expect(importer.calledTwice).to.equal(true);
await Promise.resolve();
expect(importer.calledThrice).to.equal(true);
await Promise.resolve();
expect(importer.calledThrice).to.equal(true);
clock.tick(5 * 1000);
await Promise.resolve();
expect(importer.callCount).to.equal(4);
await Promise.resolve();
expect(importer.callCount).to.equal(5);
});

it('should stop on errors', async () => {
importer.onCall(0).resolves(2);
importer.onCall(1).resolves(3);
importer.onCall(2).rejects(new Error('boom'));

await expect(watcher('db')).to.eventually.be.rejectedWith('boom');
});
});
1 change: 0 additions & 1 deletion data/.gitignore

This file was deleted.

Empty file removed data/.gitkeep
Empty file.
7 changes: 0 additions & 7 deletions data/Dockerfile

This file was deleted.

33 changes: 0 additions & 33 deletions data/data-generator.py

This file was deleted.

Binary file removed data/json_docs.tar.gz
Binary file not shown.
7 changes: 0 additions & 7 deletions data/start.sh

This file was deleted.

31 changes: 3 additions & 28 deletions docker-compose.couchdb.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,10 @@ version: '3.7'

services:
couchdb:
image: couchdb
image: public.ecr.aws/medic/cht-couchdb:4.8.0
restart: always
ports:
- "5984:5984"
environment:
- COUCHDB_USER=${COUCHDB_USER}
- COUCHDB_PASSWORD=${COUCHDB_PASSWORD}

bootstrap:
build: ./data/
depends_on:
- couchdb
restart: on-failure:5
environment:
- COUCHDB_USER=${COUCHDB_USER}
- COUCHDB_PASSWORD=${COUCHDB_PASSWORD}
- COUCHDB_DBS=${COUCHDB_DBS}

generator:
image: python:3
depends_on:
- couchdb
command: >
bash -c "mkdir /data/ -p && tar -xzf /json_docs.tar.gz -C /data/ --strip-components=1 && python3 /code/data-generator.py"
environment:
- COUCHDB_URL=http://couchdb:5984/
- DOCS_PATH=/data/
- COUCHDB_USER=${COUCHDB_USER}
- COUCHDB_PASSWORD=${COUCHDB_PASSWORD}
- COUCHDB_DBS=${COUCHDB_DBS}
volumes:
- ./data/data-generator.py:/code/data-generator.py:z
- ./data/json_docs.tar.gz/:/json_docs.tar.gz:z
restart: always
1 change: 1 addition & 0 deletions env.template
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ POSTGRES_DB=data
POSTGRES_SCHEMA=v1
POSTGRES_TABLE=medic # for dbt use only
POSTGRES_HOST=someip # Your postgres instance IP or endpoint in "prod".
POSTGRES_PORT=5432

# dbt
DBT_POSTGRES_USER=postgres
Expand Down
Loading

0 comments on commit 279d8f2

Please sign in to comment.