Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,33 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true),

PROACTIVE_CLUSTER_CHANGE_CHECK("proactiveClusterChangeCheck", true),
DIRECT_MEMORY_OOM("directMemoryOOMCount", true);
DIRECT_MEMORY_OOM("directMemoryOOMCount", true),

/**
* How many queries with joins have been executed.
* <p>
* For each query with at least one join, this meter is increased exactly once.
*/
QUERIES_WITH_JOINS("queries", true),
/**
* How many joins have been executed.
* <p>
* For each query with at least one join, this meter is increased as many times as joins in the query.
*/
JOIN_COUNT("queries", true),
/**
* How many queries with window functions have been executed.
* <p>
* For each query with at least one window function, this meter is increased exactly once.
*/
QUERIES_WITH_WINDOW("queries", true),
/**
* How many window functions have been executed.
* <p>
* For each query with at least one window function, this meter is increased as many times as window functions in the
* query.
*/
WINDOW_COUNT("queries", true),;

private final String _brokerMeterName;
private final String _unit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pinot.spi.metrics.NoopPinotMetricsRegistry;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;

import static org.apache.pinot.spi.utils.CommonConstants.Broker.DEFAULT_ENABLE_TABLE_LEVEL_METRICS;
Expand All @@ -33,22 +34,21 @@
*/
public class BrokerMetrics extends AbstractMetrics<BrokerQueryPhase, BrokerMeter, BrokerGauge, BrokerTimer> {

private static final AtomicReference<BrokerMetrics> BROKER_METRICS_INSTANCE = new AtomicReference<>();
private static final BrokerMetrics NOOP = new BrokerMetrics(new NoopPinotMetricsRegistry());
private static final AtomicReference<BrokerMetrics> BROKER_METRICS_INSTANCE = new AtomicReference<>(NOOP);

/**
* register the brokerMetrics onto this class, so that we don't need to pass it down as a parameter
*/
public static boolean register(BrokerMetrics brokerMetrics) {
return BROKER_METRICS_INSTANCE.compareAndSet(null, brokerMetrics);
return BROKER_METRICS_INSTANCE.compareAndSet(NOOP, brokerMetrics);
}

/**
* should always call after registration
*/
public static BrokerMetrics get() {
BrokerMetrics ret = BROKER_METRICS_INSTANCE.get();
assert ret != null;
return ret;
return BROKER_METRICS_INSTANCE.get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.common.metrics;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.pinot.spi.metrics.NoopPinotMetricsRegistry;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;

import static org.apache.pinot.spi.utils.CommonConstants.Controller.DEFAULT_METRICS_PREFIX;
Expand All @@ -29,10 +30,11 @@
*/
public class ControllerMetrics
extends AbstractMetrics<AbstractMetrics.QueryPhase, ControllerMeter, ControllerGauge, ControllerTimer> {
private static final AtomicReference<ControllerMetrics> CONTROLLER_METRICS_INSTANCE = new AtomicReference<>();
private static final ControllerMetrics NOOP = new ControllerMetrics(new NoopPinotMetricsRegistry());
private static final AtomicReference<ControllerMetrics> CONTROLLER_METRICS_INSTANCE = new AtomicReference<>(NOOP);

public static boolean register(ControllerMetrics controllerMetrics) {
return CONTROLLER_METRICS_INSTANCE.compareAndSet(null, controllerMetrics);
return CONTROLLER_METRICS_INSTANCE.compareAndSet(NOOP, controllerMetrics);
}

public static ControllerMetrics get() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@
package org.apache.pinot.common.metrics;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.pinot.spi.metrics.NoopPinotMetricsRegistry;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.utils.CommonConstants;


public class MinionMetrics extends AbstractMetrics<MinionQueryPhase, MinionMeter, MinionGauge, MinionTimer> {
private static final AtomicReference<MinionMetrics> MINION_METRICS_INSTANCE = new AtomicReference<>();
private static final MinionMetrics NOOP = new MinionMetrics(new NoopPinotMetricsRegistry());
private static final AtomicReference<MinionMetrics> MINION_METRICS_INSTANCE = new AtomicReference<>(NOOP);

public static boolean register(MinionMetrics minionMetrics) {
return MINION_METRICS_INSTANCE.compareAndSet(null, minionMetrics);
return MINION_METRICS_INSTANCE.compareAndSet(NOOP, minionMetrics);
}

public static MinionMetrics get() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pinot.spi.metrics.NoopPinotMetricsRegistry;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;

import static org.apache.pinot.spi.utils.CommonConstants.Server.DEFAULT_ENABLE_TABLE_LEVEL_METRICS;
Expand All @@ -34,18 +35,20 @@
*/
public class ServerMetrics extends AbstractMetrics<ServerQueryPhase, ServerMeter, ServerGauge, ServerTimer> {

private static final AtomicReference<ServerMetrics> SERVER_METRICS_INSTANCE = new AtomicReference<>();
private static final ServerMetrics NOOP = new ServerMetrics(new NoopPinotMetricsRegistry());

private static final AtomicReference<ServerMetrics> SERVER_METRICS_INSTANCE = new AtomicReference<>(NOOP);

/**
* register the serverMetrics onto this class, so that we don't need to pass it down as a parameter
*/
public static boolean register(ServerMetrics serverMetrics) {
return SERVER_METRICS_INSTANCE.compareAndSet(null, serverMetrics);
return SERVER_METRICS_INSTANCE.compareAndSet(NOOP, serverMetrics);
}

@VisibleForTesting
public static void deregister() {
SERVER_METRICS_INSTANCE.set(null);
SERVER_METRICS_INSTANCE.set(NOOP);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ private RelNode optimize(RelRoot relRoot, PlannerContext plannerContext) {
}

private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContext plannerContext, long requestId) {
SubPlan plan = PinotLogicalQueryPlanner.makePlan(relRoot);
PinotLogicalQueryPlanner logicalQueryPlanner = new PinotLogicalQueryPlanner();
SubPlan plan = logicalQueryPlanner.makePlan(relRoot);
PinotDispatchPlanner pinotDispatchPlanner =
new PinotDispatchPlanner(plannerContext, _workerManager, requestId, _tableCache);
return pinotDispatchPlanner.createDispatchableSubPlan(plan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,30 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.SubPlan;
import org.apache.pinot.query.planner.SubPlanMetadata;
import org.apache.pinot.query.planner.plannode.JoinNode;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.planner.plannode.WindowNode;


/**
* PinotLogicalQueryPlanner walks top-down from {@link RelRoot} and construct a forest of trees with {@link PlanNode}.
*/
public class PinotLogicalQueryPlanner {
private PinotLogicalQueryPlanner() {
}

private boolean _windowFunctionFound = false;
private boolean _joinFound = false;

/**
* Converts a Calcite {@link RelRoot} into a Pinot {@link SubPlan}.
*/
public static SubPlan makePlan(RelRoot relRoot) {
public SubPlan makePlan(RelRoot relRoot) {
PlanNode rootNode = relNodeToPlanNode(relRoot.rel);
PlanFragment rootFragment = planNodeToPlanFragment(rootNode);
return new SubPlan(rootFragment,
Expand Down Expand Up @@ -79,16 +84,34 @@ public static SubPlan makePlan(RelRoot relRoot) {
// return subPlanMap.get(0);
}

private static PlanNode relNodeToPlanNode(RelNode node) {
private PlanNode relNodeToPlanNode(RelNode node) {
PlanNode planNode = RelToPlanNodeConverter.toPlanNode(node, -1);

if (planNode instanceof JoinNode) {
BrokerMetrics brokerMetrics = BrokerMetrics.get();
brokerMetrics.addMeteredGlobalValue(BrokerMeter.JOIN_COUNT, 1);
if (!_joinFound) {
brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERIES_WITH_JOINS, 1);
_joinFound = true;
}
}
if (planNode instanceof WindowNode) {
BrokerMetrics brokerMetrics = BrokerMetrics.get();
brokerMetrics.addMeteredGlobalValue(BrokerMeter.WINDOW_COUNT, 1);
if (!_windowFunctionFound) {
brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERIES_WITH_WINDOW, 1);
_windowFunctionFound = true;
}
}

List<RelNode> inputs = node.getInputs();
for (RelNode input : inputs) {
planNode.addInput(relNodeToPlanNode(input));
}
return planNode;
}

private static PlanFragment planNodeToPlanFragment(PlanNode node) {
private PlanFragment planNodeToPlanFragment(PlanNode node) {
PlanFragmenter fragmenter = new PlanFragmenter();
PlanFragmenter.Context fragmenterContext = fragmenter.createContext();
node = node.visit(fragmenter, fragmenterContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.function.Function;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.NoopPinotMetricsRegistry;
import org.apache.pinot.spi.metrics.PinotGauge;
import org.apache.pinot.spi.metrics.PinotJmxReporter;
import org.apache.pinot.spi.metrics.PinotMetricName;
Expand Down Expand Up @@ -60,4 +61,37 @@ public interface PinotMetricsFactory {
* Returns the name of metrics factory.
*/
String getMetricsFactoryName();

class Noop implements PinotMetricsFactory {
private final NoopPinotMetricsRegistry _registry = new NoopPinotMetricsRegistry();
@Override
public void init(PinotConfiguration metricsConfiguration) {
}

@Override
public PinotMetricsRegistry getPinotMetricsRegistry() {
return _registry;
}

@Override
public PinotMetricName makePinotMetricName(Class<?> klass, String name) {
return () -> "noopMetricName";
}

@Override
public <T> PinotGauge<T> makePinotGauge(Function<Void, T> condition) {
return _registry.newGauge();
}

@Override
public PinotJmxReporter makePinotJmxReporter(PinotMetricsRegistry metricsRegistry) {
return () -> {
};
}

@Override
public String getMetricsFactoryName() {
return "noop";
}
}
}
Loading