Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.lang.reflect.Constructor;
import java.net.URL;
import java.util.Collection;
import java.util.List;
Expand All @@ -27,17 +28,19 @@
import java.util.concurrent.TimeUnit;

import org.apache.drill.common.exceptions.DrillConfigurationException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.scanner.ClassPathScanner;
import org.reflections.util.ClasspathHelper;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigRenderOptions;

public final class DrillConfig extends NestedConfig{
public class DrillConfig extends NestedConfig {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfig.class);

private final ImmutableList<String> startupArguments;
Expand All @@ -58,6 +61,36 @@ public DrillConfig(Config config, boolean enableServerConfigs) {
logger.debug("DrillConfig object initialized.");
}

/**
* Get an instance of the provided interface using the configuration path provided. Construct the object based on the
* provided constructor arguments.
* @param path
* The configuration path to use.
* @param iface
* The Interface or Superclass of the instance you requested.
* @param constructorArgs
* Any arguments required for constructing the requested type.
* @return The new Object instance that implements the provided Interface
*/
@SuppressWarnings("unchecked")
public <T> T getInstance(String path, Class<T> iface, Object... constructorArgs) {
try{
String className = this.getString(path);
Class<?> clazz = Class.forName(className);
Preconditions.checkArgument(iface.isAssignableFrom(clazz));
Class<?>[] argClasses = new Class[constructorArgs.length];
for (int i = 0; i < constructorArgs.length; i++) {
argClasses[i] = constructorArgs[i].getClass();
}
Constructor<?> constructor = clazz.getConstructor(argClasses);
return (T) constructor.newInstance(constructorArgs);
}catch(Exception e){
throw UserException.unsupportedError(e)
.message("Failure while attempting to load instance of the class of type %s requested at path %s.",
iface.getName(), path).build(logger);
}
}

public List<String> getStartupArguments() {
return startupArguments;
}
Expand All @@ -81,6 +114,7 @@ public static DrillConfig forClient() {
return create(null, false);
}


/**
* <p>
* DrillConfig loads up Drill configuration information. It does this utilizing a combination of classpath scanning
Expand Down
217 changes: 111 additions & 106 deletions exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.drill.exec.server.options.OptionValue.OptionType;
import org.apache.drill.exec.server.rest.WebServer;
import org.apache.drill.exec.service.ServiceEngine;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.sys.CachingStoreProvider;
import org.apache.drill.exec.store.sys.PStoreProvider;
import org.apache.drill.exec.store.sys.PStoreRegistry;
Expand All @@ -51,117 +52,14 @@
*/
public class Drillbit implements AutoCloseable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Drillbit.class);

static {
Environment.logEnv("Drillbit environment: ", logger);
}

private boolean isClosed = false;

public static Drillbit start(final StartupOptions options) throws DrillbitStartupException {
return start(DrillConfig.create(options.getConfigLocation()), null);
}

public static Drillbit start(final DrillConfig config) throws DrillbitStartupException {
return start(config, null);
}

public static Drillbit start(final DrillConfig config, final RemoteServiceSet remoteServiceSet)
throws DrillbitStartupException {
logger.debug("Starting new Drillbit.");
// TODO: allow passing as a parameter
ScanResult classpathScan = ClassPathScanner.fromPrescan(config);
Drillbit bit;
try {
bit = new Drillbit(config, remoteServiceSet, classpathScan);
} catch (final Exception ex) {
throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex);
}

try {
bit.run();
} catch (final Exception e) {
bit.close();
throw new DrillbitStartupException("Failure during initial startup of Drillbit.", e);
}
logger.debug("Started new Drillbit.");
return bit;
}

private final static String SYSTEM_OPTIONS_NAME = "org.apache.drill.exec.server.Drillbit.system_options";

private static void throwInvalidSystemOption(final String systemProp, final String errorMessage) {
throw new IllegalStateException("Property \"" + SYSTEM_OPTIONS_NAME + "\" part \"" + systemProp
+ "\" " + errorMessage + ".");
}

private static String stripQuotes(final String s, final String systemProp) {
if (s.isEmpty()) {
return s;
}

final char cFirst = s.charAt(0);
final char cLast = s.charAt(s.length() - 1);
if ((cFirst == '"') || (cFirst == '\'')) {
if (cLast != cFirst) {
throwInvalidSystemOption(systemProp, "quoted value does not have closing quote");
}

return s.substring(1, s.length() - 2); // strip the quotes
}

if ((cLast == '"') || (cLast == '\'')) {
throwInvalidSystemOption(systemProp, "value has unbalanced closing quote");
}

// return as-is
return s;
}

private void javaPropertiesToSystemOptions() {
// get the system options property
final String allSystemProps = System.getProperty(SYSTEM_OPTIONS_NAME);
if ((allSystemProps == null) || allSystemProps.isEmpty()) {
return;
}

final OptionManager optionManager = getContext().getOptionManager();

// parse out the properties, validate, and then set them
final String systemProps[] = allSystemProps.split(",");
for(final String systemProp : systemProps) {
final String keyValue[] = systemProp.split("=");
if (keyValue.length != 2) {
throwInvalidSystemOption(systemProp, "does not contain a key=value assignment");
}

final String optionName = keyValue[0].trim();
if (optionName.isEmpty()) {
throwInvalidSystemOption(systemProp, "does not contain a key before the assignment");
}

final String optionString = stripQuotes(keyValue[1].trim(), systemProp);
if (optionString.isEmpty()) {
throwInvalidSystemOption(systemProp, "does not contain a value after the assignment");
}

final OptionValue defaultValue = optionManager.getOption(optionName);
if (defaultValue == null) {
throwInvalidSystemOption(systemProp, "does not specify a valid option name");
}
if (defaultValue.type != OptionType.SYSTEM) {
throwInvalidSystemOption(systemProp, "does not specify a SYSTEM option ");
}

final OptionValue optionValue = OptionValue.createOption(
defaultValue.kind, OptionType.SYSTEM, optionName, optionString);
optionManager.setOption(optionValue);
}
}

public static void main(final String[] cli) throws DrillbitStartupException {
final StartupOptions options = StartupOptions.parse(cli);
start(options);
}
private boolean isClosed = false;

private final ClusterCoordinator coord;
private final ServiceEngine engine;
Expand All @@ -170,6 +68,7 @@ public static void main(final String[] cli) throws DrillbitStartupException {
private final BootStrapContext context;
private final WebServer webServer;
private RegistrationHandle registrationHandle;
private volatile StoragePluginRegistry storageRegistry;

@VisibleForTesting
public Drillbit(
Expand Down Expand Up @@ -210,7 +109,8 @@ public void run() throws Exception {
final DrillbitEndpoint md = engine.start();
manager.start(md, engine.getController(), engine.getDataConnectionCreator(), coord, storeProvider);
final DrillbitContext drillbitContext = manager.getContext();
drillbitContext.getStorage().init();
storageRegistry = drillbitContext.getStorage();
storageRegistry.init();
drillbitContext.getOptionManager().init();
javaPropertiesToSystemOptions();
registrationHandle = coord.register(md);
Expand Down Expand Up @@ -252,6 +152,7 @@ public synchronized void close() {
storeProvider,
coord,
manager,
storageRegistry,
context);
} catch(Exception e) {
logger.warn("Failure on close()", e);
Expand All @@ -261,6 +162,47 @@ public synchronized void close() {
isClosed = true;
}

private void javaPropertiesToSystemOptions() {
// get the system options property
final String allSystemProps = System.getProperty(SYSTEM_OPTIONS_NAME);
if ((allSystemProps == null) || allSystemProps.isEmpty()) {
return;
}

final OptionManager optionManager = getContext().getOptionManager();

// parse out the properties, validate, and then set them
final String systemProps[] = allSystemProps.split(",");
for (final String systemProp : systemProps) {
final String keyValue[] = systemProp.split("=");
if (keyValue.length != 2) {
throwInvalidSystemOption(systemProp, "does not contain a key=value assignment");
}

final String optionName = keyValue[0].trim();
if (optionName.isEmpty()) {
throwInvalidSystemOption(systemProp, "does not contain a key before the assignment");
}

final String optionString = stripQuotes(keyValue[1].trim(), systemProp);
if (optionString.isEmpty()) {
throwInvalidSystemOption(systemProp, "does not contain a value after the assignment");
}

final OptionValue defaultValue = optionManager.getOption(optionName);
if (defaultValue == null) {
throwInvalidSystemOption(systemProp, "does not specify a valid option name");
}
if (defaultValue.type != OptionType.SYSTEM) {
throwInvalidSystemOption(systemProp, "does not specify a SYSTEM option ");
}

final OptionValue optionValue = OptionValue.createOption(
defaultValue.kind, OptionType.SYSTEM, optionName, optionString);
optionManager.setOption(optionValue);
}
}

/**
* Shutdown hook for Drillbit. Closes the drillbit, and reports on errors that
* occur during closure, as well as the location the drillbit was started from.
Expand Down Expand Up @@ -310,4 +252,67 @@ public DrillbitContext getContext() {
return manager.getContext();
}

public static void main(final String[] cli) throws DrillbitStartupException {
final StartupOptions options = StartupOptions.parse(cli);
start(options);
}

public static Drillbit start(final StartupOptions options) throws DrillbitStartupException {
return start(DrillConfig.create(options.getConfigLocation()), null);
}

public static Drillbit start(final DrillConfig config) throws DrillbitStartupException {
return start(config, null);
}

public static Drillbit start(final DrillConfig config, final RemoteServiceSet remoteServiceSet)
throws DrillbitStartupException {
logger.debug("Starting new Drillbit.");
// TODO: allow passing as a parameter
ScanResult classpathScan = ClassPathScanner.fromPrescan(config);
Drillbit bit;
try {
bit = new Drillbit(config, remoteServiceSet, classpathScan);
} catch (final Exception ex) {
throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex);
}

try {
bit.run();
} catch (final Exception e) {
bit.close();
throw new DrillbitStartupException("Failure during initial startup of Drillbit.", e);
}
logger.debug("Started new Drillbit.");
return bit;
}

private static void throwInvalidSystemOption(final String systemProp, final String errorMessage) {
throw new IllegalStateException("Property \"" + SYSTEM_OPTIONS_NAME + "\" part \"" + systemProp
+ "\" " + errorMessage + ".");
}

private static String stripQuotes(final String s, final String systemProp) {
if (s.isEmpty()) {
return s;
}

final char cFirst = s.charAt(0);
final char cLast = s.charAt(s.length() - 1);
if ((cFirst == '"') || (cFirst == '\'')) {
if (cLast != cFirst) {
throwInvalidSystemOption(systemProp, "quoted value does not have closing quote");
}

return s.substring(1, s.length() - 2); // strip the quotes
}

if ((cLast == '"') || (cLast == '\'')) {
throwInvalidSystemOption(systemProp, "value has unbalanced closing quote");
}

// return as-is
return s;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.rpc.data.DataConnectionCreator;
import org.apache.drill.exec.server.options.SystemOptionManager;
import org.apache.drill.exec.store.SchemaFactory;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.StoragePluginRegistry.DrillSchemaFactory;
import org.apache.drill.exec.store.sys.PStoreProvider;

import com.codahale.metrics.MetricRegistry;
Expand Down Expand Up @@ -80,7 +80,11 @@ public DrillbitContext(
this.endpoint = checkNotNull(endpoint);
this.provider = provider;
this.lpPersistence = new LogicalPlanPersistence(context.getConfig(), classpathScan);
this.storagePlugins = new StoragePluginRegistry(this); // TODO change constructor

// TODO remove escaping "this".
this.storagePlugins = context.getConfig()
.getInstance(StoragePluginRegistry.STORAGE_PLUGIN_REGISTRY_IMPL, StoragePluginRegistry.class, this);

this.reader = new PhysicalPlanReader(context.getConfig(), classpathScan, lpPersistence, endpoint, storagePlugins);
this.operatorCreatorRegistry = new OperatorCreatorRegistry(classpathScan);
this.systemOptions = new SystemOptionManager(lpPersistence, provider);
Expand Down Expand Up @@ -152,7 +156,7 @@ public PStoreProvider getPersistentStoreProvider() {
return provider;
}

public DrillSchemaFactory getSchemaFactory() {
public SchemaFactory getSchemaFactory() {
return storagePlugins.getSchemaFactory();
}

Expand Down
Loading