Skip to content

Commit 1d19be9

Browse files
committed
MLE-26918 Dropping eval implementation for incremental write
Also renamed the other two classes to make it clear what their distinction is - fromLexicons and fromView
1 parent 2e84d2d commit 1d19be9

File tree

7 files changed

+45
-190
lines changed

7 files changed

+45
-190
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/FilterException.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
* Any exception thrown by execution of a {@code DocumentWriteSetFilter} will be wrapped in this exception and
1010
* rethrown by the {@code WriteBatcher}, allowing failure listeners to distinguish filter exceptions from other
1111
* exceptions that may occur during batch processing.
12+
*
13+
* @since 8.1.0
1214
*/
1315
public class FilterException extends DataMovementException {
1416

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteConfig.java

Lines changed: 6 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,10 @@
1414
*
1515
* @since 8.1.0
1616
*/
17-
public class IncrementalWriteConfig {
18-
19-
private final String hashKeyName;
20-
private final String timestampKeyName;
21-
private final boolean canonicalizeJson;
22-
private final Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer;
23-
private final String[] jsonExclusions;
24-
private final String[] xmlExclusions;
25-
private final Map<String, String> xmlNamespaces;
26-
private final String schemaName;
27-
private final String viewName;
17+
public record IncrementalWriteConfig(String hashKeyName, String timestampKeyName, boolean canonicalizeJson,
18+
Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer,
19+
String[] jsonExclusions, String[] xmlExclusions, Map<String, String> xmlNamespaces,
20+
String schemaName, String viewName) {
2821

2922
public IncrementalWriteConfig(String hashKeyName, String timestampKeyName, boolean canonicalizeJson,
3023
Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer,
@@ -41,39 +34,11 @@ public IncrementalWriteConfig(String hashKeyName, String timestampKeyName, boole
4134
this.viewName = viewName;
4235
}
4336

44-
public String getHashKeyName() {
45-
return hashKeyName;
46-
}
47-
48-
public String getTimestampKeyName() {
49-
return timestampKeyName;
50-
}
51-
52-
public boolean isCanonicalizeJson() {
53-
return canonicalizeJson;
54-
}
55-
56-
public Consumer<DocumentWriteOperation[]> getSkippedDocumentsConsumer() {
57-
return skippedDocumentsConsumer;
58-
}
59-
60-
public String[] getJsonExclusions() {
61-
return jsonExclusions;
62-
}
6337

64-
public String[] getXmlExclusions() {
65-
return xmlExclusions;
66-
}
67-
68-
public Map<String, String> getXmlNamespaces() {
38+
@Override
39+
public Map<String, String> xmlNamespaces() {
6940
return xmlNamespaces != null ? xmlNamespaces : Collections.emptyMap();
7041
}
7142

72-
public String getSchemaName() {
73-
return schemaName;
74-
}
7543

76-
public String getViewName() {
77-
return viewName;
78-
}
7944
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java

Lines changed: 0 additions & 66 deletions
This file was deleted.

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ public static class Builder {
4848
private String hashKeyName = "incrementalWriteHash";
4949
private String timestampKeyName = "incrementalWriteTimestamp";
5050
private boolean canonicalizeJson = true;
51-
private boolean useEvalQuery = false;
5251
private Consumer<DocumentWriteOperation[]> skippedDocumentsConsumer;
5352
private String[] jsonExclusions;
5453
private String[] xmlExclusions;
@@ -87,14 +86,6 @@ public Builder canonicalizeJson(boolean canonicalizeJson) {
8786
return this;
8887
}
8988

90-
/**
91-
* @param useEvalQuery if true, evaluate server-side JavaScript instead of an Optic query for retrieving hash values; defaults to false.
92-
*/
93-
public Builder useEvalQuery(boolean useEvalQuery) {
94-
this.useEvalQuery = useEvalQuery;
95-
return this;
96-
}
97-
9889
/**
9990
* @param skippedDocumentsConsumer a consumer that will be called with any documents in a batch that were skipped because their content had not changed.
10091
*/
@@ -161,12 +152,9 @@ public IncrementalWriteFilter build() {
161152
skippedDocumentsConsumer, jsonExclusions, xmlExclusions, xmlNamespaces, schemaName, viewName);
162153

163154
if (schemaName != null && viewName != null) {
164-
return new IncrementalWriteViewFilter(config);
165-
}
166-
if (useEvalQuery) {
167-
return new IncrementalWriteEvalFilter(config);
155+
return new IncrementalWriteFromViewFilter(config);
168156
}
169-
return new IncrementalWriteOpticFilter(config);
157+
return new IncrementalWriteFromLexiconsFilter(config);
170158
}
171159

172160
private void validateJsonExclusions() {
@@ -254,19 +242,19 @@ protected final DocumentWriteSet filterDocuments(Context context, Function<Strin
254242

255243
if (existingHash != null) {
256244
if (!existingHash.equals(contentHash)) {
257-
newWriteSet.add(addHashToMetadata(doc, config.getHashKeyName(), contentHash, config.getTimestampKeyName(), timestamp));
258-
} else if (config.getSkippedDocumentsConsumer() != null) {
245+
newWriteSet.add(addHashToMetadata(doc, config.hashKeyName(), contentHash, config.timestampKeyName(), timestamp));
246+
} else if (config.skippedDocumentsConsumer() != null) {
259247
skippedDocuments.add(doc);
260248
} else {
261249
// No consumer, so skip the document silently.
262250
}
263251
} else {
264-
newWriteSet.add(addHashToMetadata(doc, config.getHashKeyName(), contentHash, config.getTimestampKeyName(), timestamp));
252+
newWriteSet.add(addHashToMetadata(doc, config.hashKeyName(), contentHash, config.timestampKeyName(), timestamp));
265253
}
266254
}
267255

268-
if (!skippedDocuments.isEmpty() && config.getSkippedDocumentsConsumer() != null) {
269-
config.getSkippedDocumentsConsumer().accept(skippedDocuments.toArray(new DocumentWriteOperation[0]));
256+
if (!skippedDocuments.isEmpty() && config.skippedDocumentsConsumer() != null) {
257+
config.skippedDocumentsConsumer().accept(skippedDocuments.toArray(new DocumentWriteOperation[0]));
270258
}
271259

272260
return newWriteSet;
@@ -283,11 +271,11 @@ private String serializeContent(DocumentWriteOperation doc) {
283271
format = baseHandle.getFormat();
284272
}
285273

286-
if (config.isCanonicalizeJson() && (Format.JSON.equals(format) || isPossiblyJsonContent(content))) {
274+
if (config.canonicalizeJson() && (Format.JSON.equals(format) || isPossiblyJsonContent(content))) {
287275
JsonCanonicalizer jc;
288276
try {
289-
if (config.getJsonExclusions() != null && config.getJsonExclusions().length > 0) {
290-
content = ContentExclusionUtil.applyJsonExclusions(doc.getUri(), content, config.getJsonExclusions());
277+
if (config.jsonExclusions() != null && config.jsonExclusions().length > 0) {
278+
content = ContentExclusionUtil.applyJsonExclusions(doc.getUri(), content, config.jsonExclusions());
291279
}
292280
jc = new JsonCanonicalizer(content);
293281
return jc.getEncodedString();
@@ -298,9 +286,9 @@ private String serializeContent(DocumentWriteOperation doc) {
298286
logger.warn("Unable to canonicalize JSON content for URI {}, using original content for hashing; cause: {}",
299287
doc.getUri(), e.getMessage());
300288
}
301-
} else if (config.getXmlExclusions() != null && config.getXmlExclusions().length > 0) {
289+
} else if (config.xmlExclusions() != null && config.xmlExclusions().length > 0) {
302290
try {
303-
content = ContentExclusionUtil.applyXmlExclusions(doc.getUri(), content, config.getXmlNamespaces(), config.getXmlExclusions());
291+
content = ContentExclusionUtil.applyXmlExclusions(doc.getUri(), content, config.xmlNamespaces(), config.xmlExclusions());
304292
} catch (Exception e) {
305293
logger.warn("Unable to apply XML exclusions for URI {}, using original content for hashing; cause: {}",
306294
doc.getUri(), e.getMessage());
@@ -341,4 +329,10 @@ protected static DocumentWriteOperation addHashToMetadata(DocumentWriteOperation
341329
}
342330

343331

332+
protected static String[] getUrisInBatch(DocumentWriteSet writeSet) {
333+
return writeSet.stream()
334+
.filter(op -> DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(op.getOperationType()))
335+
.map(DocumentWriteOperation::getUri)
336+
.toArray(String[]::new);
337+
}
344338
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteOpticFilter.java renamed to marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFromLexiconsFilter.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,32 +12,26 @@
1212
import java.util.Map;
1313

1414
/**
15-
* Uses an Optic query to get the existing hash values for a set of URIs.
15+
* Uses an Optic fromLexicons query that depends on a field range index to retrieve URIs and
16+
* hash values.
1617
*
1718
* @since 8.1.0
1819
*/
19-
class IncrementalWriteOpticFilter extends IncrementalWriteFilter {
20+
class IncrementalWriteFromLexiconsFilter extends IncrementalWriteFilter {
2021

21-
IncrementalWriteOpticFilter(IncrementalWriteConfig config) {
22+
IncrementalWriteFromLexiconsFilter(IncrementalWriteConfig config) {
2223
super(config);
2324
}
2425

2526
@Override
2627
public DocumentWriteSet apply(Context context) {
27-
final String[] uris = context.getDocumentWriteSet().stream()
28-
.filter(op -> DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(op.getOperationType()))
29-
.map(DocumentWriteOperation::getUri)
30-
.toArray(String[]::new);
31-
32-
// It doesn't seem possible yet to use a DSL query and bind an array of strings to a "uris" param, so using
33-
// a serialized query instead. That doesn't allow a user to override the query though.
34-
RowTemplate rowTemplate = new RowTemplate(context.getDatabaseClient());
28+
final String[] uris = getUrisInBatch(context.getDocumentWriteSet());
3529

3630
try {
37-
Map<String, Long> existingHashes = rowTemplate.query(op ->
31+
Map<String, Long> existingHashes = new RowTemplate(context.getDatabaseClient()).query(op ->
3832
op.fromLexicons(Map.of(
3933
"uri", op.cts.uriReference(),
40-
"hash", op.cts.fieldReference(getConfig().getHashKeyName())
34+
"hash", op.cts.fieldReference(getConfig().hashKeyName())
4135
)).where(
4236
op.cts.documentQuery(op.xs.stringSeq(uris))
4337
),

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteViewFilter.java renamed to marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFromViewFilter.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package com.marklogic.client.datamovement.filter;
55

66
import com.marklogic.client.FailedRequestException;
7-
import com.marklogic.client.document.DocumentWriteOperation;
87
import com.marklogic.client.document.DocumentWriteSet;
98
import com.marklogic.client.row.RowTemplate;
109

@@ -15,36 +14,29 @@
1514
* Uses an Optic query with fromView to get the existing hash values for a set of URIs from a TDE view.
1615
* This implementation requires a TDE template to be deployed that contains at minimum a "uri" column
1716
* and a column matching the configured hash key name, plus any other columns desired.
18-
* The query uses a {@code where} with a {@code cts.documentQuery} to filter rows by URI, which is
19-
* significantly faster than filtering via {@code op.in}.
2017
*
2118
* @since 8.1.0
2219
*/
23-
class IncrementalWriteViewFilter extends IncrementalWriteFilter {
20+
class IncrementalWriteFromViewFilter extends IncrementalWriteFilter {
2421

25-
IncrementalWriteViewFilter(IncrementalWriteConfig config) {
22+
IncrementalWriteFromViewFilter(IncrementalWriteConfig config) {
2623
super(config);
2724
}
2825

2926
@Override
3027
public DocumentWriteSet apply(Context context) {
31-
final String[] uris = context.getDocumentWriteSet().stream()
32-
.filter(op -> DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(op.getOperationType()))
33-
.map(DocumentWriteOperation::getUri)
34-
.toArray(String[]::new);
35-
36-
RowTemplate rowTemplate = new RowTemplate(context.getDatabaseClient());
28+
final String[] uris = getUrisInBatch(context.getDocumentWriteSet());
3729

3830
try {
39-
Map<String, Long> existingHashes = rowTemplate.query(op ->
40-
op.fromView(getConfig().getSchemaName(), getConfig().getViewName(), "")
31+
Map<String, Long> existingHashes = new RowTemplate(context.getDatabaseClient()).query(op ->
32+
op.fromView(getConfig().schemaName(), getConfig().viewName(), "")
4133
.where(op.cts.documentQuery(op.xs.stringSeq(uris)))
4234
,
4335
rows -> {
4436
Map<String, Long> map = new HashMap<>();
4537
rows.forEach(row -> {
4638
String uri = row.getString("uri");
47-
String hashString = row.getString(getConfig().getHashKeyName());
39+
String hashString = row.getString(getConfig().hashKeyName());
4840
if (hashString != null && !hashString.isEmpty()) {
4941
long existingHash = Long.parseUnsignedLong(hashString);
5042
map.put(uri, existingHash);
@@ -59,7 +51,7 @@ public DocumentWriteSet apply(Context context) {
5951

6052
return filterDocuments(context, uri -> existingHashes.get(uri));
6153
} catch (FailedRequestException e) {
62-
String message = "Unable to query for existing incremental write hashes from view " + getConfig().getSchemaName() + "." + getConfig().getViewName() + "; cause: " + e.getMessage();
54+
String message = "Unable to query for existing incremental write hashes from view " + getConfig().schemaName() + "." + getConfig().viewName() + "; cause: " + e.getMessage();
6355
throw new FailedRequestException(message, e.getFailedRequest());
6456
}
6557
}

0 commit comments

Comments
 (0)