Skip to content

Commit

Permalink
Merge pull request #588 from Polber:jkinard/582
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 515538181
  • Loading branch information
cloud-teleport committed Mar 10, 2023
2 parents baf0102 + 317334c commit 8102cd6
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,17 @@ public Document invoke(Document data)
throw new RuntimeException("No udf was loaded");
}

Object result = getInvocable().invokeFunction(functionName(), data);
Object result = getInvocable().invokeFunction(functionName(), data.toJson());
if (result == null || ScriptObjectMirror.isUndefined(result)) {
return null;
} else if (result instanceof Document) {
return (Document) result;
} else if (result instanceof String) {
return Document.parse(result.toString());
} else {
String className = result.getClass().getName();
throw new RuntimeException(
"UDF Function did not return a String. Instead got: " + className);
"UDF Function did not return a valid Mongo Document. Instead got: " + className);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.beam.sdk.io.fs.MatchResult.Status;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
import org.bson.Document;
import org.openjdk.nashorn.api.scripting.ScriptObjectMirror;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -163,8 +164,23 @@ public static TableSchema getTableFieldSchemaForUDF(
throw new RuntimeException("No udf was loaded");
}

Object result = invocable.invokeFunction(udfFunctionName, document);
Document doc = (Document) result;
Document doc;
Object result = invocable.invokeFunction(udfFunctionName, document.toJson());
if (result == null || ScriptObjectMirror.isUndefined(result)) {
return null;
} else if (result instanceof Document) {
doc = (Document) result;
} else if (result instanceof String) {
doc = Document.parse(result.toString());
} else {
String className = result.getClass().getName();
throw new RuntimeException(
"UDF Function did not return a valid Mongo Document. Instead got: "
+ className
+ ": "
+ result);
}

if (userOption.equals("FLATTEN")) {
doc.forEach(
(key, value) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ public void testMongoDbToBigQuery() throws IOException {
mongoDbClient.insertDocuments(collectionName, mongoDocuments);

String bqTable = testName;
String udfFileName = "transform.js";
artifactClient.createArtifact(
"input/" + udfFileName,
"function transform(inJson) {\n"
+ " var outJson = JSON.parse(inJson);\n"
+ " outJson.udf = \"out\";\n"
+ " return JSON.stringify(outJson);\n"
+ "}");
List<Field> bqSchemaFields = new ArrayList<>();
bqSchemaFields.add(Field.of("timestamp", StandardSQLTypeName.TIMESTAMP));
mongoDocuments
Expand All @@ -148,7 +156,9 @@ public void testMongoDbToBigQuery() throws IOException {
.addParameter(MONGO_DB, mongoDbClient.getDatabaseName())
.addParameter(MONGO_COLLECTION, collectionName)
.addParameter(BIGQUERY_TABLE, toTableSpec(table))
.addParameter(USER_OPTION, "FLATTEN");
.addParameter(USER_OPTION, "FLATTEN")
.addParameter("javascriptDocumentTransformGcsPath", getGcsPath("input/" + udfFileName))
.addParameter("javascriptDocumentTransformFunctionName", "transform");

// Act
LaunchInfo info = launchTemplate(options);
Expand All @@ -168,6 +178,7 @@ public void testMongoDbToBigQuery() throws IOException {
mongoDocument -> {
JSONObject mongoDbJson = new JSONObject(mongoDocument.toJson());
String mongoId = mongoDbJson.getJSONObject(MONGO_DB_ID).getString("$oid");
mongoDbJson.put("udf", "out");
mongoDbJson.put(MONGO_DB_ID, mongoId);
mongoMap.put(mongoId, mongoDbJson);
});
Expand Down Expand Up @@ -212,6 +223,7 @@ private static List<Document> generateDocuments() {
}
mongoDocumentKeys.add(randomFieldName.toLowerCase());
}
mongoDocumentKeys.add("udf");

for (int i = 0; i < numDocuments; i++) {
Document randomDocument = new Document().append(MONGO_DB_ID, new ObjectId());
Expand All @@ -220,6 +232,7 @@ private static List<Document> generateDocuments() {
randomDocument.append(
mongoDocumentKeys.get(j), RandomStringUtils.randomAlphanumeric(0, 20));
}
randomDocument.append("udf", "in");

mongoDocuments.add(randomDocument);
}
Expand Down

0 comments on commit 8102cd6

Please sign in to comment.