Skip to content

Commit

Permalink
refactor: wrap sensitive write operations in transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
getlarge committed Dec 13, 2023
1 parent 87addd3 commit f42d480
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 79 deletions.
33 changes: 20 additions & 13 deletions apps/auth/src/app/users/users.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
} from '@nestjs/common';
import { InjectModel } from '@nestjs/mongoose';
import { OryService } from '@ticketing/microservices/ory-client';
import { transactionManager } from '@ticketing/microservices/shared/mongo';
import { Model } from 'mongoose';

import { User, UserCredentials } from './models';
Expand All @@ -19,7 +20,7 @@ export class UsersService {

constructor(
@InjectModel(UserSchema.name) private userModel: Model<UserDocument>,
@Inject(OryService) private readonly oryService: OryService
@Inject(OryService) private readonly oryService: OryService,
) {}

/**
Expand All @@ -30,27 +31,33 @@ export class UsersService {
* @see https://www.ory.sh/docs/guides/integrate-with-ory-cloud-through-webhooks#modify-identities
**/
async onSignUp(body: OnOrySignUpDto): Promise<OnOrySignUpDto> {
const { identity } = body;
this.logger.debug(`onSignUp`, body);
const email = identity.traits.email;
const email = body.identity.traits.email;
const existingUser = await this.userModel.findOne({
email,
});
if (existingUser) {
await this.oryService.deleteIdentity(identity.id).catch((error) => {
await this.oryService.deleteIdentity(body.identity.id).catch((error) => {
this.logger.error(error);
});
throw new HttpException('email already used', HttpStatus.BAD_REQUEST);
}
const newUser = await this.userModel.create({
identityId: identity.id,
email,
await using manager = await transactionManager(this.userModel);
const { identity } = await manager.wrap(async () => {
const doc: Omit<User, 'id'> = {
identityId: identity.id,
email,
};
const [user] = await this.userModel.create([doc], {
session: manager.session,
});
const updatedIdentity = await this.oryService.updateIdentityMetadata(
identity.id,
{ id: user.id },
);
return { user, identity: updatedIdentity };
});
const updatedIdentity = await this.oryService.updateIdentityMetadata(
identity.id,
{ id: newUser.id }
);
return { identity: updatedIdentity };
return { identity };
}

/**
Expand All @@ -71,7 +78,7 @@ export class UsersService {
});
const updatedIdentity = await this.oryService.updateIdentityMetadata(
identity.id,
{ id: newUser.id }
{ id: newUser.id },
);
identity.metadata_public = updatedIdentity.metadata_public;
}
Expand Down
84 changes: 51 additions & 33 deletions apps/orders/src/app/orders/orders.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import {
Patterns,
PaymentCreatedEvent,
} from '@ticketing/microservices/shared/events';
import { transactionManager } from '@ticketing/microservices/shared/mongo';
import { User } from '@ticketing/shared/models';
import { isEmpty } from 'lodash';
import { Model } from 'mongoose';
import { Observable, zip } from 'rxjs';
import { lastValueFrom, Observable, zip } from 'rxjs';

import { AppConfigService } from '../env';
import {
Expand Down Expand Up @@ -75,19 +76,22 @@ export class OrdersService {
// 3. Calclate expiration date
const expiresAt = new Date();
expiresAt.setSeconds(expiresAt.getSeconds() + this.expirationWindow);
// 4. Build the order and save it to DB
// TODO: use a transaction here
const newOrder = await this.orderModel.create({
ticket,
userId: currentUser.id,
expiresAt,
status: OrderStatus.Created,

await using manager = await transactionManager(this.ticketModel);
return manager.wrap(async () => {
// 4. Build the order and save it to DB
const newOrder = await this.orderModel.create({
ticket,
userId: currentUser.id,
expiresAt,
status: OrderStatus.Created,
});
await newOrder.populate('ticket');
const result = newOrder.toJSON<Order>();
// 5. Publish an event
await lastValueFrom(this.emitEvent(Patterns.OrderCreated, result).pipe());
return result;
});
await newOrder.populate('ticket');
const result = newOrder.toJSON<Order>();
// 5. Publish an event
this.emitEvent(Patterns.OrderCreated, result);
return result;
}

async find(currentUser: User): Promise<Order[]> {
Expand Down Expand Up @@ -124,29 +128,43 @@ export class OrdersService {
}

async cancelById(id: string, currentUser: User): Promise<Order> {
// TODO: use a transaction here
const order = await this.orderModel.findOne({ _id: id }).populate('ticket');
this.orderExists(id, order);
this.userIsOrderOwner(currentUser, order);
order.set({ status: OrderStatus.Cancelled });
await order.save();
const result = order.toJSON<Order>();
this.emitEvent(Patterns.OrderCancelled, result);
return result;
await using manager = await transactionManager(this.orderModel);
return manager.wrap(async () => {
const order = await this.orderModel
.findOne({ _id: id })
.populate('ticket')
.session(manager.session);
this.orderExists(id, order);
this.userIsOrderOwner(currentUser, order);
order.set({ status: OrderStatus.Cancelled });
await order.save({ session: manager.session });
const result = order.toJSON<Order>();
await lastValueFrom(
this.emitEvent(Patterns.OrderCancelled, result).pipe(),
);
return result;
});
}

async expireById(id: string): Promise<Order> {
const order = await this.orderModel.findOne({ _id: id }).populate('ticket');
this.orderExists(id, order);
if (order.status === OrderStatus.Complete) {
return order.toJSON<Order>();
}
// TODO: use a transaction here
order.set({ status: OrderStatus.Cancelled });
await order.save();
const result = order.toJSON<Order>();
this.emitEvent(Patterns.OrderCancelled, result);
return result;
await using manager = await transactionManager(this.orderModel);
return manager.wrap(async () => {
const order = await this.orderModel
.findOne({ _id: id })
.populate('ticket')
.session(manager.session);
this.orderExists(id, order);
if (order.status === OrderStatus.Complete) {
return order.toJSON<Order>();
}
order.set({ status: OrderStatus.Cancelled });
await order.save({ session: manager.session });
const result = order.toJSON<Order>();
await lastValueFrom(
this.emitEvent(Patterns.OrderCancelled, result).pipe(),
);
return result;
});
}

async complete(data: PaymentCreatedEvent['data']): Promise<Order> {
Expand Down
34 changes: 22 additions & 12 deletions apps/payments/src/app/payments/payments.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import {
Patterns,
PaymentCreatedEvent,
} from '@ticketing/microservices/shared/events';
import { transactionManager } from '@ticketing/microservices/shared/mongo';
import { OrderStatus, User } from '@ticketing/shared/models';
import { Model } from 'mongoose';
import { firstValueFrom } from 'rxjs';

import { Order as OrderSchema, OrderDocument } from '../orders/schemas';
import { ORDERS_CLIENT } from '../shared/constants';
Expand Down Expand Up @@ -55,24 +57,32 @@ export class PaymentsService {
`Order ${orderId} has been cancelled and can't be paid for`,
);
}
// TODO: use MongoDB transaction

// 4. make sure the payment amount match the order price and create payment with Stripe
const charge = await this.stripeService.charges.create({
amount: order.price * 100,
currency: 'eur',
source: token,
});
// 6. Create charge instance in Mongo
const payment = await this.paymentModel.create({
orderId,
stripeId: charge.id,

await using manager = await transactionManager(this.paymentModel);
return manager.wrap(async () => {
// 5. Create charge instance in Mongo
const payment = await this.paymentModel.create({
orderId,
stripeId: charge.id,
});
const result = payment.toJSON<Payment>();
// 6. emit payment:create event
await firstValueFrom(
this.client
.emit<PaymentCreatedEvent['name'], PaymentCreatedEvent['data']>(
Patterns.PaymentCreated,
result,
)
.pipe(),
);
return result;
});
const result = payment.toJSON<Payment>();
// 7. emit payment:create event
this.client.emit<PaymentCreatedEvent['name'], PaymentCreatedEvent['data']>(
Patterns.PaymentCreated,
result,
);
return result;
}
}
73 changes: 52 additions & 21 deletions apps/tickets/src/app/tickets/tickets.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
NextPaginationDto,
PaginateDto,
} from '@ticketing/microservices/shared/models';
import { transactionManager } from '@ticketing/microservices/shared/mongo';
import { User } from '@ticketing/shared/models';
import { isEmpty } from 'lodash';
import { Model } from 'mongoose';
Expand Down Expand Up @@ -46,13 +47,27 @@ export class TicketsService {
}

async create(ticket: CreateTicket, currentUser: User): Promise<Ticket> {
const newTicket = await this.ticketModel.create({
...ticket,
userId: currentUser.id,
await using manager = await transactionManager(this.ticketModel);
return manager.wrap(async () => {
const doc: CreateTicket & { userId: string } = {
...ticket,
userId: currentUser.id,
};
const [newTicket] = await this.ticketModel.create([doc], {
session: manager.session,
});
// await this.oryPermissionService.createRelation(
// parseRelationTuple(
// `${PermissionNamespaces[Resources.TICKETS]}:${newTicket.id}#owners@${PermissionNamespaces[Resources.USERS]}:${currentUser.id}#edit`,
// ).unwrapOrThrow(),
// );

const result = newTicket.toJSON<Ticket>();
await lastValueFrom(
this.emitEvent(Patterns.TicketCreated, result).pipe(),
);
return result;
});
const result = newTicket.toJSON<Ticket>();
this.emitEvent(Patterns.TicketCreated, result);
return result;
}

paginate(params: PaginateDto = {}): Promise<{
Expand Down Expand Up @@ -109,11 +124,17 @@ export class TicketsService {
} else if (ticket.orderId) {
throw new BadRequestException(`Ticket ${id} is currently reserved`);
}
ticket.set(update);
await ticket.save();
const result = ticket.toJSON<Ticket>();
this.emitEvent(Patterns.TicketUpdated, result);
return result;

await using manager = await transactionManager(this.ticketModel);
return manager.wrap(async () => {
ticket.set(update);
await ticket.save({ session: manager.session });
const result = ticket.toJSON<Ticket>();
await lastValueFrom(
this.emitEvent(Patterns.TicketUpdated, result).pipe(),
);
return result;
});
}

async createOrder(event: OrderCreatedEvent['data']): Promise<Ticket> {
Expand All @@ -123,11 +144,16 @@ export class TicketsService {
if (isEmpty(ticket)) {
throw new NotFoundException(`Ticket ${ticketId} not found`);
}
ticket.set({ orderId });
await ticket.save();
const result = ticket.toJSON<Ticket>();
await lastValueFrom(this.emitEvent(Patterns.TicketUpdated, result).pipe());
return result;
await using manager = await transactionManager(this.ticketModel);
return manager.wrap(async () => {
ticket.set({ orderId });
await ticket.save({ session: manager.session });
const result = ticket.toJSON<Ticket>();
await lastValueFrom(
this.emitEvent(Patterns.TicketUpdated, result).pipe(),
);
return result;
});
}

async cancelOrder(event: OrderCancelledEvent['data']): Promise<Ticket> {
Expand All @@ -136,10 +162,15 @@ export class TicketsService {
if (isEmpty(ticket)) {
throw new NotFoundException(`Ticket ${ticketId} not found`);
}
ticket.set({ orderId: undefined });
await ticket.save();
const result = ticket.toJSON<Ticket>();
await lastValueFrom(this.emitEvent(Patterns.TicketUpdated, result).pipe());
return result;
await using manager = await transactionManager(this.ticketModel);
return manager.wrap(async () => {
ticket.set({ orderId: undefined });
await ticket.save({ session: manager.session });
const result = ticket.toJSON<Ticket>();
await lastValueFrom(
this.emitEvent(Patterns.TicketUpdated, result).pipe(),
);
return result;
});
}
}

0 comments on commit f42d480

Please sign in to comment.