Skip to content

Commit

Permalink
fix(microservices-shared-mongo): improve transaction manager impl
Browse files Browse the repository at this point in the history
  • Loading branch information
getlarge committed Dec 15, 2023
1 parent fb59a67 commit deb70cc
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 102 deletions.
66 changes: 43 additions & 23 deletions apps/orders/src/app/orders/orders.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,30 @@ export class OrdersService {
expiresAt.setSeconds(expiresAt.getSeconds() + this.expirationWindow);

await using manager = await transactionManager(this.ticketModel);
return manager.wrap(async () => {
const result = await manager.wrap(async (session) => {
// 4. Build the order and save it to DB
const newOrder = await this.orderModel.create({
ticket,
userId: currentUser.id,
expiresAt,
status: OrderStatus.Created,
});
await newOrder.populate('ticket');
const result = newOrder.toJSON<Order>();
const res = await this.orderModel.create(
[
{
ticket,
userId: currentUser.id,
expiresAt,
status: OrderStatus.Created,
},
],
{ session },
);
await res[0].populate('ticket');
const order = res[0].toJSON<Order>();
// 5. Publish an event
await lastValueFrom(this.emitEvent(Patterns.OrderCreated, result).pipe());
return result;
await lastValueFrom(this.emitEvent(Patterns.OrderCreated, order));
return order;
});
if (result.error) {
this.logger.error(result.error);
throw result.error;
}
return result.value;
}

async find(currentUser: User): Promise<Order[]> {
Expand Down Expand Up @@ -129,42 +139,52 @@ export class OrdersService {

async cancelById(id: string, currentUser: User): Promise<Order> {
await using manager = await transactionManager(this.orderModel);
return manager.wrap(async () => {
const result = await manager.wrap(async (session) => {
const order = await this.orderModel
.findOne({ _id: id })
.populate('ticket')
.session(manager.session);
.session(session);
this.orderExists(id, order);
this.userIsOrderOwner(currentUser, order);
order.set({ status: OrderStatus.Cancelled });
await order.save({ session: manager.session });
const result = order.toJSON<Order>();
await order.save({ session });
const updatedOrder = order.toJSON<Order>();
await lastValueFrom(
this.emitEvent(Patterns.OrderCancelled, result).pipe(),
this.emitEvent(Patterns.OrderCancelled, updatedOrder),
);
return result;
return updatedOrder;
});
if (result.error) {
this.logger.error(result.error);
throw result.error;
}
return result.value;
}

async expireById(id: string): Promise<Order> {
await using manager = await transactionManager(this.orderModel);
return manager.wrap(async () => {
const result = await manager.wrap(async (session) => {
const order = await this.orderModel
.findOne({ _id: id })
.populate('ticket')
.session(manager.session);
.session(session);
this.orderExists(id, order);
if (order.status === OrderStatus.Complete) {
return order.toJSON<Order>();
}
order.set({ status: OrderStatus.Cancelled });
await order.save({ session: manager.session });
const result = order.toJSON<Order>();
await order.save({ session });
const updatedOrder = order.toJSON<Order>();
await lastValueFrom(
this.emitEvent(Patterns.OrderCancelled, result).pipe(),
this.emitEvent(Patterns.OrderCancelled, updatedOrder),
);
return result;
return updatedOrder;
});
if (result.error) {
this.logger.error(result.error);
throw result.error;
}
return result.value;
}

async complete(data: PaymentCreatedEvent['data']): Promise<Order> {
Expand Down
49 changes: 31 additions & 18 deletions apps/payments/src/app/payments/payments.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export class PaymentsService {
@Inject(ORDERS_CLIENT) private client: ClientProxy,
) {}

// TODO: add safe guard to avoid double payment
async create(
paymentRequest: CreatePayment,
currentUser: User,
Expand All @@ -59,30 +60,42 @@ export class PaymentsService {
}

// 4. make sure the payment amount match the order price and create payment with Stripe
const charge = await this.stripeService.charges.create({
amount: order.price * 100,
currency: 'eur',
source: token,
});
const charge = await this.stripeService.charges.create(
{
amount: order.price * 100,
currency: 'eur',
source: token,
},
{
// idempotencyKey: orderId,
},
);

await using manager = await transactionManager(this.paymentModel);
return manager.wrap(async () => {
const result = await manager.wrap(async (session) => {
// 5. Create charge instance in Mongo
const payment = await this.paymentModel.create({
orderId,
stripeId: charge.id,
});
const result = payment.toJSON<Payment>();
const res = await this.paymentModel.create(
[
{
orderId,
stripeId: charge.id,
},
],
{ session },
);
const payment = res[0].toJSON<Payment>();
// 6. emit payment:create event
await firstValueFrom(
this.client
.emit<PaymentCreatedEvent['name'], PaymentCreatedEvent['data']>(
Patterns.PaymentCreated,
result,
)
.pipe(),
this.client.emit<
PaymentCreatedEvent['name'],
PaymentCreatedEvent['data']
>(Patterns.PaymentCreated, payment),
);
return result;
return payment;
});
if (result.error) {
throw result.error;
}
return result.value;
}
}
122 changes: 73 additions & 49 deletions apps/tickets/src/app/tickets/tickets.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
} from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { InjectModel } from '@nestjs/mongoose';
import { OryPermissionsService } from '@ticketing/microservices/ory-client';
import {
OrderCancelledEvent,
OrderCreatedEvent,
Expand All @@ -23,7 +24,7 @@ import { transactionManager } from '@ticketing/microservices/shared/mongo';
import { User } from '@ticketing/shared/models';
import { isEmpty } from 'lodash-es';
import { Model } from 'mongoose';
import Paginator from 'nestjs-keyset-paginator';
import { Paginator } from 'nestjs-keyset-paginator';
import { lastValueFrom, Observable } from 'rxjs';

import { ORDERS_CLIENT } from '../shared/constants';
Expand All @@ -35,39 +36,38 @@ export class TicketsService {
readonly logger = new Logger(TicketsService.name);

constructor(
@InjectModel(TicketSchema.name) private ticketModel: Model<TicketDocument>,
@Inject(ORDERS_CLIENT) private client: ClientProxy,
@InjectModel(TicketSchema.name)
private readonly ticketModel: Model<TicketDocument>,
@Inject(ORDERS_CLIENT) private readonly client: ClientProxy,
) {}

emitEvent(
pattern: Patterns.TicketCreated | Patterns.TicketUpdated,
event: TicketCreatedEvent['data'] | TicketUpdatedEvent['data'],
): Observable<string> {
return this.client.emit<string, typeof event>(pattern, event);
return this.client.emit(pattern, event);
}

async create(ticket: CreateTicket, currentUser: User): Promise<Ticket> {
await using manager = await transactionManager(this.ticketModel);
return manager.wrap(async () => {
const res = await manager.wrap<Ticket>(async (session) => {
const doc: CreateTicket & { userId: string } = {
...ticket,
userId: currentUser.id,
};
const [newTicket] = await this.ticketModel.create([doc], {
session: manager.session,
const docs = await this.ticketModel.create([doc], {
session,
});
// await this.oryPermissionService.createRelation(
// parseRelationTuple(
// `${PermissionNamespaces[Resources.TICKETS]}:${newTicket.id}#owners@${PermissionNamespaces[Resources.USERS]}:${currentUser.id}#edit`,
// ).unwrapOrThrow(),
// );

const result = newTicket.toJSON<Ticket>();
await lastValueFrom(
this.emitEvent(Patterns.TicketCreated, result).pipe(),
);
return result;
const newTicket = docs[0].toJSON<Ticket>();
this.logger.debug(`Created ticket ${newTicket.id}`);
await lastValueFrom(this.emitEvent(Patterns.TicketCreated, newTicket));
this.logger.debug(`Sent event ${Patterns.TicketCreated}`);
return newTicket;
});
if (res.error) {
throw res.error;
}
return res.value;
}

paginate(params: PaginateDto = {}): Promise<{
Expand Down Expand Up @@ -116,61 +116,85 @@ export class TicketsService {
update: UpdateTicket,
currenUser: User,
): Promise<Ticket> {
const ticket = await this.ticketModel.findOne({ _id: id });
if (isEmpty(ticket)) {
throw new NotFoundException(`Ticket ${id} not found`);
} else if (ticket.userId !== currenUser.id) {
throw new ForbiddenException();
} else if (ticket.orderId) {
throw new BadRequestException(`Ticket ${id} is currently reserved`);
}

await using manager = await transactionManager(this.ticketModel);
return manager.wrap(async () => {
const result = await manager.wrap(async (session) => {
const ticket = await this.ticketModel
.findOne({ _id: id })
.session(session);
if (isEmpty(ticket)) {
throw new NotFoundException(`Ticket ${id} not found`);
} else if (ticket.userId !== currenUser.id) {
// TODO: should be handled by Ory permissions only
throw new ForbiddenException();
} else if (ticket.orderId) {
throw new BadRequestException(`Ticket ${id} is currently reserved`);
}
ticket.set(update);
await ticket.save({ session: manager.session });
const result = ticket.toJSON<Ticket>();
await ticket.save({ session });
const updatedTicket = ticket.toJSON<Ticket>();
await lastValueFrom(
this.emitEvent(Patterns.TicketUpdated, result).pipe(),
this.emitEvent(Patterns.TicketUpdated, updatedTicket),
);
return result;
return updatedTicket;
});
if (result.error) {
this.logger.error(result.error);
throw result.error;
}
return result.value;
}

async createOrder(event: OrderCreatedEvent['data']): Promise<Ticket> {
const ticketId = event.ticket.id;
const orderId = event.id;
const ticket = await this.ticketModel.findOne({ _id: ticketId });
if (isEmpty(ticket)) {
throw new NotFoundException(`Ticket ${ticketId} not found`);
}
await using manager = await transactionManager(this.ticketModel);
return manager.wrap(async () => {
const result = await manager.wrap(async (session) => {
const ticket = await this.ticketModel
.findOne({ _id: ticketId })
.session(session);
if (isEmpty(ticket)) {
throw new NotFoundException(`Ticket ${ticketId} not found`);
}
ticket.set({ orderId });
await ticket.save({ session: manager.session });
const result = ticket.toJSON<Ticket>();
await ticket.save({ session });
// TODO: create relation between ticket and order
const updatedTicket = ticket.toJSON<Ticket>();
await lastValueFrom(
this.emitEvent(Patterns.TicketUpdated, result).pipe(),
this.emitEvent(Patterns.TicketUpdated, updatedTicket),
);
return result;
return updatedTicket;
});
if (result.error) {
this.logger.error(result.error);
throw result.error;
}
return result.value;
}

async cancelOrder(event: OrderCancelledEvent['data']): Promise<Ticket> {
const ticketId = event.ticket.id;
const ticket = await this.ticketModel.findOne({ _id: ticketId });
if (isEmpty(ticket)) {
throw new NotFoundException(`Ticket ${ticketId} not found`);
}

await using manager = await transactionManager(this.ticketModel);
return manager.wrap(async () => {
const result = await manager.wrap(async (session) => {
const ticket = await this.ticketModel
.findOne({ _id: ticketId })
.session(session);
if (isEmpty(ticket)) {
throw new NotFoundException(`Ticket ${ticketId} not found`);
}
ticket.set({ orderId: undefined });
await ticket.save({ session: manager.session });
const result = ticket.toJSON<Ticket>();
// TODO: delete relation between ticket and order
const updatedTicket = ticket.toJSON<Ticket>();
await lastValueFrom(
this.emitEvent(Patterns.TicketUpdated, result).pipe(),
this.emitEvent(Patterns.TicketUpdated, updatedTicket).pipe(),
);
return result;
return updatedTicket;
});
if (result.error) {
this.logger.error(result.error);
throw result.error;
}
return result.value;
}
}
Loading

0 comments on commit deb70cc

Please sign in to comment.