Skip to content

Commit

Permalink
predicate-pushdown changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Trianz-Akshay authored and hackett123 committed May 16, 2023
1 parent 778ee89 commit 1835354
Show file tree
Hide file tree
Showing 74 changed files with 1,317 additions and 137 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*-
* #%L
* athena-cloudera-hive
* %%
* Copyright (C) 2019 - 2022 Amazon Web Services
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package com.amazonaws.athena.connectors.cloudera;

import com.amazonaws.athena.connectors.jdbc.manager.JdbcFederationExpressionParser;
import com.google.common.base.Joiner;
import org.apache.arrow.vector.types.pojo.ArrowType;

import java.util.List;

public class HiveFederationExpressionParser extends JdbcFederationExpressionParser
{
public HiveFederationExpressionParser(String quoteChar)
{
super(quoteChar);
}

@Override
public String writeArrayConstructorClause(ArrowType type, List<String> arguments)
{
return Joiner.on(", ").join(arguments);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* #L%
*/
package com.amazonaws.athena.connectors.cloudera;

import com.amazonaws.athena.connector.lambda.QueryStatusChecker;
import com.amazonaws.athena.connector.lambda.data.Block;
import com.amazonaws.athena.connector.lambda.data.BlockAllocator;
Expand All @@ -27,12 +28,21 @@
import com.amazonaws.athena.connector.lambda.data.SupportedTypes;
import com.amazonaws.athena.connector.lambda.domain.Split;
import com.amazonaws.athena.connector.lambda.domain.TableName;
import com.amazonaws.athena.connector.lambda.domain.predicate.functions.StandardFunctions;
import com.amazonaws.athena.connector.lambda.domain.spill.SpillLocation;
import com.amazonaws.athena.connector.lambda.metadata.GetDataSourceCapabilitiesRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetDataSourceCapabilitiesResponse;
import com.amazonaws.athena.connector.lambda.metadata.GetSplitsRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetSplitsResponse;
import com.amazonaws.athena.connector.lambda.metadata.GetTableLayoutRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetTableRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetTableResponse;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.DataSourceOptimizations;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.OptimizationSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.ComplexExpressionPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.LimitPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.TopNPushdownSubType;
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig;
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo;
import com.amazonaws.athena.connectors.jdbc.connection.JdbcConnectionFactory;
Expand All @@ -42,6 +52,8 @@
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
Expand All @@ -56,13 +68,18 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static com.amazonaws.athena.connector.lambda.domain.predicate.functions.StandardFunctions.IS_DISTINCT_FROM_OPERATOR_FUNCTION_NAME;
import static com.amazonaws.athena.connector.lambda.domain.predicate.functions.StandardFunctions.NULLIF_FUNCTION_NAME;

public class HiveMetadataHandler extends JdbcMetadataHandler
{
static final Logger LOGGER = LoggerFactory.getLogger(HiveMetadataHandler.class);
Expand Down Expand Up @@ -225,6 +242,38 @@ public GetSplitsResponse doGetSplits(BlockAllocator blockAllocator, GetSplitsReq
}
return new GetSplitsResponse(getSplitsRequest.getCatalogName(), splits, null);
}

/**
* Overridden this method to describe the types of capabilities supported by a data source
* @param allocator Tool for creating and managing Apache Arrow Blocks.
* @param request Provides details about the catalog being used.
* @return A GetDataSourceCapabilitiesResponse object which returns a map of supported capabilities
*/
@Override
public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAllocator allocator, GetDataSourceCapabilitiesRequest request)
{
Set<StandardFunctions> unsupportedFunctions = ImmutableSet.of(NULLIF_FUNCTION_NAME, IS_DISTINCT_FROM_OPERATOR_FUNCTION_NAME);
ImmutableMap.Builder<String, List<OptimizationSubType>> capabilities = ImmutableMap.builder();
capabilities.put(DataSourceOptimizations.SUPPORTS_FILTER_PUSHDOWN.withSupportedSubTypes(
FilterPushdownSubType.SORTED_RANGE_SET, FilterPushdownSubType.NULLABLE_COMPARISON
));
capabilities.put(DataSourceOptimizations.SUPPORTS_COMPLEX_EXPRESSION_PUSHDOWN.withSupportedSubTypes(
ComplexExpressionPushdownSubType.SUPPORTED_FUNCTION_EXPRESSION_TYPES
.withSubTypeProperties(Arrays.stream(StandardFunctions.values())
.filter(values -> !unsupportedFunctions.contains(values))
.map(standardFunctions -> standardFunctions.getFunctionName().getFunctionName())
.toArray(String[]::new))
));
capabilities.put(DataSourceOptimizations.SUPPORTS_TOP_N_PUSHDOWN.withSupportedSubTypes(
TopNPushdownSubType.SUPPORTS_ORDER_BY
));
capabilities.put(DataSourceOptimizations.SUPPORTS_LIMIT_PUSHDOWN.withSupportedSubTypes(
LimitPushdownSubType.INTEGER_CONSTANT
));

return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build());
}

private int decodeContinuationToken(GetSplitsRequest request)
{
if (request.hasContinuationToken()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package com.amazonaws.athena.connectors.cloudera;

import com.amazonaws.athena.connector.lambda.domain.Split;
import com.amazonaws.athena.connectors.jdbc.manager.FederationExpressionParser;
import com.amazonaws.athena.connectors.jdbc.manager.JdbcSplitQueryBuilder;
import com.google.common.base.Strings;

Expand All @@ -28,9 +29,9 @@

public class HiveQueryStringBuilder extends JdbcSplitQueryBuilder
{
public HiveQueryStringBuilder(String quoteCharacters)
public HiveQueryStringBuilder(final String quoteCharacters, final FederationExpressionParser federationExpressionParser)
{
super(quoteCharacters);
super(quoteCharacters, federationExpressionParser);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public HiveRecordHandler(DatabaseConnectionConfig databaseConnectionConfig, java
public HiveRecordHandler(DatabaseConnectionConfig databaseConnectionConfig, JdbcConnectionFactory jdbcConnectionFactory, java.util.Map<String, String> configOptions)
{
this(databaseConnectionConfig, AmazonS3ClientBuilder.defaultClient(), AWSSecretsManagerClientBuilder.defaultClient(), AmazonAthenaClientBuilder.defaultClient(),
jdbcConnectionFactory, new HiveQueryStringBuilder(HIVE_QUOTE_CHARACTER), configOptions);
jdbcConnectionFactory, new HiveQueryStringBuilder(HIVE_QUOTE_CHARACTER, new HiveFederationExpressionParser(HIVE_QUOTE_CHARACTER)), configOptions);
}
@VisibleForTesting
HiveRecordHandler(DatabaseConnectionConfig databaseConnectionConfig, AmazonS3 amazonS3, AWSSecretsManager secretsManager, AmazonAthena athena, JdbcConnectionFactory jdbcConnectionFactory, JdbcSplitQueryBuilder jdbcSplitQueryBuilder, java.util.Map<String, String> configOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import static com.amazonaws.athena.connectors.cloudera.HiveConstants.HIVE_QUOTE_CHARACTER;

@SuppressWarnings("deprecation")
@RunWith(MockitoJUnitRunner.class)
public class HiveQueryStringBuilderTest
Expand All @@ -39,7 +41,7 @@ public void testQueryBuilder()
{
String expectedFrom1 = " FROM default.schema.table ";
String expectedFrom2 = " FROM default.table ";
HiveQueryStringBuilder builder = new HiveQueryStringBuilder("");
HiveQueryStringBuilder builder = new HiveQueryStringBuilder(HIVE_QUOTE_CHARACTER, new HiveFederationExpressionParser(HIVE_QUOTE_CHARACTER));
String fromResult1 = builder.getFromClauseWithSplit("default", "schema", "table", split);
String fromResult2 = builder.getFromClauseWithSplit("default", "", "table", split);
Assert.assertEquals(expectedFrom1, fromResult1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,6 @@

package com.amazonaws.athena.connectors.cloudera;

import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig;
import com.amazonaws.athena.connectors.jdbc.connection.JdbcConnectionFactory;
import com.amazonaws.athena.connectors.jdbc.connection.JdbcCredentialProvider;
import com.amazonaws.athena.connectors.jdbc.manager.JdbcSplitQueryBuilder;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import com.amazonaws.athena.connector.lambda.data.FieldBuilder;
import com.amazonaws.athena.connector.lambda.data.SchemaBuilder;
import com.amazonaws.athena.connector.lambda.domain.Split;
Expand All @@ -46,15 +28,32 @@
import com.amazonaws.athena.connector.lambda.domain.predicate.Range;
import com.amazonaws.athena.connector.lambda.domain.predicate.SortedRangeSet;
import com.amazonaws.athena.connector.lambda.domain.predicate.ValueSet;
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig;
import com.amazonaws.athena.connectors.jdbc.connection.JdbcConnectionFactory;
import com.amazonaws.athena.connectors.jdbc.connection.JdbcCredentialProvider;
import com.amazonaws.athena.connectors.jdbc.manager.JdbcSplitQueryBuilder;
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import com.amazonaws.services.secretsmanager.model.GetSecretValueRequest;
import com.amazonaws.services.secretsmanager.model.GetSecretValueResult;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

import static org.mockito.ArgumentMatchers.any;
import static com.amazonaws.athena.connectors.cloudera.HiveConstants.HIVE_QUOTE_CHARACTER;
import static org.mockito.ArgumentMatchers.nullable;

public class HiveRecordHandlerTest
Expand All @@ -78,7 +77,7 @@ public void setup()
this.connection = Mockito.mock(Connection.class);
this.jdbcConnectionFactory = Mockito.mock(JdbcConnectionFactory.class);
Mockito.when(this.jdbcConnectionFactory.getConnection(nullable(JdbcCredentialProvider.class))).thenReturn(this.connection);
jdbcSplitQueryBuilder = new HiveQueryStringBuilder("`");
jdbcSplitQueryBuilder = new HiveQueryStringBuilder(HIVE_QUOTE_CHARACTER, new HiveFederationExpressionParser(HIVE_QUOTE_CHARACTER));
final DatabaseConnectionConfig databaseConnectionConfig = new DatabaseConnectionConfig("testCatalog", HiveConstants.HIVE_NAME,
"hive2://jdbc:hive2://54.89.6.2:10000/authena;AuthMech=3;UID=hive;PWD=hive");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*-
* #%L
* athena-cloudera-impala
* %%
* Copyright (C) 2019 - 2022 Amazon Web Services
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package com.amazonaws.athena.connectors.hortonworks;

import com.amazonaws.athena.connectors.jdbc.manager.JdbcFederationExpressionParser;
import com.google.common.base.Joiner;
import org.apache.arrow.vector.types.pojo.ArrowType;

import java.util.List;

public class ImpalaFederationExpressionParser extends JdbcFederationExpressionParser
{
public ImpalaFederationExpressionParser(String quoteChar)
{
super(quoteChar);
}

@Override
public String writeArrayConstructorClause(ArrowType type, List<String> arguments)
{
return Joiner.on(", ").join(arguments);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,21 @@
import com.amazonaws.athena.connector.lambda.data.SupportedTypes;
import com.amazonaws.athena.connector.lambda.domain.Split;
import com.amazonaws.athena.connector.lambda.domain.TableName;
import com.amazonaws.athena.connector.lambda.domain.predicate.functions.StandardFunctions;
import com.amazonaws.athena.connector.lambda.domain.spill.SpillLocation;
import com.amazonaws.athena.connector.lambda.metadata.GetDataSourceCapabilitiesRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetDataSourceCapabilitiesResponse;
import com.amazonaws.athena.connector.lambda.metadata.GetSplitsRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetSplitsResponse;
import com.amazonaws.athena.connector.lambda.metadata.GetTableLayoutRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetTableRequest;
import com.amazonaws.athena.connector.lambda.metadata.GetTableResponse;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.DataSourceOptimizations;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.OptimizationSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.ComplexExpressionPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.LimitPushdownSubType;
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.TopNPushdownSubType;
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig;
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo;
import com.amazonaws.athena.connectors.jdbc.connection.JdbcConnectionFactory;
Expand All @@ -42,6 +51,7 @@
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
Expand All @@ -56,9 +66,11 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -86,6 +98,30 @@ protected ImpalaMetadataHandler(
{
super(databaseConnectionConfiguration, secretManager, athena, jdbcConnectionFactory, configOptions);
}

/**
* {@inheritDoc}
*/
public GetDataSourceCapabilitiesResponse doGetDataSourceCapabilities(BlockAllocator allocator, GetDataSourceCapabilitiesRequest request)
{
ImmutableMap.Builder<String, List<OptimizationSubType>> capabilities = ImmutableMap.builder();
capabilities.put(DataSourceOptimizations.SUPPORTS_FILTER_PUSHDOWN.withSupportedSubTypes(
FilterPushdownSubType.SORTED_RANGE_SET, FilterPushdownSubType.NULLABLE_COMPARISON
));
capabilities.put(DataSourceOptimizations.SUPPORTS_LIMIT_PUSHDOWN.withSupportedSubTypes(
LimitPushdownSubType.INTEGER_CONSTANT
));
capabilities.put(DataSourceOptimizations.SUPPORTS_COMPLEX_EXPRESSION_PUSHDOWN.withSupportedSubTypes(
ComplexExpressionPushdownSubType.SUPPORTED_FUNCTION_EXPRESSION_TYPES
.withSubTypeProperties(Arrays.stream(StandardFunctions.values())
.map(standardFunctions -> standardFunctions.getFunctionName().getFunctionName())
.toArray(String[]::new))
));
capabilities.put(DataSourceOptimizations.SUPPORTS_TOP_N_PUSHDOWN.withSupportedSubTypes(TopNPushdownSubType.SUPPORTS_ORDER_BY));

return new GetDataSourceCapabilitiesResponse(request.getCatalogName(), capabilities.build());
}

/**
* Delegates creation of partition schema to database type implementation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package com.amazonaws.athena.connectors.cloudera;

import com.amazonaws.athena.connector.lambda.domain.Split;
import com.amazonaws.athena.connectors.jdbc.manager.FederationExpressionParser;
import com.amazonaws.athena.connectors.jdbc.manager.JdbcSplitQueryBuilder;
import com.google.common.base.Strings;

Expand All @@ -28,9 +29,9 @@

public class ImpalaQueryStringBuilder extends JdbcSplitQueryBuilder
{
public ImpalaQueryStringBuilder(String quoteCharacters)
public ImpalaQueryStringBuilder(String quoteCharacters, final FederationExpressionParser federationExpressionParser)
{
super(quoteCharacters);
super(quoteCharacters, federationExpressionParser);
}

@Override
Expand Down
Loading

0 comments on commit 1835354

Please sign in to comment.