diff --git a/client/pom.xml b/client/pom.xml index 27cc070c5190..6979f20000c0 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5-SNAPSHOT + 0.5.6-SNAPSHOT diff --git a/client/src/main/java/com/metamx/druid/QueryableNode.java b/client/src/main/java/com/metamx/druid/QueryableNode.java index 68f978929d67..d01032817f72 100644 --- a/client/src/main/java/com/metamx/druid/QueryableNode.java +++ b/client/src/main/java/com/metamx/druid/QueryableNode.java @@ -43,6 +43,7 @@ import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.coordination.DruidServerMetadata; import com.metamx.druid.curator.announcement.Announcer; +import com.metamx.druid.http.NoopRequestLogger; import com.metamx.druid.http.RequestLogger; import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.Initialization; @@ -374,12 +375,14 @@ private void initializeRequestLogger() getEmitter() )); } - else { + else if ("file".equalsIgnoreCase(loggingType)) { setRequestLogger(Initialization.makeFileRequestLogger( getJsonMapper(), getScheduledExecutorFactory(), getProps() )); + } else { + setRequestLogger(new NoopRequestLogger()); } } catch (IOException e) { diff --git a/client/src/main/java/com/metamx/druid/http/NoopRequestLogger.java b/client/src/main/java/com/metamx/druid/http/NoopRequestLogger.java new file mode 100644 index 000000000000..ddc422c91be4 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/http/NoopRequestLogger.java @@ -0,0 +1,31 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.http; + +/** + */ +public class NoopRequestLogger implements RequestLogger +{ + @Override + public void log(RequestLogLine requestLogLine) throws Exception + { + // do nothing + } +} diff --git a/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java b/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java index 0dd878c7aad5..8acc43a75856 100644 --- a/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java +++ b/client/src/main/java/com/metamx/druid/query/ChainedExecutionQueryRunner.java @@ -20,6 +20,7 @@ package com.metamx.druid.query; import com.google.common.base.Function; +import com.google.common.base.Predicates; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -76,7 +77,7 @@ public ChainedExecutionQueryRunner( { this.exec = exec; this.ordering = ordering; - this.queryables = Iterables.unmodifiableIterable(queryables); + this.queryables = Iterables.unmodifiableIterable(Iterables.filter(queryables, Predicates.notNull())); } @Override diff --git a/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 2210a0f5a7b4..1cfceac4db81 100644 --- a/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -46,13 +46,16 @@ import javax.annotation.Nullable; import java.nio.ByteBuffer; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; public class SegmentMetadataQueryQueryToolChest extends QueryToolChest { - private static final TypeReference TYPE_REFERENCE = new TypeReference(){}; + private static final TypeReference TYPE_REFERENCE = new TypeReference() + { + }; private static final byte[] SEGMENT_METADATA_CACHE_PREFIX = new byte[]{0x4}; @Override @@ -228,6 +231,6 @@ public int compare(SegmentAnalysis left, SegmentAnalysis right) { return left.getId().compareTo(right.getId()); } - }; + }.nullsFirst(); } } diff --git a/common/pom.xml b/common/pom.xml index ba47a0296bdb..1964f5658ffc 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5-SNAPSHOT + 0.5.6-SNAPSHOT diff --git a/common/src/main/java/com/metamx/druid/aggregation/CountAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/CountAggregatorFactory.java index fe7d53e9ff4d..559003948600 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/CountAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/CountAggregatorFactory.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.primitives.Longs; import com.metamx.druid.processing.ColumnSelectorFactory; @@ -40,6 +41,8 @@ public CountAggregatorFactory( @JsonProperty("name") String name ) { + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + this.name = name; } diff --git a/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregator.java b/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregator.java index 15dc4736b63b..278cd686a3bd 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregator.java +++ b/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregator.java @@ -19,6 +19,7 @@ package com.metamx.druid.aggregation; +import com.google.common.collect.Ordering; import com.google.common.primitives.Doubles; import com.metamx.druid.processing.FloatMetricSelector; @@ -28,14 +29,14 @@ */ public class DoubleSumAggregator implements Aggregator { - static final Comparator COMPARATOR = new Comparator() + static final Comparator COMPARATOR = new Ordering() { @Override public int compare(Object o, Object o1) { return Doubles.compare(((Number) o).doubleValue(), ((Number) o1).doubleValue()); } - }; + }.nullsFirst(); static double combineValues(Object lhs, Object rhs) { diff --git a/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregatorFactory.java index 86469a377923..ec89a79f39d7 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/DoubleSumAggregatorFactory.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import com.google.common.primitives.Doubles; import com.metamx.druid.processing.ColumnSelectorFactory; @@ -44,6 +45,9 @@ public DoubleSumAggregatorFactory( @JsonProperty("fieldName") final String fieldName ) { + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + this.name = name; this.fieldName = fieldName; } diff --git a/common/src/main/java/com/metamx/druid/aggregation/HistogramAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/HistogramAggregatorFactory.java index 8d39cbb89351..6c65cd95f289 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/HistogramAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/HistogramAggregatorFactory.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.primitives.Floats; import com.google.common.primitives.Longs; import com.metamx.druid.processing.ColumnSelectorFactory; @@ -49,12 +51,18 @@ public HistogramAggregatorFactory( @JsonProperty("breaks") final List breaksList ) { + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + this.name = name; this.fieldName = fieldName; - this.breaksList = breaksList; - this.breaks = new float[breaksList.size()]; - for(int i = 0; i < breaksList.size(); ++i) this.breaks[i] = breaksList.get(i); + this.breaksList = (breaksList == null) ? Lists.newArrayList() :breaksList; + this.breaks = new float[this.breaksList.size()]; + for (int i = 0; i < this.breaksList.size(); ++i) { + this.breaks[i] = this.breaksList.get(i); + } } + @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { @@ -95,14 +103,12 @@ public AggregatorFactory getCombiningFactory() @Override public Object deserialize(Object object) { - if (object instanceof byte []) { - return Histogram.fromBytes((byte []) object); - } - else if (object instanceof ByteBuffer) { + if (object instanceof byte[]) { + return Histogram.fromBytes((byte[]) object); + } else if (object instanceof ByteBuffer) { return Histogram.fromBytes((ByteBuffer) object); - } - else if(object instanceof String) { - byte[] bytes = Base64.decodeBase64(((String)object).getBytes(Charsets.UTF_8)); + } else if (object instanceof String) { + byte[] bytes = Base64.decodeBase64(((String) object).getBytes(Charsets.UTF_8)); return Histogram.fromBytes(bytes); } return object; @@ -111,7 +117,7 @@ else if(object instanceof String) { @Override public Object finalizeComputation(Object object) { - return ((Histogram)object).asVisual(); + return ((Histogram) object).asVisual(); } @Override @@ -149,7 +155,7 @@ public byte[] getCacheKey() @Override public String getTypeName() { - throw new UnsupportedOperationException("HistogramAggregatorFactory does not support getTypeName()"); + throw new UnsupportedOperationException("HistogramAggregatorFactory does not support getTypeName()"); } @Override diff --git a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java index af68b26df42a..a8375f294fac 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/JavaScriptAggregatorFactory.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Charsets; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.primitives.Doubles; import com.metamx.druid.processing.ColumnSelectorFactory; @@ -51,7 +52,6 @@ public class JavaScriptAggregatorFactory implements AggregatorFactory private final String fnCombine; - private final JavaScriptAggregator.ScriptAggregator compiledScript; @JsonCreator @@ -63,6 +63,12 @@ public JavaScriptAggregatorFactory( @JsonProperty("fnCombine") final String fnCombine ) { + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkNotNull(fieldNames, "Must have a valid, non-null fieldNames"); + Preconditions.checkNotNull(fnAggregate, "Must have a valid, non-null fnAggregate"); + Preconditions.checkNotNull(fnReset, "Must have a valid, non-null fnReset"); + Preconditions.checkNotNull(fnCombine, "Must have a valid, non-null fnCombine"); + this.name = name; this.fieldNames = fieldNames; @@ -83,7 +89,8 @@ public Aggregator factorize(final ColumnSelectorFactory columnFactory) new com.google.common.base.Function() { @Override - public ObjectColumnSelector apply(@Nullable String s) { + public ObjectColumnSelector apply(@Nullable String s) + { return columnFactory.makeObjectColumnSelector(s); } } @@ -101,7 +108,8 @@ public BufferAggregator factorizeBuffered(final ColumnSelectorFactory columnSele new com.google.common.base.Function() { @Override - public ObjectColumnSelector apply(@Nullable String s) { + public ObjectColumnSelector apply(@Nullable String s) + { return columnSelectorFactory.makeObjectColumnSelector(s); } } @@ -148,7 +156,8 @@ public String getName() } @JsonProperty - public List getFieldNames() { + public List getFieldNames() + { return fieldNames; } @@ -182,7 +191,7 @@ public byte[] getCacheKey() try { MessageDigest md = MessageDigest.getInstance("SHA-1"); byte[] fieldNameBytes = Joiner.on(",").join(fieldNames).getBytes(Charsets.UTF_8); - byte[] sha1 = md.digest((fnAggregate+fnReset+fnCombine).getBytes(Charsets.UTF_8)); + byte[] sha1 = md.digest((fnAggregate + fnReset + fnCombine).getBytes(Charsets.UTF_8)); return ByteBuffer.allocate(1 + fieldNameBytes.length + sha1.length) .put(CACHE_TYPE_ID) @@ -225,7 +234,11 @@ public String toString() '}'; } - public static JavaScriptAggregator.ScriptAggregator compileScript(final String aggregate, final String reset, final String combine) + public static JavaScriptAggregator.ScriptAggregator compileScript( + final String aggregate, + final String reset, + final String combine + ) { final ContextFactory contextFactory = ContextFactory.getGlobal(); Context context = contextFactory.enterContext(); @@ -234,8 +247,8 @@ public static JavaScriptAggregator.ScriptAggregator compileScript(final String a final ScriptableObject scope = context.initStandardObjects(); final Function fnAggregate = context.compileFunction(scope, aggregate, "aggregate", 1, null); - final Function fnReset = context.compileFunction(scope, reset, "reset", 1, null); - final Function fnCombine = context.compileFunction(scope, combine, "combine", 1, null); + final Function fnReset = context.compileFunction(scope, reset, "reset", 1, null); + final Function fnCombine = context.compileFunction(scope, combine, "combine", 1, null); Context.exit(); return new JavaScriptAggregator.ScriptAggregator() @@ -244,7 +257,9 @@ public static JavaScriptAggregator.ScriptAggregator compileScript(final String a public double aggregate(final double current, final ObjectColumnSelector[] selectorList) { Context cx = Context.getCurrentContext(); - if(cx == null) cx = contextFactory.enterContext(); + if (cx == null) { + cx = contextFactory.enterContext(); + } final int size = selectorList.length; final Object[] args = new Object[size + 1]; @@ -292,8 +307,9 @@ public Object run(final Context cx) } @Override - public void close() { - if(Context.getCurrentContext() != null) { + public void close() + { + if (Context.getCurrentContext() != null) { Context.exit(); } } diff --git a/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregatorFactory.java index d84e22041db7..07e04254f762 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/LongSumAggregatorFactory.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import com.google.common.primitives.Longs; import com.metamx.druid.processing.ColumnSelectorFactory; @@ -44,6 +45,9 @@ public LongSumAggregatorFactory( @JsonProperty("fieldName") final String fieldName ) { + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + this.name = name; this.fieldName = fieldName; } diff --git a/common/src/main/java/com/metamx/druid/aggregation/MaxAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/MaxAggregatorFactory.java index fd66aeae90dc..45cd85257f7e 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/MaxAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/MaxAggregatorFactory.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import com.google.common.primitives.Doubles; import com.metamx.druid.processing.ColumnSelectorFactory; @@ -44,6 +45,9 @@ public MaxAggregatorFactory( @JsonProperty("fieldName") final String fieldName ) { + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + this.name = name; this.fieldName = fieldName; } diff --git a/common/src/main/java/com/metamx/druid/aggregation/MinAggregatorFactory.java b/common/src/main/java/com/metamx/druid/aggregation/MinAggregatorFactory.java index ed1e82cc9b5e..a6d19ebd8e56 100644 --- a/common/src/main/java/com/metamx/druid/aggregation/MinAggregatorFactory.java +++ b/common/src/main/java/com/metamx/druid/aggregation/MinAggregatorFactory.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import com.google.common.primitives.Doubles; import com.metamx.druid.processing.ColumnSelectorFactory; @@ -44,6 +45,9 @@ public MinAggregatorFactory( @JsonProperty("fieldName") final String fieldName ) { + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + this.name = name; this.fieldName = fieldName; } diff --git a/common/src/main/java/com/metamx/druid/db/DbConnector.java b/common/src/main/java/com/metamx/druid/db/DbConnector.java index cb202b1a8f20..a76536bed6e3 100644 --- a/common/src/main/java/com/metamx/druid/db/DbConnector.java +++ b/common/src/main/java/com/metamx/druid/db/DbConnector.java @@ -140,15 +140,16 @@ public static void createTable( @Override public Void withHandle(Handle handle) throws Exception { - List> table = handle.select(String.format("SHOW tables LIKE '%s'", tableName)); - - if (table.isEmpty()) { - log.info("Creating table[%s]", tableName); - handle.createStatement(sql).execute(); - } else { - log.info("Table[%s] existed: [%s]", tableName, table); + if ( !handle.getConnection().getMetaData().getDatabaseProductName().contains("PostgreSQL") ) { + List> table = handle.select(String.format("SHOW tables LIKE '%s'", tableName)); + + if (table.isEmpty()) { + log.info("Creating table[%s]", tableName); + handle.createStatement(sql).execute(); + } else { + log.info("Table[%s] existed: [%s]", tableName, table); + } } - return null; } } diff --git a/examples/pom.xml b/examples/pom.xml index 4a1cbc929ac5..d3bd28bd2360 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -9,7 +9,7 @@ com.metamx druid - 0.5.5-SNAPSHOT + 0.5.6-SNAPSHOT diff --git a/indexing-common/pom.xml b/indexing-common/pom.xml index 3971a922c5b3..fd727328c496 100644 --- a/indexing-common/pom.xml +++ b/indexing-common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5-SNAPSHOT + 0.5.6-SNAPSHOT diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml index 1da0d489862f..a8b8d429c79c 100644 --- a/indexing-hadoop/pom.xml +++ b/indexing-hadoop/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5-SNAPSHOT + 0.5.6-SNAPSHOT diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml index d27563fceeac..37cb5b9c2e40 100644 --- a/indexing-service/pom.xml +++ b/indexing-service/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5-SNAPSHOT + 0.5.6-SNAPSHOT diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java index 0ee2a6e76324..5bbfd73abbef 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java @@ -24,6 +24,10 @@ import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.ServerView; import com.metamx.druid.coordination.DataSegmentAnnouncer; +import com.metamx.druid.indexing.common.actions.TaskActionClient; +import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; +import com.metamx.druid.indexing.common.config.TaskConfig; +import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.loading.DataSegmentKiller; import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.MMappedQueryableIndexFactory; @@ -31,10 +35,6 @@ import com.metamx.druid.loading.SegmentLoaderConfig; import com.metamx.druid.loading.SegmentLoadingException; import com.metamx.druid.loading.SingleSegmentLoader; -import com.metamx.druid.indexing.common.actions.TaskActionClient; -import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; -import com.metamx.druid.indexing.common.config.TaskConfig; -import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.emitter.service.ServiceEmitter; import org.jets3t.service.impl.rest.httpclient.RestS3Service; @@ -141,9 +141,9 @@ public Map getSegments(List segments) new SegmentLoaderConfig() { @Override - public File getCacheDirectory() + public String getCacheDirectory() { - return new File(getTaskWorkDir(), "fetched_segments"); + return new File(getTaskWorkDir(), "fetched_segments").toString(); } } ); diff --git a/install/postgresql-schema.sql b/install/postgresql-schema.sql new file mode 100644 index 000000000000..180ec5e67023 --- /dev/null +++ b/install/postgresql-schema.sql @@ -0,0 +1,49 @@ +-- Table structure for table `config` +-- + + +DROP TABLE IF EXISTS prod_config; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!40101 SET character_set_client = utf8 */; +CREATE TABLE prod_config ( + name varchar(255) NOT NULL, + payload bytea NOT NULL, + PRIMARY KEY (name) +); +/*!40101 SET character_set_client = @saved_cs_client */; + +-- +-- Table structure for table `rules` +-- +DROP TABLE IF EXISTS prod_rules; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!40101 SET character_set_client = utf8 */; +CREATE TABLE prod_rules ( + id varchar(255) NOT NULL, + dataSource varchar(255) NOT NULL, + version text NOT NULL, + payload text NOT NULL, + PRIMARY KEY (id) +); +/*!40101 SET character_set_client = @saved_cs_client */; + +-- +-- Table structure for table `segments` +-- + +DROP TABLE IF EXISTS prod_segments; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!40101 SET character_set_client = utf8 */; +CREATE TABLE prod_segments ( + id varchar(255) NOT NULL, + dataSource varchar(255) NOT NULL, + created_date text NOT NULL, + start text NOT NULL, + "end" text NOT NULL, + partitioned SMALLINT NOT NULL, + version text NOT NULL, + used boolean NOT NULL, + payload text NOT NULL, + PRIMARY KEY (id) +); + diff --git a/pom.xml b/pom.xml index 83dcd5177829..ede50cdf143b 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.metamx druid pom - 0.5.5-SNAPSHOT + 0.5.6-SNAPSHOT druid druid diff --git a/realtime/pom.xml b/realtime/pom.xml index 829d724d95ff..abe217c1af26 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5-SNAPSHOT + 0.5.6-SNAPSHOT diff --git a/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java b/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java index 7a7e0e8ed7fc..67177e8f45c2 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/DbSegmentPublisher.java @@ -62,7 +62,7 @@ public Void withHandle(Handle handle) throws Exception { handle.createStatement( String.format( - "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) " + "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", config.getSegmentTable() ) diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java index cee1bfb00ff6..a429fbef9d58 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java @@ -55,6 +55,9 @@ import com.metamx.druid.query.QueryRunnerFactory; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.druid.query.QueryToolChest; +import com.metamx.druid.query.segment.SegmentDescriptor; +import com.metamx.druid.query.segment.SpecificSegmentQueryRunner; +import com.metamx.druid.query.segment.SpecificSegmentSpec; import com.metamx.druid.realtime.FireDepartmentMetrics; import com.metamx.druid.realtime.FireHydrant; import com.metamx.druid.realtime.Schema; @@ -253,23 +256,32 @@ public ServiceMetricEvent.Builder apply(@Nullable Query input) new Function>() { @Override - public QueryRunner apply(@Nullable Sink input) + public QueryRunner apply(Sink input) { - return new MetricsEmittingQueryRunner( - emitter, - builderFn, - factory.mergeRunners( - EXEC, - Iterables.transform( - input, - new Function>() - { - @Override - public QueryRunner apply(@Nullable FireHydrant input) - { - return factory.createRunner(input.getSegment()); - } - } + return new SpecificSegmentQueryRunner( + new MetricsEmittingQueryRunner( + emitter, + builderFn, + factory.mergeRunners( + EXEC, + Iterables.transform( + input, + new Function>() + { + @Override + public QueryRunner apply(FireHydrant input) + { + return factory.createRunner(input.getSegment()); + } + } + ) + ) + ), + new SpecificSegmentSpec( + new SegmentDescriptor( + input.getInterval(), + input.getSegment().getVersion(), + input.getSegment().getShardSpec().getPartitionNum() ) ) ); @@ -380,12 +392,16 @@ private void bootstrapSinksFromDisk() //final File[] sinkFiles = sinkDir.listFiles(); // To avoid reading and listing of "merged" dir - final File[] sinkFiles = sinkDir.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String fileName) { - return !(Ints.tryParse(fileName) == null); - } - }); + final File[] sinkFiles = sinkDir.listFiles( + new FilenameFilter() + { + @Override + public boolean accept(File dir, String fileName) + { + return !(Ints.tryParse(fileName) == null); + } + } + ); Arrays.sort( sinkFiles, new Comparator() @@ -408,15 +424,14 @@ public int compare(File o1, File o2) List hydrants = Lists.newArrayList(); for (File segmentDir : sinkFiles) { log.info("Loading previously persisted segment at [%s]", segmentDir); - + // Although this has been tackled at start of this method. // Just a doubly-check added to skip "merged" dir. from being added to hydrants // If 100% sure that this is not needed, this check can be removed. - if(Ints.tryParse(segmentDir.getName()) == null) - { + if (Ints.tryParse(segmentDir.getName()) == null) { continue; } - + hydrants.add( new FireHydrant( new QueryableIndexSegment(null, IndexIO.loadIndex(segmentDir)), @@ -647,6 +662,14 @@ private File computePersistDir(Schema schema, Interval interval) */ private int persistHydrant(FireHydrant indexToPersist, Schema schema, Interval interval) { + if (indexToPersist.hasSwapped()) { + log.info( + "DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.", + schema.getDataSource(), interval, indexToPersist + ); + return 0; + } + log.info("DataSource[%s], Interval[%s], persisting Hydrant[%s]", schema.getDataSource(), interval, indexToPersist); try { int numRows = indexToPersist.getIndex().size(); diff --git a/server/pom.xml b/server/pom.xml index 3f9b33f5ba87..e32b027219a1 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.5.5-SNAPSHOT + 0.5.6-SNAPSHOT diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java index 1befc1df888c..0740be81b5d2 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java @@ -261,7 +261,7 @@ public void removeSegment(DataSegment segment) announcer.unannounceSegment(segment); } catch (Exception e) { - log.makeAlert("Failed to remove segment") + log.makeAlert(e, "Failed to remove segment") .addData("segment", segment) .emit(); } diff --git a/server/src/main/java/com/metamx/druid/db/DatabaseRuleManager.java b/server/src/main/java/com/metamx/druid/db/DatabaseRuleManager.java index 708c61d9e24a..44ea802082cb 100644 --- a/server/src/main/java/com/metamx/druid/db/DatabaseRuleManager.java +++ b/server/src/main/java/com/metamx/druid/db/DatabaseRuleManager.java @@ -192,7 +192,7 @@ public Map> withHandle(Handle handle) throws Exception return handle.createQuery( // Return latest version rule by dataSource String.format( - "SELECT %1$s.dataSource, %1$s.payload FROM %1$s INNER JOIN(SELECT dataSource, max(version) as version, payload FROM %1$s GROUP BY dataSource) ds ON %1$s.datasource = ds.datasource and %1$s.version = ds.version", + "SELECT %1$s.dataSource, %1$s.payload FROM %1$s INNER JOIN(SELECT dataSource, max(version) as version FROM %1$s GROUP BY dataSource) ds ON %1$s.datasource = ds.datasource and %1$s.version = ds.version", config.getRuleTable() ) ).fold( diff --git a/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java b/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java index 2ea9056627fc..53c853d5a217 100644 --- a/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java +++ b/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java @@ -203,7 +203,7 @@ public Void withHandle(Handle handle) throws Exception for (DataSegment segment : segments) { batch.add( String.format( - "UPDATE %s SET used=1 WHERE id = '%s'", + "UPDATE %s SET used=true WHERE id = '%s'", config.getSegmentTable(), segment.getIdentifier() ) @@ -234,7 +234,7 @@ public boolean enableSegment(final String segmentId) public Void withHandle(Handle handle) throws Exception { handle.createStatement( - String.format("UPDATE %s SET used=1 WHERE id = :id", config.getSegmentTable()) + String.format("UPDATE %s SET used=true WHERE id = :id", config.getSegmentTable()) ) .bind("id", segmentId) .execute(); @@ -268,7 +268,7 @@ public boolean removeDatasource(final String ds) public Void withHandle(Handle handle) throws Exception { handle.createStatement( - String.format("UPDATE %s SET used=0 WHERE dataSource = :dataSource", config.getSegmentTable()) + String.format("UPDATE %s SET used=false WHERE dataSource = :dataSource", config.getSegmentTable()) ) .bind("dataSource", ds) .execute(); @@ -298,7 +298,7 @@ public boolean removeSegment(String ds, final String segmentID) public Void withHandle(Handle handle) throws Exception { handle.createStatement( - String.format("UPDATE %s SET used=0 WHERE id = :segmentID", config.getSegmentTable()) + String.format("UPDATE %s SET used=false WHERE id = :segmentID", config.getSegmentTable()) ).bind("segmentID", segmentID) .execute(); @@ -398,7 +398,7 @@ public void poll() public List> withHandle(Handle handle) throws Exception { return handle.createQuery( - String.format("SELECT payload FROM %s WHERE used=1", config.getSegmentTable()) + String.format("SELECT payload FROM %s WHERE used=true", config.getSegmentTable()) ).list(); } } diff --git a/server/src/main/java/com/metamx/druid/loading/DataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/DataSegmentPuller.java index b821c653a6e5..306d7f449af6 100644 --- a/server/src/main/java/com/metamx/druid/loading/DataSegmentPuller.java +++ b/server/src/main/java/com/metamx/druid/loading/DataSegmentPuller.java @@ -39,9 +39,13 @@ public interface DataSegmentPuller /** * Returns the last modified time of the given segment. * + * Note, this is not actually used at this point and doesn't need to actually be implemented. It's just still here + * to not break compatibility. + * * @param segment The segment to check the last modified time for * @return the last modified time in millis from the epoch * @throws SegmentLoadingException if there are any errors */ + @Deprecated public long getLastModified(DataSegment segment) throws SegmentLoadingException; } diff --git a/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java b/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java index bc44f82f3dd6..e72bd787bb33 100644 --- a/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java +++ b/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java @@ -20,32 +20,14 @@ package com.metamx.druid.loading; import com.google.common.base.Joiner; -import com.metamx.common.MapUtils; import com.metamx.druid.client.DataSegment; -import java.util.Map; - /** */ public class DataSegmentPusherUtil { private static final Joiner JOINER = Joiner.on("/").skipNulls(); - public static String getLegacyStorageDir(DataSegment segment) - { - final Map loadSpec = segment.getLoadSpec(); - - String specType = MapUtils.getString(loadSpec, "type"); - if (specType.startsWith("s3")) { - String s3Bucket = MapUtils.getString(loadSpec, "bucket"); - String s3Path = MapUtils.getString(loadSpec, "key"); - - return String.format("%s/%s", s3Bucket, s3Path.substring(0, s3Path.lastIndexOf("/"))); - } - - return null; - } - public static String getStorageDir(DataSegment segment) { return JOINER.join( diff --git a/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusherConfig.java b/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusherConfig.java index c3f6d603ccb8..b27d03672bc5 100644 --- a/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusherConfig.java +++ b/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusherConfig.java @@ -21,12 +21,10 @@ import org.skife.config.Config; -import java.io.File; - /** */ public abstract class HdfsDataSegmentPusherConfig { @Config("druid.pusher.hdfs.storageDirectory") - public abstract File getStorageDirectory(); + public abstract String getStorageDirectory(); } diff --git a/server/src/main/java/com/metamx/druid/loading/SegmentLoaderConfig.java b/server/src/main/java/com/metamx/druid/loading/SegmentLoaderConfig.java index 294c91b9a387..8a0e32484e1f 100644 --- a/server/src/main/java/com/metamx/druid/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/com/metamx/druid/loading/SegmentLoaderConfig.java @@ -21,14 +21,18 @@ import org.skife.config.Config; -import java.io.File; - /** */ public abstract class SegmentLoaderConfig { @Config({"druid.paths.indexCache", "druid.segmentCache.path"}) - public abstract File getCacheDirectory(); + public abstract String getCacheDirectory(); + + @Config("druid.server.maxSize") + public long getServerMaxSize() + { + return Long.MAX_VALUE; + } @Config("druid.segmentCache.deleteOnRemove") public boolean deleteOnRemove() diff --git a/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java b/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java index 61e9986f484c..f0e9a7f20e82 100644 --- a/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java +++ b/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java @@ -19,9 +19,13 @@ package com.metamx.druid.loading; -import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; +import com.google.common.primitives.Longs; import com.google.inject.Inject; -import com.metamx.common.StreamUtils; +import com.metamx.common.IAE; +import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.index.QueryableIndex; @@ -29,7 +33,11 @@ import com.metamx.druid.index.Segment; import org.apache.commons.io.FileUtils; -import java.io.*; +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Set; /** */ @@ -39,8 +47,8 @@ public class SingleSegmentLoader implements SegmentLoader private final DataSegmentPuller dataSegmentPuller; private final QueryableIndexFactory factory; - private final SegmentLoaderConfig config; - private static final Joiner JOINER = Joiner.on("/").skipNulls(); + + private final List locations; @Inject public SingleSegmentLoader( @@ -51,22 +59,52 @@ public SingleSegmentLoader( { this.dataSegmentPuller = dataSegmentPuller; this.factory = factory; - this.config = config; + + final ImmutableList.Builder locBuilder = ImmutableList.builder(); + + // This is a really, really stupid way of getting this information. Splitting on commas and bars is error-prone + // We should instead switch it up to be a JSON Array of JSON Object or something and cool stuff like that + // But, that'll have to wait for some other day. + for (String dirSpec : config.getCacheDirectory().split(",")) { + String[] dirSplit = dirSpec.split("\\|"); + if (dirSplit.length == 1) { + locBuilder.add(new StorageLocation(new File(dirSplit[0]), config.getServerMaxSize())); + } + else if (dirSplit.length == 2) { + final Long maxSize = Longs.tryParse(dirSplit[1]); + if (maxSize == null) { + throw new IAE("Size of a local segment storage location must be an integral number, got[%s]", dirSplit[1]); + } + locBuilder.add(new StorageLocation(new File(dirSplit[0]), maxSize)); + } + else { + throw new ISE( + "Unknown segment storage location[%s]=>[%s], config[%s].", + dirSplit.length, dirSpec, config.getCacheDirectory() + ); + } + } + locations = locBuilder.build(); + + Preconditions.checkArgument(locations.size() > 0, "Must have at least one segment cache directory."); + log.info("Using storage locations[%s]", locations); } @Override public boolean isSegmentLoaded(final DataSegment segment) { - File localStorageDir = new File(config.getCacheDirectory(), DataSegmentPusherUtil.getStorageDir(segment)); - if (localStorageDir.exists()) { - return true; - } + return findStorageLocationIfLoaded(segment) != null; + } - final File legacyStorageDir = new File( - config.getCacheDirectory(), - DataSegmentPusherUtil.getLegacyStorageDir(segment) - ); - return legacyStorageDir.exists(); + public StorageLocation findStorageLocationIfLoaded(final DataSegment segment) + { + for (StorageLocation location : locations) { + File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); + if (localStorageDir.exists()) { + return location; + } + } + return null; } @Override @@ -80,111 +118,129 @@ public Segment getSegment(DataSegment segment) throws SegmentLoadingException public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException { - File localStorageDir = new File(config.getCacheDirectory(), DataSegmentPusherUtil.getStorageDir(segment)); - - final String legacyDir = DataSegmentPusherUtil.getLegacyStorageDir(segment); - if (legacyDir != null) { - File legacyStorageDir = new File(config.getCacheDirectory(), legacyDir); - - if (legacyStorageDir.exists()) { - log.info("Found legacyStorageDir[%s], moving to new storage location[%s]", legacyStorageDir, localStorageDir); - if (localStorageDir.exists()) { - try { - FileUtils.deleteDirectory(localStorageDir); - } - catch (IOException e) { - throw new SegmentLoadingException(e, "Error deleting localDir[%s]", localStorageDir); - } - } - final File parentDir = localStorageDir.getParentFile(); - if (!parentDir.exists()) { - log.info("Parent[%s] didn't exist, creating.", parentDir); - if (!parentDir.mkdirs()) { - log.warn("Unable to make parentDir[%s]", parentDir); - } - } + StorageLocation loc = findStorageLocationIfLoaded(segment); - if (!legacyStorageDir.renameTo(localStorageDir)) { - log.warn("Failed moving [%s] to [%s]", legacyStorageDir, localStorageDir); - } + final File retVal; + + if (loc == null) { + Iterator locIter = locations.iterator(); + loc = locIter.next(); + while (locIter.hasNext()) { + loc = loc.mostEmpty(locIter.next()); } - } - if (localStorageDir.exists()) { - long localLastModified = localStorageDir.lastModified(); - long remoteLastModified = dataSegmentPuller.getLastModified(segment); - if (remoteLastModified > 0 && localLastModified >= remoteLastModified) { - log.info( - "Found localStorageDir[%s] with modified[%s], which is same or after remote[%s]. Using.", - localStorageDir, localLastModified, remoteLastModified + if (!loc.canHandle(segment.getSize())) { + throw new ISE( + "Segment[%s:%,d] too large for storage[%s:%,d].", + segment.getIdentifier(), segment.getSize(), loc.getPath(), loc.available() ); - return localStorageDir; } - } - if (localStorageDir.exists()) { - try { - FileUtils.deleteDirectory(localStorageDir); - } - catch (IOException e) { - log.warn(e, "Exception deleting previously existing local dir[%s]", localStorageDir); + File storageDir = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); + if (!storageDir.mkdirs()) { + log.debug("Unable to make parent file[%s]", storageDir); } + + dataSegmentPuller.getSegmentFiles(segment, storageDir); + loc.addSegment(segment); + + retVal = storageDir; } - if (!localStorageDir.mkdirs()) { - log.info("Unable to make parent file[%s]", localStorageDir); + else { + retVal = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); } - dataSegmentPuller.getSegmentFiles(segment, localStorageDir); + loc.addSegment(segment); - return localStorageDir; + return retVal; } - private File getLocalStorageDir(DataSegment segment) + @Override + public void cleanup(DataSegment segment) throws SegmentLoadingException { - String outputKey = JOINER.join( - segment.getDataSource(), - String.format("%s_%s", segment.getInterval().getStart(), segment.getInterval().getEnd()), - segment.getVersion(), - segment.getShardSpec().getPartitionNum() - ); - - return new File(config.getCacheDirectory(), outputKey); + StorageLocation loc = findStorageLocationIfLoaded(segment); + + if (loc == null) { + log.info("Asked to cleanup something[%s] that didn't exist. Skipping.", segment); + return; + } + + try { + File cacheFile = new File(loc.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); + log.info("Deleting directory[%s]", cacheFile); + FileUtils.deleteDirectory(cacheFile); + loc.removeSegment(segment); + } + catch (IOException e) { + throw new SegmentLoadingException(e, e.getMessage()); + } } - private void moveToCache(File pulledFile, File cacheFile) throws SegmentLoadingException + private static class StorageLocation { - log.info("Rename pulledFile[%s] to cacheFile[%s]", pulledFile, cacheFile); - if (!pulledFile.renameTo(cacheFile)) { - log.warn("Error renaming pulledFile[%s] to cacheFile[%s]. Copying instead.", pulledFile, cacheFile); + private final File path; + private final long maxSize; + private final Set segments; - try { - StreamUtils.copyToFileAndClose(new FileInputStream(pulledFile), cacheFile); - } - catch (IOException e) { - throw new SegmentLoadingException( - e, - "Problem moving pulledFile[%s] to cache[%s]", - pulledFile, - cacheFile - ); + private volatile long currSize = 0; + + StorageLocation( + File path, + long maxSize + ) + { + this.path = path; + this.maxSize = maxSize; + + this.segments = Sets.newHashSet(); + } + + private File getPath() + { + return path; + } + + private Long getMaxSize() + { + return maxSize; + } + + private synchronized void addSegment(DataSegment segment) + { + if (! segments.add(segment)) { + currSize += segment.getSize(); } - if (!pulledFile.delete()) { - log.error("Could not delete pulledFile[%s].", pulledFile); + } + + private synchronized void removeSegment(DataSegment segment) + { + if (segments.remove(segment)) { + currSize -= segment.getSize(); } } - } - @Override - public void cleanup(DataSegment segment) throws SegmentLoadingException - { - File cacheFile = getLocalStorageDir(segment); + private boolean canHandle(long size) + { + return available() > size; + } - try { - log.info("Deleting directory[%s]", cacheFile); - FileUtils.deleteDirectory(cacheFile); + private synchronized long available() + { + return maxSize - currSize; } - catch (IOException e) { - throw new SegmentLoadingException(e, e.getMessage()); + + private StorageLocation mostEmpty(StorageLocation other) + { + return available() > other.available() ? this : other; + } + + @Override + public String toString() + { + return "StorageLocation{" + + "path=" + path + + ", maxSize=" + maxSize + + '}'; } } } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index 13fe5a58c1ee..36b0b0b15962 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -668,7 +668,7 @@ public void run() synchronized (lock) { final LeaderLatch latch = leaderLatch.get(); if (latch == null || !latch.hasLeadership()) { - log.info("[%s] is master, not me. Phooey.", latch == null ? null : latch.getLeader().getId()); + log.info("LEGGO MY EGGO. [%s] is master.", latch == null ? null : latch.getLeader().getId()); stopBeingMaster(); return; } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java index 9f031146cce3..05e5992466d2 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterBalancer.java @@ -112,15 +112,16 @@ public DruidMasterRuntimeParams run(DruidMasterRuntimeParams params) continue; } - int iter = 0; - while (iter < maxSegmentsToMove) { - iter++; + for (int iter = 0; iter < maxSegmentsToMove; iter++) { final BalancerSegmentHolder segmentToMove = analyzer.pickSegmentToMove(serverHolderList); - final ServerHolder holder = analyzer.findNewSegmentHomeBalance(segmentToMove.getSegment(), serverHolderList); - if (holder == null) { - continue; + + if (params.getAvailableSegments().contains(segmentToMove.getSegment())) { + final ServerHolder holder = analyzer.findNewSegmentHomeBalance(segmentToMove.getSegment(), serverHolderList); + + if (holder != null) { + moveSegment(segmentToMove, holder.getServer(), params); + } } - moveSegment(segmentToMove, holder.getServer(), params); } final double initialTotalCost = analyzer.calculateInitialTotalCost(serverHolderList); diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java index affc0b93bdd3..9677b5028bc5 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java @@ -62,16 +62,18 @@ public MasterStats run(DruidMaster master, DruidMasterRuntimeParams params, Data final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp(); final BalancerCostAnalyzer analyzer = params.getBalancerCostAnalyzer(referenceTimestamp); - stats.accumulate( - assign( - params.getReplicationManager(), - expectedReplicants, - totalReplicants, - analyzer, - serverHolderList, - segment - ) - ); + if (params.getAvailableSegments().contains(segment)) { + stats.accumulate( + assign( + params.getReplicationManager(), + expectedReplicants, + totalReplicants, + analyzer, + serverHolderList, + segment + ) + ); + } stats.accumulate(drop(expectedReplicants, clusterReplicants, segment, params)); diff --git a/server/src/main/java/com/metamx/druid/master/rules/Rule.java b/server/src/main/java/com/metamx/druid/master/rules/Rule.java index efc6a91c5abc..1c77a0ebc8ff 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/Rule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/Rule.java @@ -35,7 +35,9 @@ @JsonSubTypes.Type(name = "loadByPeriod", value = PeriodLoadRule.class), @JsonSubTypes.Type(name = "loadByInterval", value = IntervalLoadRule.class), @JsonSubTypes.Type(name = "dropByPeriod", value = PeriodDropRule.class), - @JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class) + @JsonSubTypes.Type(name = "dropByInterval", value = IntervalDropRule.class), + @JsonSubTypes.Type(name = "loadBySize", value = SizeLoadRule.class), + @JsonSubTypes.Type(name = "dropBySize", value = SizeDropRule.class) }) public interface Rule diff --git a/server/src/main/java/com/metamx/druid/master/rules/SizeDropRule.java b/server/src/main/java/com/metamx/druid/master/rules/SizeDropRule.java new file mode 100644 index 000000000000..0bd9f94cd006 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/master/rules/SizeDropRule.java @@ -0,0 +1,71 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.master.rules; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Range; +import com.metamx.druid.client.DataSegment; +import org.joda.time.DateTime; + +/** + */ +public class SizeDropRule extends DropRule +{ + private final long low; + private final long high; + private final Range range; + + @JsonCreator + public SizeDropRule( + @JsonProperty("low") long low, + @JsonProperty("high") long high + ) + { + this.low = low; + this.high = high; + this.range = Range.closedOpen(low, high); + } + + @Override + @JsonProperty + public String getType() + { + return "dropBySize"; + } + + @JsonProperty + public long getLow() + { + return low; + } + + @JsonProperty + public long getHigh() + { + return high; + } + + @Override + public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) + { + return range.contains(segment.getSize()); + } +} diff --git a/server/src/main/java/com/metamx/druid/master/rules/SizeLoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/SizeLoadRule.java new file mode 100644 index 000000000000..421432ec6b98 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/master/rules/SizeLoadRule.java @@ -0,0 +1,96 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.master.rules; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Range; +import com.metamx.druid.client.DataSegment; +import org.joda.time.DateTime; + +/** + */ +public class SizeLoadRule extends LoadRule +{ + private final long low; + private final long high; + private final Integer replicants; + private final String tier; + private final Range range; + + @JsonCreator + public SizeLoadRule( + @JsonProperty("low") long low, + @JsonProperty("high") long high, + @JsonProperty("replicants") Integer replicants, + @JsonProperty("tier") String tier + ) + { + this.low = low; + this.high = high; + this.replicants = replicants; + this.tier = tier; + this.range = Range.closedOpen(low, high); + } + + @Override + @JsonProperty + public int getReplicants() + { + return replicants; + } + + @Override + public int getReplicants(String tier) + { + return (this.tier.equalsIgnoreCase(tier)) ? replicants : 0; + } + + @Override + @JsonProperty + public String getTier() + { + return tier; + } + + @Override + public String getType() + { + return "loadBySize"; + } + + @JsonProperty + public long getLow() + { + return low; + } + + @JsonProperty + public long getHigh() + { + return high; + } + + @Override + public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp) + { + return range.contains(segment.getSize()); + } +} diff --git a/services/pom.xml b/services/pom.xml index 18abf630390f..d01c1f447068 100644 --- a/services/pom.xml +++ b/services/pom.xml @@ -24,11 +24,11 @@ druid-services druid-services druid-services - 0.5.5-SNAPSHOT + 0.5.6-SNAPSHOT com.metamx druid - 0.5.5-SNAPSHOT + 0.5.6-SNAPSHOT