Skip to content

Commit

Permalink
Support Union in Decoupled planning (apache#17354)
Browse files Browse the repository at this point in the history
* introduces `UnionQuery`
* some changes to enable a `UnionQuery` to have multiple input datasources
* `UnionQuery` execution is driven by the `QueryLogic` - which could later enable to reduce some complexity in `ClientQuerySegmentWalker`
* to run the subqueries of `UnionQuery` there was a need to access the `conglomerate` from the `Runner`; to enable that some refactors were done
* renamed `UnionQueryRunner` to `UnionDataSourceQueryRunner`
* `QueryRunnerFactoryConglomerate` have taken the place of `QueryToolChestWarehouse` which shaves of some unnecessary things here and there
* small cleanup/refactors
  • Loading branch information
kgyrtkirk authored Nov 5, 2024
1 parent ba76264 commit 2eac831
Show file tree
Hide file tree
Showing 107 changed files with 4,328 additions and 826 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
Expand Down Expand Up @@ -167,7 +165,6 @@ public class CachingClusteredClientBenchmark
@Param({"all", "minute"})
private String queryGranularity;

private QueryToolChestWarehouse toolChestWarehouse;
private QueryRunnerFactoryConglomerate conglomerate;
private CachingClusteredClient cachingClusteredClient;
private ExecutorService processingPool;
Expand Down Expand Up @@ -258,48 +255,37 @@ public int getNumThreads()
}
};

conglomerate = new DefaultQueryRunnerFactoryConglomerate(
ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
.put(
TimeseriesQuery.class,
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
conglomerate = DefaultQueryRunnerFactoryConglomerate.buildFromQueryRunnerFactories(ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
.put(
TimeseriesQuery.class,
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
.put(
TopNQuery.class,
new TopNQueryRunnerFactory(
new StupidPool<>(
"TopNQueryRunnerFactory-bufferPool",
() -> ByteBuffer.allocate(PROCESSING_BUFFER_SIZE)
),
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
.put(
TopNQuery.class,
new TopNQueryRunnerFactory(
new StupidPool<>(
"TopNQueryRunnerFactory-bufferPool",
() -> ByteBuffer.allocate(PROCESSING_BUFFER_SIZE)
),
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
.put(
GroupByQuery.class,
makeGroupByQueryRunnerFactory(
GroupByQueryRunnerTest.DEFAULT_MAPPER,
new GroupByQueryConfig()
{
},
processingConfig
)
)
.put(
GroupByQuery.class,
makeGroupByQueryRunnerFactory(
GroupByQueryRunnerTest.DEFAULT_MAPPER,
new GroupByQueryConfig()
{
},
processingConfig
)
.build()
);

toolChestWarehouse = new QueryToolChestWarehouse()
{
@Override
public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest(final QueryType query)
{
return conglomerate.findFactory(query).getToolchest();
}
};
)
.build());

SimpleServerView serverView = new SimpleServerView();
int serverSuffx = 1;
Expand All @@ -319,7 +305,7 @@ public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest
true
);
cachingClusteredClient = new CachingClusteredClient(
toolChestWarehouse,
conglomerate,
serverView,
MapCache.create(0),
JSON_MAPPER,
Expand Down Expand Up @@ -468,7 +454,7 @@ private <T> List<T> runQuery()
QueryRunner<T> theRunner = FluentQueryRunner
.create(
cachingClusteredClient.getQueryRunnerForIntervals(query, query.getIntervals()),
toolChestWarehouse.getToolChest(query)
conglomerate.getToolChest(query)
)
.applyPreMergeDecoration()
.mergeResults(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
Expand Down Expand Up @@ -307,7 +307,7 @@ public CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas)

DirectDruidClientFactory druidClientFactory = new DirectDruidClientFactory(
new NoopServiceEmitter(),
EasyMock.createMock(QueryToolChestWarehouse.class),
EasyMock.createMock(QueryRunnerFactoryConglomerate.class),
EasyMock.createMock(QueryWatcher.class),
getSmileMapper(),
EasyMock.createMock(HttpClient.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.Result;
import org.apache.druid.query.RetryQueryRunnerConfig;
import org.apache.druid.query.SegmentDescriptor;
Expand Down Expand Up @@ -104,7 +104,7 @@
public class MovingAverageQueryTest extends InitializedNullHandlingTest
{
private final ObjectMapper jsonMapper;
private final QueryToolChestWarehouse warehouse;
private final QueryRunnerFactoryConglomerate conglomerate;
private final RetryQueryRunnerConfig retryConfig;
private final ServerConfig serverConfig;

Expand Down Expand Up @@ -167,7 +167,7 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<Seg
Injector injector = Initialization.makeInjectorWithModules(baseInjector, modules);

jsonMapper = injector.getInstance(ObjectMapper.class);
warehouse = injector.getInstance(QueryToolChestWarehouse.class);
conglomerate = injector.getInstance(QueryRunnerFactoryConglomerate.class);
retryConfig = injector.getInstance(RetryQueryRunnerConfig.class);
serverConfig = injector.getInstance(ServerConfig.class);

Expand Down Expand Up @@ -321,7 +321,7 @@ public long getMaxQueuedBytes()
};

CachingClusteredClient baseClient = new CachingClusteredClient(
warehouse,
conglomerate,
new TimelineServerView()
{
@Override
Expand Down Expand Up @@ -375,7 +375,7 @@ public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback c
new NoopServiceEmitter(),
baseClient,
null /* local client; unused in this test, so pass in null */,
warehouse,
conglomerate,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
retryConfig,
jsonMapper,
Expand All @@ -392,7 +392,7 @@ public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback c
final Sequence<?> res = query.getRunner(walker).run(queryPlus);

List actualResults = new ArrayList();
actualResults = (List<MapBasedRow>) res.accumulate(actualResults, Accumulators.list());
actualResults = res.accumulate(actualResults, Accumulators.list());

expectedResults = consistentTypeCasting(expectedResults);
actualResults = consistentTypeCasting(actualResults);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2891,28 +2891,26 @@ private static DataSchema cloneDataSchema(final DataSchema dataSchema)
@Override
protected QueryRunnerFactoryConglomerate makeQueryRunnerConglomerate()
{
return new DefaultQueryRunnerFactoryConglomerate(
ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
.put(
TimeseriesQuery.class,
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(),
(query, future) -> {
// do nothing
}
)
return DefaultQueryRunnerFactoryConglomerate.buildFromQueryRunnerFactories(ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
.put(
TimeseriesQuery.class,
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(),
(query, future) -> {
// do nothing
}
)
.put(
ScanQuery.class,
new ScanQueryRunnerFactory(
new ScanQueryQueryToolChest(DefaultGenericQueryMetricsFactory.instance()),
new ScanQueryEngine(),
new ScanQueryConfig()
)
)
.put(
ScanQuery.class,
new ScanQueryRunnerFactory(
new ScanQueryQueryToolChest(DefaultGenericQueryMetricsFactory.instance()),
new ScanQueryEngine(),
new ScanQueryConfig()
)
.build()
);
)
.build());
}

private void makeToolboxFactory() throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2418,18 +2418,16 @@ private static DataSchema cloneDataSchema(final DataSchema dataSchema)
@Override
protected QueryRunnerFactoryConglomerate makeQueryRunnerConglomerate()
{
return new DefaultQueryRunnerFactoryConglomerate(
ImmutableMap.of(
TimeseriesQuery.class,
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(),
(query, future) -> {
// do nothing
}
)
return DefaultQueryRunnerFactoryConglomerate.buildFromQueryRunnerFactories(ImmutableMap.of(
TimeseriesQuery.class,
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(),
(query, future) -> {
// do nothing
}
)
);
));
}

private void makeToolboxFactory() throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,5 +523,4 @@ public String getErrorCode()

protected abstract DruidException makeException(DruidExceptionBuilder bob);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
*/
public class ArenaMemoryAllocatorFactory implements MemoryAllocatorFactory
{
private static final int FRAME_SIZE = 8_000_000;

private final int capacity;

public ArenaMemoryAllocatorFactory(final int capacity)
Expand All @@ -42,4 +44,9 @@ public long allocatorCapacity()
{
return capacity;
}

public static MemoryAllocatorFactory makeDefault()
{
return new ArenaMemoryAllocatorFactory(FRAME_SIZE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;

import java.io.Closeable;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -147,4 +146,9 @@ public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulat
return Yielders.done(initValue, null);
}
}

public static <T> Sequence<T> of(T... values)
{
return simple(Arrays.asList(values));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,58 @@

package org.apache.druid.query;

import com.google.common.collect.Maps;
import com.google.inject.Inject;

import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Map;

/**
*/
public class DefaultQueryRunnerFactoryConglomerate implements QueryRunnerFactoryConglomerate
{
private final Map<Class<? extends Query>, QueryRunnerFactory> factories;
private final Map<Class<? extends Query>, QueryToolChest> toolchests;
private final Map<Class<? extends Query>, QueryLogic> querylogics;

public static DefaultQueryRunnerFactoryConglomerate buildFromQueryRunnerFactories(
Map<Class<? extends Query>, QueryRunnerFactory> factories)
{
return new DefaultQueryRunnerFactoryConglomerate(
factories,
Maps.transformValues(factories, f -> f.getToolchest()),
Collections.emptyMap()
);
}

@Inject
public DefaultQueryRunnerFactoryConglomerate(Map<Class<? extends Query>, QueryRunnerFactory> factories)
public DefaultQueryRunnerFactoryConglomerate(
Map<Class<? extends Query>, QueryRunnerFactory> factories,
Map<Class<? extends Query>, QueryToolChest> toolchests,
Map<Class<? extends Query>, QueryLogic> querylogics)
{
// Accesses to IdentityHashMap should be faster than to HashMap or ImmutableMap.
// Class doesn't override Object.equals().
this.factories = new IdentityHashMap<>(factories);
this.toolchests = new IdentityHashMap<>(toolchests);
this.querylogics = new IdentityHashMap<>(querylogics);
}

@Override
@SuppressWarnings("unchecked")
public <T, QueryType extends Query<T>> QueryRunnerFactory<T, QueryType> findFactory(QueryType query)
{
return (QueryRunnerFactory<T, QueryType>) factories.get(query.getClass());
return factories.get(query.getClass());
}

@Override
@SuppressWarnings("unchecked")
public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest(QueryType query)
{
return toolchests.get(query.getClass());
}

@Override
@SuppressWarnings("unchecked")
public <T, QueryType extends Query<T>> QueryLogic getQueryLogic(QueryType query)
{
return querylogics.get(query.getClass());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public FluentQueryRunner<T> applyPostMergeDecoration()

public FluentQueryRunner<T> applyPreMergeDecoration()
{
return from(new UnionQueryRunner<>(toolChest.preMergeQueryDecoration(baseRunner)));
return from(new UnionDataSourceQueryRunner<>(toolChest.preMergeQueryDecoration(baseRunner)));
}

public FluentQueryRunner<T> emitCPUTimeMetric(ServiceEmitter emitter)
Expand Down
Loading

0 comments on commit 2eac831

Please sign in to comment.