Skip to content

Commit

Permalink
ITN coverage for SQl engine
Browse files Browse the repository at this point in the history
  • Loading branch information
AnkitCLI committed Apr 15, 2024
1 parent 3171ef2 commit f2087f7
Show file tree
Hide file tree
Showing 20 changed files with 582 additions and 4 deletions.
209 changes: 209 additions & 0 deletions src/e2e-test/features/bigquery/source/BigQuerySqlEngine.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
# Copyright © 2024 Cask Data, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.

@BigQuery_Sink
Feature: BigQuery sink - Verification of BigQuery to BigQuery successful data transfer

@BQ_SOURCE_JOINER_TEST @BQ_SOURCE_JOINER2_TEST @BQ_DELETE_JOIN @BQ_SINK_TEST @EXISTING_BQ_CONNECTION
Scenario:Validate successful records transfer from BigQuery source to BigQuery sink using Join
Given Open Datafusion Project to configure pipeline
When Expand Plugin group in the LHS plugins list: "Source"
When Select plugin: "BigQuery" from the plugins list as: "Source"
When Expand Plugin group in the LHS plugins list: "Analytics"
When Select plugin: "Joiner" from the plugins list as: "Analytics"
Then Navigate to the properties page of plugin: "BigQuery"
Then Click plugin property: "switch-useConnection"
Then Click on the Browse Connections button
Then Select connection: "bqConnectionName"
Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
And Replace input plugin property: "dataset" with value: "dataset"
And Replace input plugin property: "table" with value: "bqSourceTable"
Then Click on the Get Schema button
Then Validate "BigQuery" plugin properties
And Close the Plugin Properties page
When Expand Plugin group in the LHS plugins list: "Source"
When Select plugin: "BigQuery" from the plugins list as: "Source"
When Expand Plugin group in the LHS plugins list: "Sink"
When Select plugin: "BigQuery" from the plugins list as: "Sink"
Then Connect plugins: "BigQuery" and "Joiner" to establish connection
Then Connect plugins: "BigQuery2" and "Joiner" to establish connection
Then Connect plugins: "Joiner" and "BigQuery3" to establish connection
Then Navigate to the properties page of plugin: "BigQuery2"
Then Click plugin property: "useConnection"
Then Click on the Browse Connections button
Then Select connection: "bqConnectionName"
Then Enter input plugin property: "referenceName" with value: "BQRefName"
Then Enter input plugin property: "dataset" with value: "dataset"
Then Enter input plugin property: "table" with value: "bqSourceTable2"
Then Validate "BigQuery2" plugin properties
And Close the Plugin Properties page
Then Navigate to the properties page of plugin: "Joiner"
Then Select radio button plugin property: "conditionType" with value: "basic"
Then Click on the Get Schema button
Then Validate "Joiner" plugin properties
Then Close the Plugin Properties page
Then Navigate to the properties page of plugin: "BigQuery3"
Then Click plugin property: "useConnection"
Then Click on the Browse Connections button
Then Select connection: "bqConnectionName"
Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName"
Then Enter input plugin property: "dataset" with value: "dataset"
Then Enter input plugin property: "table" with value: "bqTargetTable"
Then Validate "BigQuery3" plugin properties
Then Close the Plugin Properties page
Then Save the pipeline
Then Preview and run the pipeline
Then Wait till pipeline preview is in running state
Then Open and capture pipeline preview logs
Then Verify the preview run status of pipeline in the logs is "succeeded"
Then Close the pipeline logs
Then Close the preview
Then Deploy the pipeline
Then Click on "Configure" button
Then Click on "Transformation Pushdown" button
Then Click on "Enable Transformation Pushdown" button
Then Enter input plugin property: "dataset" with value: "test_sqlengine"
Then Click on "Advanced" button
Then Click plugin property: "useConnection"
Then Click on the Browse Connections button
Then Select connection: "bqConnectionName"
Then Click on "Save" button
Then Run the Pipeline in Runtime
Then Wait till pipeline is in running state
Then Open and capture logs
Then Close the pipeline logs
Then Verify the pipeline status is "Succeeded"
Then Validate The Data From BQ To BQ With Actual And Expected File for: "bqExpectedFileJoin"

@BQ_SOURCE_SQLENGINE_TEST @BQ_SINK_TEST @EXISTING_BQ_CONNECTION
Scenario:Validate successful records transfer from BigQuery source to BigQuery sink using group by
Given Open Datafusion Project to configure pipeline
When Expand Plugin group in the LHS plugins list: "Source"
When Select plugin: "BigQuery" from the plugins list as: "Source"
When Expand Plugin group in the LHS plugins list: "Sink"
When Select plugin: "BigQuery" from the plugins list as: "Sink"
When Expand Plugin group in the LHS plugins list: "Analytics"
When Select plugin: "Group By" from the plugins list as: "Analytics"
Then Navigate to the properties page of plugin: "BigQuery"
Then Click plugin property: "switch-useConnection"
Then Click on the Browse Connections button
Then Select connection: "bqConnectionName"
Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
And Replace input plugin property: "dataset" with value: "dataset"
And Replace input plugin property: "table" with value: "bqSourceTable"
Then Click on the Get Schema button
Then Validate "BigQuery" plugin properties
And Close the Plugin Properties page
Then Connect plugins: "BigQuery" and "Group By" to establish connection
Then Connect plugins: "Group By" and "BigQuery2" to establish connection
Then Navigate to the properties page of plugin: "Group By"
Then Select dropdown plugin property: "groupByFields" with option value: "groupByValidFirstField"
Then Press Escape Key
Then Select dropdown plugin property: "groupByFields" with option value: "groupByValidSecondField"
Then Press Escape Key
Then Enter GroupBy plugin Fields to be Aggregate "groupByGcsAggregateFields"
Then Click on the Get Schema button
Then Click on the Validate button
Then Close the Plugin Properties page
Then Navigate to the properties page of plugin: "BigQuery2"
Then Click plugin property: "useConnection"
Then Click on the Browse Connections button
Then Select connection: "bqConnectionName"
Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName"
Then Enter input plugin property: "dataset" with value: "dataset"
Then Enter input plugin property: "table" with value: "bqTargetTable"
Then Validate "BigQuery" plugin properties
And Close the Plugin Properties page
Then Save the pipeline
Then Preview and run the pipeline
Then Wait till pipeline preview is in running state
Then Open and capture pipeline preview logs
Then Verify the preview run status of pipeline in the logs is "succeeded"
Then Close the pipeline logs
Then Close the preview
Then Deploy the pipeline
Then Click on "Configure" button
Then Click on "Transformation Pushdown" button
Then Click on "Enable Transformation Pushdown" button
Then Enter input plugin property: "dataset" with value: "test_sqlengine"
Then Click on "Advanced" button
Then Click plugin property: "useConnection"
Then Click on the Browse Connections button
Then Select connection: "bqConnectionName"
Then Click on "Save" button
Then Run the Pipeline in Runtime
Then Wait till pipeline is in running state
Then Open and capture logs
Then Close the pipeline logs
Then Verify the pipeline status is "Succeeded"
Then Validate The Data From BQ To BQ With Actual And Expected File for: "groupByTestOutputFile"

@BQ_SOURCE_SQLENGINE_TEST @BQ_SINK_TEST @EXISTING_BQ_CONNECTION
Scenario:Validate successful records transfer from BigQuery source to BigQuery sink using deduplicate
Given Open Datafusion Project to configure pipeline
When Expand Plugin group in the LHS plugins list: "Source"
When Select plugin: "BigQuery" from the plugins list as: "Source"
When Expand Plugin group in the LHS plugins list: "Sink"
When Select plugin: "BigQuery" from the plugins list as: "Sink"
When Expand Plugin group in the LHS plugins list: "Analytics"
When Select plugin: "Deduplicate" from the plugins list as: "Analytics"
Then Navigate to the properties page of plugin: "BigQuery"
Then Click plugin property: "switch-useConnection"
Then Click on the Browse Connections button
Then Select connection: "bqConnectionName"
Then Enter input plugin property: "referenceName" with value: "BQReferenceName"
And Replace input plugin property: "dataset" with value: "dataset"
And Replace input plugin property: "table" with value: "bqSourceTable"
Then Click on the Get Schema button
Then Validate "BigQuery" plugin properties
And Close the Plugin Properties page
Then Connect plugins: "BigQuery" and "Deduplicate" to establish connection
Then Connect plugins: "Deduplicate" and "BigQuery2" to establish connection
Then Navigate to the properties page of plugin: "Deduplicate"
Then Select dropdown plugin property: "uniqueFields" with option value: "DeduplicateValidFirstField"
Then Press Escape Key
Then Click on the Validate button
Then Close the Plugin Properties page
Then Navigate to the properties page of plugin: "BigQuery2"
Then Click plugin property: "useConnection"
Then Click on the Browse Connections button
Then Select connection: "bqConnectionName"
Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName"
Then Enter input plugin property: "dataset" with value: "dataset"
Then Enter input plugin property: "table" with value: "bqTargetTable"
Then Validate "BigQuery" plugin properties
And Close the Plugin Properties page
Then Save the pipeline
Then Preview and run the pipeline
Then Wait till pipeline preview is in running state
Then Open and capture pipeline preview logs
Then Verify the preview run status of pipeline in the logs is "succeeded"
Then Close the pipeline logs
Then Close the preview
Then Deploy the pipeline
Then Click on "Configure" button
Then Click on "Transformation Pushdown" button
Then Click on "Enable Transformation Pushdown" button
Then Enter input plugin property: "dataset" with value: "test_sqlengine"
Then Click on "Advanced" button
Then Click plugin property: "useConnection"
Then Click on the Browse Connections button
Then Select connection: "bqConnectionName"
Then Click on "Save" button
Then Run the Pipeline in Runtime
Then Wait till pipeline is in running state
Then Open and capture logs
Then Close the pipeline logs
Then Verify the pipeline status is "Succeeded"
Then Validate The Data From BQ To BQ With Actual And Expected File for: "deduplicateTestOutputFile"
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
@CucumberOptions(
features = {"src/e2e-test/features"},
glue = {"io.cdap.plugin.bigquery.stepsdesign", "io.cdap.plugin.gcs.stepsdesign",
"stepsdesign", "io.cdap.plugin.common.stepsdesign"},
"stepsdesign", "io.cdap.plugin.common.stepsdesign", "io.cdap.plugin.groupby.actions",
"io.cdap.plugin.groupby.locators", "io.cdap.plugin.groupby.stepsdesign"},
tags = {"@BigQuery_Sink and not @CDAP-20830"},
//TODO: Enable test once issue is fixed https://cdap.atlassian.net/browse/CDAP-20830
monochrome = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
@CucumberOptions(
features = {"src/e2e-test/features"},
glue = {"io.cdap.plugin.bigquery.stepsdesign", "io.cdap.plugin.gcs.stepsdesign",
"stepsdesign", "io.cdap.plugin.common.stepsdesign"},
"stepsdesign", "io.cdap.plugin.common.stepsdesign", "io.cdap.plugin.groupby.actions",
"io.cdap.plugin.groupby.locators", "io.cdap.plugin.groupby.stepsdesign"},
tags = {"@BigQuery_Sink_Required"},
monochrome = true,
//TODO: Enable test once issue is fixed https://cdap.atlassian.net/browse/CDAP-20830
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import stepsdesign.BeforeActions;

import java.io.IOException;
import java.net.URISyntaxException;

/**
* BigQuery Plugin validation common step design.
Expand All @@ -44,4 +45,13 @@ public void validateTheValuesOfRecordsTransferredToBQsinkIsEqualToTheValuesFromS
Assert.assertTrue("Value of records transferred to the BQ sink should be equal to the value " +
"of the records in the source table", recordsMatched);
}

@Then("Validate The Data From BQ To BQ With Actual And Expected File for: {string}")
public void validateTheDataFromBQToBQWithActualAndExpectedFileFor(String expectedFile) throws IOException,
InterruptedException, URISyntaxException {
boolean recordsMatched = ValidationHelperSqlEngine.validateActualDataToExpectedData(
PluginPropertyUtils.pluginProp("bqTargetTable"),
PluginPropertyUtils.pluginProp(expectedFile));
Assert.assertTrue("Value of records in actual and expected file is equal", recordsMatched);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.cdap.plugin.bigquery.stepsdesign;

import com.esotericsoftware.minlog.Log;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.TableResult;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.cdap.e2e.utils.BigQueryClient;
import io.cdap.e2e.utils.PluginPropertyUtils;
import io.cucumber.core.logging.Logger;
import io.cucumber.core.logging.LoggerFactory;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;

/**
* Validation Helper.
*/
public class ValidationHelperSqlEngine {

private static final Logger LOG = LoggerFactory.getLogger(ValidationHelperSqlEngine.class);
static Gson gson = new Gson();

/**
* Validates the actual data from a BigQuery table against the expected data from a file.
*
* @param table The name of the BigQuery table to fetch data from
* @param fileName The name of the file containing the expected data
* @return True if the actual data matches the expected data, otherwise false
*/
public static boolean validateActualDataToExpectedData(String table, String fileName) throws IOException,
InterruptedException, URISyntaxException {
// Initialize maps to store data from BigQuery and file
Map<String, JsonObject> bigQueryMap = new HashMap<>();
Map<String, JsonObject> fileMap = new HashMap<>();
// Get the path of the expected file
Path importExpectedFile = Paths.get(ValidationHelperSqlEngine.class.getResource("/" + fileName).toURI());

getBigQueryTableData(table, bigQueryMap);
getFileData(importExpectedFile.toString(), fileMap);

// Compare the data from BigQuery with the data from the file
boolean isMatched = bigQueryMap.equals(fileMap);

return isMatched;
}

public static void getFileData(String fileName, Map<String, JsonObject> fileMap) {
try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
String line;
while ((line = br.readLine()) != null) {
JsonObject json = gson.fromJson(line, JsonObject.class);
if (json.has("id")) { // Check if the JSON object has the "id" key
JsonElement idElement = json.get("id");
if (idElement.isJsonPrimitive()) {
String idKey = idElement.getAsString();
fileMap.put(idKey, json);
} else {
Log.error("ID key not found");
}
}
}
} catch (IOException e) {
System.err.println("Error reading the file: " + e.getMessage());
}
}

private static void getBigQueryTableData(String targetTable, Map<String, JsonObject> bigQueryMap)
throws IOException, InterruptedException {
String dataset = PluginPropertyUtils.pluginProp("dataset");
String projectId = PluginPropertyUtils.pluginProp("projectId");
String selectQuery = "SELECT TO_JSON(t) FROM `" + projectId + "." + dataset + "." + targetTable + "` AS t";
TableResult result = BigQueryClient.getQueryResult(selectQuery);

for (FieldValueList row : result.iterateAll()) {
JsonObject json = gson.fromJson(row.get(0).getStringValue(), JsonObject.class);
if (json.has("id")) { // Check if the JSON object has the "id" key
JsonElement idElement = json.get("id");
if (idElement.isJsonPrimitive()) {
String idKey = idElement.getAsString();
bigQueryMap.put(idKey, json);
} else {
LOG.error("Data Mismatched");
}
} else {
LOG.error("ID Key not found in JSON object");
}
}
}
}
Loading

0 comments on commit f2087f7

Please sign in to comment.