Skip to content

Commit

Permalink
fix(core): Update DefaultSearchPlugin indexer controller to avoid Typ…
Browse files Browse the repository at this point in the history
…eORM memory leak (#2883)
  • Loading branch information
carathorys authored Jul 12, 2024
1 parent 1c46a88 commit ee2c177
Showing 1 changed file with 99 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ID } from '@vendure/common/lib/shared-types';
import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';
import { unique } from '@vendure/common/lib/unique';
import { Observable } from 'rxjs';
import { FindManyOptions, In } from 'typeorm';
import { Equal, FindManyOptions, FindOptionsWhere, In, IsNull } from 'typeorm';

import { RequestContext } from '../../../api/common/request-context';
import { RequestContextCacheService } from '../../../cache/request-context-cache.service';
Expand Down Expand Up @@ -39,17 +39,31 @@ import {
import { MutableRequestContext } from './mutable-request-context';

export const BATCH_SIZE = 1000;
export const productRelations = ['featuredAsset', 'facetValues', 'facetValues.facet', 'channels'];
export const variantRelations = [
export const productRelations = [
'translations',
'featuredAsset',
'facetValues',
'facetValues.facet',
'collections',
'variants',
'channels',
];
export const variantRelations = [
'translations',
'taxCategory',
'featuredAsset',
'productVariantPrices',
'channels',
'channels.defaultTaxZone',
'facetValues',
'facetValues.facet',
'product',
'product.translations',
'product.channels',
'product.facetValues',
'product.facetValues.facet',
'collections',
'collections.translations',
];

export const workerLoggerCtx = 'DefaultSearchPlugin Worker';

@Injectable()
Expand All @@ -72,8 +86,7 @@ export class IndexerController {
return asyncObservable(async observer => {
const timeStart = Date.now();
const channel = ctx.channel ?? (await this.loadChannel(ctx, ctx.channelId));
const qb = this.getSearchIndexQueryBuilder(ctx, channel);
const count = await qb.getCount();
const { count } = await this.getSearchIndexQueryBuilder(ctx, { channels: [channel] });
Logger.verbose(`Reindexing ${count} variants for channel ${ctx.channel.code}`, workerLoggerCtx);
const batches = Math.ceil(count / BATCH_SIZE);

Expand All @@ -85,10 +98,11 @@ export class IndexerController {
throw new Error('reindex job was cancelled');
}
Logger.verbose(`Processing batch ${i + 1} of ${batches}`, workerLoggerCtx);
const variants = await qb
.take(BATCH_SIZE)
.skip(i * BATCH_SIZE)
.getMany();
const { variants } = await this.getSearchIndexQueryBuilder(ctx, {
channels: [channel],
take: BATCH_SIZE,
skip: i * BATCH_SIZE,
});
await this.saveVariants(ctx, variants);
observer.next({
total: count,
Expand Down Expand Up @@ -124,12 +138,10 @@ export class IndexerController {
const end = begin + BATCH_SIZE;
Logger.verbose(`Updating ids from index ${begin} to ${end}`);
const batchIds = ids.slice(begin, end);
const batch = await this.getSearchIndexQueryBuilder(
ctx,
...(await this.getAllChannels(ctx)),
)
.where('variants.deletedAt IS NULL AND variants.id IN (:...ids)', { ids: batchIds })
.getMany();
const { variants: batch } = await this.getSearchIndexQueryBuilder(ctx, {
channels: await this.getAllChannels(ctx),
productVariantIds: batchIds,
});

await this.saveVariants(ctx, batch);
observer.next({
Expand Down Expand Up @@ -245,19 +257,18 @@ export class IndexerController {
): Promise<boolean> {
const channel = await this.loadChannel(ctx, channelId);
ctx.setChannel(channel);
const product = await this.getProductInChannelQueryBuilder(ctx, productId, channel).getOneOrFail();
const product = await this.getProductInChannelQueryBuilder(ctx, productId, channel);

if (product) {
const affectedChannels = await this.getAllChannels(ctx, {
where: {
availableLanguageCodes: In(product.translations.map(t => t.languageCode)),
},
});
const updatedVariants = await this.getSearchIndexQueryBuilder(
ctx,
...unique(affectedChannels.concat(channel)),
)
.andWhere('variants.productId = :productId', { productId })
.getMany();
const { variants: updatedVariants } = await this.getSearchIndexQueryBuilder(ctx, {
channels: unique(affectedChannels.concat(channel)),
productId,
});
if (updatedVariants.length === 0) {
const clone = new Product({ id: product.id });
await this.entityHydrator.hydrate(ctx, clone, { relations: ['translations' as never] });
Expand Down Expand Up @@ -287,9 +298,10 @@ export class IndexerController {
): Promise<boolean> {
const channel = await this.loadChannel(ctx, channelId);
ctx.setChannel(channel);
const variants = await this.getSearchIndexQueryBuilder(ctx, channel)
.andWhere('variants.deletedAt IS NULL AND variants.id IN (:...variantIds)', { variantIds })
.getMany();
const { variants } = await this.getSearchIndexQueryBuilder(ctx, {
channels: [channel],
productVariantIds: variantIds,
});

if (variants) {
Logger.verbose(`Updating ${variants.length} variants`, workerLoggerCtx);
Expand All @@ -304,10 +316,7 @@ export class IndexerController {
channelIds: ID[],
): Promise<boolean> {
const channels = await Promise.all(channelIds.map(channelId => this.loadChannel(ctx, channelId)));
const product = await this.getProductInChannelQueryBuilder(ctx, productId, ...channels)
.leftJoinAndSelect('product.variants', 'variants')
.leftJoinAndSelect('variants.translations', 'variants_translations')
.getOne();
const product = await this.getProductInChannelQueryBuilder(ctx, productId, ...channels);

if (product) {
const removedVariantIds = product.variants.map(v => v.id);
Expand All @@ -326,125 +335,68 @@ export class IndexerController {
ctx: RequestContext,
options?: FindManyOptions<Channel> | undefined,
): Promise<Channel[]> {
return await this.connection.getRepository(ctx, Channel).find(options);
return await this.connection
.getRepository(ctx, Channel)
.find({ ...options, relationLoadStrategy: 'query' });
}

private getSearchIndexQueryBuilder(ctx: RequestContext, ...channels: Channel[]) {
const channelLanguages = unique(
channels.flatMap(c => c.availableLanguageCodes).concat(this.configService.defaultLanguageCode),
);
const qb = this.connection
.getRepository(ctx, ProductVariant)
.createQueryBuilder('variants')
.setFindOptions({
loadEagerRelations: false,
})
.leftJoinAndSelect(
'variants.channels',
'variant_channels',
'variant_channels.id IN (:...channelId)',
{
channelId: channels.map(x => x.id),
},
)
.leftJoinAndSelect('variant_channels.defaultTaxZone', 'variant_channel_tax_zone')
.leftJoinAndSelect('variants.taxCategory', 'variant_tax_category')
.leftJoinAndSelect(
'variants.productVariantPrices',
'product_variant_prices',
'product_variant_prices.channelId IN (:...channelId)',
{ channelId: channels.map(x => x.id) },
)
.leftJoinAndSelect(
'variants.translations',
'product_variant_translation',
'product_variant_translation.baseId = variants.id AND product_variant_translation.languageCode IN (:...channelLanguages)',
{
channelLanguages,
},
)
.leftJoin('variants.product', 'product')
.leftJoinAndSelect('variants.facetValues', 'variant_facet_values')
.leftJoinAndSelect(
'variant_facet_values.translations',
'variant_facet_value_translations',
'variant_facet_value_translations.languageCode IN (:...channelLanguages)',
{
channelLanguages,
},
)
.leftJoinAndSelect('variant_facet_values.facet', 'facet_values_facet')
.leftJoinAndSelect(
'facet_values_facet.translations',
'facet_values_facet_translations',
'facet_values_facet_translations.languageCode IN (:...channelLanguages)',
{
channelLanguages,
},
)
.leftJoinAndSelect('variants.collections', 'collections')
.leftJoinAndSelect(
'collections.channels',
'collection_channels',
'collection_channels.id IN (:...channelId)',
{ channelId: channels.map(x => x.id) },
)
.leftJoinAndSelect(
'collections.translations',
'collection_translations',
'collection_translations.languageCode IN (:...channelLanguages)',
{
channelLanguages,
},
)
.leftJoin('product.channels', 'channel')
.where('channel.id IN (:...channelId)', { channelId: channels.map(x => x.id) })
.andWhere('product.deletedAt IS NULL')
.andWhere('variants.deletedAt IS NULL');
return qb;
private async getSearchIndexQueryBuilder(
ctx: RequestContext,
options?: {
productId?: ID;
productVariantIds?: ID[];
channels?: Channel[];
take?: number;
skip?: number;
},
): Promise<{ variants: ProductVariant[]; count: number }> {
const {
productId = undefined,
productVariantIds = undefined,
channels = [],
take = 50,
skip = 0,
} = options ?? {};
const where: FindOptionsWhere<ProductVariant> = {
deletedAt: IsNull(),
product: {
deletedAt: IsNull(),
},
};
if (productId) {
where.productId = productId;
}
if (productVariantIds && productVariantIds.length > 0) {
where.id = In(productVariantIds);
}
where.channels = { id: In(channels.map(c => c.id)) };
const [variants, count] = await this.connection.getRepository(ctx, ProductVariant).findAndCount({
loadEagerRelations: false,
relations: variantRelations,
where,
take,
skip,
relationLoadStrategy: 'query',
});
return { variants, count };
}

private getProductInChannelQueryBuilder(ctx: RequestContext, productId: ID, ...channels: Channel[]) {
private async getProductInChannelQueryBuilder(
ctx: RequestContext,
productId: ID,
...channels: Channel[]
): Promise<Product | undefined> {
const channelLanguages = unique(
channels.flatMap(c => c.availableLanguageCodes).concat(this.configService.defaultLanguageCode),
);
return this.connection
.getRepository(ctx, Product)
.createQueryBuilder('product')
.setFindOptions({
loadEagerRelations: false,
})
.leftJoinAndSelect(
'product.translations',
'translations',
'translations.languageCode IN (:...channelLanguages)',
{
channelLanguages,
},
)
.leftJoinAndSelect('product.featuredAsset', 'product_featured_asset')
.leftJoinAndSelect('product.facetValues', 'product_facet_values')
.leftJoinAndSelect(
'product_facet_values.translations',
'product_facet_value_translations',
'product_facet_value_translations.languageCode IN (:...channelLanguages)',
{
channelLanguages,
},
)
.leftJoinAndSelect('product_facet_values.facet', 'product_facet')
.leftJoinAndSelect(
'product_facet.translations',
'product_facet_translations',
'product_facet_translations.languageCode IN (:...channelLanguages)',
{
channelLanguages,
},
)
.leftJoinAndSelect('product.channels', 'channel', 'channel.id IN (:...channelId)', {
channelId: channels.map(x => x.id),
})
.where('product.id = :productId', { productId });

const product = await this.connection.getRepository(ctx, Product).findOne({
loadEagerRelations: false,
relations: productRelations,
relationLoadStrategy: 'query',
where: { id: Equal(productId), channels: { id: In(channels.map(x => x.id)) } },
});
return product ?? undefined;
}

private async saveVariants(ctx: MutableRequestContext, variants: ProductVariant[]) {
Expand All @@ -456,11 +408,10 @@ export class IndexerController {
for (const variant of variants) {
let product = productMap.get(variant.productId);
if (!product) {
product = await this.getProductInChannelQueryBuilder(
ctx,
variant.productId,
ctx.channel,
).getOneOrFail();
product = await this.getProductInChannelQueryBuilder(ctx, variant.productId, ctx.channel);
if (!product) {
throw new Error('Product not found for variant!');
}
productMap.set(variant.productId, product);
}
const availableLanguageCodes = unique(ctx.channel.availableLanguageCodes);
Expand Down

0 comments on commit ee2c177

Please sign in to comment.