Skip to content

Commit

Permalink
Remote function execution with thrift executor
Browse files Browse the repository at this point in the history
  • Loading branch information
rongrong committed Sep 15, 2020
1 parent b1b5e8f commit 3825bdb
Show file tree
Hide file tree
Showing 24 changed files with 641 additions and 55 deletions.
47 changes: 45 additions & 2 deletions presto-function-namespace-managers/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
<artifactId>bootstrap</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>concurrent</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>configuration</artifactId>
Expand All @@ -33,18 +38,38 @@
<artifactId>json</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-api</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-client</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-transport-spi</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
<artifactId>presto-common</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-common</artifactId>
<artifactId>presto-spi</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-thrift-api</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand All @@ -66,6 +91,12 @@
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>slice</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
Expand Down Expand Up @@ -97,6 +128,12 @@
<artifactId>jdbi3-sqlobject</artifactId>
</dependency>

<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<scope>provided</scope>
</dependency>

<!-- for testing -->
<dependency>
<groupId>com.facebook.airlift</groupId>
Expand All @@ -115,5 +152,11 @@
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.drift</groupId>
<artifactId>drift-transport-netty</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.common.CatalogSchemaName;
import com.facebook.presto.common.function.QualifiedFunctionName;
import com.facebook.presto.functionNamespace.execution.SqlFunctionExecutors;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.function.FunctionHandle;
import com.facebook.presto.spi.function.FunctionImplementationType;
Expand Down Expand Up @@ -45,7 +46,6 @@
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_USER_ERROR;
import static com.facebook.presto.spi.function.FunctionImplementationType.SQL;
import static com.facebook.presto.spi.function.FunctionKind.SCALAR;
import static com.facebook.presto.spi.function.RoutineCharacteristics.Language;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand All @@ -59,15 +59,16 @@ public abstract class AbstractSqlInvokedFunctionNamespaceManager
private final ConcurrentMap<FunctionNamespaceTransactionHandle, FunctionCollection> transactions = new ConcurrentHashMap<>();

private final String catalogName;
private final Map<Language, FunctionImplementationType> supportedFunctionLanguages;
private final SqlFunctionExecutors sqlFunctionExecutors;
private final LoadingCache<QualifiedFunctionName, Collection<SqlInvokedFunction>> functions;
private final LoadingCache<SqlFunctionHandle, FunctionMetadata> metadataByHandle;
private final LoadingCache<SqlFunctionHandle, ScalarFunctionImplementation> implementationByHandle;

public AbstractSqlInvokedFunctionNamespaceManager(String catalogName, SqlInvokedFunctionNamespaceManagerConfig config)
public AbstractSqlInvokedFunctionNamespaceManager(String catalogName, SqlFunctionExecutors sqlFunctionExecutors, SqlInvokedFunctionNamespaceManagerConfig config)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.supportedFunctionLanguages = config.getSupportedFunctionLanguages();
this.sqlFunctionExecutors = requireNonNull(sqlFunctionExecutors, "sqlFunctionExecutors is null");
requireNonNull(config, "config is null");
this.functions = CacheBuilder.newBuilder()
.expireAfterWrite(config.getFunctionCacheExpiration().toMillis(), MILLISECONDS)
.build(new CacheLoader<QualifiedFunctionName, Collection<SqlInvokedFunction>>()
Expand Down Expand Up @@ -210,7 +211,7 @@ protected void refreshFunctionsCache(QualifiedFunctionName functionName)

protected void checkFunctionLanguageSupported(SqlInvokedFunction function)
{
if (!supportedFunctionLanguages.containsKey(function.getRoutineCharacteristics().getLanguage())) {
if (!sqlFunctionExecutors.getSupportedLanguages().contains(function.getRoutineCharacteristics().getLanguage())) {
throw new PrestoException(GENERIC_USER_ERROR, format("Catalog %s does not support functions implemented in language %s", catalogName, function.getRoutineCharacteristics().getLanguage()));
}
}
Expand All @@ -233,7 +234,7 @@ protected FunctionMetadata sqlInvokedFunctionToMetadata(SqlInvokedFunction funct

protected FunctionImplementationType getFunctionImplementationType(SqlInvokedFunction function)
{
return supportedFunctionLanguages.get(function.getRoutineCharacteristics().getLanguage());
return sqlFunctionExecutors.getFunctionImplementationType(function.getRoutineCharacteristics().getLanguage());
}

protected ScalarFunctionImplementation sqlInvokedFunctionToImplementation(SqlInvokedFunction function)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,23 @@
package com.facebook.presto.functionNamespace;

import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.spi.function.FunctionImplementationType;
import com.google.common.collect.ImmutableMap;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;

import java.util.Map;
import java.util.Set;

import static com.facebook.airlift.json.JsonCodec.mapJsonCodec;
import static com.facebook.presto.spi.function.RoutineCharacteristics.Language;
import static com.facebook.presto.spi.function.RoutineCharacteristics.Language.SQL;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MINUTES;

public class SqlInvokedFunctionNamespaceManagerConfig
{
private static final JsonCodec<Map<Language, FunctionImplementationType>> FUNCTION_LANGUAGES_CODEC = mapJsonCodec(Language.class, FunctionImplementationType.class);
private static final Splitter SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();

private Duration functionCacheExpiration = new Duration(5, MINUTES);
private Duration functionInstanceCacheExpiration = new Duration(8, HOURS);
private Map<Language, FunctionImplementationType> supportedFunctionLanguages = ImmutableMap.of(SQL, FunctionImplementationType.SQL);
private Set<String> supportedFunctionLanguages = ImmutableSet.of("sql");

@MinDuration("0ns")
public Duration getFunctionCacheExpiration()
Expand Down Expand Up @@ -62,15 +58,15 @@ public SqlInvokedFunctionNamespaceManagerConfig setFunctionInstanceCacheExpirati
return this;
}

public Map<Language, FunctionImplementationType> getSupportedFunctionLanguages()
{
return supportedFunctionLanguages;
}

@Config("supported-function-languages")
public SqlInvokedFunctionNamespaceManagerConfig setSupportedFunctionLanguages(String languages)
{
this.supportedFunctionLanguages = FUNCTION_LANGUAGES_CODEC.fromJson(languages);
this.supportedFunctionLanguages = ImmutableSet.copyOf(SPLITTER.split(languages));
return this;
}

public Set<String> getSupportedFunctionLanguages()
{
return supportedFunctionLanguages;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.functionNamespace.execution;

import com.facebook.airlift.configuration.AbstractConfigurationAwareModule;
import com.facebook.drift.client.address.SimpleAddressSelectorConfig;
import com.facebook.presto.functionNamespace.SqlInvokedFunctionNamespaceManagerConfig;
import com.facebook.presto.functionNamespace.execution.thrift.SimpleAddressThriftSqlFunctionExecutionModule;
import com.facebook.presto.spi.function.FunctionImplementationType;
import com.facebook.presto.spi.function.RoutineCharacteristics.Language;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.TypeLiteral;

import java.util.Map;

import static com.facebook.presto.spi.function.FunctionImplementationType.THRIFT;
import static com.google.inject.Scopes.SINGLETON;

public class SimpleAddressSqlFunctionExecutorsModule
extends AbstractConfigurationAwareModule
{
@Override
protected void setup(Binder binder)
{
binder.bind(SqlFunctionExecutors.class).in(SINGLETON);

SqlInvokedFunctionNamespaceManagerConfig config = buildConfigObject(SqlInvokedFunctionNamespaceManagerConfig.class);
ImmutableMap.Builder<Language, SimpleAddressSelectorConfig> thriftConfigs = ImmutableMap.builder();
ImmutableMap.Builder<Language, FunctionImplementationType> languageImplementationTypeMap = ImmutableMap.builder();
for (String languageName : config.getSupportedFunctionLanguages()) {
Language language = new Language(languageName);
FunctionImplementationType implementationType = buildConfigObject(SqlFunctionLanguageConfig.class, languageName).getFunctionImplementationType();
languageImplementationTypeMap.put(language, implementationType);
if (implementationType.equals(THRIFT)) {
thriftConfigs.put(language, buildConfigObject(SimpleAddressSelectorConfig.class, languageName));
}
}
binder.bind(new TypeLiteral<Map<Language, FunctionImplementationType>>() {}).toInstance(languageImplementationTypeMap.build());
binder.install(new SimpleAddressThriftSqlFunctionExecutionModule(thriftConfigs.build()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.functionNamespace.execution;

import com.facebook.presto.functionNamespace.execution.thrift.ThriftSqlFunctionExecutor;
import com.facebook.presto.spi.function.FunctionImplementationType;
import com.facebook.presto.spi.function.RoutineCharacteristics.Language;
import com.google.inject.Inject;

import java.util.Map;
import java.util.Set;

import static java.util.Objects.requireNonNull;

public class SqlFunctionExecutors
{
private final Map<Language, FunctionImplementationType> supportedLanguages;
private final ThriftSqlFunctionExecutor thriftSqlFunctionExecutor;

@Inject
public SqlFunctionExecutors(Map<Language, FunctionImplementationType> supportedLanguages, ThriftSqlFunctionExecutor thriftSqlFunctionExecutor)
{
this.supportedLanguages = requireNonNull(supportedLanguages, "supportedLanguages is null");
this.thriftSqlFunctionExecutor = requireNonNull(thriftSqlFunctionExecutor, "thriftSqlFunctionExecutor is null");
}

public Set<Language> getSupportedLanguages()
{
return supportedLanguages.keySet();
}

public FunctionImplementationType getFunctionImplementationType(Language language)
{
return supportedLanguages.get(language);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.functionNamespace.execution;

import com.facebook.airlift.configuration.Config;
import com.facebook.presto.spi.function.FunctionImplementationType;

import static com.facebook.presto.spi.function.FunctionImplementationType.SQL;

public class SqlFunctionLanguageConfig
{
private FunctionImplementationType functionImplementationType = SQL;

@Config("function-implementation-type")
public SqlFunctionLanguageConfig setFunctionImplementationType(String implementationType)
{
this.functionImplementationType = FunctionImplementationType.valueOf(implementationType.toUpperCase());
return this;
}

public FunctionImplementationType getFunctionImplementationType()
{
return functionImplementationType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.functionNamespace.execution.thrift;

import com.facebook.drift.client.address.AddressSelector;
import com.facebook.drift.transport.client.Address;
import com.google.common.collect.ImmutableSet;

import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class ContextualAddressSelector<T extends Address>
implements AddressSelector<T>
{
private final Map<String, AddressSelector<T>> delegates;

public ContextualAddressSelector(Map<String, AddressSelector<T>> delegates)
{
this.delegates = requireNonNull(delegates, "delegates is null");
}

@Override
public Optional<T> selectAddress(Optional<String> context)
{
return selectAddress(context, ImmutableSet.of());
}

@Override
public Optional<T> selectAddress(Optional<String> context, Set<T> attempted)
{
checkArgument(context.isPresent(), "context is empty");
return delegates.get(context.get()).selectAddress(Optional.empty(), attempted);
}
}
Loading

0 comments on commit 3825bdb

Please sign in to comment.