Skip to content

Commit

Permalink
feat: not be affected by replication-lag when used replication
Browse files Browse the repository at this point in the history
  • Loading branch information
jiho-kr committed Sep 13, 2023
1 parent cd30449 commit 86951f1
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 56 deletions.
2 changes: 2 additions & 0 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ MYSQL_DATABASE_PASSWORD=local
POSTGRESQL_DATABASE_NAME=pg_test
POSTGRESQL_DATABASE_USERNAME=postgres
POSTGRESQL_DATABASE_PASSWORD=local

POSTGRESQL_SLAVE_PORT=5433
10 changes: 10 additions & 0 deletions docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,13 @@ services:
POSTGRES_PASSWORD: $POSTGRESQL_DATABASE_PASSWORD
ports:
- '5432:5432'

postgresql-slave:
image: postgres:latest
restart: always
environment:
POSTGRES_DB: $POSTGRESQL_DATABASE_NAME
POSTGRES_USER: $POSTGRESQL_DATABASE_USERNAME
POSTGRES_PASSWORD: $POSTGRESQL_DATABASE_PASSWORD
ports:
- '$POSTGRESQL_SLAVE_PORT:5432'
86 changes: 86 additions & 0 deletions spec/replication/replication.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { HttpStatus, INestApplication } from '@nestjs/common';
import { Test } from '@nestjs/testing';
import { TypeOrmModule } from '@nestjs/typeorm';
import request from 'supertest';

import { BaseEntity } from '../base/base.entity';
import { BaseModule } from '../base/base.module';
import { TestHelper } from '../test.helper';

describe('replication', () => {
let app: INestApplication;
beforeAll(async () => {
const [moduleFixture] = await Promise.all([
Test.createTestingModule({
imports: [
BaseModule,
TypeOrmModule.forRoot({
type: 'postgres',
entities: [BaseEntity],
synchronize: true,
logging: true,
logger: 'file',
replication: {
master: {
database: process.env.POSTGRESQL_DATABASE_NAME,
username: process.env.POSTGRESQL_DATABASE_USERNAME,
password: process.env.POSTGRESQL_DATABASE_PASSWORD,
},
slaves: [
{
port: Number(process.env.POSTGRESQL_SLAVE_PORT),
database: process.env.POSTGRESQL_DATABASE_NAME,
username: process.env.POSTGRESQL_DATABASE_USERNAME,
password: process.env.POSTGRESQL_DATABASE_PASSWORD,
},
],
},
}),
],
}).compile(),
await Test.createTestingModule({
imports: [
BaseModule,
TypeOrmModule.forRoot({
type: 'postgres',
entities: [BaseEntity],
port: Number(process.env.POSTGRESQL_SLAVE_PORT),
database: process.env.POSTGRESQL_DATABASE_NAME,
username: process.env.POSTGRESQL_DATABASE_USERNAME,
password: process.env.POSTGRESQL_DATABASE_PASSWORD,
synchronize: true,
logging: true,
logger: 'file',
}),
],
}).compile(),
]);
app = moduleFixture.createNestApplication();
await app.init();
});

afterAll(async () => {
await TestHelper.dropTypeOrmEntityTables();
await app?.close();
});

it('should not be affected by replication-lag', async () => {
const name = 'replication-test';
const { body: created } = await request(app.getHttpServer()).post('/base').send({ name }).expect(HttpStatus.CREATED);

await request(app.getHttpServer()).get(`/base/${created.id}`).expect(HttpStatus.NOT_FOUND);

const { body: updated } = await request(app.getHttpServer())
.patch(`/base/${created.id}`)
.send({ name: 'new Name' })
.expect(HttpStatus.OK);
expect(updated.id).toEqual(created.id);
expect(updated.name).toEqual('new Name');

const { body: deleted } = await request(app.getHttpServer()).delete(`/base/${created.id}`).expect(HttpStatus.OK);
expect(deleted.id).toEqual(created.id);

const { body: recovered } = await request(app.getHttpServer()).post(`/base/${created.id}/recover`).expect(HttpStatus.CREATED);
expect(recovered.id).toEqual(created.id);
});
});
113 changes: 57 additions & 56 deletions src/lib/crud.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ import {
} from './interface';
import { CrudReadManyRequest } from './request';

const SUPPORTED_REPLICATION_TYPES = new Set(['mysql', 'mariadb', 'postgres', 'aurora-postgres', 'aurora-mysql']);

export class CrudService<T extends BaseEntity> {
private primaryKey: string[];
private usableQueryRunner = false;

constructor(public readonly repository: Repository<T>) {
this.usableQueryRunner = SUPPORTED_REPLICATION_TYPES.has(this.repository.metadata.connection?.options.type);
this.primaryKey = this.repository.metadata.primaryColumns?.map((columnMetadata) => columnMetadata.propertyName) ?? [];
}

Expand Down Expand Up @@ -86,79 +90,76 @@ export class CrudService<T extends BaseEntity> {
};

readonly reservedUpsert = async (crudUpsertRequest: CrudUpsertRequest<T>): Promise<T> => {
return this.repository
.findOne({
where: crudUpsertRequest.params as unknown as FindOptionsWhere<T>,
withDeleted: true,
})
.then(async (entity: T | null) => {
const upsertEntity = entity ?? this.repository.create(crudUpsertRequest.params as unknown as DeepPartial<T>);
if ('deletedAt' in upsertEntity && upsertEntity.deletedAt != null) {
throw new ConflictException('it has been deleted');
}
return this.findOne(crudUpsertRequest.params as unknown as FindOptionsWhere<T>, true).then(async (entity: T | null) => {
const upsertEntity = entity ?? this.repository.create(crudUpsertRequest.params as unknown as DeepPartial<T>);
if ('deletedAt' in upsertEntity && upsertEntity.deletedAt != null) {
throw new ConflictException('it has been deleted');
}

if (crudUpsertRequest.author) {
_.merge(upsertEntity, { [crudUpsertRequest.author.property]: crudUpsertRequest.author.value });
}
if (crudUpsertRequest.author) {
_.merge(upsertEntity, { [crudUpsertRequest.author.property]: crudUpsertRequest.author.value });
}

return this.repository.save(_.assign(upsertEntity, crudUpsertRequest.body));
});
return this.repository.save(_.assign(upsertEntity, crudUpsertRequest.body));
});
};

readonly reservedUpdate = async (crudUpdateOneRequest: CrudUpdateOneRequest<T>): Promise<T> => {
return this.repository
.findOne({
where: crudUpdateOneRequest.params as unknown as FindOptionsWhere<T>,
})
.then(async (entity: T | null) => {
if (!entity) {
throw new NotFoundException();
}
return this.findOne(crudUpdateOneRequest.params as unknown as FindOptionsWhere<T>, false).then(async (entity: T | null) => {
if (!entity) {
throw new NotFoundException();
}

if (crudUpdateOneRequest.author) {
_.merge(entity, { [crudUpdateOneRequest.author.property]: crudUpdateOneRequest.author.value });
}
if (crudUpdateOneRequest.author) {
_.merge(entity, { [crudUpdateOneRequest.author.property]: crudUpdateOneRequest.author.value });
}

return this.repository.save(_.assign(entity, crudUpdateOneRequest.body));
});
return this.repository.save(_.assign(entity, crudUpdateOneRequest.body));
});
};

readonly reservedDelete = async (crudDeleteOneRequest: CrudDeleteOneRequest<T>): Promise<T> => {
if (this.primaryKey.length === 0) {
throw new ConflictException('cannot found primary key from entity');
}
return this.findOne(crudDeleteOneRequest.params as unknown as FindOptionsWhere<T>, false).then(async (entity: T | null) => {
if (!entity) {
throw new NotFoundException();
}

return this.repository
.findOne({
where: crudDeleteOneRequest.params as unknown as FindOptionsWhere<T>,
})
.then(async (entity: T | null) => {
if (!entity) {
throw new NotFoundException();
}

if (crudDeleteOneRequest.author) {
_.merge(entity, { [crudDeleteOneRequest.author.property]: crudDeleteOneRequest.author.value });
await this.repository.save(entity);
}
if (crudDeleteOneRequest.author) {
_.merge(entity, { [crudDeleteOneRequest.author.property]: crudDeleteOneRequest.author.value });
await this.repository.save(entity);
}

await (crudDeleteOneRequest.softDeleted ? entity.softRemove() : entity.remove());
return entity;
});
await (crudDeleteOneRequest.softDeleted ? entity.softRemove() : entity.remove());
return entity;
});
};

readonly reservedRecover = async (crudRecoverRequest: CrudRecoverRequest<T>): Promise<T> => {
return this.repository
.findOne({
where: crudRecoverRequest.params as unknown as FindOptionsWhere<T>,
withDeleted: true,
})
.then(async (entity: T | null) => {
if (!entity) {
throw new NotFoundException();
}
await this.repository.recover(entity);
return entity;
});
return this.findOne(crudRecoverRequest.params as unknown as FindOptionsWhere<T>, true).then(async (entity: T | null) => {
if (!entity) {
throw new NotFoundException();
}
await this.repository.recover(entity);
return entity;
});
};

private async findOne(where: FindOptionsWhere<T>, withDeleted: boolean): Promise<T | null> {
if (!this.usableQueryRunner) {
return this.repository.findOne({ where, withDeleted });
}
const queryBuilder = this.repository.createQueryBuilder().where(where);
if (withDeleted) {
queryBuilder.withDeleted();
}
const runner = queryBuilder.connection.createQueryRunner('master');
try {
return await queryBuilder.setQueryRunner(runner).getOne();
} finally {
await runner.release();
}
}
}

0 comments on commit 86951f1

Please sign in to comment.