Skip to content

Commit 1ffae11

Browse files
Safe match inserts (#67)
## What is the goal of this PR? We validate that match-inserts only match 1 answer before doing inserts. This is disableable with a new flag `--allowMultiInserts` ## What are the changes implemented in this PR? * Fix a bunch of tests that relied on `toString()` for equality checks on TypeQL insert queries * Verify that match-inserts only receive 1 `match` answer before doing inserts. We rewrite `match-insert` with constraints into a `match` using IIDs or Labels to do avoid doing the complex `match` multiple times
1 parent ce50107 commit 1ffae11

19 files changed

+550
-441
lines changed

.DS_Store

6 KB
Binary file not shown.

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ plugins {
55
}
66

77
group 'com.vaticle.typedb-osi'
8-
version '1.4.2'
8+
version '1.5.0'
99

1010
repositories {
1111
mavenCentral()

src/main/java/com/vaticle/typedb/osi/loader/cli/LoadOptions.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ public class LoadOptions {
3838
@CommandLine.Option(names = {"-ls", "--loadSchema"}, description = "optional - reload schema when continuing a migration (ignored when clean migration)", defaultValue = "false")
3939
public boolean loadSchema;
4040

41+
@CommandLine.Option(names = {"-mi", "--allowMultiInsert"}, description = "Allow match-inserts to match multiple answers and insert for each.", defaultValue = "false")
42+
public boolean multiInsert;
43+
4144
public static LoadOptions parse(String[] args) {
4245
CommandLine commandLine = new CommandLine(new TypeDBLoaderCLI())
4346
.addSubcommand("load", new LoadOptions());

src/main/java/com/vaticle/typedb/osi/loader/generator/AppendAttributeGenerator.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717
package com.vaticle.typedb.osi.loader.generator;
1818

1919
import com.vaticle.typedb.client.api.TypeDBTransaction;
20+
import com.vaticle.typedb.client.api.answer.ConceptMap;
2021
import com.vaticle.typedb.client.common.exception.TypeDBClientException;
2122
import com.vaticle.typedb.osi.loader.config.Configuration;
2223
import com.vaticle.typedb.osi.loader.io.FileLogger;
2324
import com.vaticle.typedb.osi.loader.util.GeneratorUtil;
25+
import com.vaticle.typedb.osi.loader.util.TypeDBUtil;
2426
import com.vaticle.typedb.osi.loader.util.Util;
2527
import com.vaticle.typeql.lang.TypeQL;
2628
import com.vaticle.typeql.lang.pattern.constraint.ThingConstraint;
@@ -33,6 +35,9 @@
3335

3436
import java.io.IOException;
3537
import java.util.ArrayList;
38+
import java.util.Iterator;
39+
40+
import static com.vaticle.typedb.osi.loader.util.TypeDBUtil.safeInsert;
3641

3742
public class AppendAttributeGenerator implements Generator {
3843
private static final Logger dataLogger = LogManager.getLogger("com.vaticle.typedb.osi.loader.error");
@@ -48,8 +53,8 @@ public AppendAttributeGenerator(String filePath, Configuration.Generator.AppendA
4853
this.fileSeparator = fileSeparator;
4954
}
5055

51-
public void write(TypeDBTransaction tx,
52-
String[] row) {
56+
@Override
57+
public void write(TypeDBTransaction tx, String[] row, boolean allowMultiInsert) {
5358
String fileName = FilenameUtils.getName(filePath);
5459
String fileNoExtension = FilenameUtils.removeExtension(fileName);
5560
String originalRow = String.join(Character.toString(fileSeparator), row);
@@ -59,18 +64,24 @@ public void write(TypeDBTransaction tx,
5964
dataLogger.error("Malformed Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_malformed.log" + ">");
6065
}
6166

62-
TypeQLInsert statement = generateMatchInsertStatement(row);
67+
TypeQLInsert query = generateMatchInsertStatement(row);
6368

64-
if (appendAttributeInsertStatementValid(statement)) {
69+
if (appendAttributeInsertStatementValid(query)) {
6570
try {
66-
tx.query().insert(statement);
71+
Iterator<ConceptMap> answers = TypeDBUtil.executeMatch(tx, query);
72+
if (!answers.hasNext()) {
73+
FileLogger.getLogger().logNoMatches(fileName, originalRow);
74+
dataLogger.error("Match-insert failed - File <" + filePath + "> row <" + originalRow + "> generates query <" + query + "> which matched no answers.");
75+
} else {
76+
safeInsert(tx, query, answers, allowMultiInsert, filePath, originalRow, dataLogger);
77+
}
6778
} catch (TypeDBClientException typeDBClientException) {
6879
FileLogger.getLogger().logUnavailable(fileName, originalRow);
6980
dataLogger.error("TypeDB Unavailable - Row in <" + filePath + "> not inserted - written to <" + fileNoExtension + "_unavailable.log" + ">");
7081
}
7182
} else {
7283
FileLogger.getLogger().logInvalid(fileName, originalRow);
73-
dataLogger.error("Invalid Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_invalid.log" + "> - invalid Statement: <" + statement.toString().replace("\n", " ") + ">");
84+
dataLogger.error("Invalid Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_invalid.log" + "> - invalid Statement: <" + query.toString().replace("\n", " ") + ">");
7485
}
7586
}
7687

src/main/java/com/vaticle/typedb/osi/loader/generator/AppendAttributeOrInsertThingGenerator.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.vaticle.typedb.osi.loader.config.Configuration;
2323
import com.vaticle.typedb.osi.loader.io.FileLogger;
2424
import com.vaticle.typedb.osi.loader.util.GeneratorUtil;
25+
import com.vaticle.typedb.osi.loader.util.TypeDBUtil;
2526
import com.vaticle.typedb.osi.loader.util.Util;
2627
import com.vaticle.typeql.lang.TypeQL;
2728
import com.vaticle.typeql.lang.pattern.constraint.ThingConstraint;
@@ -34,8 +35,11 @@
3435

3536
import java.io.IOException;
3637
import java.util.ArrayList;
38+
import java.util.Iterator;
3739
import java.util.stream.Stream;
3840

41+
import static com.vaticle.typedb.osi.loader.util.TypeDBUtil.safeInsert;
42+
3943
public class AppendAttributeOrInsertThingGenerator implements Generator {
4044
private static final Logger dataLogger = LogManager.getLogger("com.vaticle.typedb.osi.loader.error");
4145
private final String filePath;
@@ -50,8 +54,8 @@ public AppendAttributeOrInsertThingGenerator(String filePath, Configuration.Gene
5054
this.fileSeparator = fileSeparator;
5155
}
5256

53-
public void write(TypeDBTransaction tx,
54-
String[] row) {
57+
@Override
58+
public void write(TypeDBTransaction tx, String[] row, boolean allowMultiInsert) {
5559
String fileName = FilenameUtils.getName(filePath);
5660
String fileNoExtension = FilenameUtils.removeExtension(fileName);
5761
String originalRow = String.join(Character.toString(fileSeparator), row);
@@ -61,30 +65,32 @@ public void write(TypeDBTransaction tx,
6165
dataLogger.error("Malformed Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_malformed.log" + ">");
6266
}
6367

64-
TypeQLInsert appendStatement = generateMatchInsertStatement(row);
65-
TypeQLInsert insertStatement = generateThingInsertStatement(row);
68+
TypeQLInsert appendQuery = generateMatchInsertStatement(row);
69+
TypeQLInsert insertQuery = generateThingInsertStatement(row);
6670

67-
if (appendAttributeInsertStatementValid(appendStatement)) {
71+
if (appendAttributeInsertStatementValid(appendQuery)) {
6872
try {
69-
final Stream<ConceptMap> insertedStream = tx.query().insert(appendStatement);
70-
if (insertedStream.count() == 0) {
71-
if (thingInsertStatementValid(insertStatement)) {
72-
tx.query().insert(insertStatement);
73+
Iterator<ConceptMap> answers = TypeDBUtil.executeMatch(tx, appendQuery);
74+
if (!answers.hasNext()) {
75+
if (thingInsertStatementValid(insertQuery)) {
76+
tx.query().insert(insertQuery);
7377
} else {
7478
FileLogger.getLogger().logInvalid(fileName, originalRow);
75-
dataLogger.error("Invalid Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_invalid.log" + "> - invalid Statement: <" + insertStatement.toString().replace("\n", " ") + ">");
79+
dataLogger.error("Invalid Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_invalid.log" + "> - invalid Statement: <" + insertQuery.toString().replace("\n", " ") + ">");
7680
}
81+
} else {
82+
safeInsert(tx, appendQuery, answers, allowMultiInsert, filePath, originalRow, dataLogger);
7783
}
7884
} catch (TypeDBClientException typeDBClientException) {
7985
FileLogger.getLogger().logUnavailable(fileName, originalRow);
8086
dataLogger.error("TypeDB Unavailable - Row in <" + filePath + "> not inserted - written to <" + fileNoExtension + "_unavailable.log" + ">");
8187
}
8288
} else {
83-
if (thingInsertStatementValid(insertStatement)) {
84-
tx.query().insert(insertStatement);
89+
if (thingInsertStatementValid(insertQuery)) {
90+
tx.query().insert(insertQuery);
8591
} else {
8692
FileLogger.getLogger().logInvalid(fileName, originalRow);
87-
dataLogger.error("Invalid Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_invalid.log" + "> - invalid Statements: <" + appendStatement.toString().replace("\n", " ") + "> and <" + insertStatement.toString().replace("\n", " ") + ">");
93+
dataLogger.error("Invalid Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_invalid.log" + "> - invalid Statements: <" + appendQuery.toString().replace("\n", " ") + "> and <" + insertQuery.toString().replace("\n", " ") + ">");
8894
}
8995
}
9096
}
@@ -149,7 +155,7 @@ public boolean appendAttributeInsertStatementValid(TypeQLInsert insert) {
149155
if (insert == null) return false;
150156
if (!insert.toString().contains("isa " + appendOrInsertConfiguration.getMatch().getType())) return false;
151157
for (Configuration.Definition.Attribute ownershipThingGetter : appendOrInsertConfiguration.getMatch().getOwnerships()) {
152-
if (!insert.toString().contains(", has " + ownershipThingGetter.getAttribute())) return false;
158+
if (!insert.toString().contains("has " + ownershipThingGetter.getAttribute())) return false;
153159
}
154160
if (appendOrInsertConfiguration.getInsert().getRequiredOwnerships() != null) {
155161
for (Configuration.Definition.Attribute attribute : appendOrInsertConfiguration.getInsert().getRequiredOwnerships()) {

src/main/java/com/vaticle/typedb/osi/loader/generator/AttributeGenerator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ public AttributeGenerator(String filePath, Configuration.Generator.Attribute att
4747
this.fileSeparator = fileSeparator;
4848
}
4949

50-
public void write(TypeDBTransaction tx, String[] row) {
50+
@Override
51+
public void write(TypeDBTransaction tx, String[] row, boolean allowMultiInsert) {
5152

5253
String fileName = FilenameUtils.getName(filePath);
5354
String fileNoExtension = FilenameUtils.removeExtension(fileName);
@@ -62,7 +63,7 @@ public void write(TypeDBTransaction tx, String[] row) {
6263
if (isValid(statement)) {
6364
try {
6465
tx.query().insert(statement);
65-
} catch (TypeDBClientException graknClientException) {
66+
} catch (TypeDBClientException clientException) {
6667
FileLogger.getLogger().logUnavailable(fileName, originalRow);
6768
dataLogger.error("TypeDB Unavailable - Row in <" + filePath + "> not inserted - written to <" + fileNoExtension + "_unavailable.log" + ">");
6869
}

src/main/java/com/vaticle/typedb/osi/loader/generator/EntityGenerator.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.vaticle.typedb.osi.loader.config.Configuration;
2222
import com.vaticle.typedb.osi.loader.io.FileLogger;
2323
import com.vaticle.typedb.osi.loader.util.GeneratorUtil;
24+
import com.vaticle.typedb.osi.loader.util.TypeDBUtil;
2425
import com.vaticle.typedb.osi.loader.util.Util;
2526
import com.vaticle.typeql.lang.TypeQL;
2627
import com.vaticle.typeql.lang.pattern.variable.ThingVariable;
@@ -45,8 +46,8 @@ public EntityGenerator(String filePath, Configuration.Generator.Entity entityCon
4546
this.fileSeparator = fileSeparator;
4647
}
4748

48-
public void write(TypeDBTransaction tx,
49-
String[] row) {
49+
@Override
50+
public void write(TypeDBTransaction tx, String[] row, boolean allowMultiInsert) {
5051
String fileName = FilenameUtils.getName(filePath);
5152
String fileNoExtension = FilenameUtils.removeExtension(fileName);
5253
String originalRow = String.join(Character.toString(fileSeparator), row);
@@ -56,17 +57,17 @@ public void write(TypeDBTransaction tx,
5657
dataLogger.error("Malformed Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_malformed.log" + ">");
5758
}
5859

59-
TypeQLInsert statement = generateThingInsertStatement(row);
60-
if (valid(statement)) {
60+
TypeQLInsert query = generateThingInsertStatement(row);
61+
if (valid(query)) {
6162
try {
62-
tx.query().insert(statement);
63+
tx.query().insert(query);
6364
} catch (TypeDBClientException typeDBClientException) {
6465
FileLogger.getLogger().logUnavailable(fileName, originalRow);
6566
dataLogger.error("TypeDB Unavailable - Row in <" + filePath + "> not inserted - written to <" + fileNoExtension + "_unavailable.log" + ">");
6667
}
6768
} else {
6869
FileLogger.getLogger().logInvalid(fileName, originalRow);
69-
dataLogger.error("Invalid Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_invalid.log" + "> - invalid Statement: <" + statement.toString().replace("\n", " ") + ">");
70+
dataLogger.error("Invalid Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_invalid.log" + "> - invalid Statement: <" + query.toString().replace("\n", " ") + ">");
7071
}
7172
}
7273

src/main/java/com/vaticle/typedb/osi/loader/generator/Generator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,6 @@
1919
import com.vaticle.typedb.client.api.TypeDBTransaction;
2020

2121
public interface Generator {
22-
void write(TypeDBTransaction tx, String[] row);
22+
void write(TypeDBTransaction tx, String[] row, boolean allowMultiInsert);
2323
char getFileSeparator();
2424
}

src/main/java/com/vaticle/typedb/osi/loader/generator/RelationGenerator.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717
package com.vaticle.typedb.osi.loader.generator;
1818

1919
import com.vaticle.typedb.client.api.TypeDBTransaction;
20+
import com.vaticle.typedb.client.api.answer.ConceptMap;
2021
import com.vaticle.typedb.client.common.exception.TypeDBClientException;
2122
import com.vaticle.typedb.osi.loader.config.Configuration;
2223
import com.vaticle.typedb.osi.loader.io.FileLogger;
2324
import com.vaticle.typedb.osi.loader.util.GeneratorUtil;
25+
import com.vaticle.typedb.osi.loader.util.TypeDBUtil;
2426
import com.vaticle.typedb.osi.loader.util.Util;
2527
import com.vaticle.typeql.lang.TypeQL;
2628
import com.vaticle.typeql.lang.pattern.constraint.ThingConstraint;
@@ -33,8 +35,10 @@
3335

3436
import java.io.IOException;
3537
import java.util.ArrayList;
38+
import java.util.Iterator;
3639

3740
import static com.vaticle.typedb.osi.loader.util.GeneratorUtil.constrainThingWithHasAttributes;
41+
import static com.vaticle.typedb.osi.loader.util.TypeDBUtil.safeInsert;
3842

3943
public class RelationGenerator implements Generator {
4044
private static final Logger dataLogger = LogManager.getLogger("com.vaticle.typedb.osi.loader.error");
@@ -50,8 +54,8 @@ public RelationGenerator(String filePath, Configuration.Generator.Relation relat
5054
this.fileSeparator = fileSeparator;
5155
}
5256

53-
public void write(TypeDBTransaction tx,
54-
String[] row) {
57+
@Override
58+
public void write(TypeDBTransaction tx, String[] row, boolean allowMultiInsert) {
5559
String fileName = FilenameUtils.getName(filePath);
5660
String fileNoExtension = FilenameUtils.removeExtension(fileName);
5761
String originalRow = String.join(Character.toString(fileSeparator), row);
@@ -61,18 +65,24 @@ public void write(TypeDBTransaction tx,
6165
dataLogger.error("Malformed Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_malformed.log" + ">");
6266
}
6367

64-
TypeQLInsert statement = generateMatchInsertStatement(row);
68+
TypeQLInsert query = generateMatchInsertStatement(row);
6569

66-
if (relationInsertStatementValid(statement)) {
70+
if (relationInsertStatementValid(query)) {
6771
try {
68-
tx.query().insert(statement);
72+
Iterator<ConceptMap> answers = TypeDBUtil.executeMatch(tx, query);
73+
if (!answers.hasNext()) {
74+
FileLogger.getLogger().logNoMatches(fileName, originalRow);
75+
dataLogger.error("Match-insert failed - File <" + filePath + "> row <" + originalRow + "> generates query <" + query + "> which matched no answers.");
76+
} else {
77+
safeInsert(tx, query, answers, allowMultiInsert, filePath, originalRow, dataLogger);
78+
}
6979
} catch (TypeDBClientException typeDBClientException) {
7080
FileLogger.getLogger().logUnavailable(fileName, originalRow);
7181
dataLogger.error("TypeDB Unavailable - Row in <" + filePath + "> not inserted - written to <" + fileNoExtension + "_unavailable.log" + ">");
7282
}
7383
} else {
7484
FileLogger.getLogger().logInvalid(fileName, originalRow);
75-
dataLogger.error("Invalid Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_invalid.log" + "> - invalid Statement: <" + statement.toString().replace("\n", " ") + ">");
85+
dataLogger.error("Invalid Row detected in <" + filePath + "> - written to <" + fileNoExtension + "_invalid.log" + "> - invalid Statement: <" + query.toString().replace("\n", " ") + ">");
7686
}
7787
}
7888

src/main/java/com/vaticle/typedb/osi/loader/io/FileLogger.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,30 @@ public synchronized void logUnavailable(String sourceFile, String errorString) {
8282
}
8383
}
8484

85+
public void logNoMatches(String sourceFile, String row) {
86+
try {
87+
FileWriter fw = new FileWriter(directoryString + "/" + FilenameUtils.removeExtension(sourceFile) + "_no_matches.log", true);
88+
fw.append(row.replace("null", ""));
89+
fw.append("\n");
90+
fw.flush();
91+
fw.close();
92+
} catch (IOException ioException) {
93+
ioException.printStackTrace();
94+
}
95+
}
96+
97+
public void logTooManyMatches(String sourceFile, String row) {
98+
try {
99+
FileWriter fw = new FileWriter(directoryString + "/" + FilenameUtils.removeExtension(sourceFile) + "_too_many_matches.log", true);
100+
fw.append(row.replace("null", ""));
101+
fw.append("\n");
102+
fw.flush();
103+
fw.close();
104+
} catch (IOException ioException) {
105+
ioException.printStackTrace();
106+
}
107+
}
108+
85109
public synchronized void logColumnWarnings(String sourceFile, String errorString) {
86110
try {
87111
FileWriter fw = new FileWriter(directoryString + "/" + FilenameUtils.removeExtension(sourceFile) + "_column_type.log", true);

src/main/java/com/vaticle/typedb/osi/loader/loader/AsyncLoaderWorker.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.vaticle.typedb.client.api.TypeDBTransaction;
2222
import com.vaticle.typedb.common.collection.Either;
2323
import com.vaticle.typedb.common.concurrent.NamedThreadFactory;
24+
import com.vaticle.typedb.osi.loader.cli.LoadOptions;
2425
import com.vaticle.typedb.osi.loader.config.Configuration;
2526
import com.vaticle.typedb.osi.loader.generator.AppendAttributeGenerator;
2627
import com.vaticle.typedb.osi.loader.generator.AppendAttributeOrInsertThingGenerator;
@@ -52,23 +53,25 @@ public class AsyncLoaderWorker {
5253

5354
private static final DecimalFormat countFormat = new DecimalFormat("#,###");
5455
private static final DecimalFormat decimalFormat = new DecimalFormat("#,###.00");
56+
private final Configuration dc;
57+
private final LoadOptions loadOptions;
5558
private final ExecutorService executor;
5659
private final int threads;
5760
private final String databaseName;
5861
private final AtomicBoolean hasError;
5962
private final int batchGroup;
60-
private final Configuration dc;
6163
private Status status;
6264

6365
private enum Status {OK, ERROR}
6466

65-
public AsyncLoaderWorker(Configuration dc, String databaseName) {
67+
public AsyncLoaderWorker(Configuration dc, LoadOptions loadOptions) {
6668
this.dc = dc;
69+
this.loadOptions = loadOptions;
6770
this.threads = dc.getGlobalConfig().getParallelisation();
68-
this.databaseName = databaseName;
71+
this.databaseName = loadOptions.databaseName;
6972
this.hasError = new AtomicBoolean(false);
7073
this.batchGroup = 1;
71-
this.executor = Executors.newFixedThreadPool(threads, new NamedThreadFactory(databaseName));
74+
this.executor = Executors.newFixedThreadPool(threads, new NamedThreadFactory(this.databaseName));
7275
this.status = Status.OK;
7376
}
7477

@@ -379,7 +382,7 @@ private CompletableFuture<Void> asyncWrite(int id,
379382
try (TypeDBTransaction tx = session.transaction(TypeDBTransaction.Type.WRITE)) {
380383
rows.forEach(csv -> {
381384
Util.debug("async-writer-{}: {}", id, csv);
382-
gen.write(tx, csv);
385+
gen.write(tx, csv, loadOptions.multiInsert);
383386
});
384387
tx.commit();
385388
}

0 commit comments

Comments
 (0)