@@ -23,7 +23,12 @@ import { In, IsNull } from 'typeorm';
23
23
24
24
import { RequestContext , SerializedRequestContext } from '../../api/common/request-context' ;
25
25
import { RelationPaths } from '../../api/decorators/relations.decorator' ;
26
- import { ForbiddenError , IllegalOperationError , UserInputError } from '../../common/error/errors' ;
26
+ import {
27
+ ForbiddenError ,
28
+ IllegalOperationError ,
29
+ InternalServerError ,
30
+ UserInputError ,
31
+ } from '../../common/error/errors' ;
27
32
import { ListQueryOptions } from '../../common/types/common-types' ;
28
33
import { Translated } from '../../common/types/locale-types' ;
29
34
import { assertFound , idsAreEqual } from '../../common/utils' ;
@@ -101,11 +106,14 @@ export class CollectionService implements OnModuleInit {
101
106
. createQueryBuilder ( 'collection' )
102
107
. select ( 'collection.id' , 'id' )
103
108
. getRawMany ( ) ;
104
- await this . applyFiltersQueue . add ( {
105
- ctx : event . ctx . serialize ( ) ,
106
- collectionIds : collections . map ( c => c . id ) ,
107
- } ,
108
- { ctx : event . ctx } ) ;
109
+ await this . applyFiltersQueue . add (
110
+ {
111
+ ctx : event . ctx . serialize ( ) ,
112
+ collectionIds : collections . map ( c => c . id ) ,
113
+ applyToChangedVariantsOnly : true ,
114
+ } ,
115
+ { ctx : event . ctx } ,
116
+ ) ;
109
117
} ) ;
110
118
111
119
this . applyFiltersQueue = await this . jobQueueService . createQueue ( {
@@ -129,7 +137,7 @@ export class CollectionService implements OnModuleInit {
129
137
Logger . warn ( `Could not find Collection with id ${ collectionId } , skipping` ) ;
130
138
}
131
139
completed ++ ;
132
- if ( collection ) {
140
+ if ( collection !== undefined ) {
133
141
let affectedVariantIds : ID [ ] = [ ] ;
134
142
try {
135
143
affectedVariantIds = await this . applyCollectionFiltersInternal (
@@ -147,8 +155,11 @@ export class CollectionService implements OnModuleInit {
147
155
}
148
156
job . setProgress ( Math . ceil ( ( completed / job . data . collectionIds . length ) * 100 ) ) ;
149
157
if ( affectedVariantIds . length ) {
150
- await this . eventBus . publish (
151
- new CollectionModificationEvent ( ctx , collection , affectedVariantIds ) ,
158
+ // To avoid performance issues on huge collections we first split the affected variant ids into chunks
159
+ this . chunkArray ( affectedVariantIds , 50000 ) . map ( chunk =>
160
+ this . eventBus . publish (
161
+ new CollectionModificationEvent ( ctx , collection as Collection , chunk ) ,
162
+ ) ,
152
163
) ;
153
164
}
154
165
}
@@ -469,11 +480,13 @@ export class CollectionService implements OnModuleInit {
469
480
input ,
470
481
collection ,
471
482
) ;
472
- await this . applyFiltersQueue . add ( {
473
- ctx : ctx . serialize ( ) ,
474
- collectionIds : [ collection . id ] ,
475
- } ,
476
- { ctx } ) ;
483
+ await this . applyFiltersQueue . add (
484
+ {
485
+ ctx : ctx . serialize ( ) ,
486
+ collectionIds : [ collection . id ] ,
487
+ } ,
488
+ { ctx } ,
489
+ ) ;
477
490
await this . eventBus . publish ( new CollectionEvent ( ctx , collectionWithRelations , 'created' , input ) ) ;
478
491
return assertFound ( this . findOne ( ctx , collection . id ) ) ;
479
492
}
@@ -495,12 +508,14 @@ export class CollectionService implements OnModuleInit {
495
508
} ) ;
496
509
await this . customFieldRelationService . updateRelations ( ctx , Collection , input , collection ) ;
497
510
if ( input . filters ) {
498
- await this . applyFiltersQueue . add ( {
499
- ctx : ctx . serialize ( ) ,
500
- collectionIds : [ collection . id ] ,
501
- applyToChangedVariantsOnly : false ,
502
- } ,
503
- { ctx } ) ;
511
+ await this . applyFiltersQueue . add (
512
+ {
513
+ ctx : ctx . serialize ( ) ,
514
+ collectionIds : [ collection . id ] ,
515
+ applyToChangedVariantsOnly : false ,
516
+ } ,
517
+ { ctx } ,
518
+ ) ;
504
519
} else {
505
520
const affectedVariantIds = await this . getCollectionProductVariantIds ( collection ) ;
506
521
await this . eventBus . publish ( new CollectionModificationEvent ( ctx , collection , affectedVariantIds ) ) ;
@@ -571,11 +586,13 @@ export class CollectionService implements OnModuleInit {
571
586
siblings = moveToIndex ( input . index , target , siblings ) ;
572
587
573
588
await this . connection . getRepository ( ctx , Collection ) . save ( siblings ) ;
574
- await this . applyFiltersQueue . add ( {
575
- ctx : ctx . serialize ( ) ,
576
- collectionIds : [ target . id ] ,
577
- } ,
578
- { ctx } ) ;
589
+ await this . applyFiltersQueue . add (
590
+ {
591
+ ctx : ctx . serialize ( ) ,
592
+ collectionIds : [ target . id ] ,
593
+ } ,
594
+ { ctx } ,
595
+ ) ;
579
596
return assertFound ( this . findOne ( ctx , input . collectionId ) ) ;
580
597
}
581
598
@@ -601,61 +618,117 @@ export class CollectionService implements OnModuleInit {
601
618
} ;
602
619
603
620
/**
604
- * Applies the CollectionFilters
605
- *
606
- * If applyToChangedVariantsOnly (default: true) is true, then apply collection job will process only changed variants
607
- * If applyToChangedVariantsOnly (default: true) is false, then apply collection job will process all variants
608
- * This param is used when we update collection and collection filters are changed to update all
609
- * variants (because other attributes of collection can be changed https://github.com/vendure-ecommerce/vendure/issues/1015)
621
+ * Applies the CollectionFilters and returns the IDs of ProductVariants that need to be added or removed.
610
622
*/
611
623
private async applyCollectionFiltersInternal (
612
624
collection : Collection ,
613
625
applyToChangedVariantsOnly = true ,
614
626
) : Promise < ID [ ] > {
627
+ const masterConnection = this . connection . rawConnection . createQueryRunner ( 'master' ) . connection ;
615
628
const ancestorFilters = await this . getAncestorFilters ( collection ) ;
616
- const preIds = await this . getCollectionProductVariantIds ( collection ) ;
617
- const filteredVariantIds = await this . getFilteredProductVariantIds ( [
618
- ...ancestorFilters ,
619
- ...( collection . filters || [ ] ) ,
620
- ] ) ;
621
- const postIds = filteredVariantIds . map ( v => v . id ) ;
622
- const preIdsSet = new Set ( preIds ) ;
623
- const postIdsSet = new Set ( postIds ) ;
629
+ const filters = [ ...ancestorFilters , ...( collection . filters || [ ] ) ] ;
624
630
625
- const toDeleteIds = preIds . filter ( id => ! postIdsSet . has ( id ) ) ;
626
- const toAddIds = postIds . filter ( id => ! preIdsSet . has ( id ) ) ;
631
+ const { collectionFilters } = this . configService . catalogOptions ;
627
632
628
- try {
629
- // First we remove variants that are no longer in the collection
630
- const chunkedDeleteIds = this . chunkArray ( toDeleteIds , 500 ) ;
633
+ // Create a basic query to retrieve the IDs of product variants that match the collection filters
634
+ let filteredQb = masterConnection
635
+ . getRepository ( ProductVariant )
636
+ . createQueryBuilder ( 'productVariant' )
637
+ . select ( 'productVariant.id' , 'id' )
638
+ . setFindOptions ( { loadEagerRelations : false } ) ;
631
639
632
- for ( const chunkedDeleteId of chunkedDeleteIds ) {
633
- await this . connection . rawConnection
634
- . createQueryBuilder ( )
635
- . relation ( Collection , 'productVariants' )
636
- . of ( collection )
637
- . remove ( chunkedDeleteId ) ;
640
+ // If there are no filters, we need to ensure that the query returns no results
641
+ if ( filters . length === 0 ) {
642
+ filteredQb . andWhere ( '1 = 0' ) ;
643
+ }
644
+
645
+ // Applies the CollectionFilters and returns an array of ProductVariant entities which match
646
+ for ( const filterType of collectionFilters ) {
647
+ const filtersOfType = filters . filter ( f => f . code === filterType . code ) ;
648
+ if ( filtersOfType . length ) {
649
+ for ( const filter of filtersOfType ) {
650
+ filteredQb = filterType . apply ( filteredQb , filter . args ) ;
651
+ }
638
652
}
653
+ }
639
654
640
- // Then we add variants have been added
641
- const chunkedAddIds = this . chunkArray ( toAddIds , 500 ) ;
655
+ // Subquery for existing variants in the collection
656
+ const existingVariantsQb = masterConnection
657
+ . getRepository ( ProductVariant )
658
+ . createQueryBuilder ( 'variant' )
659
+ . select ( 'variant.id' , 'id' )
660
+ . setFindOptions ( { loadEagerRelations : false } )
661
+ . innerJoin ( 'variant.collections' , 'collection' , 'collection.id = :id' , { id : collection . id } ) ;
662
+
663
+ // Using CTE to find variants to add
664
+ const addQb = masterConnection
665
+ . createQueryBuilder ( )
666
+ . addCommonTableExpression ( filteredQb , '_filtered_variants' )
667
+ . addCommonTableExpression ( existingVariantsQb , '_existing_variants' )
668
+ . select ( 'filtered_variants.id' )
669
+ . from ( '_filtered_variants' , 'filtered_variants' )
670
+ . leftJoin (
671
+ '_existing_variants' ,
672
+ 'existing_variants' ,
673
+ 'filtered_variants.id = existing_variants.id' ,
674
+ )
675
+ . where ( 'existing_variants.id IS NULL' ) ;
676
+
677
+ // Using CTE to find the variants to be deleted
678
+ const removeQb = masterConnection
679
+ . createQueryBuilder ( )
680
+ . addCommonTableExpression ( filteredQb , '_filtered_variants' )
681
+ . addCommonTableExpression ( existingVariantsQb , '_existing_variants' )
682
+ . select ( 'existing_variants.id' )
683
+ . from ( '_existing_variants' , 'existing_variants' )
684
+ . leftJoin (
685
+ '_filtered_variants' ,
686
+ 'filtered_variants' ,
687
+ 'existing_variants.id = filtered_variants.id' ,
688
+ )
689
+ . where ( 'filtered_variants.id IS NULL' )
690
+ . setParameters ( { id : collection . id } ) ;
642
691
643
- for ( const chunkedAddId of chunkedAddIds ) {
644
- await this . connection . rawConnection
645
- . createQueryBuilder ( )
646
- . relation ( Collection , 'productVariants' )
647
- . of ( collection )
648
- . add ( chunkedAddId ) ;
649
- }
692
+ const [ toAddIds , toRemoveIds ] = await Promise . all ( [
693
+ addQb . getRawMany ( ) . then ( results => results . map ( result => result . id ) ) ,
694
+ removeQb . getRawMany ( ) . then ( results => results . map ( result => result . id ) ) ,
695
+ ] ) ;
696
+
697
+ try {
698
+ await this . connection . rawConnection . transaction ( async transactionalEntityManager => {
699
+ const chunkedDeleteIds = this . chunkArray ( toRemoveIds , 5000 ) ;
700
+ const chunkedAddIds = this . chunkArray ( toAddIds , 5000 ) ;
701
+ await Promise . all ( [
702
+ // Delete variants that should no longer be in the collection
703
+ ...chunkedDeleteIds . map ( chunk =>
704
+ transactionalEntityManager
705
+ . createQueryBuilder ( )
706
+ . relation ( Collection , 'productVariants' )
707
+ . of ( collection )
708
+ . remove ( chunk ) ,
709
+ ) ,
710
+ // Adding options that should be in the collection
711
+ ...chunkedAddIds . map ( chunk =>
712
+ transactionalEntityManager
713
+ . createQueryBuilder ( )
714
+ . relation ( Collection , 'productVariants' )
715
+ . of ( collection )
716
+ . add ( chunk ) ,
717
+ ) ,
718
+ ] ) ;
719
+ } ) ;
650
720
} catch ( e : any ) {
651
721
Logger . error ( e ) ;
652
722
}
653
723
654
724
if ( applyToChangedVariantsOnly ) {
655
- return [ ...preIds . filter ( id => ! postIdsSet . has ( id ) ) , ...postIds . filter ( id => ! preIdsSet . has ( id ) ) ] ;
656
- } else {
657
- return [ ...preIds . filter ( id => ! postIdsSet . has ( id ) ) , ...postIds ] ;
725
+ return [ ...toAddIds , ...toRemoveIds ] ;
658
726
}
727
+
728
+ return [
729
+ ...( await existingVariantsQb . getRawMany ( ) . then ( results => results . map ( result => result . id ) ) ) ,
730
+ ...toRemoveIds ,
731
+ ] ;
659
732
}
660
733
661
734
/**
@@ -676,32 +749,6 @@ export class CollectionService implements OnModuleInit {
676
749
return ancestorFilters ;
677
750
}
678
751
679
- /**
680
- * Applies the CollectionFilters and returns an array of ProductVariant entities which match.
681
- */
682
- private async getFilteredProductVariantIds ( filters : ConfigurableOperation [ ] ) : Promise < Array < { id : ID } > > {
683
- if ( filters . length === 0 ) {
684
- return [ ] ;
685
- }
686
- const { collectionFilters } = this . configService . catalogOptions ;
687
- let qb = this . connection . rawConnection
688
- . getRepository ( ProductVariant )
689
- . createQueryBuilder ( 'productVariant' ) ;
690
-
691
- for ( const filterType of collectionFilters ) {
692
- const filtersOfType = filters . filter ( f => f . code === filterType . code ) ;
693
- if ( filtersOfType . length ) {
694
- for ( const filter of filtersOfType ) {
695
- qb = filterType . apply ( qb , filter . args ) ;
696
- }
697
- }
698
- }
699
-
700
- // This is the most performant (time & memory) way to get
701
- // just the variant IDs, which is all we need.
702
- return qb . select ( 'productVariant.id' , 'id' ) . getRawMany ( ) ;
703
- }
704
-
705
752
/**
706
753
* Returns the IDs of the Collection's ProductVariants.
707
754
*/
@@ -830,11 +877,13 @@ export class CollectionService implements OnModuleInit {
830
877
) ;
831
878
await this . assetService . assignToChannel ( ctx , { channelId : input . channelId , assetIds } ) ;
832
879
833
- await this . applyFiltersQueue . add ( {
834
- ctx : ctx . serialize ( ) ,
835
- collectionIds : collectionsToAssign . map ( collection => collection . id ) ,
836
- } ,
837
- { ctx } ) ;
880
+ await this . applyFiltersQueue . add (
881
+ {
882
+ ctx : ctx . serialize ( ) ,
883
+ collectionIds : collectionsToAssign . map ( collection => collection . id ) ,
884
+ } ,
885
+ { ctx } ,
886
+ ) ;
838
887
839
888
return this . connection
840
889
. findByIdsInChannel (
0 commit comments