Skip to content

Commit 67608df

Browse files
author
pellse
committed
Refactor merge function utilities for simplified usage
Streamlined `MergeFunctionFactory`, extracted `MergeFunctionContext`, and refactored related utilities to reduce complexity and improve readability. Removed redundant `MergeFunctionUtils` and added flexible new helper methods to `MergeFunctions`.
1 parent 88adf10 commit 67608df

File tree

7 files changed

+97
-63
lines changed

7 files changed

+97
-63
lines changed

assembler/src/main/java/io/github/pellse/assembler/caching/factory/CacheFactory.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import static io.github.pellse.assembler.caching.OneToManyCache.oneToManyCache;
4444
import static io.github.pellse.assembler.caching.OneToOneCache.oneToOneCache;
4545
import static io.github.pellse.assembler.caching.factory.SerializeCacheFactory.serialize;
46+
import static io.github.pellse.assembler.caching.merge.MergeFunctionContext.mergeFunctionContext;
4647
import static io.github.pellse.util.ObjectUtils.*;
4748
import static io.github.pellse.util.collection.CollectionUtils.*;
4849
import static java.util.Arrays.stream;
@@ -180,7 +181,7 @@ static <T, K, ID, EID, R> RuleMapperSource<T, K, ID, EID, R, List<R>, OneToManyC
180181

181182
@SafeVarargs
182183
static <T, K, ID, EID, R> RuleMapperSource<T, K, ID, EID, R, List<R>, OneToManyContext<T, K, ID, EID, R>> cachedMany(
183-
MergeFunctionFactory<T, K, ID, EID, R, List<R>, OneToManyContext<T, K, ID, EID, R>> mergeFunctionFactory,
184+
MergeFunctionFactory<ID, EID, R> mergeFunctionFactory,
184185
Function<CacheFactory<ID, R, List<R>, OneToManyCacheContext<ID, EID, R>>, CacheFactory<ID, R, List<R>, OneToManyCacheContext<ID, EID, R>>>... delegateCacheFactories) {
185186

186187
return cachedMany(cache(), mergeFunctionFactory, delegateCacheFactories);
@@ -197,7 +198,7 @@ static <T, K, ID, EID, R> RuleMapperSource<T, K, ID, EID, R, List<R>, OneToManyC
197198
@SafeVarargs
198199
static <T, K, ID, EID, R> RuleMapperSource<T, K, ID, EID, R, List<R>, OneToManyContext<T, K, ID, EID, R>> cachedMany(
199200
CacheFactory<ID, R, List<R>, OneToManyCacheContext<ID, EID, R>> cacheFactory,
200-
MergeFunctionFactory<T, K, ID, EID, R, List<R>, OneToManyContext<T, K, ID, EID, R>> mergeFunctionFactory,
201+
MergeFunctionFactory<ID, EID, R> mergeFunctionFactory,
201202
Function<CacheFactory<ID, R, List<R>, OneToManyCacheContext<ID, EID, R>>, CacheFactory<ID, R, List<R>, OneToManyCacheContext<ID, EID, R>>>... delegateCacheFactories) {
202203

203204
return cachedMany(emptySource(), cacheFactory, mergeFunctionFactory, delegateCacheFactories);
@@ -214,7 +215,7 @@ static <T, K, ID, EID, R> RuleMapperSource<T, K, ID, EID, R, List<R>, OneToManyC
214215
@SafeVarargs
215216
static <T, K, ID, EID, R> RuleMapperSource<T, K, ID, EID, R, List<R>, OneToManyContext<T, K, ID, EID, R>> cachedMany(
216217
Function<List<T>, Publisher<R>> queryFunction,
217-
MergeFunctionFactory<T, K, ID, EID, R, List<R>, OneToManyContext<T, K, ID, EID, R>> mergeFunctionFactory,
218+
MergeFunctionFactory<ID, EID, R> mergeFunctionFactory,
218219
Function<CacheFactory<ID, R, List<R>, OneToManyCacheContext<ID, EID, R>>, CacheFactory<ID, R, List<R>, OneToManyCacheContext<ID, EID, R>>>... delegateCacheFactories) {
219220

220221
return cachedMany(from(queryFunction), mergeFunctionFactory, delegateCacheFactories);
@@ -231,7 +232,7 @@ static <T, K, ID, EID, R> RuleMapperSource<T, K, ID, EID, R, List<R>, OneToManyC
231232
@SafeVarargs
232233
static <T, K, ID, EID, R> RuleMapperSource<T, K, ID, EID, R, List<R>, OneToManyContext<T, K, ID, EID, R>> cachedMany(
233234
RuleMapperSource<T, K, ID, EID, R, List<R>, OneToManyContext<T, K, ID, EID, R>> ruleMapperSource,
234-
MergeFunctionFactory<T, K, ID, EID, R, List<R>, OneToManyContext<T, K, ID, EID, R>> mergeFunctionFactory,
235+
MergeFunctionFactory<ID, EID, R> mergeFunctionFactory,
235236
Function<CacheFactory<ID, R, List<R>, OneToManyCacheContext<ID, EID, R>>, CacheFactory<ID, R, List<R>, OneToManyCacheContext<ID, EID, R>>>... delegateCacheFactories) {
236237

237238
return cachedMany(ruleMapperSource, cache(), mergeFunctionFactory, delegateCacheFactories);
@@ -250,7 +251,7 @@ static <T, K, ID, EID, R> RuleMapperSource<T, K, ID, EID, R, List<R>, OneToManyC
250251
static <T, K, ID, EID, R> RuleMapperSource<T, K, ID, EID, R, List<R>, OneToManyContext<T, K, ID, EID, R>> cachedMany(
251252
Function<List<T>, Publisher<R>> queryFunction,
252253
CacheFactory<ID, R, List<R>, OneToManyCacheContext<ID, EID, R>> cacheFactory,
253-
MergeFunctionFactory<T, K, ID, EID, R, List<R>, OneToManyContext<T, K, ID, EID, R>> mergeFunctionFactory,
254+
MergeFunctionFactory<ID, EID, R> mergeFunctionFactory,
254255
Function<CacheFactory<ID, R, List<R>, OneToManyCacheContext<ID, EID, R>>, CacheFactory<ID, R, List<R>, OneToManyCacheContext<ID, EID, R>>>... delegateCacheFactories) {
255256

256257
return cachedMany(from(queryFunction), cacheFactory, mergeFunctionFactory, delegateCacheFactories);
@@ -269,13 +270,13 @@ static <T, K, ID, EID, R> RuleMapperSource<T, K, ID, EID, R, List<R>, OneToManyC
269270
static <T, K, ID, EID, R> RuleMapperSource<T, K, ID, EID, R, List<R>, OneToManyContext<T, K, ID, EID, R>> cachedMany(
270271
RuleMapperSource<T, K, ID, EID, R, List<R>, OneToManyContext<T, K, ID, EID, R>> ruleMapperSource,
271272
CacheFactory<ID, R, List<R>, OneToManyCacheContext<ID, EID, R>> cacheFactory,
272-
MergeFunctionFactory<T, K, ID, EID, R, List<R>, OneToManyContext<T, K, ID, EID, R>> mergeFunctionFactory,
273+
MergeFunctionFactory<ID, EID, R> mergeFunctionFactory,
273274
Function<CacheFactory<ID, R, List<R>, OneToManyCacheContext<ID, EID, R>>, CacheFactory<ID, R, List<R>, OneToManyCacheContext<ID, EID, R>>>... delegateCacheFactories) {
274275

275276
final var wrappedCacheFactory = wrap(cacheFactory);
276277

277278
return cached(
278-
ctx -> oneToManyCacheContext(ctx, mergeFunctionFactory != null ? mergeFunctionFactory.create(ctx) : null),
279+
ctx -> oneToManyCacheContext(ctx, mergeFunctionFactory != null ? mergeFunctionFactory.create(mergeFunctionContext(ctx)) : null),
279280
ruleMapperSource,
280281
cacheCtx -> oneToManyCache(cacheCtx, wrappedCacheFactory.create(cacheCtx)),
281282
delegateCacheFactories);
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.github.pellse.assembler.caching.merge;
2+
3+
import io.github.pellse.assembler.RuleMapperContext.OneToManyContext;
4+
5+
import java.util.Comparator;
6+
import java.util.function.Function;
7+
8+
public interface MergeFunctionContext<EID, R> {
9+
Function<R, EID> idResolver();
10+
11+
Comparator<R> idComparator();
12+
13+
static <T, K, ID, EID, R> MergeFunctionContext<EID, R> mergeFunctionContext(OneToManyContext<T, K, ID, EID, R> ctx) {
14+
return new MergeFunctionContext<>() {
15+
16+
@Override
17+
public Function<R, EID> idResolver() {
18+
return ctx.idResolver();
19+
}
20+
21+
@Override
22+
public Comparator<R> idComparator() {
23+
return ctx.idComparator();
24+
}
25+
};
26+
}
27+
}

assembler/src/main/java/io/github/pellse/assembler/caching/merge/MergeFunctionFactory.java

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,25 @@
11
package io.github.pellse.assembler.caching.merge;
22

3-
import io.github.pellse.assembler.RuleMapperContext;
4-
import io.github.pellse.assembler.RuleMapperContext.OneToOneContext;
5-
3+
import java.util.List;
64
import java.util.function.Function;
75
import java.util.stream.Stream;
86

97
import static java.util.function.Function.identity;
108

11-
public interface MergeFunctionFactory <T, K, ID, EID, R, RRC, CTX extends RuleMapperContext<T, K, ID, EID, R, RRC>> extends Function<CTX, MergeFunction<ID, RRC>> {
9+
public interface MergeFunctionFactory <ID, EID, R> extends Function<MergeFunctionContext<EID, R>, MergeFunction<ID, List<R>>> {
1210

13-
MergeFunction<ID, RRC> create(CTX ctx);
11+
MergeFunction<ID, List<R>> create(MergeFunctionContext<EID, R> ctx);
1412

1513
@Override
16-
default MergeFunction<ID, RRC> apply(CTX ctx) {
14+
default MergeFunction<ID, List<R>> apply(MergeFunctionContext<EID, R> ctx) {
1715
return create(ctx);
1816
}
1917

20-
default MergeFunctionFactory<T, K, ID, EID, R, RRC, CTX> pipe(Function<RRC, RRC> finisher) {
18+
default MergeFunctionFactory<ID, EID, R> pipe(Function<List<R>, List<R>> finisher) {
2119
return pipeWith(__ -> finisher);
2220
}
2321

24-
default MergeFunctionFactory<T, K, ID, EID, R, RRC, CTX> pipeWith(Function<CTX, Function<RRC, RRC>> finisherFactory) {
22+
default MergeFunctionFactory<ID, EID, R> pipeWith(Function<MergeFunctionContext<EID, R>, Function<List<R>, List<R>>> finisherFactory) {
2523
return ctx -> {
2624
final var mergeFunction = create(ctx);
2725
final var finisher = finisherFactory.apply(ctx);
@@ -30,22 +28,26 @@ default MergeFunctionFactory<T, K, ID, EID, R, RRC, CTX> pipeWith(Function<CTX,
3028
};
3129
}
3230

33-
static <T, K, ID, EID, R, RRC, CTX extends RuleMapperContext<T, K, ID, EID, R, RRC>> MergeFunctionFactory<T, K, ID, EID, R, RRC, CTX> with(MergeFunctionFactory<T, K, ID, EID, R, RRC, CTX> mergeFunctionFactory) {
31+
static <ID, EID, R> MergeFunctionFactory<ID, EID, R> from(MergeFunctionFactory<ID, EID, R> mergeFunctionFactory) {
3432
return mergeFunctionFactory;
3533
}
3634

37-
static <T, K, ID, EID, R, RRC, CTX extends RuleMapperContext<T, K, ID, EID, R, RRC>> MergeFunctionFactory<T, K, ID, EID, R, RRC, CTX> with(MergeFunction<ID, RRC> mergeFunction) {
35+
static <ID, EID, R> MergeFunctionFactory<ID, EID, R> from(MergeFunction<ID, List<R>> mergeFunction) {
3836
return __ -> mergeFunction;
3937
}
4038

41-
static <T, K, ID, R> MergeFunctionFactory<T, K, ID, ID, R, R, OneToOneContext<T, K, ID, R>> withOneToOne(MergeFunction<ID, R> mergeFunction) {
42-
return __ -> mergeFunction;
39+
@SafeVarargs
40+
static <ID, EID, R> MergeFunctionFactory<ID, EID, R> pipe(
41+
MergeFunction<ID, List<R>> mergeFunction,
42+
Function<List<R>, List<R>>... finishers) {
43+
44+
return pipe(from(mergeFunction), finishers);
4345
}
4446

4547
@SafeVarargs
46-
static <T, K, ID, EID, R, RRC, CTX extends RuleMapperContext<T, K, ID, EID, R, RRC>> MergeFunctionFactory<T, K, ID, EID, R, RRC, CTX> pipe(
47-
MergeFunctionFactory<T, K, ID, EID, R, RRC, CTX> mergeFunctionFactory,
48-
Function<RRC, RRC>... finishers) {
48+
static <ID, EID, R> MergeFunctionFactory<ID, EID, R> pipe(
49+
MergeFunctionFactory<ID, EID, R> mergeFunctionFactory,
50+
Function<List<R>, List<R>>... finishers) {
4951

5052
final var finisher = Stream.of(finishers)
5153
.reduce(identity(), (f, acc) -> acc.andThen(f));
@@ -54,9 +56,17 @@ static <T, K, ID, EID, R, RRC, CTX extends RuleMapperContext<T, K, ID, EID, R, R
5456
}
5557

5658
@SafeVarargs
57-
static <T, K, ID, EID, R, RRC, CTX extends RuleMapperContext<T, K, ID, EID, R, RRC>> MergeFunctionFactory<T, K, ID, EID, R, RRC, CTX> pipeWith(
58-
MergeFunctionFactory<T, K, ID, EID, R, RRC, CTX> mergeFunctionFactory,
59-
Function<CTX, Function<RRC, RRC>>... finisherFactories) {
59+
static <ID, EID, R> MergeFunctionFactory<ID, EID, R> pipeWith(
60+
MergeFunction<ID, List<R>> mergeFunction,
61+
Function<MergeFunctionContext<EID, R>, Function<List<R>, List<R>>>... finisherFactories) {
62+
63+
return pipeWith(from(mergeFunction), finisherFactories);
64+
}
65+
66+
@SafeVarargs
67+
static <ID, EID, R> MergeFunctionFactory<ID, EID, R> pipeWith(
68+
MergeFunctionFactory<ID, EID, R> mergeFunctionFactory,
69+
Function<MergeFunctionContext<EID, R>, Function<List<R>, List<R>>>... finisherFactories) {
6070

6171
return ctx -> {
6272
final var finisher = Stream.of(finisherFactories)

assembler/src/main/java/io/github/pellse/assembler/caching/merge/MergeFunctionUtils.java

Lines changed: 0 additions & 33 deletions
This file was deleted.

assembler/src/main/java/io/github/pellse/assembler/caching/merge/MergeFunctions.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,30 @@
11
package io.github.pellse.assembler.caching.merge;
22

3-
import io.github.pellse.assembler.RuleMapperContext.OneToManyContext;
43
import io.github.pellse.util.collection.CollectionUtils;
54

65
import java.util.List;
76
import java.util.function.Function;
87

9-
import static io.github.pellse.util.collection.CollectionUtils.*;
8+
import static io.github.pellse.util.collection.CollectionUtils.asList;
109
import static io.github.pellse.util.collection.CollectionUtils.concat;
10+
import static java.lang.Math.max;
1111
import static java.lang.Math.min;
12+
import static java.util.List.copyOf;
1213

1314
public interface MergeFunctions {
1415

15-
static <T, K, ID, EID, R> MergeFunctionFactory<T, K, ID, EID, R, List<R>, OneToManyContext<T, K, ID, EID, R>> removeDuplicates() {
16+
static <ID, EID, R> MergeFunctionFactory<ID, EID, R> removeDuplicates() {
1617
return ctx -> removeDuplicates(ctx.idResolver());
1718
}
1819

1920
static <ID, EID, R> MergeFunction<ID, List<R>> removeDuplicates(Function<R, EID> idResolver) {
2021
return (id, existingList, newList) -> CollectionUtils.removeDuplicates(concat(existingList, newList), idResolver);
2122
}
2223

24+
static <EID, R> Function<MergeFunctionContext<EID, R>, Function<List<R>, List<R>>> removeAllDuplicates() {
25+
return ctx -> list -> CollectionUtils.removeDuplicates(list, ctx.idResolver());
26+
}
27+
2328
static <ID, R> MergeFunction<ID, List<R>> keepFirst(int nbElements) {
2429
return (id, existingList, newList) -> concat(existingList, newList)
2530
.limit(min(nbElements, asList(existingList).size() + asList(newList).size()))
@@ -36,7 +41,29 @@ static <ID, R> MergeFunction<ID, List<R>> keepLast(int nbElements) {
3641
};
3742
}
3843

44+
static <R> Function<List<R>, List<R>> keepFirstN(int nbElements) {
45+
return keep(nbElements, list -> list.subList(0, nbElements));
46+
}
47+
48+
static <R> Function<List<R>, List<R>> keepLastN(int nbElements) {
49+
return keep(nbElements, list -> list.subList(max(0, list.size() - nbElements), list.size()));
50+
}
51+
3952
static <ID, R> MergeFunction<ID, R> replace() {
4053
return (k, r1, r2) -> r2 != null ? r2 : r1;
4154
}
55+
56+
private static <R> Function<List<R>, List<R>> keep(int nbElements, Function<List<R>, List<R>> subListFunction) {
57+
return list -> {
58+
final var nonNullList = asList(list);
59+
60+
if (nbElements <= 0) {
61+
return List.of();
62+
} else if (nonNullList.size() <= nbElements) {
63+
return nonNullList;
64+
}
65+
66+
return copyOf(subListFunction.apply(nonNullList));
67+
};
68+
}
4269
}

assembler/src/main/java/io/github/pellse/util/collection/CollectionUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ static <K, V, V1, M extends Map<K, V1>> Map<K, V1> readAll(Iterable<? extends K>
166166
return newMap(null, mapSupplier, map -> keys.forEach(key -> ifNotNull(sourceMap.get(key), value -> map.put(key, mappingFunction.apply(value)))));
167167
}
168168

169-
static <K, V> Collection<V> removeDuplicates(
169+
static <K, V> List<V> removeDuplicates(
170170
Collection<V> coll,
171171
Function<? super V, K> keyExtractor) {
172172

assembler/src/test/java/io/github/pellse/assembler/test/CacheTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import static io.github.pellse.assembler.caching.factory.StreamTableFactoryBuilder.streamTableBuilder;
5252
import static io.github.pellse.assembler.caching.factory.CacheFactory.*;
5353
import static io.github.pellse.assembler.caching.factory.ConcurrentCacheFactory.concurrent;
54+
import static io.github.pellse.assembler.caching.merge.MergeFunctionFactory.*;
5455
import static io.github.pellse.assembler.caching.merge.MergeFunctions.*;
5556
import static io.github.pellse.assembler.test.CDCAdd.cdcAdd;
5657
import static io.github.pellse.assembler.test.CDCDelete.cdcDelete;
@@ -272,7 +273,8 @@ public void testReusableAssemblerBuilderWithMergeFunctions() {
272273
.withCorrelationIdResolver(Customer::customerId)
273274
.withRules(
274275
rule(BillingInfo::customerId, oneToOne(cached(call(getBillingInfo), replace()), BillingInfo::new)),
275-
rule(OrderItem::customerId, oneToMany(OrderItem::id, cachedMany(this::getAllOrders, removeDuplicates()))),
276+
rule(OrderItem::customerId, oneToMany(OrderItem::id, cachedMany(this::getAllOrders, pipe(removeDuplicates(), keepLastN(20))))),
277+
// rule(OrderItem::customerId, oneToMany(OrderItem::id, cachedMany(this::getAllOrders, pipeWith(keepLast(20), removeAllDuplicates())))),
276278
Transaction::new)
277279
.build();
278280

0 commit comments

Comments
 (0)