diff --git a/.github/workflows/build-report.yml b/.github/workflows/build-report.yml
new file mode 100644
index 000000000..691236ceb
--- /dev/null
+++ b/.github/workflows/build-report.yml
@@ -0,0 +1,51 @@
+# Copyright © 2024 Cask Data, Inc.
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+
+# This workflow will build a Java project with Maven
+# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven
+# Note: Any changes to this workflow would be used only after merging into develop
+name: Build Unit Tests Report
+
+on:
+ workflow_run:
+ workflows:
+ - Build with unit tests
+ types:
+ - completed
+
+jobs:
+ build:
+ runs-on: ubuntu-latest
+
+ if: ${{ github.event.workflow_run.conclusion != 'skipped' }}
+
+ steps:
+ # Pinned 1.0.0 version
+ - uses: marocchino/action-workflow_run-status@54b6e87d6cb552fc5f36dbe9a722a6048725917a
+
+ - name: Download artifact
+ uses: actions/download-artifact@v4
+ with:
+ github-token: ${{ secrets.GITHUB_TOKEN }}
+ run-id: ${{ github.event.workflow_run.id }}
+ path: artifacts/
+
+ - name: Surefire Report
+ # Pinned 3.5.2 version
+ uses: mikepenz/action-junit-report@16a9560bd02f11e7e3bf6b3e2ef6bba6c9d07c32
+ if: always()
+ with:
+ report_paths: '**/target/surefire-reports/TEST-*.xml'
+ github_token: ${{ secrets.GITHUB_TOKEN }}
+ detailed_summary: true
+ commit: ${{ github.event.workflow_run.head_sha }}
+ check_name: Build Test Report
+
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 939f9a6c2..94dfe5b16 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -15,21 +15,28 @@
name: Build with unit tests
on:
- workflow_run:
- workflows:
- - Trigger build
- types:
- - completed
+ push:
+ branches: [ develop, release/** ]
+ pull_request:
+ branches: [ develop, release/** ]
+ types: [opened, synchronize, reopened, labeled]
jobs:
build:
runs-on: k8s-runner-build
- if: ${{ github.event.workflow_run.conclusion != 'skipped' }}
-
+ # We allow builds:
+ # 1) When it's a merge into a branch
+ # 2) For PRs that are labeled as build and
+ # - It's a code change
+ # - A build label was just added
+ # A bit complex, but prevents builds when other labels are manipulated
+ if: >
+ github.event_name == 'push'
+ || (contains(github.event.pull_request.labels.*.name, 'build')
+ && (github.event.action != 'labeled' || github.event.label.name == 'build')
+ )
steps:
- # Pinned 1.0.0 version
- - uses: haya14busa/action-workflow_run-status@967ed83efa565c257675ed70cfe5231f062ddd94
- uses: actions/checkout@v3
with:
ref: ${{ github.event.workflow_run.head_branch }}
@@ -43,18 +50,11 @@ jobs:
- name: Build with Maven
run: mvn clean test -fae -T 2 -B -V -DcloudBuild -Dmaven.wagon.http.retryHandler.count=3 -Dmaven.wagon.httpconnectionManager.ttlSeconds=25
- name: Archive build artifacts
- uses: actions/upload-artifact@v3
+ uses: actions/upload-artifact@v4
if: always()
with:
- name: Build debug files
+ name: reports-${{ github.run_id }}
path: |
**/target/rat.txt
**/target/surefire-reports/*
- - name: Surefire Report
- # Pinned 1.0.5 version
- uses: ScaCap/action-surefire-report@ad808943e6bfbd2e6acba7c53fdb5c89534da533
- if: always()
- with:
- # GITHUB_TOKEN
- github_token: ${{ secrets.GITHUB_TOKEN }}
- commit: ${{ github.event.workflow_run.head_branch }}
+
diff --git a/.github/workflows/trigger.yml b/.github/workflows/trigger.yml
deleted file mode 100644
index e5693af07..000000000
--- a/.github/workflows/trigger.yml
+++ /dev/null
@@ -1,47 +0,0 @@
-# Copyright © 2021 Cask Data, Inc.
-# Licensed under the Apache License, Version 2.0 (the "License"); you may not
-# use this file except in compliance with the License. You may obtain a copy of
-# the License at
-# http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations under
-# the License.
-
-# This workflow will trigger build.yml only when needed.
-# This way we don't flood main workflow run list
-# Note that build.yml from develop will be used even for PR builds
-# Also it will have access to the proper GITHUB_SECRET
-
-name: Trigger build
-
-on:
- push:
- branches: [ develop, release/** ]
- pull_request:
- branches: [ develop, release/** ]
- types: [opened, synchronize, reopened, labeled]
- workflow_dispatch:
-
-jobs:
- trigger:
- runs-on: ubuntu-latest
-
- # We allow builds:
- # 1) When triggered manually
- # 2) When it's a merge into a branch
- # 3) For PRs that are labeled as build and
- # - It's a code change
- # - A build label was just added
- # A bit complex, but prevents builds when other labels are manipulated
- if: >
- github.event_name == 'workflow_dispatch'
- || github.event_name == 'push'
- || (contains(github.event.pull_request.labels.*.name, 'build')
- && (github.event.action != 'labeled' || github.event.label.name == 'build')
- )
-
- steps:
- - name: Trigger build
- run: echo Maven build will be triggered now
diff --git a/pom.xml b/pom.xml
index 172d11f52..aecfcd54b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -19,7 +19,7 @@
io.cdap.wrangler
wrangler
- 4.10.0-SNAPSHOT
+ 4.10.1
Wrangler
pom
An interactive tool for data cleansing and transformation.
@@ -60,17 +60,6 @@
HEAD
-
-
- sonatype.release
- https://oss.sonatype.org/service/local/staging/deploy/maven2
-
-
- sonatype.snapshots
- https://oss.sonatype.org/content/repositories/snapshots
-
-
-
https://issues.cask.co/browse/CDAP
@@ -83,7 +72,7 @@
1.11.133
0.10.2-hadoop2
1.56
- 6.10.0-SNAPSHOT
+ 6.10.1
1.1.5
1.6
2.5
@@ -126,19 +115,9 @@
-
- sonatype
- https://oss.sonatype.org/content/groups/public
-
- true
-
-
- false
-
-
sonatype-snapshots
- https://oss.sonatype.org/content/repositories/snapshots
+ https://central.sonatype.com/repository/maven-snapshots
false
@@ -387,26 +366,14 @@
- org.apache.maven.plugins
- maven-release-plugin
- 2.5.3
-
- v${releaseVersion}
- v@{project.version}
- true
-
- releases
-
-
-
- org.sonatype.plugins
- nexus-staging-maven-plugin
- 1.6.2
+ org.sonatype.central
+ central-publishing-maven-plugin
+ 0.8.0
true
- https://oss.sonatype.org
- sonatype.release
+ sonatype.release
+ false
+ true
@@ -547,7 +514,7 @@
io.cdap.tests.e2e
cdap-e2e-framework
- 0.3.0-SNAPSHOT
+ 0.3.0
test
diff --git a/wrangler-api/pom.xml b/wrangler-api/pom.xml
index 383cbccf8..16cb528da 100644
--- a/wrangler-api/pom.xml
+++ b/wrangler-api/pom.xml
@@ -18,7 +18,7 @@
wrangler
io.cdap.wrangler
- 4.10.0-SNAPSHOT
+ 4.10.1
4.0.0
diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/RemoteDirectiveResponse.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/RemoteDirectiveResponse.java
new file mode 100644
index 000000000..0627c28f5
--- /dev/null
+++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/RemoteDirectiveResponse.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package io.cdap.wrangler.api;
+
+import io.cdap.cdap.api.data.schema.Schema;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Response after executing directives remotely
+ * Please make sure all fields are registered with {@link io.cdap.wrangler.utils.KryoSerializer}
+ */
+public class RemoteDirectiveResponse implements Serializable {
+ private final List rows;
+ private final Schema outputSchema;
+
+ /**
+ * Only used by {@link io.cdap.wrangler.utils.KryoSerializer}
+ **/
+ private RemoteDirectiveResponse() {
+ this(null, null);
+ }
+
+ public RemoteDirectiveResponse(List rows, Schema outputSchema) {
+ this.rows = rows;
+ this.outputSchema = outputSchema;
+ }
+
+ public List getRows() {
+ return rows;
+ }
+
+ public Schema getOutputSchema() {
+ return outputSchema;
+ }
+}
diff --git a/wrangler-core/pom.xml b/wrangler-core/pom.xml
index df8e568f7..de9900dda 100644
--- a/wrangler-core/pom.xml
+++ b/wrangler-core/pom.xml
@@ -18,7 +18,7 @@
wrangler
io.cdap.wrangler
- 4.10.0-SNAPSHOT
+ 4.10.1
4.0.0
@@ -170,6 +170,11 @@
guava-retrying
${guava.retrying.version}
+
+ com.esotericsoftware
+ kryo
+ 4.0.2
+
org.antlr
antlr4
diff --git a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4
index c9d923da7..7c517ed6a 100644
--- a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4
+++ b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4
@@ -124,7 +124,7 @@ properties
;
propertyList
- : property (',' property)+
+ : property (',' property)*
;
property
diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/Copy.java b/wrangler-core/src/main/java/io/cdap/directives/column/Copy.java
index 99a4e9b5f..80ec25a9e 100644
--- a/wrangler-core/src/main/java/io/cdap/directives/column/Copy.java
+++ b/wrangler-core/src/main/java/io/cdap/directives/column/Copy.java
@@ -118,18 +118,18 @@ public Schema getOutputSchema(SchemaResolutionContext context) {
Schema inputSchema = context.getInputSchema();
List outputFields = new ArrayList<>();
Schema sourceSchema = inputSchema.getField(source.value()).getSchema();
+ boolean destinationExists = inputSchema.getField(destination.value()) != null;
for (Schema.Field field : inputSchema.getFields()) {
- if (field.getName().equals(destination.value())) {
+ if (force && field.getName().equals(destination.value())) {
outputFields.add(Schema.Field.of(destination.value(), sourceSchema));
} else {
outputFields.add(field);
}
}
- if (!force) {
+ if (!destinationExists) {
outputFields.add(Schema.Field.of(destination.value(), sourceSchema));
}
-
return Schema.recordOf("outputSchema", outputFields);
}
}
diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java b/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java
index 1ecc3b159..e520fa6b1 100644
--- a/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java
+++ b/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java
@@ -115,6 +115,9 @@ public Mutation lineage() {
@Override
public Schema getOutputSchema(SchemaResolutionContext context) {
Schema inputSchema = context.getInputSchema();
+ if (type.equalsIgnoreCase("decimal") && scale == null) {
+ return null;
+ }
return Schema.recordOf(
"outputSchema",
inputSchema.getFields().stream()
diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/SendToErrorAndContinue.java b/wrangler-core/src/main/java/io/cdap/directives/row/SendToErrorAndContinue.java
index d2a20fbe9..20922d1e8 100644
--- a/wrangler-core/src/main/java/io/cdap/directives/row/SendToErrorAndContinue.java
+++ b/wrangler-core/src/main/java/io/cdap/directives/row/SendToErrorAndContinue.java
@@ -125,6 +125,8 @@ public List execute(List rows, ExecutorContext context)
context.getTransientStore().increment(TransientVariableScope.LOCAL, "dq_failure", 1);
}
throw new ReportErrorAndProceed(message, 1);
+ } else if (context != null && !context.getTransientStore().getVariables().contains("dq_failure")) {
+ context.getTransientStore().set(TransientVariableScope.LOCAL, "dq_failure", 0L);
}
} catch (ELException e) {
throw new DirectiveExecutionException(NAME, e.getMessage(), e);
diff --git a/wrangler-core/src/main/java/io/cdap/directives/xml/XmlToJson.java b/wrangler-core/src/main/java/io/cdap/directives/xml/XmlToJson.java
index 8a3ed89ff..513a54d17 100644
--- a/wrangler-core/src/main/java/io/cdap/directives/xml/XmlToJson.java
+++ b/wrangler-core/src/main/java/io/cdap/directives/xml/XmlToJson.java
@@ -38,6 +38,7 @@
import io.cdap.wrangler.api.parser.Numeric;
import io.cdap.wrangler.api.parser.TokenType;
import io.cdap.wrangler.api.parser.UsageDefinition;
+import org.apache.commons.lang.StringUtils;
import org.json.JSONException;
import org.json.XML;
@@ -52,9 +53,11 @@
@Description("Parses a XML document to JSON representation.")
public class XmlToJson implements Directive, Lineage {
public static final String NAME = "parse-xml-to-json";
+ public static final String ARG_KEEP_STRING = "keep-string";
// Column within the input row that needs to be parsed as Json
private String col;
private int depth;
+ private boolean keepString;
private final Gson gson = new Gson();
@Override
@@ -62,6 +65,7 @@ public UsageDefinition define() {
UsageDefinition.Builder builder = UsageDefinition.builder(NAME);
builder.define("column", TokenType.COLUMN_NAME);
builder.define("depth", TokenType.NUMERIC, Optional.TRUE);
+ builder.define(ARG_KEEP_STRING, TokenType.BOOLEAN, Optional.TRUE);
return builder.build();
}
@@ -73,6 +77,12 @@ public void initialize(Arguments args) throws DirectiveParseException {
} else {
this.depth = Integer.MAX_VALUE;
}
+
+ if (args.contains(ARG_KEEP_STRING) &&
+ StringUtils.isNotEmpty(args.value(ARG_KEEP_STRING).value().toString())) {
+ this.keepString = Boolean.parseBoolean(args.value(ARG_KEEP_STRING).value().toString());
+ }
+
}
@Override
@@ -93,7 +103,7 @@ public List execute(List rows, ExecutorContext context) throws Directi
try {
if (object instanceof String) {
- JsonObject element = gson.fromJson(XML.toJSONObject((String) object).toString(),
+ JsonObject element = gson.fromJson(XML.toJSONObject((String) object, this.keepString).toString(),
JsonElement.class).getAsJsonObject();
JsParser.jsonFlatten(element, col, 1, depth, row);
row.remove(idx);
diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/registry/UserDirectiveRegistry.java b/wrangler-core/src/main/java/io/cdap/wrangler/registry/UserDirectiveRegistry.java
index 86cc271f0..58b0832b6 100644
--- a/wrangler-core/src/main/java/io/cdap/wrangler/registry/UserDirectiveRegistry.java
+++ b/wrangler-core/src/main/java/io/cdap/wrangler/registry/UserDirectiveRegistry.java
@@ -46,6 +46,8 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListMap;
import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A User Executor Registry in a collection of user defined directives. The
@@ -78,6 +80,7 @@ public final class UserDirectiveRegistry implements DirectiveRegistry {
private HttpServiceContext manager;
private ArtifactSummary wranglerArtifact;
private SystemAppTaskContext systemAppTaskContext;
+ private static final Logger LOG = LoggerFactory.getLogger(UserDirectiveRegistry.class);
/**
* This constructor should be used when initializing the registry from Service.
@@ -92,6 +95,7 @@ public final class UserDirectiveRegistry implements DirectiveRegistry {
* @param manager an instance of {@link ArtifactManager}.
*/
public UserDirectiveRegistry(HttpServiceContext manager) {
+ LOG.info("inside UDR, manager:{}", manager );
this.manager = manager;
}
@@ -100,6 +104,8 @@ public UserDirectiveRegistry(HttpServiceContext manager) {
* @param systemAppTaskContext {@link SystemAppTaskContext}
*/
public UserDirectiveRegistry(SystemAppTaskContext systemAppTaskContext) {
+ LOG.info("inside UDR, systemAppTaskContext:{}", systemAppTaskContext );
+ LOG.info("inside UDR, systemAppTaskContext.getArtifactManager:{}", systemAppTaskContext.getArtifactManager() );
this.systemAppTaskContext = systemAppTaskContext;
}
@@ -162,9 +168,14 @@ public DirectiveInfo get(String namespace, String name) throws DirectiveLoadExce
@Nullable
private Class extends Directive> getDirective(String namespace, String name) throws IOException {
+ LOG.info("Inside getDirective(), context: {}", context);
+ LOG.info("name: {}", name);
+ LOG.info("namespace: {}", namespace);
if (context != null) {
+ LOG.info("context: {}", context);
return context.loadPluginClass(name);
}
+ LOG.info("manager: {}", manager);
PluginConfigurer configurer = manager != null ?
manager.createPluginConfigurer(namespace) : systemAppTaskContext.createPluginConfigurer(namespace);
return configurer.usePluginClass(Directive.TYPE, name, UUID.randomUUID().toString(),
diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/schema/DirectiveOutputSchemaGenerator.java b/wrangler-core/src/main/java/io/cdap/wrangler/schema/DirectiveOutputSchemaGenerator.java
index cfac41804..a5e3ddc6f 100644
--- a/wrangler-core/src/main/java/io/cdap/wrangler/schema/DirectiveOutputSchemaGenerator.java
+++ b/wrangler-core/src/main/java/io/cdap/wrangler/schema/DirectiveOutputSchemaGenerator.java
@@ -103,6 +103,9 @@ private Schema generateDirectiveOutputSchema(Schema inputSchema)
outputFields.add(Schema.Field.of(fieldName, Schema.of(Schema.Type.NULL)));
}
}
+ if (outputFields.isEmpty()) {
+ return null;
+ }
return Schema.recordOf("output", outputFields);
}
diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/schema/TransientStoreKeys.java b/wrangler-core/src/main/java/io/cdap/wrangler/schema/TransientStoreKeys.java
index e35ef803f..da89fdb3c 100644
--- a/wrangler-core/src/main/java/io/cdap/wrangler/schema/TransientStoreKeys.java
+++ b/wrangler-core/src/main/java/io/cdap/wrangler/schema/TransientStoreKeys.java
@@ -18,6 +18,7 @@
/**
* TransientStoreKeys for storing Workspace schema in TransientStore
+ * NOTE: Please add any needed value in {@link io.cdap.wrangler.api.RemoteDirectiveResponse}
*/
public final class TransientStoreKeys {
public static final String INPUT_SCHEMA = "ws_input_schema";
diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/utils/ColumnConverter.java b/wrangler-core/src/main/java/io/cdap/wrangler/utils/ColumnConverter.java
index 29d64a596..1824ef088 100644
--- a/wrangler-core/src/main/java/io/cdap/wrangler/utils/ColumnConverter.java
+++ b/wrangler-core/src/main/java/io/cdap/wrangler/utils/ColumnConverter.java
@@ -317,7 +317,6 @@ public static Schema getSchemaForType(String type, Integer scale) throws Directi
type = type.toUpperCase();
if (type.equals(ColumnTypeNames.DECIMAL)) {
// TODO make set-type support setting decimal precision
- scale = scale != null ? scale : 38;
typeSchema = Schema.nullableOf(Schema.decimalOf(77, scale));
} else {
if (!SCHEMA_TYPE_MAP.containsKey(type)) {
diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/utils/KryoSerializer.java b/wrangler-core/src/main/java/io/cdap/wrangler/utils/KryoSerializer.java
new file mode 100644
index 000000000..0d13a7b08
--- /dev/null
+++ b/wrangler-core/src/main/java/io/cdap/wrangler/utils/KryoSerializer.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package io.cdap.wrangler.utils;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonNull;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.wrangler.api.RemoteDirectiveResponse;
+import io.cdap.wrangler.api.Row;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * A helper class with allows Serialization and Deserialization using Kryo
+ * We should register all schema classes present in {@link SchemaConverter}
+ * and {@link RemoteDirectiveResponse}
+ **/
+public class KryoSerializer {
+
+ private final Kryo kryo;
+ private static final Gson GSON = new Gson();
+
+ public KryoSerializer() {
+ kryo = new Kryo();
+ // Register all classes from RemoteDirectiveResponse
+ kryo.register(RemoteDirectiveResponse.class);
+ // Schema does not have no-arg constructor but implements Serializable
+ kryo.register(Schema.class, new JavaSerializer());
+ // Register all classes from SchemaConverter
+ kryo.register(Row.class);
+ kryo.register(ArrayList.class);
+ kryo.register(LocalDate.class);
+ kryo.register(LocalTime.class);
+ kryo.register(ZonedDateTime.class);
+ kryo.register(Map.class);
+ kryo.register(JsonNull.class);
+ // JsonPrimitive does not have no-arg constructor hence we need a
+ // custom serializer as it is not serializable by JavaSerializer
+ kryo.register(JsonPrimitive.class, new JsonSerializer());
+ kryo.register(JsonArray.class);
+ kryo.register(JsonObject.class);
+ // Support deprecated util.date classes
+ kryo.register(Date.class);
+ kryo.register(java.sql.Date.class);
+ kryo.register(Time.class);
+ kryo.register(Timestamp.class);
+ }
+
+ public byte[] fromRemoteDirectiveResponse(RemoteDirectiveResponse response) {
+ Output output = new Output(1024, -1);
+ kryo.writeClassAndObject(output, response);
+ return output.getBuffer();
+ }
+
+ public RemoteDirectiveResponse toRemoteDirectiveResponse(byte[] bytes) {
+ Input input = new Input(bytes);
+ return (RemoteDirectiveResponse) kryo.readClassAndObject(input);
+ }
+
+ static class JsonSerializer extends Serializer {
+
+ @Override
+ public void write(Kryo kryo, Output output, JsonElement object) {
+ output.writeString(GSON.toJson(object));
+ }
+
+ @Override
+ public JsonElement read(Kryo kryo, Input input, Class type) {
+ return GSON.fromJson(input.readString(), type);
+ }
+ }
+}
diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/utils/SchemaConverter.java b/wrangler-core/src/main/java/io/cdap/wrangler/utils/SchemaConverter.java
index 65ba2ae5d..a658c8115 100644
--- a/wrangler-core/src/main/java/io/cdap/wrangler/utils/SchemaConverter.java
+++ b/wrangler-core/src/main/java/io/cdap/wrangler/utils/SchemaConverter.java
@@ -19,7 +19,6 @@
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.data.schema.Schema.Field;
@@ -100,6 +99,7 @@ public Schema getSchema(Object value, String name) throws RecordConvertorExcepti
* @param name name of the field
* @param recordPrefix prefix to append at the beginning of a custom record
* @return the schema of this object
+ * NOTE: ANY NEWLY SUPPORTED DATATYPE SHOULD ALSO BE REGISTERED IN {@link KryoSerializer}
*/
@Nullable
public Schema getSchema(Object value, String name, @Nullable String recordPrefix) throws RecordConvertorException {
diff --git a/wrangler-core/src/test/java/io/cdap/directives/column/CopyTest.java b/wrangler-core/src/test/java/io/cdap/directives/column/CopyTest.java
index 2e2df4812..ff2e36a3b 100644
--- a/wrangler-core/src/test/java/io/cdap/directives/column/CopyTest.java
+++ b/wrangler-core/src/test/java/io/cdap/directives/column/CopyTest.java
@@ -103,6 +103,7 @@ public void testForceCopy() throws Exception {
public void testGetOutputSchemaForForceCopiedColumn() throws Exception {
String[] directives = new String[] {
"copy :col_B :col_A true",
+ "copy :col_B :col_C true",
};
List rows = Collections.singletonList(
new Row("col_A", 1).add("col_B", new BigDecimal("143235.016"))
@@ -115,7 +116,8 @@ public void testGetOutputSchemaForForceCopiedColumn() throws Exception {
Schema expectedSchema = Schema.recordOf(
"expectedSchema",
Schema.Field.of("col_A", Schema.decimalOf(10, 3)),
- Schema.Field.of("col_B", Schema.decimalOf(10, 3))
+ Schema.Field.of("col_B", Schema.decimalOf(10, 3)),
+ Schema.Field.of("col_C", Schema.decimalOf(10, 3))
);
Schema outputSchema = TestingRig.executeAndGetSchema(directives, rows, inputSchema);
diff --git a/wrangler-core/src/test/java/io/cdap/directives/column/SetTypeTest.java b/wrangler-core/src/test/java/io/cdap/directives/column/SetTypeTest.java
index b5f83c9e3..aabe2f7a9 100644
--- a/wrangler-core/src/test/java/io/cdap/directives/column/SetTypeTest.java
+++ b/wrangler-core/src/test/java/io/cdap/directives/column/SetTypeTest.java
@@ -224,7 +224,7 @@ public void testToDecimalScaleIsNull() throws Exception {
Schema expectedSchema = Schema.recordOf(
"expectedSchema",
- Schema.Field.of("scale_2", Schema.decimalOf(77, 38))
+ Schema.Field.of("scale_2", Schema.decimalOf(38, 2))
);
List results = TestingRig.execute(directives, rows);
diff --git a/wrangler-core/src/test/java/io/cdap/directives/parser/XmlToJsonTest.java b/wrangler-core/src/test/java/io/cdap/directives/parser/XmlToJsonTest.java
new file mode 100644
index 000000000..2d08228a8
--- /dev/null
+++ b/wrangler-core/src/test/java/io/cdap/directives/parser/XmlToJsonTest.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.directives.parser;
+
+import io.cdap.directives.xml.XmlToJson;
+import io.cdap.wrangler.TestingRig;
+import io.cdap.wrangler.api.Row;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Tests {@link XmlToJson}
+ */
+public class XmlToJsonTest {
+ @Test
+ public void testAutoConversionOfStringField() throws Exception {
+ String[] directives = new String[] {
+ "copy body body_1 true",
+ "copy body body_2 true",
+ "copy body body_3 true",
+ "parse-xml-to-json body_1 1",
+ "parse-xml-to-json body_2 1 false",
+ "parse-xml-to-json body_3 1 true"
+ };
+
+ List rows = Arrays.asList(
+ new Row("body",
+ "303246306303E8")
+ );
+
+ rows = TestingRig.execute(directives, rows);
+ Assert.assertEquals(1, rows.size());
+ Assert.assertEquals("{\"tagid\":3.03246306303E19}", rows.get(0).getValue("body_1_Data").toString());
+ Assert.assertEquals("{\"tagid\":3.03246306303E19}", rows.get(0).getValue("body_2_Data").toString());
+ Assert.assertEquals("{\"tagid\":\"303246306303E8\"}", rows.get(0).getValue("body_3_Data").toString());
+ }
+}
diff --git a/wrangler-core/src/test/java/io/cdap/directives/row/SendToErrorAndContinueTest.java b/wrangler-core/src/test/java/io/cdap/directives/row/SendToErrorAndContinueTest.java
index f981e14a5..718b2f488 100644
--- a/wrangler-core/src/test/java/io/cdap/directives/row/SendToErrorAndContinueTest.java
+++ b/wrangler-core/src/test/java/io/cdap/directives/row/SendToErrorAndContinueTest.java
@@ -107,4 +107,29 @@ public void testErrorConditionTrueAndContinueWithTransientVariable() throws Exce
Assert.assertEquals(2, errors.size());
Assert.assertEquals(3, results.size());
}
+
+ @Test
+ public void testErrorConditionFalseAndContinueWithTransientVariable() throws Exception {
+ String[] directives = new String[] {
+ "parse-as-csv body , true",
+ "drop body",
+ "send-to-error-and-continue exp:{body_3 == 'xyzw'} 'invalid value'",
+ "send-to-error-and-continue exp:{body_4=='1000'} 'junk' ",
+ "send-to-error exp:{dq_failure >= 1} "
+ };
+
+ List rows = Arrays.asList(
+ new Row("body", "1020134.298,,1,2,2 "),
+ new Row("body", "1020134.298,,xx,1,3"),
+ new Row("body", "1020134.298,,4,1,4"),
+ new Row("body", "1020134.298,,4,2,5"),
+ new Row("body", "1020134.298,,1,2,1")
+ );
+
+ RecipePipeline pipeline = TestingRig.execute(directives);
+ List results = pipeline.execute(rows);
+ List errors = pipeline.errors();
+ Assert.assertEquals(0, errors.size());
+ Assert.assertEquals(5, results.size());
+ }
}
diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/utils/JsonTestData.java b/wrangler-core/src/test/java/io/cdap/wrangler/utils/JsonTestData.java
index aaad5eb2b..b5fa647fc 100644
--- a/wrangler-core/src/test/java/io/cdap/wrangler/utils/JsonTestData.java
+++ b/wrangler-core/src/test/java/io/cdap/wrangler/utils/JsonTestData.java
@@ -194,4 +194,5 @@ public final class JsonTestData {
+ " }"
+ " }";
public static final String EMPTY_OBJECT = "{ \"dividesplitdetails\":{\"type0\":[]}}";
+ public static final String NULL_OBJECT = "{ \"dividesplitdetails\":{\"type0\":null, \"type1\":0}}";
}
diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/utils/KryoSerializerTest.java b/wrangler-core/src/test/java/io/cdap/wrangler/utils/KryoSerializerTest.java
new file mode 100644
index 000000000..10477465e
--- /dev/null
+++ b/wrangler-core/src/test/java/io/cdap/wrangler/utils/KryoSerializerTest.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright © 2024 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package io.cdap.wrangler.utils;
+
+import com.google.common.collect.Lists;
+import com.google.gson.JsonParser;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.wrangler.TestingRig;
+import io.cdap.wrangler.api.RecipePipeline;
+import io.cdap.wrangler.api.RemoteDirectiveResponse;
+import io.cdap.wrangler.api.Row;
+import org.junit.Assert;
+import org.junit.Test;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class KryoSerializerTest {
+
+ private static final String[] TESTS = new String[]{
+ JsonTestData.BASIC,
+ JsonTestData.SIMPLE_JSON_OBJECT,
+ JsonTestData.ARRAY_OF_OBJECTS,
+ JsonTestData.JSON_ARRAY_WITH_OBJECT,
+ JsonTestData.COMPLEX_1,
+ JsonTestData.ARRAY_OF_NUMBERS,
+ JsonTestData.ARRAY_OF_STRING,
+ JsonTestData.COMPLEX_2,
+ JsonTestData.EMPTY_OBJECT,
+ JsonTestData.NULL_OBJECT,
+ JsonTestData.FB_JSON
+ };
+
+ private static final String[] directives = new String[]{
+ "set-column body json:Parse(body)"
+ };
+
+ @Test
+ public void testJsonTypes() throws Exception {
+ SchemaConverter converter = new SchemaConverter();
+ RecordConvertor recordConvertor = new RecordConvertor();
+ JsonParser parser = new JsonParser();
+ RecipePipeline executor = TestingRig.execute(directives);
+ for (String test : TESTS) {
+ Row row = new Row("body", test);
+
+ List expectedRows = executor.execute(Lists.newArrayList(row));
+ byte[] serializedRows = new KryoSerializer().fromRemoteDirectiveResponse(
+ new RemoteDirectiveResponse(expectedRows, null));
+ List gotRows = new KryoSerializer().toRemoteDirectiveResponse(serializedRows).getRows();
+ Assert.assertArrayEquals(expectedRows.toArray(), gotRows.toArray());
+ }
+ }
+
+ @Test
+ public void testLogicalTypes() throws Exception {
+ Row testRow = new Row();
+ testRow.add("id", 1);
+ testRow.add("name", "abc");
+ testRow.add("date", LocalDate.of(2018, 11, 11));
+ testRow.add("time", LocalTime.of(11, 11, 11));
+ testRow.add("timestamp", ZonedDateTime.of(2018, 11, 11, 11, 11, 11, 0, ZoneId.of("UTC")));
+ testRow.add("bigdecimal", new BigDecimal(new BigInteger("123456"), 5));
+ testRow.add("datetime", LocalDateTime.now());
+ List expectedRows = Collections.singletonList(testRow);
+ byte[] serializedRows = new KryoSerializer().fromRemoteDirectiveResponse(
+ new RemoteDirectiveResponse(expectedRows, null));
+ List gotRows = new KryoSerializer().toRemoteDirectiveResponse(serializedRows).getRows();
+ Assert.assertArrayEquals(expectedRows.toArray(), gotRows.toArray());
+ }
+
+ @Test
+ public void testCollectionTypes() throws Exception {
+ List list = new ArrayList<>();
+ list.add(null);
+ list.add(1);
+ list.add(2);
+ Set set = new HashSet<>();
+ set.add(null);
+ set.add(1);
+ set.add(2);
+ Map map = new HashMap<>();
+ map.put("null", null);
+ map.put("1", 1);
+ map.put("2", 2);
+
+ Row testRow = new Row();
+ testRow.add("list", list);
+ testRow.add("set", set);
+ testRow.add("map", map);
+
+ List expectedRows = Collections.singletonList(testRow);
+ byte[] serializedRows = new KryoSerializer().fromRemoteDirectiveResponse(
+ new RemoteDirectiveResponse(expectedRows, null));
+ List gotRows = new KryoSerializer().toRemoteDirectiveResponse(serializedRows).getRows();
+ Assert.assertArrayEquals(expectedRows.toArray(), gotRows.toArray());
+ }
+
+ @Test
+ public void testWithSchema() throws Exception {
+ Row testRow = new Row();
+ testRow.add("id", 1);
+ testRow.add("name", "abc");
+ testRow.add("date", LocalDate.of(2018, 11, 11));
+ testRow.add("time", LocalTime.of(11, 11, 11));
+ testRow.add("timestamp", ZonedDateTime.of(2018, 11, 11, 11, 11, 11, 0, ZoneId.of("UTC")));
+ testRow.add("bigdecimal", new BigDecimal(new BigInteger("123456"), 5));
+ testRow.add("datetime", LocalDateTime.now());
+ List expectedRows = Collections.singletonList(testRow);
+
+ SchemaConverter converter = new SchemaConverter();
+ Schema expectedSchema = converter.toSchema("myrecord", expectedRows.get(0));
+
+ byte[] serializedRows = new KryoSerializer().fromRemoteDirectiveResponse(
+ new RemoteDirectiveResponse(expectedRows, expectedSchema));
+ RemoteDirectiveResponse response = new KryoSerializer().toRemoteDirectiveResponse(
+ serializedRows);
+
+ Assert.assertArrayEquals(expectedRows.toArray(), response.getRows().toArray());
+ Assert.assertEquals(expectedSchema, response.getOutputSchema());
+ }
+}
diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/utils/ObjectSerDeTest.java b/wrangler-core/src/test/java/io/cdap/wrangler/utils/ObjectSerDeTest.java
index 689668444..f297b23e8 100644
--- a/wrangler-core/src/test/java/io/cdap/wrangler/utils/ObjectSerDeTest.java
+++ b/wrangler-core/src/test/java/io/cdap/wrangler/utils/ObjectSerDeTest.java
@@ -17,6 +17,8 @@
package io.cdap.wrangler.utils;
import com.google.common.base.Charsets;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.wrangler.api.RemoteDirectiveResponse;
import io.cdap.wrangler.api.Row;
import org.junit.Assert;
import org.junit.Test;
@@ -85,4 +87,20 @@ public void testLogicalTypeSerDe() throws Exception {
actualRows = objectSerDe.toObject(bytes);
Assert.assertEquals(expectedRows.size(), actualRows.size());
}
+ @Test
+ public void testRemoteDirectiveResponseSerDe() throws Exception {
+ List expectedRows = new ArrayList<>();
+ Row firstRow = new Row();
+ firstRow.add("id", 1);
+ expectedRows.add(firstRow);
+ Schema expectedSchema = Schema.recordOf(Schema.Field.of("id", Schema.of(Schema.Type.INT)));
+ RemoteDirectiveResponse expectedResponse = new RemoteDirectiveResponse(expectedRows, expectedSchema);
+ ObjectSerDe objectSerDe = new ObjectSerDe<>();
+
+ byte[] bytes = objectSerDe.toByteArray(expectedResponse);
+ RemoteDirectiveResponse actualResponse = objectSerDe.toObject(bytes);
+
+ Assert.assertEquals(expectedResponse.getRows().size(), actualResponse.getRows().size());
+ Assert.assertEquals(expectedResponse.getOutputSchema(), actualResponse.getOutputSchema());
+ }
}
diff --git a/wrangler-docs/directives/parse-xml-to-json.md b/wrangler-docs/directives/parse-xml-to-json.md
index 031633786..beb136b0c 100644
--- a/wrangler-docs/directives/parse-xml-to-json.md
+++ b/wrangler-docs/directives/parse-xml-to-json.md
@@ -8,11 +8,13 @@ transforms the XML into a JSON document, simplifying further parsing using the
## Syntax
```
-parse-xml-to-json []
+parse-xml-to-json [] []
```
* `` is the name of the column in the record that is an XML document.
* `` indicates the depth at which the XML document parsing should terminate processing.
+* `` An OPTIONAL boolean value that if true, then values will not be coerced into boolean or numeric values and will instead be left as strings. (as per `org.json.XML` rules)
+ The default value is `false`
## Usage Notes
diff --git a/wrangler-proto/pom.xml b/wrangler-proto/pom.xml
index 9a9cdc8cc..7db666024 100644
--- a/wrangler-proto/pom.xml
+++ b/wrangler-proto/pom.xml
@@ -18,7 +18,7 @@
wrangler
io.cdap.wrangler
- 4.10.0-SNAPSHOT
+ 4.10.1
4.0.0
diff --git a/wrangler-service/pom.xml b/wrangler-service/pom.xml
index e6aaf1866..04d616c91 100644
--- a/wrangler-service/pom.xml
+++ b/wrangler-service/pom.xml
@@ -18,7 +18,7 @@
wrangler
io.cdap.wrangler
- 4.10.0-SNAPSHOT
+ 4.10.1
4.0.0
diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteDirectiveRequest.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteDirectiveRequest.java
index 9b77f23f3..d3e6d959f 100644
--- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteDirectiveRequest.java
+++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteDirectiveRequest.java
@@ -15,6 +15,7 @@
*/
package io.cdap.wrangler.service.directive;
+import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.wrangler.parser.DirectiveClass;
import java.util.HashMap;
@@ -29,13 +30,15 @@ public class RemoteDirectiveRequest {
private final Map systemDirectives;
private final String pluginNameSpace;
private final byte[] data;
+ private final Schema inputSchema;
RemoteDirectiveRequest(String recipe, Map systemDirectives,
- String pluginNameSpace, byte[] data) {
+ String pluginNameSpace, byte[] data, Schema inputSchema) {
this.recipe = recipe;
this.systemDirectives = new HashMap<>(systemDirectives);
this.pluginNameSpace = pluginNameSpace;
this.data = data;
+ this.inputSchema = inputSchema;
}
public String getRecipe() {
@@ -53,4 +56,8 @@ public byte[] getData() {
public String getPluginNameSpace() {
return pluginNameSpace;
}
+
+ public Schema getInputSchema() {
+ return inputSchema;
+ }
}
diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java
index 27216247f..8e5e4cbef 100644
--- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java
+++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java
@@ -16,9 +16,13 @@
package io.cdap.wrangler.service.directive;
import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.service.worker.RunnableTask;
import io.cdap.cdap.api.service.worker.RunnableTaskContext;
import io.cdap.cdap.api.service.worker.SystemAppTaskContext;
+import io.cdap.cdap.features.Feature;
+import io.cdap.cdap.internal.io.SchemaTypeAdapter;
import io.cdap.directives.aggregates.DefaultTransientStore;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.CompileException;
@@ -29,7 +33,10 @@
import io.cdap.wrangler.api.ErrorRecordBase;
import io.cdap.wrangler.api.ExecutorContext;
import io.cdap.wrangler.api.RecipeException;
+import io.cdap.wrangler.api.RemoteDirectiveResponse;
import io.cdap.wrangler.api.Row;
+import io.cdap.wrangler.api.TransientStore;
+import io.cdap.wrangler.api.TransientVariableScope;
import io.cdap.wrangler.api.parser.UsageDefinition;
import io.cdap.wrangler.executor.RecipePipelineExecutor;
import io.cdap.wrangler.expression.EL;
@@ -42,27 +49,44 @@
import io.cdap.wrangler.proto.ErrorRecordsException;
import io.cdap.wrangler.registry.DirectiveInfo;
import io.cdap.wrangler.registry.UserDirectiveRegistry;
+import io.cdap.wrangler.utils.KryoSerializer;
import io.cdap.wrangler.utils.ObjectSerDe;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static io.cdap.wrangler.schema.TransientStoreKeys.INPUT_SCHEMA;
+import static io.cdap.wrangler.schema.TransientStoreKeys.OUTPUT_SCHEMA;
/**
* Task for remote execution of directives
*/
public class RemoteExecutionTask implements RunnableTask {
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutionTask.class);
- private static final Gson GSON = new Gson();
+ private static final Gson GSON = new GsonBuilder()
+ .registerTypeAdapter(Schema.class, new SchemaTypeAdapter())
+ .create();
@Override
public void run(RunnableTaskContext runnableTaskContext) throws Exception {
RemoteDirectiveRequest directiveRequest = GSON.fromJson(runnableTaskContext.getParam(),
RemoteDirectiveRequest.class);
+ LOG.info("Inside run method");
+ LOG.info("directiveRequest: {}", directiveRequest);
+ LOG.info("directiveRequest getData :{}", directiveRequest.getData());
+ LOG.info("directiveRequest getRecipe :{}", directiveRequest.getRecipe());
+ LOG.info("directiveRequest getInputSchema: {}", directiveRequest.getInputSchema());
+ LOG.info("directiveRequest getSystemDirectives:{}", directiveRequest.getSystemDirectives());
+ LOG.info("directiveRequest getPluginNameSpace:{}", directiveRequest.getPluginNameSpace());
+
SystemAppTaskContext systemAppContext = runnableTaskContext.getRunnableTaskSystemAppContext();
+ LOG.info("systemAppContext: {}", systemAppContext);
String namespace = directiveRequest.getPluginNameSpace();
Map systemDirectives = directiveRequest.getSystemDirectives();
AtomicBoolean hasUDD = new AtomicBoolean();
@@ -102,12 +126,18 @@ public void run(RunnableTaskContext runnableTaskContext) throws Exception {
ObjectSerDe> objectSerDe = new ObjectSerDe<>();
List rows = objectSerDe.toObject(directiveRequest.getData());
+ Schema inputSchema = directiveRequest.getInputSchema();
+ TransientStore transientStore = new DefaultTransientStore();
+ if (inputSchema != null) {
+ transientStore.set(TransientVariableScope.GLOBAL, INPUT_SCHEMA, inputSchema);
+ }
+
try (RecipePipelineExecutor executor = new RecipePipelineExecutor(() -> directives,
new ServicePipelineContext(
namespace,
ExecutorContext.Environment.SERVICE,
systemAppContext,
- new DefaultTransientStore()))) {
+ transientStore))) {
rows = executor.execute(rows);
List errors = executor.errors().stream()
.filter(ErrorRecordBase::isShownInWrangler)
@@ -120,8 +150,17 @@ public void run(RunnableTaskContext runnableTaskContext) throws Exception {
throw new BadRequestException(e.getMessage(), e);
}
+ Schema outputSchema = transientStore.get(OUTPUT_SCHEMA);
+ RemoteDirectiveResponse response = new RemoteDirectiveResponse(rows, outputSchema);
+ ObjectSerDe responseSerDe = new ObjectSerDe<>();
+
runnableTaskContext.setTerminateOnComplete(hasUDD.get() || EL.isUsed());
- runnableTaskContext.writeResult(objectSerDe.toByteArray(rows));
+
+ if (Feature.WRANGLER_KRYO_SERIALIZATION.isEnabled(systemAppContext)) {
+ runnableTaskContext.writeResult(new KryoSerializer().fromRemoteDirectiveResponse(response));
+ } else {
+ runnableTaskContext.writeResult(responseSerDe.toByteArray(response));
+ }
} catch (DirectiveParseException | ClassNotFoundException | CompileException e) {
throw new BadRequestException(e.getMessage(), e);
}
diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java
index 20d4d3b2f..9a01df5f6 100644
--- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java
+++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java
@@ -39,6 +39,7 @@
import io.cdap.cdap.etl.proto.ArtifactSelectorConfig;
import io.cdap.cdap.etl.proto.connection.ConnectorDetail;
import io.cdap.cdap.etl.proto.connection.SampleResponse;
+import io.cdap.cdap.features.Feature;
import io.cdap.cdap.internal.io.SchemaTypeAdapter;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.wrangler.PropertyIds;
@@ -48,8 +49,10 @@
import io.cdap.wrangler.api.DirectiveParseException;
import io.cdap.wrangler.api.GrammarMigrator;
import io.cdap.wrangler.api.RecipeException;
+import io.cdap.wrangler.api.RemoteDirectiveResponse;
import io.cdap.wrangler.api.Row;
import io.cdap.wrangler.api.TransientVariableScope;
+import io.cdap.wrangler.lineage.LineageOperations;
import io.cdap.wrangler.parser.ConfigDirectiveContext;
import io.cdap.wrangler.parser.DirectiveClass;
import io.cdap.wrangler.parser.GrammarWalker;
@@ -77,6 +80,7 @@
import io.cdap.wrangler.schema.TransientStoreKeys;
import io.cdap.wrangler.store.recipe.RecipeStore;
import io.cdap.wrangler.store.workspace.WorkspaceStore;
+import io.cdap.wrangler.utils.KryoSerializer;
import io.cdap.wrangler.utils.ObjectSerDe;
import io.cdap.wrangler.utils.RowHelper;
import io.cdap.wrangler.utils.SchemaConverter;
@@ -100,12 +104,17 @@
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static io.cdap.wrangler.schema.TransientStoreKeys.INPUT_SCHEMA;
+import static io.cdap.wrangler.schema.TransientStoreKeys.OUTPUT_SCHEMA;
/**
* V2 endpoints for workspace
*/
public class WorkspaceHandler extends AbstractDirectiveHandler {
-
+ private static final Logger LOG = LoggerFactory.getLogger(WorkspaceHandler.class);
private static final Gson GSON =
new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).create();
private static final Pattern PRAGMA_PATTERN = Pattern.compile("^\\s*#pragma\\s+load-directives\\s+");
@@ -464,14 +473,16 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque
DirectiveExecutionRequest executionRequest =
GSON.fromJson(StandardCharsets.UTF_8.decode(request.getContent()).toString(),
DirectiveExecutionRequest.class);
-
+ LOG.info("DirectiveExecutionRequest: directives: {} limit: {}", executionRequest.getDirectives(), executionRequest.getLimit());
List directives = new ArrayList<>(executionRequest.getDirectives());
if (recipeDirectives != null) {
directives.addAll(recipeDirectives);
}
WorkspaceDetail detail = wsStore.getWorkspaceDetail(workspaceId);
+ LOG.info("WorkspaceDetail: {}", detail);
UserDirectivesCollector userDirectivesCollector = new UserDirectivesCollector();
+ LOG.info("UserDirectivesCollector: {}", userDirectivesCollector);
List result = executeDirectives(ns.getName(), directives, detail,
userDirectivesCollector);
DirectiveExecutionResponse response = generateExecutionResponse(result,
@@ -557,6 +568,8 @@ private List executeDirectives(String namespace,
TRANSIENT_STORE.reset(TransientVariableScope.GLOBAL);
TRANSIENT_STORE.set(TransientVariableScope.GLOBAL, TransientStoreKeys.INPUT_SCHEMA, inputSchema);
}
+ LOG.info("SampleSpec: {}",detail.getWorkspace().getSampleSpec());
+
return getContext().isRemoteTaskEnabled() ?
executeRemotely(namespace, directives, detail, grammarVisitor) :
@@ -598,6 +611,7 @@ private List executeRemotely(String namespace, List systemDirectives = new HashMap<>();
// Gather system directives and call additional visitor.
@@ -616,14 +630,32 @@ private List executeRemotely(String namespace, List>().toObject(bytes);
+ RemoteDirectiveResponse response;
+ if (Feature.WRANGLER_KRYO_SERIALIZATION.isEnabled(getContext())) {
+ response = new KryoSerializer().toRemoteDirectiveResponse(bytes);
+ } else {
+ response = new ObjectSerDe().toObject(bytes);
+ }
+ if (response.getOutputSchema() != null) {
+ TRANSIENT_STORE.set(TransientVariableScope.GLOBAL, OUTPUT_SCHEMA, response.getOutputSchema());
+ }
+ LOG.info("response: {}", response);
+ return response.getRows();
}
private List getSample(SampleResponse sampleResponse) {
diff --git a/wrangler-storage/pom.xml b/wrangler-storage/pom.xml
index a7549b03f..a8e339631 100644
--- a/wrangler-storage/pom.xml
+++ b/wrangler-storage/pom.xml
@@ -18,7 +18,7 @@
wrangler
io.cdap.wrangler
- 4.10.0-SNAPSHOT
+ 4.10.1
4.0.0
diff --git a/wrangler-test/pom.xml b/wrangler-test/pom.xml
index 95be5b354..d9cf77b4c 100644
--- a/wrangler-test/pom.xml
+++ b/wrangler-test/pom.xml
@@ -19,7 +19,7 @@
wrangler
io.cdap.wrangler
- 4.10.0-SNAPSHOT
+ 4.10.1
4.0.0
diff --git a/wrangler-transform/pom.xml b/wrangler-transform/pom.xml
index b09712c0d..e464a24d5 100644
--- a/wrangler-transform/pom.xml
+++ b/wrangler-transform/pom.xml
@@ -3,7 +3,7 @@
wrangler
io.cdap.wrangler
- 4.10.0-SNAPSHOT
+ 4.10.1
4.0.0
@@ -122,8 +122,8 @@
1.1.0
- system:cdap-data-pipeline[6.4.0,7.0.0-SNAPSHOT)
- system:cdap-data-streams[6.4.0,7.0.0-SNAPSHOT)
+ system:cdap-data-pipeline[6.10.0,7.0.0-SNAPSHOT)
+ system:cdap-data-streams[6.10.0,7.0.0-SNAPSHOT)