From 317334cce8928565a6e52b9549f06096e68eae6e Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Fri, 10 Feb 2023 10:24:47 -0500 Subject: [PATCH] Issue 582 - MongoDBtoBigQuery with UDF: ScriptObjectMirror cannot be cast to bson.Document Signed-off-by: Jeffrey Kinard --- .../JavascriptDocumentTransformer.java | 6 ++++-- .../v2/mongodb/templates/MongoDbUtils.java | 20 +++++++++++++++++-- .../templates/MongoDbToBigQueryIT.java | 15 +++++++++++++- 3 files changed, 36 insertions(+), 5 deletions(-) diff --git a/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/JavascriptDocumentTransformer.java b/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/JavascriptDocumentTransformer.java index a95016ed58..85460f1ae6 100644 --- a/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/JavascriptDocumentTransformer.java +++ b/v2/common/src/main/java/com/google/cloud/teleport/v2/transforms/JavascriptDocumentTransformer.java @@ -147,15 +147,17 @@ public Document invoke(Document data) throw new RuntimeException("No udf was loaded"); } - Object result = getInvocable().invokeFunction(functionName(), data); + Object result = getInvocable().invokeFunction(functionName(), data.toJson()); if (result == null || ScriptObjectMirror.isUndefined(result)) { return null; } else if (result instanceof Document) { return (Document) result; + } else if (result instanceof String) { + return Document.parse(result.toString()); } else { String className = result.getClass().getName(); throw new RuntimeException( - "UDF Function did not return a String. Instead got: " + className); + "UDF Function did not return a valid Mongo Document. Instead got: " + className); } } diff --git a/v2/mongodb-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/mongodb/templates/MongoDbUtils.java b/v2/mongodb-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/mongodb/templates/MongoDbUtils.java index 0b7258e0c3..9e242f8704 100644 --- a/v2/mongodb-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/mongodb/templates/MongoDbUtils.java +++ b/v2/mongodb-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/mongodb/templates/MongoDbUtils.java @@ -50,6 +50,7 @@ import org.apache.beam.sdk.io.fs.MatchResult.Status; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams; import org.bson.Document; +import org.openjdk.nashorn.api.scripting.ScriptObjectMirror; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -163,8 +164,23 @@ public static TableSchema getTableFieldSchemaForUDF( throw new RuntimeException("No udf was loaded"); } - Object result = invocable.invokeFunction(udfFunctionName, document); - Document doc = (Document) result; + Document doc; + Object result = invocable.invokeFunction(udfFunctionName, document.toJson()); + if (result == null || ScriptObjectMirror.isUndefined(result)) { + return null; + } else if (result instanceof Document) { + doc = (Document) result; + } else if (result instanceof String) { + doc = Document.parse(result.toString()); + } else { + String className = result.getClass().getName(); + throw new RuntimeException( + "UDF Function did not return a valid Mongo Document. Instead got: " + + className + + ": " + + result); + } + if (userOption.equals("FLATTEN")) { doc.forEach( (key, value) -> { diff --git a/v2/mongodb-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/mongodb/templates/MongoDbToBigQueryIT.java b/v2/mongodb-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/mongodb/templates/MongoDbToBigQueryIT.java index 6def54297e..03ccb5a357 100644 --- a/v2/mongodb-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/mongodb/templates/MongoDbToBigQueryIT.java +++ b/v2/mongodb-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/mongodb/templates/MongoDbToBigQueryIT.java @@ -132,6 +132,14 @@ public void testMongoDbToBigQuery() throws IOException { mongoDbClient.insertDocuments(collectionName, mongoDocuments); String bqTable = testName; + String udfFileName = "transform.js"; + artifactClient.createArtifact( + "input/" + udfFileName, + "function transform(inJson) {\n" + + " var outJson = JSON.parse(inJson);\n" + + " outJson.udf = \"out\";\n" + + " return JSON.stringify(outJson);\n" + + "}"); List bqSchemaFields = new ArrayList<>(); bqSchemaFields.add(Field.of("timestamp", StandardSQLTypeName.TIMESTAMP)); mongoDocuments @@ -148,7 +156,9 @@ public void testMongoDbToBigQuery() throws IOException { .addParameter(MONGO_DB, mongoDbClient.getDatabaseName()) .addParameter(MONGO_COLLECTION, collectionName) .addParameter(BIGQUERY_TABLE, toTableSpec(table)) - .addParameter(USER_OPTION, "FLATTEN"); + .addParameter(USER_OPTION, "FLATTEN") + .addParameter("javascriptDocumentTransformGcsPath", getGcsPath("input/" + udfFileName)) + .addParameter("javascriptDocumentTransformFunctionName", "transform"); // Act LaunchInfo info = launchTemplate(options); @@ -168,6 +178,7 @@ public void testMongoDbToBigQuery() throws IOException { mongoDocument -> { JSONObject mongoDbJson = new JSONObject(mongoDocument.toJson()); String mongoId = mongoDbJson.getJSONObject(MONGO_DB_ID).getString("$oid"); + mongoDbJson.put("udf", "out"); mongoDbJson.put(MONGO_DB_ID, mongoId); mongoMap.put(mongoId, mongoDbJson); }); @@ -212,6 +223,7 @@ private static List generateDocuments() { } mongoDocumentKeys.add(randomFieldName.toLowerCase()); } + mongoDocumentKeys.add("udf"); for (int i = 0; i < numDocuments; i++) { Document randomDocument = new Document().append(MONGO_DB_ID, new ObjectId()); @@ -220,6 +232,7 @@ private static List generateDocuments() { randomDocument.append( mongoDocumentKeys.get(j), RandomStringUtils.randomAlphanumeric(0, 20)); } + randomDocument.append("udf", "in"); mongoDocuments.add(randomDocument); }