diff --git a/.github/workflows/build-report.yml b/.github/workflows/build-report.yml new file mode 100644 index 000000000..691236ceb --- /dev/null +++ b/.github/workflows/build-report.yml @@ -0,0 +1,51 @@ +# Copyright © 2024 Cask Data, Inc. +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +# This workflow will build a Java project with Maven +# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven +# Note: Any changes to this workflow would be used only after merging into develop +name: Build Unit Tests Report + +on: + workflow_run: + workflows: + - Build with unit tests + types: + - completed + +jobs: + build: + runs-on: ubuntu-latest + + if: ${{ github.event.workflow_run.conclusion != 'skipped' }} + + steps: + # Pinned 1.0.0 version + - uses: marocchino/action-workflow_run-status@54b6e87d6cb552fc5f36dbe9a722a6048725917a + + - name: Download artifact + uses: actions/download-artifact@v4 + with: + github-token: ${{ secrets.GITHUB_TOKEN }} + run-id: ${{ github.event.workflow_run.id }} + path: artifacts/ + + - name: Surefire Report + # Pinned 3.5.2 version + uses: mikepenz/action-junit-report@16a9560bd02f11e7e3bf6b3e2ef6bba6c9d07c32 + if: always() + with: + report_paths: '**/target/surefire-reports/TEST-*.xml' + github_token: ${{ secrets.GITHUB_TOKEN }} + detailed_summary: true + commit: ${{ github.event.workflow_run.head_sha }} + check_name: Build Test Report + diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 939f9a6c2..94dfe5b16 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -15,21 +15,28 @@ name: Build with unit tests on: - workflow_run: - workflows: - - Trigger build - types: - - completed + push: + branches: [ develop, release/** ] + pull_request: + branches: [ develop, release/** ] + types: [opened, synchronize, reopened, labeled] jobs: build: runs-on: k8s-runner-build - if: ${{ github.event.workflow_run.conclusion != 'skipped' }} - + # We allow builds: + # 1) When it's a merge into a branch + # 2) For PRs that are labeled as build and + # - It's a code change + # - A build label was just added + # A bit complex, but prevents builds when other labels are manipulated + if: > + github.event_name == 'push' + || (contains(github.event.pull_request.labels.*.name, 'build') + && (github.event.action != 'labeled' || github.event.label.name == 'build') + ) steps: - # Pinned 1.0.0 version - - uses: haya14busa/action-workflow_run-status@967ed83efa565c257675ed70cfe5231f062ddd94 - uses: actions/checkout@v3 with: ref: ${{ github.event.workflow_run.head_branch }} @@ -43,18 +50,11 @@ jobs: - name: Build with Maven run: mvn clean test -fae -T 2 -B -V -DcloudBuild -Dmaven.wagon.http.retryHandler.count=3 -Dmaven.wagon.httpconnectionManager.ttlSeconds=25 - name: Archive build artifacts - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 if: always() with: - name: Build debug files + name: reports-${{ github.run_id }} path: | **/target/rat.txt **/target/surefire-reports/* - - name: Surefire Report - # Pinned 1.0.5 version - uses: ScaCap/action-surefire-report@ad808943e6bfbd2e6acba7c53fdb5c89534da533 - if: always() - with: - # GITHUB_TOKEN - github_token: ${{ secrets.GITHUB_TOKEN }} - commit: ${{ github.event.workflow_run.head_branch }} + diff --git a/.github/workflows/trigger.yml b/.github/workflows/trigger.yml deleted file mode 100644 index e5693af07..000000000 --- a/.github/workflows/trigger.yml +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright © 2021 Cask Data, Inc. -# Licensed under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy of -# the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations under -# the License. - -# This workflow will trigger build.yml only when needed. -# This way we don't flood main workflow run list -# Note that build.yml from develop will be used even for PR builds -# Also it will have access to the proper GITHUB_SECRET - -name: Trigger build - -on: - push: - branches: [ develop, release/** ] - pull_request: - branches: [ develop, release/** ] - types: [opened, synchronize, reopened, labeled] - workflow_dispatch: - -jobs: - trigger: - runs-on: ubuntu-latest - - # We allow builds: - # 1) When triggered manually - # 2) When it's a merge into a branch - # 3) For PRs that are labeled as build and - # - It's a code change - # - A build label was just added - # A bit complex, but prevents builds when other labels are manipulated - if: > - github.event_name == 'workflow_dispatch' - || github.event_name == 'push' - || (contains(github.event.pull_request.labels.*.name, 'build') - && (github.event.action != 'labeled' || github.event.label.name == 'build') - ) - - steps: - - name: Trigger build - run: echo Maven build will be triggered now diff --git a/pom.xml b/pom.xml index 172d11f52..aecfcd54b 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ io.cdap.wrangler wrangler - 4.10.0-SNAPSHOT + 4.10.1 Wrangler pom An interactive tool for data cleansing and transformation. @@ -60,17 +60,6 @@ HEAD - - - sonatype.release - https://oss.sonatype.org/service/local/staging/deploy/maven2 - - - sonatype.snapshots - https://oss.sonatype.org/content/repositories/snapshots - - - https://issues.cask.co/browse/CDAP @@ -83,7 +72,7 @@ 1.11.133 0.10.2-hadoop2 1.56 - 6.10.0-SNAPSHOT + 6.10.1 1.1.5 1.6 2.5 @@ -126,19 +115,9 @@ - - sonatype - https://oss.sonatype.org/content/groups/public - - true - - - false - - sonatype-snapshots - https://oss.sonatype.org/content/repositories/snapshots + https://central.sonatype.com/repository/maven-snapshots false @@ -387,26 +366,14 @@ - org.apache.maven.plugins - maven-release-plugin - 2.5.3 - - v${releaseVersion} - v@{project.version} - true - - releases - - - - org.sonatype.plugins - nexus-staging-maven-plugin - 1.6.2 + org.sonatype.central + central-publishing-maven-plugin + 0.8.0 true - https://oss.sonatype.org - sonatype.release + sonatype.release + false + true @@ -547,7 +514,7 @@ io.cdap.tests.e2e cdap-e2e-framework - 0.3.0-SNAPSHOT + 0.3.0 test diff --git a/wrangler-api/pom.xml b/wrangler-api/pom.xml index 383cbccf8..16cb528da 100644 --- a/wrangler-api/pom.xml +++ b/wrangler-api/pom.xml @@ -18,7 +18,7 @@ wrangler io.cdap.wrangler - 4.10.0-SNAPSHOT + 4.10.1 4.0.0 diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/RemoteDirectiveResponse.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/RemoteDirectiveResponse.java new file mode 100644 index 000000000..0627c28f5 --- /dev/null +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/RemoteDirectiveResponse.java @@ -0,0 +1,50 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.wrangler.api; + +import io.cdap.cdap.api.data.schema.Schema; + +import java.io.Serializable; +import java.util.List; + +/** + * Response after executing directives remotely + * Please make sure all fields are registered with {@link io.cdap.wrangler.utils.KryoSerializer} + */ +public class RemoteDirectiveResponse implements Serializable { + private final List rows; + private final Schema outputSchema; + + /** + * Only used by {@link io.cdap.wrangler.utils.KryoSerializer} + **/ + private RemoteDirectiveResponse() { + this(null, null); + } + + public RemoteDirectiveResponse(List rows, Schema outputSchema) { + this.rows = rows; + this.outputSchema = outputSchema; + } + + public List getRows() { + return rows; + } + + public Schema getOutputSchema() { + return outputSchema; + } +} diff --git a/wrangler-core/pom.xml b/wrangler-core/pom.xml index df8e568f7..de9900dda 100644 --- a/wrangler-core/pom.xml +++ b/wrangler-core/pom.xml @@ -18,7 +18,7 @@ wrangler io.cdap.wrangler - 4.10.0-SNAPSHOT + 4.10.1 4.0.0 @@ -170,6 +170,11 @@ guava-retrying ${guava.retrying.version} + + com.esotericsoftware + kryo + 4.0.2 + org.antlr antlr4 diff --git a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 index c9d923da7..7c517ed6a 100644 --- a/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 +++ b/wrangler-core/src/main/antlr4/io/cdap/wrangler/parser/Directives.g4 @@ -124,7 +124,7 @@ properties ; propertyList - : property (',' property)+ + : property (',' property)* ; property diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/Copy.java b/wrangler-core/src/main/java/io/cdap/directives/column/Copy.java index 99a4e9b5f..80ec25a9e 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/Copy.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/Copy.java @@ -118,18 +118,18 @@ public Schema getOutputSchema(SchemaResolutionContext context) { Schema inputSchema = context.getInputSchema(); List outputFields = new ArrayList<>(); Schema sourceSchema = inputSchema.getField(source.value()).getSchema(); + boolean destinationExists = inputSchema.getField(destination.value()) != null; for (Schema.Field field : inputSchema.getFields()) { - if (field.getName().equals(destination.value())) { + if (force && field.getName().equals(destination.value())) { outputFields.add(Schema.Field.of(destination.value(), sourceSchema)); } else { outputFields.add(field); } } - if (!force) { + if (!destinationExists) { outputFields.add(Schema.Field.of(destination.value(), sourceSchema)); } - return Schema.recordOf("outputSchema", outputFields); } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java b/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java index 1ecc3b159..e520fa6b1 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java @@ -115,6 +115,9 @@ public Mutation lineage() { @Override public Schema getOutputSchema(SchemaResolutionContext context) { Schema inputSchema = context.getInputSchema(); + if (type.equalsIgnoreCase("decimal") && scale == null) { + return null; + } return Schema.recordOf( "outputSchema", inputSchema.getFields().stream() diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/SendToErrorAndContinue.java b/wrangler-core/src/main/java/io/cdap/directives/row/SendToErrorAndContinue.java index d2a20fbe9..20922d1e8 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/SendToErrorAndContinue.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/SendToErrorAndContinue.java @@ -125,6 +125,8 @@ public List execute(List rows, ExecutorContext context) context.getTransientStore().increment(TransientVariableScope.LOCAL, "dq_failure", 1); } throw new ReportErrorAndProceed(message, 1); + } else if (context != null && !context.getTransientStore().getVariables().contains("dq_failure")) { + context.getTransientStore().set(TransientVariableScope.LOCAL, "dq_failure", 0L); } } catch (ELException e) { throw new DirectiveExecutionException(NAME, e.getMessage(), e); diff --git a/wrangler-core/src/main/java/io/cdap/directives/xml/XmlToJson.java b/wrangler-core/src/main/java/io/cdap/directives/xml/XmlToJson.java index 8a3ed89ff..513a54d17 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/xml/XmlToJson.java +++ b/wrangler-core/src/main/java/io/cdap/directives/xml/XmlToJson.java @@ -38,6 +38,7 @@ import io.cdap.wrangler.api.parser.Numeric; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import org.apache.commons.lang.StringUtils; import org.json.JSONException; import org.json.XML; @@ -52,9 +53,11 @@ @Description("Parses a XML document to JSON representation.") public class XmlToJson implements Directive, Lineage { public static final String NAME = "parse-xml-to-json"; + public static final String ARG_KEEP_STRING = "keep-string"; // Column within the input row that needs to be parsed as Json private String col; private int depth; + private boolean keepString; private final Gson gson = new Gson(); @Override @@ -62,6 +65,7 @@ public UsageDefinition define() { UsageDefinition.Builder builder = UsageDefinition.builder(NAME); builder.define("column", TokenType.COLUMN_NAME); builder.define("depth", TokenType.NUMERIC, Optional.TRUE); + builder.define(ARG_KEEP_STRING, TokenType.BOOLEAN, Optional.TRUE); return builder.build(); } @@ -73,6 +77,12 @@ public void initialize(Arguments args) throws DirectiveParseException { } else { this.depth = Integer.MAX_VALUE; } + + if (args.contains(ARG_KEEP_STRING) && + StringUtils.isNotEmpty(args.value(ARG_KEEP_STRING).value().toString())) { + this.keepString = Boolean.parseBoolean(args.value(ARG_KEEP_STRING).value().toString()); + } + } @Override @@ -93,7 +103,7 @@ public List execute(List rows, ExecutorContext context) throws Directi try { if (object instanceof String) { - JsonObject element = gson.fromJson(XML.toJSONObject((String) object).toString(), + JsonObject element = gson.fromJson(XML.toJSONObject((String) object, this.keepString).toString(), JsonElement.class).getAsJsonObject(); JsParser.jsonFlatten(element, col, 1, depth, row); row.remove(idx); diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/registry/UserDirectiveRegistry.java b/wrangler-core/src/main/java/io/cdap/wrangler/registry/UserDirectiveRegistry.java index 86cc271f0..58b0832b6 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/registry/UserDirectiveRegistry.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/registry/UserDirectiveRegistry.java @@ -46,6 +46,8 @@ import java.util.UUID; import java.util.concurrent.ConcurrentSkipListMap; import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A User Executor Registry in a collection of user defined directives. The @@ -78,6 +80,7 @@ public final class UserDirectiveRegistry implements DirectiveRegistry { private HttpServiceContext manager; private ArtifactSummary wranglerArtifact; private SystemAppTaskContext systemAppTaskContext; + private static final Logger LOG = LoggerFactory.getLogger(UserDirectiveRegistry.class); /** * This constructor should be used when initializing the registry from Service. @@ -92,6 +95,7 @@ public final class UserDirectiveRegistry implements DirectiveRegistry { * @param manager an instance of {@link ArtifactManager}. */ public UserDirectiveRegistry(HttpServiceContext manager) { + LOG.info("inside UDR, manager:{}", manager ); this.manager = manager; } @@ -100,6 +104,8 @@ public UserDirectiveRegistry(HttpServiceContext manager) { * @param systemAppTaskContext {@link SystemAppTaskContext} */ public UserDirectiveRegistry(SystemAppTaskContext systemAppTaskContext) { + LOG.info("inside UDR, systemAppTaskContext:{}", systemAppTaskContext ); + LOG.info("inside UDR, systemAppTaskContext.getArtifactManager:{}", systemAppTaskContext.getArtifactManager() ); this.systemAppTaskContext = systemAppTaskContext; } @@ -162,9 +168,14 @@ public DirectiveInfo get(String namespace, String name) throws DirectiveLoadExce @Nullable private Class getDirective(String namespace, String name) throws IOException { + LOG.info("Inside getDirective(), context: {}", context); + LOG.info("name: {}", name); + LOG.info("namespace: {}", namespace); if (context != null) { + LOG.info("context: {}", context); return context.loadPluginClass(name); } + LOG.info("manager: {}", manager); PluginConfigurer configurer = manager != null ? manager.createPluginConfigurer(namespace) : systemAppTaskContext.createPluginConfigurer(namespace); return configurer.usePluginClass(Directive.TYPE, name, UUID.randomUUID().toString(), diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/schema/DirectiveOutputSchemaGenerator.java b/wrangler-core/src/main/java/io/cdap/wrangler/schema/DirectiveOutputSchemaGenerator.java index cfac41804..a5e3ddc6f 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/schema/DirectiveOutputSchemaGenerator.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/schema/DirectiveOutputSchemaGenerator.java @@ -103,6 +103,9 @@ private Schema generateDirectiveOutputSchema(Schema inputSchema) outputFields.add(Schema.Field.of(fieldName, Schema.of(Schema.Type.NULL))); } } + if (outputFields.isEmpty()) { + return null; + } return Schema.recordOf("output", outputFields); } diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/schema/TransientStoreKeys.java b/wrangler-core/src/main/java/io/cdap/wrangler/schema/TransientStoreKeys.java index e35ef803f..da89fdb3c 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/schema/TransientStoreKeys.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/schema/TransientStoreKeys.java @@ -18,6 +18,7 @@ /** * TransientStoreKeys for storing Workspace schema in TransientStore + * NOTE: Please add any needed value in {@link io.cdap.wrangler.api.RemoteDirectiveResponse} */ public final class TransientStoreKeys { public static final String INPUT_SCHEMA = "ws_input_schema"; diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/utils/ColumnConverter.java b/wrangler-core/src/main/java/io/cdap/wrangler/utils/ColumnConverter.java index 29d64a596..1824ef088 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/utils/ColumnConverter.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/utils/ColumnConverter.java @@ -317,7 +317,6 @@ public static Schema getSchemaForType(String type, Integer scale) throws Directi type = type.toUpperCase(); if (type.equals(ColumnTypeNames.DECIMAL)) { // TODO make set-type support setting decimal precision - scale = scale != null ? scale : 38; typeSchema = Schema.nullableOf(Schema.decimalOf(77, scale)); } else { if (!SCHEMA_TYPE_MAP.containsKey(type)) { diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/utils/KryoSerializer.java b/wrangler-core/src/main/java/io/cdap/wrangler/utils/KryoSerializer.java new file mode 100644 index 000000000..0d13a7b08 --- /dev/null +++ b/wrangler-core/src/main/java/io/cdap/wrangler/utils/KryoSerializer.java @@ -0,0 +1,100 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.wrangler.utils; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonNull; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.wrangler.api.RemoteDirectiveResponse; +import io.cdap.wrangler.api.Row; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Date; +import java.util.Map; + +/** + * A helper class with allows Serialization and Deserialization using Kryo + * We should register all schema classes present in {@link SchemaConverter} + * and {@link RemoteDirectiveResponse} + **/ +public class KryoSerializer { + + private final Kryo kryo; + private static final Gson GSON = new Gson(); + + public KryoSerializer() { + kryo = new Kryo(); + // Register all classes from RemoteDirectiveResponse + kryo.register(RemoteDirectiveResponse.class); + // Schema does not have no-arg constructor but implements Serializable + kryo.register(Schema.class, new JavaSerializer()); + // Register all classes from SchemaConverter + kryo.register(Row.class); + kryo.register(ArrayList.class); + kryo.register(LocalDate.class); + kryo.register(LocalTime.class); + kryo.register(ZonedDateTime.class); + kryo.register(Map.class); + kryo.register(JsonNull.class); + // JsonPrimitive does not have no-arg constructor hence we need a + // custom serializer as it is not serializable by JavaSerializer + kryo.register(JsonPrimitive.class, new JsonSerializer()); + kryo.register(JsonArray.class); + kryo.register(JsonObject.class); + // Support deprecated util.date classes + kryo.register(Date.class); + kryo.register(java.sql.Date.class); + kryo.register(Time.class); + kryo.register(Timestamp.class); + } + + public byte[] fromRemoteDirectiveResponse(RemoteDirectiveResponse response) { + Output output = new Output(1024, -1); + kryo.writeClassAndObject(output, response); + return output.getBuffer(); + } + + public RemoteDirectiveResponse toRemoteDirectiveResponse(byte[] bytes) { + Input input = new Input(bytes); + return (RemoteDirectiveResponse) kryo.readClassAndObject(input); + } + + static class JsonSerializer extends Serializer { + + @Override + public void write(Kryo kryo, Output output, JsonElement object) { + output.writeString(GSON.toJson(object)); + } + + @Override + public JsonElement read(Kryo kryo, Input input, Class type) { + return GSON.fromJson(input.readString(), type); + } + } +} diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/utils/SchemaConverter.java b/wrangler-core/src/main/java/io/cdap/wrangler/utils/SchemaConverter.java index 65ba2ae5d..a658c8115 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/utils/SchemaConverter.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/utils/SchemaConverter.java @@ -19,7 +19,6 @@ import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.data.schema.Schema.Field; @@ -100,6 +99,7 @@ public Schema getSchema(Object value, String name) throws RecordConvertorExcepti * @param name name of the field * @param recordPrefix prefix to append at the beginning of a custom record * @return the schema of this object + * NOTE: ANY NEWLY SUPPORTED DATATYPE SHOULD ALSO BE REGISTERED IN {@link KryoSerializer} */ @Nullable public Schema getSchema(Object value, String name, @Nullable String recordPrefix) throws RecordConvertorException { diff --git a/wrangler-core/src/test/java/io/cdap/directives/column/CopyTest.java b/wrangler-core/src/test/java/io/cdap/directives/column/CopyTest.java index 2e2df4812..ff2e36a3b 100644 --- a/wrangler-core/src/test/java/io/cdap/directives/column/CopyTest.java +++ b/wrangler-core/src/test/java/io/cdap/directives/column/CopyTest.java @@ -103,6 +103,7 @@ public void testForceCopy() throws Exception { public void testGetOutputSchemaForForceCopiedColumn() throws Exception { String[] directives = new String[] { "copy :col_B :col_A true", + "copy :col_B :col_C true", }; List rows = Collections.singletonList( new Row("col_A", 1).add("col_B", new BigDecimal("143235.016")) @@ -115,7 +116,8 @@ public void testGetOutputSchemaForForceCopiedColumn() throws Exception { Schema expectedSchema = Schema.recordOf( "expectedSchema", Schema.Field.of("col_A", Schema.decimalOf(10, 3)), - Schema.Field.of("col_B", Schema.decimalOf(10, 3)) + Schema.Field.of("col_B", Schema.decimalOf(10, 3)), + Schema.Field.of("col_C", Schema.decimalOf(10, 3)) ); Schema outputSchema = TestingRig.executeAndGetSchema(directives, rows, inputSchema); diff --git a/wrangler-core/src/test/java/io/cdap/directives/column/SetTypeTest.java b/wrangler-core/src/test/java/io/cdap/directives/column/SetTypeTest.java index b5f83c9e3..aabe2f7a9 100644 --- a/wrangler-core/src/test/java/io/cdap/directives/column/SetTypeTest.java +++ b/wrangler-core/src/test/java/io/cdap/directives/column/SetTypeTest.java @@ -224,7 +224,7 @@ public void testToDecimalScaleIsNull() throws Exception { Schema expectedSchema = Schema.recordOf( "expectedSchema", - Schema.Field.of("scale_2", Schema.decimalOf(77, 38)) + Schema.Field.of("scale_2", Schema.decimalOf(38, 2)) ); List results = TestingRig.execute(directives, rows); diff --git a/wrangler-core/src/test/java/io/cdap/directives/parser/XmlToJsonTest.java b/wrangler-core/src/test/java/io/cdap/directives/parser/XmlToJsonTest.java new file mode 100644 index 000000000..2d08228a8 --- /dev/null +++ b/wrangler-core/src/test/java/io/cdap/directives/parser/XmlToJsonTest.java @@ -0,0 +1,54 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.directives.parser; + +import io.cdap.directives.xml.XmlToJson; +import io.cdap.wrangler.TestingRig; +import io.cdap.wrangler.api.Row; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +/** + * Tests {@link XmlToJson} + */ +public class XmlToJsonTest { + @Test + public void testAutoConversionOfStringField() throws Exception { + String[] directives = new String[] { + "copy body body_1 true", + "copy body body_2 true", + "copy body body_3 true", + "parse-xml-to-json body_1 1", + "parse-xml-to-json body_2 1 false", + "parse-xml-to-json body_3 1 true" + }; + + List rows = Arrays.asList( + new Row("body", + "303246306303E8") + ); + + rows = TestingRig.execute(directives, rows); + Assert.assertEquals(1, rows.size()); + Assert.assertEquals("{\"tagid\":3.03246306303E19}", rows.get(0).getValue("body_1_Data").toString()); + Assert.assertEquals("{\"tagid\":3.03246306303E19}", rows.get(0).getValue("body_2_Data").toString()); + Assert.assertEquals("{\"tagid\":\"303246306303E8\"}", rows.get(0).getValue("body_3_Data").toString()); + } +} diff --git a/wrangler-core/src/test/java/io/cdap/directives/row/SendToErrorAndContinueTest.java b/wrangler-core/src/test/java/io/cdap/directives/row/SendToErrorAndContinueTest.java index f981e14a5..718b2f488 100644 --- a/wrangler-core/src/test/java/io/cdap/directives/row/SendToErrorAndContinueTest.java +++ b/wrangler-core/src/test/java/io/cdap/directives/row/SendToErrorAndContinueTest.java @@ -107,4 +107,29 @@ public void testErrorConditionTrueAndContinueWithTransientVariable() throws Exce Assert.assertEquals(2, errors.size()); Assert.assertEquals(3, results.size()); } + + @Test + public void testErrorConditionFalseAndContinueWithTransientVariable() throws Exception { + String[] directives = new String[] { + "parse-as-csv body , true", + "drop body", + "send-to-error-and-continue exp:{body_3 == 'xyzw'} 'invalid value'", + "send-to-error-and-continue exp:{body_4=='1000'} 'junk' ", + "send-to-error exp:{dq_failure >= 1} " + }; + + List rows = Arrays.asList( + new Row("body", "1020134.298,,1,2,2 "), + new Row("body", "1020134.298,,xx,1,3"), + new Row("body", "1020134.298,,4,1,4"), + new Row("body", "1020134.298,,4,2,5"), + new Row("body", "1020134.298,,1,2,1") + ); + + RecipePipeline pipeline = TestingRig.execute(directives); + List results = pipeline.execute(rows); + List errors = pipeline.errors(); + Assert.assertEquals(0, errors.size()); + Assert.assertEquals(5, results.size()); + } } diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/utils/JsonTestData.java b/wrangler-core/src/test/java/io/cdap/wrangler/utils/JsonTestData.java index aaad5eb2b..b5fa647fc 100644 --- a/wrangler-core/src/test/java/io/cdap/wrangler/utils/JsonTestData.java +++ b/wrangler-core/src/test/java/io/cdap/wrangler/utils/JsonTestData.java @@ -194,4 +194,5 @@ public final class JsonTestData { + " }" + " }"; public static final String EMPTY_OBJECT = "{ \"dividesplitdetails\":{\"type0\":[]}}"; + public static final String NULL_OBJECT = "{ \"dividesplitdetails\":{\"type0\":null, \"type1\":0}}"; } diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/utils/KryoSerializerTest.java b/wrangler-core/src/test/java/io/cdap/wrangler/utils/KryoSerializerTest.java new file mode 100644 index 000000000..10477465e --- /dev/null +++ b/wrangler-core/src/test/java/io/cdap/wrangler/utils/KryoSerializerTest.java @@ -0,0 +1,146 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.wrangler.utils; + +import com.google.common.collect.Lists; +import com.google.gson.JsonParser; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.wrangler.TestingRig; +import io.cdap.wrangler.api.RecipePipeline; +import io.cdap.wrangler.api.RemoteDirectiveResponse; +import io.cdap.wrangler.api.Row; +import org.junit.Assert; +import org.junit.Test; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class KryoSerializerTest { + + private static final String[] TESTS = new String[]{ + JsonTestData.BASIC, + JsonTestData.SIMPLE_JSON_OBJECT, + JsonTestData.ARRAY_OF_OBJECTS, + JsonTestData.JSON_ARRAY_WITH_OBJECT, + JsonTestData.COMPLEX_1, + JsonTestData.ARRAY_OF_NUMBERS, + JsonTestData.ARRAY_OF_STRING, + JsonTestData.COMPLEX_2, + JsonTestData.EMPTY_OBJECT, + JsonTestData.NULL_OBJECT, + JsonTestData.FB_JSON + }; + + private static final String[] directives = new String[]{ + "set-column body json:Parse(body)" + }; + + @Test + public void testJsonTypes() throws Exception { + SchemaConverter converter = new SchemaConverter(); + RecordConvertor recordConvertor = new RecordConvertor(); + JsonParser parser = new JsonParser(); + RecipePipeline executor = TestingRig.execute(directives); + for (String test : TESTS) { + Row row = new Row("body", test); + + List expectedRows = executor.execute(Lists.newArrayList(row)); + byte[] serializedRows = new KryoSerializer().fromRemoteDirectiveResponse( + new RemoteDirectiveResponse(expectedRows, null)); + List gotRows = new KryoSerializer().toRemoteDirectiveResponse(serializedRows).getRows(); + Assert.assertArrayEquals(expectedRows.toArray(), gotRows.toArray()); + } + } + + @Test + public void testLogicalTypes() throws Exception { + Row testRow = new Row(); + testRow.add("id", 1); + testRow.add("name", "abc"); + testRow.add("date", LocalDate.of(2018, 11, 11)); + testRow.add("time", LocalTime.of(11, 11, 11)); + testRow.add("timestamp", ZonedDateTime.of(2018, 11, 11, 11, 11, 11, 0, ZoneId.of("UTC"))); + testRow.add("bigdecimal", new BigDecimal(new BigInteger("123456"), 5)); + testRow.add("datetime", LocalDateTime.now()); + List expectedRows = Collections.singletonList(testRow); + byte[] serializedRows = new KryoSerializer().fromRemoteDirectiveResponse( + new RemoteDirectiveResponse(expectedRows, null)); + List gotRows = new KryoSerializer().toRemoteDirectiveResponse(serializedRows).getRows(); + Assert.assertArrayEquals(expectedRows.toArray(), gotRows.toArray()); + } + + @Test + public void testCollectionTypes() throws Exception { + List list = new ArrayList<>(); + list.add(null); + list.add(1); + list.add(2); + Set set = new HashSet<>(); + set.add(null); + set.add(1); + set.add(2); + Map map = new HashMap<>(); + map.put("null", null); + map.put("1", 1); + map.put("2", 2); + + Row testRow = new Row(); + testRow.add("list", list); + testRow.add("set", set); + testRow.add("map", map); + + List expectedRows = Collections.singletonList(testRow); + byte[] serializedRows = new KryoSerializer().fromRemoteDirectiveResponse( + new RemoteDirectiveResponse(expectedRows, null)); + List gotRows = new KryoSerializer().toRemoteDirectiveResponse(serializedRows).getRows(); + Assert.assertArrayEquals(expectedRows.toArray(), gotRows.toArray()); + } + + @Test + public void testWithSchema() throws Exception { + Row testRow = new Row(); + testRow.add("id", 1); + testRow.add("name", "abc"); + testRow.add("date", LocalDate.of(2018, 11, 11)); + testRow.add("time", LocalTime.of(11, 11, 11)); + testRow.add("timestamp", ZonedDateTime.of(2018, 11, 11, 11, 11, 11, 0, ZoneId.of("UTC"))); + testRow.add("bigdecimal", new BigDecimal(new BigInteger("123456"), 5)); + testRow.add("datetime", LocalDateTime.now()); + List expectedRows = Collections.singletonList(testRow); + + SchemaConverter converter = new SchemaConverter(); + Schema expectedSchema = converter.toSchema("myrecord", expectedRows.get(0)); + + byte[] serializedRows = new KryoSerializer().fromRemoteDirectiveResponse( + new RemoteDirectiveResponse(expectedRows, expectedSchema)); + RemoteDirectiveResponse response = new KryoSerializer().toRemoteDirectiveResponse( + serializedRows); + + Assert.assertArrayEquals(expectedRows.toArray(), response.getRows().toArray()); + Assert.assertEquals(expectedSchema, response.getOutputSchema()); + } +} diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/utils/ObjectSerDeTest.java b/wrangler-core/src/test/java/io/cdap/wrangler/utils/ObjectSerDeTest.java index 689668444..f297b23e8 100644 --- a/wrangler-core/src/test/java/io/cdap/wrangler/utils/ObjectSerDeTest.java +++ b/wrangler-core/src/test/java/io/cdap/wrangler/utils/ObjectSerDeTest.java @@ -17,6 +17,8 @@ package io.cdap.wrangler.utils; import com.google.common.base.Charsets; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.wrangler.api.RemoteDirectiveResponse; import io.cdap.wrangler.api.Row; import org.junit.Assert; import org.junit.Test; @@ -85,4 +87,20 @@ public void testLogicalTypeSerDe() throws Exception { actualRows = objectSerDe.toObject(bytes); Assert.assertEquals(expectedRows.size(), actualRows.size()); } + @Test + public void testRemoteDirectiveResponseSerDe() throws Exception { + List expectedRows = new ArrayList<>(); + Row firstRow = new Row(); + firstRow.add("id", 1); + expectedRows.add(firstRow); + Schema expectedSchema = Schema.recordOf(Schema.Field.of("id", Schema.of(Schema.Type.INT))); + RemoteDirectiveResponse expectedResponse = new RemoteDirectiveResponse(expectedRows, expectedSchema); + ObjectSerDe objectSerDe = new ObjectSerDe<>(); + + byte[] bytes = objectSerDe.toByteArray(expectedResponse); + RemoteDirectiveResponse actualResponse = objectSerDe.toObject(bytes); + + Assert.assertEquals(expectedResponse.getRows().size(), actualResponse.getRows().size()); + Assert.assertEquals(expectedResponse.getOutputSchema(), actualResponse.getOutputSchema()); + } } diff --git a/wrangler-docs/directives/parse-xml-to-json.md b/wrangler-docs/directives/parse-xml-to-json.md index 031633786..beb136b0c 100644 --- a/wrangler-docs/directives/parse-xml-to-json.md +++ b/wrangler-docs/directives/parse-xml-to-json.md @@ -8,11 +8,13 @@ transforms the XML into a JSON document, simplifying further parsing using the ## Syntax ``` -parse-xml-to-json [] +parse-xml-to-json [] [] ``` * `` is the name of the column in the record that is an XML document. * `` indicates the depth at which the XML document parsing should terminate processing. +* `` An OPTIONAL boolean value that if true, then values will not be coerced into boolean or numeric values and will instead be left as strings. (as per `org.json.XML` rules) + The default value is `false` ## Usage Notes diff --git a/wrangler-proto/pom.xml b/wrangler-proto/pom.xml index 9a9cdc8cc..7db666024 100644 --- a/wrangler-proto/pom.xml +++ b/wrangler-proto/pom.xml @@ -18,7 +18,7 @@ wrangler io.cdap.wrangler - 4.10.0-SNAPSHOT + 4.10.1 4.0.0 diff --git a/wrangler-service/pom.xml b/wrangler-service/pom.xml index e6aaf1866..04d616c91 100644 --- a/wrangler-service/pom.xml +++ b/wrangler-service/pom.xml @@ -18,7 +18,7 @@ wrangler io.cdap.wrangler - 4.10.0-SNAPSHOT + 4.10.1 4.0.0 diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteDirectiveRequest.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteDirectiveRequest.java index 9b77f23f3..d3e6d959f 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteDirectiveRequest.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteDirectiveRequest.java @@ -15,6 +15,7 @@ */ package io.cdap.wrangler.service.directive; +import io.cdap.cdap.api.data.schema.Schema; import io.cdap.wrangler.parser.DirectiveClass; import java.util.HashMap; @@ -29,13 +30,15 @@ public class RemoteDirectiveRequest { private final Map systemDirectives; private final String pluginNameSpace; private final byte[] data; + private final Schema inputSchema; RemoteDirectiveRequest(String recipe, Map systemDirectives, - String pluginNameSpace, byte[] data) { + String pluginNameSpace, byte[] data, Schema inputSchema) { this.recipe = recipe; this.systemDirectives = new HashMap<>(systemDirectives); this.pluginNameSpace = pluginNameSpace; this.data = data; + this.inputSchema = inputSchema; } public String getRecipe() { @@ -53,4 +56,8 @@ public byte[] getData() { public String getPluginNameSpace() { return pluginNameSpace; } + + public Schema getInputSchema() { + return inputSchema; + } } diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java index 27216247f..8e5e4cbef 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/RemoteExecutionTask.java @@ -16,9 +16,13 @@ package io.cdap.wrangler.service.directive; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.service.worker.RunnableTask; import io.cdap.cdap.api.service.worker.RunnableTaskContext; import io.cdap.cdap.api.service.worker.SystemAppTaskContext; +import io.cdap.cdap.features.Feature; +import io.cdap.cdap.internal.io.SchemaTypeAdapter; import io.cdap.directives.aggregates.DefaultTransientStore; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.CompileException; @@ -29,7 +33,10 @@ import io.cdap.wrangler.api.ErrorRecordBase; import io.cdap.wrangler.api.ExecutorContext; import io.cdap.wrangler.api.RecipeException; +import io.cdap.wrangler.api.RemoteDirectiveResponse; import io.cdap.wrangler.api.Row; +import io.cdap.wrangler.api.TransientStore; +import io.cdap.wrangler.api.TransientVariableScope; import io.cdap.wrangler.api.parser.UsageDefinition; import io.cdap.wrangler.executor.RecipePipelineExecutor; import io.cdap.wrangler.expression.EL; @@ -42,27 +49,44 @@ import io.cdap.wrangler.proto.ErrorRecordsException; import io.cdap.wrangler.registry.DirectiveInfo; import io.cdap.wrangler.registry.UserDirectiveRegistry; +import io.cdap.wrangler.utils.KryoSerializer; import io.cdap.wrangler.utils.ObjectSerDe; - import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.cdap.wrangler.schema.TransientStoreKeys.INPUT_SCHEMA; +import static io.cdap.wrangler.schema.TransientStoreKeys.OUTPUT_SCHEMA; /** * Task for remote execution of directives */ public class RemoteExecutionTask implements RunnableTask { + private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutionTask.class); - private static final Gson GSON = new Gson(); + private static final Gson GSON = new GsonBuilder() + .registerTypeAdapter(Schema.class, new SchemaTypeAdapter()) + .create(); @Override public void run(RunnableTaskContext runnableTaskContext) throws Exception { RemoteDirectiveRequest directiveRequest = GSON.fromJson(runnableTaskContext.getParam(), RemoteDirectiveRequest.class); + LOG.info("Inside run method"); + LOG.info("directiveRequest: {}", directiveRequest); + LOG.info("directiveRequest getData :{}", directiveRequest.getData()); + LOG.info("directiveRequest getRecipe :{}", directiveRequest.getRecipe()); + LOG.info("directiveRequest getInputSchema: {}", directiveRequest.getInputSchema()); + LOG.info("directiveRequest getSystemDirectives:{}", directiveRequest.getSystemDirectives()); + LOG.info("directiveRequest getPluginNameSpace:{}", directiveRequest.getPluginNameSpace()); + SystemAppTaskContext systemAppContext = runnableTaskContext.getRunnableTaskSystemAppContext(); + LOG.info("systemAppContext: {}", systemAppContext); String namespace = directiveRequest.getPluginNameSpace(); Map systemDirectives = directiveRequest.getSystemDirectives(); AtomicBoolean hasUDD = new AtomicBoolean(); @@ -102,12 +126,18 @@ public void run(RunnableTaskContext runnableTaskContext) throws Exception { ObjectSerDe> objectSerDe = new ObjectSerDe<>(); List rows = objectSerDe.toObject(directiveRequest.getData()); + Schema inputSchema = directiveRequest.getInputSchema(); + TransientStore transientStore = new DefaultTransientStore(); + if (inputSchema != null) { + transientStore.set(TransientVariableScope.GLOBAL, INPUT_SCHEMA, inputSchema); + } + try (RecipePipelineExecutor executor = new RecipePipelineExecutor(() -> directives, new ServicePipelineContext( namespace, ExecutorContext.Environment.SERVICE, systemAppContext, - new DefaultTransientStore()))) { + transientStore))) { rows = executor.execute(rows); List errors = executor.errors().stream() .filter(ErrorRecordBase::isShownInWrangler) @@ -120,8 +150,17 @@ public void run(RunnableTaskContext runnableTaskContext) throws Exception { throw new BadRequestException(e.getMessage(), e); } + Schema outputSchema = transientStore.get(OUTPUT_SCHEMA); + RemoteDirectiveResponse response = new RemoteDirectiveResponse(rows, outputSchema); + ObjectSerDe responseSerDe = new ObjectSerDe<>(); + runnableTaskContext.setTerminateOnComplete(hasUDD.get() || EL.isUsed()); - runnableTaskContext.writeResult(objectSerDe.toByteArray(rows)); + + if (Feature.WRANGLER_KRYO_SERIALIZATION.isEnabled(systemAppContext)) { + runnableTaskContext.writeResult(new KryoSerializer().fromRemoteDirectiveResponse(response)); + } else { + runnableTaskContext.writeResult(responseSerDe.toByteArray(response)); + } } catch (DirectiveParseException | ClassNotFoundException | CompileException e) { throw new BadRequestException(e.getMessage(), e); } diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java index 20d4d3b2f..9a01df5f6 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/WorkspaceHandler.java @@ -39,6 +39,7 @@ import io.cdap.cdap.etl.proto.ArtifactSelectorConfig; import io.cdap.cdap.etl.proto.connection.ConnectorDetail; import io.cdap.cdap.etl.proto.connection.SampleResponse; +import io.cdap.cdap.features.Feature; import io.cdap.cdap.internal.io.SchemaTypeAdapter; import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.wrangler.PropertyIds; @@ -48,8 +49,10 @@ import io.cdap.wrangler.api.DirectiveParseException; import io.cdap.wrangler.api.GrammarMigrator; import io.cdap.wrangler.api.RecipeException; +import io.cdap.wrangler.api.RemoteDirectiveResponse; import io.cdap.wrangler.api.Row; import io.cdap.wrangler.api.TransientVariableScope; +import io.cdap.wrangler.lineage.LineageOperations; import io.cdap.wrangler.parser.ConfigDirectiveContext; import io.cdap.wrangler.parser.DirectiveClass; import io.cdap.wrangler.parser.GrammarWalker; @@ -77,6 +80,7 @@ import io.cdap.wrangler.schema.TransientStoreKeys; import io.cdap.wrangler.store.recipe.RecipeStore; import io.cdap.wrangler.store.workspace.WorkspaceStore; +import io.cdap.wrangler.utils.KryoSerializer; import io.cdap.wrangler.utils.ObjectSerDe; import io.cdap.wrangler.utils.RowHelper; import io.cdap.wrangler.utils.SchemaConverter; @@ -100,12 +104,17 @@ import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.cdap.wrangler.schema.TransientStoreKeys.INPUT_SCHEMA; +import static io.cdap.wrangler.schema.TransientStoreKeys.OUTPUT_SCHEMA; /** * V2 endpoints for workspace */ public class WorkspaceHandler extends AbstractDirectiveHandler { - + private static final Logger LOG = LoggerFactory.getLogger(WorkspaceHandler.class); private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).create(); private static final Pattern PRAGMA_PATTERN = Pattern.compile("^\\s*#pragma\\s+load-directives\\s+"); @@ -464,14 +473,16 @@ private DirectiveExecutionResponse execute(NamespaceSummary ns, HttpServiceReque DirectiveExecutionRequest executionRequest = GSON.fromJson(StandardCharsets.UTF_8.decode(request.getContent()).toString(), DirectiveExecutionRequest.class); - + LOG.info("DirectiveExecutionRequest: directives: {} limit: {}", executionRequest.getDirectives(), executionRequest.getLimit()); List directives = new ArrayList<>(executionRequest.getDirectives()); if (recipeDirectives != null) { directives.addAll(recipeDirectives); } WorkspaceDetail detail = wsStore.getWorkspaceDetail(workspaceId); + LOG.info("WorkspaceDetail: {}", detail); UserDirectivesCollector userDirectivesCollector = new UserDirectivesCollector(); + LOG.info("UserDirectivesCollector: {}", userDirectivesCollector); List result = executeDirectives(ns.getName(), directives, detail, userDirectivesCollector); DirectiveExecutionResponse response = generateExecutionResponse(result, @@ -557,6 +568,8 @@ private List executeDirectives(String namespace, TRANSIENT_STORE.reset(TransientVariableScope.GLOBAL); TRANSIENT_STORE.set(TransientVariableScope.GLOBAL, TransientStoreKeys.INPUT_SCHEMA, inputSchema); } + LOG.info("SampleSpec: {}",detail.getWorkspace().getSampleSpec()); + return getContext().isRemoteTaskEnabled() ? executeRemotely(namespace, directives, detail, grammarVisitor) : @@ -598,6 +611,7 @@ private List executeRemotely(String namespace, List systemDirectives = new HashMap<>(); // Gather system directives and call additional visitor. @@ -616,14 +630,32 @@ private List executeRemotely(String namespace, List>().toObject(bytes); + RemoteDirectiveResponse response; + if (Feature.WRANGLER_KRYO_SERIALIZATION.isEnabled(getContext())) { + response = new KryoSerializer().toRemoteDirectiveResponse(bytes); + } else { + response = new ObjectSerDe().toObject(bytes); + } + if (response.getOutputSchema() != null) { + TRANSIENT_STORE.set(TransientVariableScope.GLOBAL, OUTPUT_SCHEMA, response.getOutputSchema()); + } + LOG.info("response: {}", response); + return response.getRows(); } private List getSample(SampleResponse sampleResponse) { diff --git a/wrangler-storage/pom.xml b/wrangler-storage/pom.xml index a7549b03f..a8e339631 100644 --- a/wrangler-storage/pom.xml +++ b/wrangler-storage/pom.xml @@ -18,7 +18,7 @@ wrangler io.cdap.wrangler - 4.10.0-SNAPSHOT + 4.10.1 4.0.0 diff --git a/wrangler-test/pom.xml b/wrangler-test/pom.xml index 95be5b354..d9cf77b4c 100644 --- a/wrangler-test/pom.xml +++ b/wrangler-test/pom.xml @@ -19,7 +19,7 @@ wrangler io.cdap.wrangler - 4.10.0-SNAPSHOT + 4.10.1 4.0.0 diff --git a/wrangler-transform/pom.xml b/wrangler-transform/pom.xml index b09712c0d..e464a24d5 100644 --- a/wrangler-transform/pom.xml +++ b/wrangler-transform/pom.xml @@ -3,7 +3,7 @@ wrangler io.cdap.wrangler - 4.10.0-SNAPSHOT + 4.10.1 4.0.0 @@ -122,8 +122,8 @@ 1.1.0 - system:cdap-data-pipeline[6.4.0,7.0.0-SNAPSHOT) - system:cdap-data-streams[6.4.0,7.0.0-SNAPSHOT) + system:cdap-data-pipeline[6.10.0,7.0.0-SNAPSHOT) + system:cdap-data-streams[6.10.0,7.0.0-SNAPSHOT)