Skip to content

Commit

Permalink
IGNITE-21990 Fix performance statistics query records aggregation - F…
Browse files Browse the repository at this point in the history
…ixes #259.

Signed-off-by: Aleksey Plekhanov <plehanov.alex@gmail.com>
  • Loading branch information
alex-plekhanov committed May 30, 2024
1 parent 64aade8 commit 42522a6
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 39 deletions.
2 changes: 1 addition & 1 deletion modules/performance-statistics-ext/report/js/sqlTab.js
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ function buildPropertiesSubTable($el, properties) {

$.each(properties, function (k, prop) {
data.push({
name: k,
name: prop["name"],
value: prop["value"],
count: prop["count"]
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@
* "logicalReads" : $logicalReads,
* "physicalReads" : $physicalReads,
* "failures" : $failures,
* "properties" : {
* $propName : {"value" : $propValue, "count" : $propCount},
* "properties" : [
* {"name" : $propName, "value" : $propValue, "count" : $propCount},
* ...
* },
* ],
* "rows" : {
* $action : $rowsCount,
* ...
Expand All @@ -68,10 +68,10 @@
* "logicalReads" : $logicalReads,
* "physicalReads" : $physicalReads,
* "success" : $success,
* "properties" : {
* $propName : {"value" : $propValue, "count" : $propCount},
* "properties" : [
* {"name" : $propName, "value" : $propValue, "count" : $propCount},
* ...
* },
* ],
* "rows" : {
* $action : $rowsCount,
* ...
Expand All @@ -85,6 +85,10 @@ public class QueryHandler implements IgnitePerformanceStatisticsHandler {
private final Map<GridCacheQueryType, Map<String, AggregatedQueryInfo>> aggrQuery =
new EnumMap<>(GridCacheQueryType.class);

/** Queries results: queryType -> nodeId -> queryId -> aggregatedInfo. */
private final Map<GridCacheQueryType, Map<UUID, Map<Long, AggregatedQueryInfo>>> aggrQryById =
new EnumMap<>(GridCacheQueryType.class);

/** Parsed reads: queryType -> queryNodeId -> queryId -> reads. */
private final Map<GridCacheQueryType, Map<UUID, Map<Long, long[]>>> readsById =
new EnumMap<>(GridCacheQueryType.class);
Expand Down Expand Up @@ -134,6 +138,13 @@ public class QueryHandler implements IgnitePerformanceStatisticsHandler {
long logicalReads,
long physicalReads
) {
AggregatedQueryInfo info = aggregatedQueryInfoById(type, qryNodeId, id);

if (info != null) {
info.mergeReads(logicalReads, physicalReads);
return;
}

Map<Long, long[]> ids = readsById.computeIfAbsent(type, queryType -> new HashMap<>())
.computeIfAbsent(qryNodeId, node -> new HashMap<>());

Expand All @@ -152,6 +163,13 @@ public class QueryHandler implements IgnitePerformanceStatisticsHandler {
String action,
long rows
) {
AggregatedQueryInfo info = aggregatedQueryInfoById(type, qryNodeId, id);

if (info != null) {
info.mergeRows(action.intern(), rows);
return;
}

Map<String, long[]> actions = rowsById.computeIfAbsent(qryNodeId, node -> new HashMap<>())
.computeIfAbsent(id, qryId -> new HashMap<>());

Expand All @@ -169,11 +187,18 @@ public class QueryHandler implements IgnitePerformanceStatisticsHandler {
String name,
String val
) {
String key = (name + '=' + val).intern();

AggregatedQueryInfo info = aggregatedQueryInfoById(type, qryNodeId, id);

if (info != null) {
info.mergeProperty(key, name.intern(), val.intern(), 1);
return;
}

Map<String, T3<String, String, long[]>> props = propsById.computeIfAbsent(qryNodeId, node -> new HashMap<>())
.computeIfAbsent(id, qryId -> new HashMap<>());

String key = (name + '=' + val).intern();

T3<String, String, long[]> prop = props.computeIfAbsent(key,
nv -> new T3<>(name.intern(), val.intern(), new long[] {0}));

Expand Down Expand Up @@ -210,6 +235,19 @@ public class QueryHandler implements IgnitePerformanceStatisticsHandler {
return res;
}

/**
* Gets aggregeted query info by global query id.
* @param type Query type.
* @param nodeId Query originator node id.
* @param id Query id.
* @return Aggregated query info.
*/
private AggregatedQueryInfo aggregatedQueryInfoById(GridCacheQueryType type, UUID nodeId, long id) {
Map<UUID, Map<Long, AggregatedQueryInfo>> typeAggrs = aggrQryById.get(type);
Map<Long, AggregatedQueryInfo> nodeAggrs = typeAggrs == null ? null : typeAggrs.get(nodeId);
return nodeAggrs == null ? null : nodeAggrs.get(id);
}

/**
* Aggregates query reads/rows/properties and remove detailed info.
*/
Expand All @@ -221,46 +259,32 @@ private void aggregateQuery(Query qry) {

AggregatedQueryInfo info = typeAggrs.get(qry.text);

aggrQryById.computeIfAbsent(qry.type, t -> new HashMap<>())
.computeIfAbsent(qry.queryNodeId, n -> new HashMap<>())
.put(qry.id, info);

// Reads.
Map<UUID, Map<Long, long[]>> typeReads = readsById.get(qry.type);
Map<Long, long[]> nodeReads = typeReads == null ? null : typeReads.get(qry.queryNodeId);
long[] qryReads = nodeReads == null ? null : nodeReads.remove(qry.id);

if (qryReads != null) {
info.logicalReads += qryReads[0];
info.physicalReads += qryReads[1];
}
if (qryReads != null)
info.mergeReads(qryReads[0], qryReads[1]);

if (qry.type == GridCacheQueryType.SQL_FIELDS) {
// Properties.
Map<Long, Map<String, T3<String, String, long[]>>> nodeProps = propsById.get(qry.queryNodeId);
Map<String, T3<String, String, long[]>> qryProps = nodeProps == null ? null : nodeProps.remove(qry.id);

if (!F.isEmpty(qryProps)) {
qryProps.forEach((propKey0, prop0) -> info.props.compute(propKey0, (propKey1, prop1) -> {
if (prop1 == null)
return new T3<>(prop0.get1(), prop0.get2(), new long[] {prop0.get3()[0]});
else {
prop1.get3()[0] += prop0.get3()[0];
return prop1;
}
}));
}
if (!F.isEmpty(qryProps))
qryProps.forEach((propKey, prop) -> info.mergeProperty(propKey, prop.get1(), prop.get2(), prop.get3()[0]));

// Rows.
Map<Long, Map<String, long[]>> nodeRows = rowsById.get(qry.queryNodeId);
Map<String, long[]> qryRows = nodeRows == null ? null : nodeRows.remove(qry.id);

if (!F.isEmpty(qryRows)) {
qryRows.forEach((act0, rows0) -> info.rows.compute(act0, (act1, rows1) -> {
if (rows1 == null)
return new long[] {rows0[0]};
else {
rows1[0] += rows0[0];
return rows1;
}
}));
}
if (!F.isEmpty(qryRows))
qryRows.forEach((act, rows) -> info.mergeRows(act, rows[0]));
}
}

Expand Down Expand Up @@ -293,15 +317,16 @@ private void buildResult(GridCacheQueryType type, ObjectNode jsonRes) {
sql.put("failures", info.failures);

if (!F.isEmpty(info.props)) {
ObjectNode node = MAPPER.createObjectNode();
ArrayNode node = MAPPER.createArrayNode();

info.props.forEach((propKey, prop) -> {
ObjectNode valCntNode = MAPPER.createObjectNode();

valCntNode.put("name", prop.get1());
valCntNode.put("value", prop.get2());
valCntNode.put("count", prop.get3()[0]);

node.putIfAbsent(prop.get1(), valCntNode);
node.add(valCntNode);
});

sql.putIfAbsent("properties", node);
Expand Down Expand Up @@ -351,16 +376,17 @@ private void buildTopSlowResult(GridCacheQueryType type, ArrayNode jsonRes) {

if (type == GridCacheQueryType.SQL_FIELDS) {
if (propsById.containsKey(query.queryNodeId) && propsById.get(query.queryNodeId).containsKey(query.id)) {
ObjectNode node = MAPPER.createObjectNode();
ArrayNode node = MAPPER.createArrayNode();

Collection<T3<String, String, long[]>> props = propsById.get(query.queryNodeId).get(query.id).values();

props.forEach(prop -> {
ObjectNode valCntNode = MAPPER.createObjectNode();
valCntNode.put("name", prop.get1());
valCntNode.put("value", prop.get2());
valCntNode.put("count", prop.get3()[0]);

node.putIfAbsent(prop.get1(), valCntNode);
node.add(valCntNode);
});

json.putIfAbsent("properties", node);
Expand Down Expand Up @@ -410,6 +436,23 @@ public void merge(long duration, boolean success) {
if (!success)
failures += 1;
}

/** */
public void mergeReads(long logicalReads, long physicalReads) {
this.logicalReads += logicalReads;
this.physicalReads += physicalReads;
}

/** */
public void mergeRows(String action, long cnt) {
rows.computeIfAbsent(action, act -> new long[] {0})[0] += cnt;
}

/** */
public void mergeProperty(String key, String name, String val, long cnt) {
props.computeIfAbsent(key, k -> new T3<>(name, val, new long[] {0})).get3()[0] += cnt;
}

}

/** Query. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
Expand All @@ -36,8 +39,11 @@
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.performancestatistics.handlers.QueryHandler;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand All @@ -47,7 +53,9 @@
import static org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.waitForStatisticsEnabled;
import static org.apache.ignite.internal.processors.performancestatistics.FilePerformanceStatisticsWriter.PERF_STAT_DIR;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
import static org.apache.ignite.testframework.junits.GridAbstractTest.LOCAL_IP_FINDER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

/**
Expand All @@ -58,10 +66,12 @@ public class PerformanceStatisticsReportSelfTest {
@Test
public void testCreateReport() throws Exception {
try (
Ignite srv = Ignition.start(new IgniteConfiguration().setIgniteInstanceName("srv"));
Ignite srv = Ignition.start(new IgniteConfiguration().setIgniteInstanceName("srv")
.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(LOCAL_IP_FINDER)));

IgniteEx client = (IgniteEx)Ignition.start(new IgniteConfiguration()
.setIgniteInstanceName("client")
.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(LOCAL_IP_FINDER))
.setClientMode(true))
) {
client.context().performanceStatistics().startCollectStatistics();
Expand Down Expand Up @@ -106,7 +116,8 @@ public void testCreateReport() throws Exception {

cache.query(new SqlFieldsQuery("select * from sys.tables").setEnforceJoinOrder(true)).getAll();

cache.query(new SqlFieldsQuery("select sum(_VAL) from \"cache\".Integer")).getAll();
for (int i = 0; i < 100; i++)
cache.query(new SqlFieldsQuery("select sum(_VAL) from \"cache\".Integer")).getAll();

cache.query(new IndexQuery<>(Integer.class).setCriteria(gt("_KEY", 0))).getAll();

Expand Down Expand Up @@ -139,6 +150,86 @@ public void testCreateReport() throws Exception {
}
}

/** @throws Exception If failed. */
@Test
public void testQueryHandlerAggregation() throws Exception {
QueryHandler qryHnd = new QueryHandler();

for (int nodeIdx = 0; nodeIdx < 10; nodeIdx++) {
UUID nodeId = new UUID(0, nodeIdx);

for (long id = 0; id < 1000; id++) {
String text = "query" + (id / 100);
UUID origNodeId = new UUID(0, id % 10);
qryHnd.queryReads(nodeId, GridCacheQueryType.SQL_FIELDS, origNodeId, id, 1, 1);
qryHnd.queryRows(nodeId, GridCacheQueryType.SQL_FIELDS, origNodeId, id, "ROWS", 1);
qryHnd.queryRows(nodeId, GridCacheQueryType.SQL_FIELDS, origNodeId, id, "ROWSx2", 2);
qryHnd.queryProperty(nodeId, GridCacheQueryType.SQL_FIELDS, origNodeId, id, "prop1", "val1");
qryHnd.queryProperty(nodeId, GridCacheQueryType.SQL_FIELDS, origNodeId, id, "prop1", "val2");
qryHnd.queryProperty(nodeId, GridCacheQueryType.SQL_FIELDS, origNodeId, id, "prop2", "val2");
if (nodeId.equals(origNodeId)) {
qryHnd.query(nodeId, GridCacheQueryType.SQL_FIELDS, text, id, 0,
TimeUnit.MILLISECONDS.toNanos(id), true);
}
}
}

Map<String, JsonNode> res = qryHnd.results();
JsonNode aggrSql = res.get("sql");
assertEquals(10, aggrSql.size());

for (int i = 0; i < 10; i++) {
JsonNode aggrQry = aggrSql.get("query" + i);
assertNotNull(aggrQry);
assertEquals(100, aggrQry.get("count").asInt());

assertEquals(1000, aggrQry.get("logicalReads").asInt());
assertEquals(1000, aggrQry.get("physicalReads").asInt());

JsonNode props = aggrQry.get("properties");
assertNotNull(props);
assertEquals(3, props.size());
for (int j = 0; j < 3; j++) {
JsonNode prop = props.get(j);
assertNotNull(prop);
assertTrue(prop.get("name").asText().startsWith("prop"));
assertTrue(prop.get("value").asText().startsWith("val"));
assertEquals(1000, prop.get("count").asInt());
}

JsonNode rows = aggrQry.get("rows");
assertNotNull(rows);
assertEquals(1000, rows.get("ROWS").asInt());
assertEquals(2000, rows.get("ROWSx2").asInt());
}

JsonNode slowSql = res.get("topSlowSql");
assertEquals(30, slowSql.size());

for (int i = 0; i < 30; i++) {
JsonNode slowQry = slowSql.get(i);
assertNotNull(slowQry);
assertEquals(10, slowQry.get("logicalReads").asInt());
assertEquals(10, slowQry.get("physicalReads").asInt());

JsonNode props = slowQry.get("properties");
assertNotNull(props);
assertEquals(3, props.size());
for (int j = 0; j < 3; j++) {
JsonNode prop = props.get(j);
assertNotNull(prop);
assertTrue(prop.get("name").asText().startsWith("prop"));
assertTrue(prop.get("value").asText().startsWith("val"));
assertEquals(10, prop.get("count").asInt());
}

JsonNode rows = slowQry.get("rows");
assertNotNull(rows);
assertEquals(10, rows.get("ROWS").asInt());
assertEquals(20, rows.get("ROWSx2").asInt());
}
}

/** */
private static class TaskWithoutJobs extends ComputeTaskAdapter<Object, Object> {
/** {@inheritDoc} */
Expand Down

0 comments on commit 42522a6

Please sign in to comment.