Skip to content

Commit

Permalink
IGNITE-21863 Reduce memory consumption by performance statistics Quer…
Browse files Browse the repository at this point in the history
…yHandler (#258)
  • Loading branch information
alex-plekhanov authored Mar 29, 2024
1 parent ee9488d commit 8f90140
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import java.util.Collection;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -102,27 +100,42 @@ public class QueryHandler implements IgnitePerformanceStatisticsHandler {
new EnumMap<>(GridCacheQueryType.class);

/** {@inheritDoc} */
@Override public void query(UUID nodeId, GridCacheQueryType type, String text, long id, long startTime,
long duration, boolean success) {
Query query = new Query(type, text, nodeId, id, startTime, duration, success);
@Override public void query(
UUID nodeId,
GridCacheQueryType type,
String text,
long id,
long startTime,
long duration,
boolean success
) {
Query qry = new Query(type, text, nodeId, id, startTime, duration, success);

OrderedFixedSizeStructure<Long, Query> tree = topSlow.computeIfAbsent(type,
queryType -> new OrderedFixedSizeStructure<>());

tree.put(duration, query);

AggregatedQueryInfo info = aggrQuery.computeIfAbsent(type, queryType -> new HashMap<>())
.computeIfAbsent(text, queryText -> new AggregatedQueryInfo());

info.merge(nodeId, id, duration, success);
info.merge(duration, success);

Query evicted = tree.put(duration, qry);

if (evicted != null)
aggregateQuery(evicted);
}

/** {@inheritDoc} */
@Override public void queryReads(UUID nodeId, GridCacheQueryType type, UUID queryNodeId, long id, long logicalReads,
long physicalReads) {

@Override public void queryReads(
UUID nodeId,
GridCacheQueryType type,
UUID qryNodeId,
long id,
long logicalReads,
long physicalReads
) {
Map<Long, long[]> ids = readsById.computeIfAbsent(type, queryType -> new HashMap<>())
.computeIfAbsent(queryNodeId, node -> new HashMap<>());
.computeIfAbsent(qryNodeId, node -> new HashMap<>());

long[] readsArr = ids.computeIfAbsent(id, queryId -> new long[] {0, 0});

Expand Down Expand Up @@ -169,90 +182,105 @@ public class QueryHandler implements IgnitePerformanceStatisticsHandler {

/** {@inheritDoc} */
@Override public Map<String, JsonNode> results() {
ObjectNode sqlRes = MAPPER.createObjectNode();
ObjectNode scanRes = MAPPER.createObjectNode();
ObjectNode indexRes = MAPPER.createObjectNode();

buildResult(GridCacheQueryType.SQL_FIELDS, sqlRes);
buildResult(GridCacheQueryType.SCAN, scanRes);
buildResult(GridCacheQueryType.INDEX, indexRes);

ArrayNode topSlowSql = MAPPER.createArrayNode();
ArrayNode topSlowScan = MAPPER.createArrayNode();
ArrayNode topSlowIndex = MAPPER.createArrayNode();
ArrayNode topSlowIdx = MAPPER.createArrayNode();

buildTopSlowResult(GridCacheQueryType.SQL_FIELDS, topSlowSql);
buildTopSlowResult(GridCacheQueryType.SCAN, topSlowScan);
buildTopSlowResult(GridCacheQueryType.INDEX, topSlowIndex);
buildTopSlowResult(GridCacheQueryType.INDEX, topSlowIdx);

ObjectNode sqlRes = MAPPER.createObjectNode();
ObjectNode scanRes = MAPPER.createObjectNode();
ObjectNode idxRes = MAPPER.createObjectNode();

buildResult(GridCacheQueryType.SQL_FIELDS, sqlRes);
buildResult(GridCacheQueryType.SCAN, scanRes);
buildResult(GridCacheQueryType.INDEX, idxRes);

Map<String, JsonNode> res = new HashMap<>();

res.put("sql", sqlRes);
res.put("scan", scanRes);
res.put("index", indexRes);
res.put("index", idxRes);
res.put("topSlowSql", topSlowSql);
res.put("topSlowScan", topSlowScan);
res.put("topSlowIndex", topSlowIndex);
res.put("topSlowIndex", topSlowIdx);

return res;
}

/** Builds JSON. */
private void buildResult(GridCacheQueryType type, ObjectNode jsonRes) {
if (!aggrQuery.containsKey(type))
/**
* Aggregates query reads/rows/properties and remove detailed info.
*/
private void aggregateQuery(Query qry) {
if (!aggrQuery.containsKey(qry.type))
return;

Map<String, AggregatedQueryInfo> res = aggrQuery.get(type);
Map<String, AggregatedQueryInfo> typeAggrs = aggrQuery.get(qry.type);

res.forEach((text, info) -> {
info.ids.forEach((uuid, ids) -> {
if (readsById.containsKey(type) && readsById.get(type).containsKey(uuid)) {
Map<Long, long[]> reads = readsById.get(type).get(uuid);
AggregatedQueryInfo info = typeAggrs.get(qry.text);

ids.forEach(id -> {
long[] readsArr = reads.get(id);
// Reads.
Map<UUID, Map<Long, long[]>> typeReads = readsById.get(qry.type);
Map<Long, long[]> nodeReads = typeReads == null ? null : typeReads.get(qry.queryNodeId);
long[] qryReads = nodeReads == null ? null : nodeReads.remove(qry.id);

if (readsArr != null) {
info.logicalReads += readsArr[0];
info.physicalReads += readsArr[1];
}
});
}
if (qryReads != null) {
info.logicalReads += qryReads[0];
info.physicalReads += qryReads[1];
}

if (type == GridCacheQueryType.SQL_FIELDS) {
Map<Long, Map<String, long[]>> nodeRows = rowsById.get(uuid);
Map<Long, Map<String, T3<String, String, long[]>>> nodeProps = propsById.get(uuid);

ids.forEach(id -> {
Map<String, T3<String, String, long[]>> qryProps = nodeProps == null ? null : nodeProps.get(id);

if (!F.isEmpty(qryProps)) {
qryProps.forEach((propKey0, prop0) -> info.props.compute(propKey0, (propKey1, prop1) -> {
if (prop1 == null)
return new T3<>(prop0.get1(), prop0.get2(), new long[] {prop0.get3()[0]});
else {
prop1.get3()[0] += prop0.get3()[0];
return prop1;
}
}));
}

Map<String, long[]> qryRows = nodeRows == null ? null : nodeRows.get(id);

if (!F.isEmpty(qryRows)) {
qryRows.forEach((act0, rows0) -> info.rows.compute(act0, (act1, rows1) -> {
if (rows1 == null)
return new long[] {rows0[0]};
else {
rows1[0] += rows0[0];
return rows1;
}
}));
}
});
}
});
if (qry.type == GridCacheQueryType.SQL_FIELDS) {
// Properties.
Map<Long, Map<String, T3<String, String, long[]>>> nodeProps = propsById.get(qry.queryNodeId);
Map<String, T3<String, String, long[]>> qryProps = nodeProps == null ? null : nodeProps.remove(qry.id);

if (!F.isEmpty(qryProps)) {
qryProps.forEach((propKey0, prop0) -> info.props.compute(propKey0, (propKey1, prop1) -> {
if (prop1 == null)
return new T3<>(prop0.get1(), prop0.get2(), new long[] {prop0.get3()[0]});
else {
prop1.get3()[0] += prop0.get3()[0];
return prop1;
}
}));
}

// Rows.
Map<Long, Map<String, long[]>> nodeRows = rowsById.get(qry.queryNodeId);
Map<String, long[]> qryRows = nodeRows == null ? null : nodeRows.remove(qry.id);

if (!F.isEmpty(qryRows)) {
qryRows.forEach((act0, rows0) -> info.rows.compute(act0, (act1, rows1) -> {
if (rows1 == null)
return new long[] {rows0[0]};
else {
rows1[0] += rows0[0];
return rows1;
}
}));
}
}
}

/** Builds JSON. */
private void buildResult(GridCacheQueryType type, ObjectNode jsonRes) {
OrderedFixedSizeStructure<Long, Query> topSlowForType = topSlow.get(type);

// Up to this moment we've aggregated and removed detailed info for all queries except top slow. Now (after
// result for top slow queries was built) we can aggregate and remove details for top slow queries too.
if (topSlowForType != null) {
for (Query qry : topSlowForType.values())
aggregateQuery(qry);
}

Map<String, AggregatedQueryInfo> res = aggrQuery.get(type);

if (res == null)
return;

res.forEach((text, info) -> {
ObjectNode sql = (ObjectNode)jsonRes.get(text);

if (sql == null) {
Expand Down Expand Up @@ -374,19 +402,13 @@ private static class AggregatedQueryInfo {
/** Number of processed rows (by different actions). */
Map<String, long[]> rows = new TreeMap<>();

/** Query ids. Parsed from global query id: NodeId -> queryIds */
final Map<UUID, Set<Long>> ids = new HashMap<>();

/** */
public void merge(UUID queryNodeId, long id, long duration, boolean success) {
public void merge(long duration, boolean success) {
count += 1;
totalDuration += duration;

if (!success)
failures += 1;

ids.computeIfAbsent(queryNodeId, k -> new HashSet<>())
.add(id);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.ignite.internal.performancestatistics.util;

import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.jetbrains.annotations.Nullable;

/**
* Data structure for keeping the top N elements in DESC sort order.
Expand Down Expand Up @@ -52,19 +54,24 @@ public OrderedFixedSizeStructure(int capacity) {
/**
* @param key Key.
* @param value Value.
* @return Evicted value.
*/
public void put(K key, V value) {
public @Nullable V put(K key, V value) {
if (map.size() < capacity) {
map.put(key, value);

return;
return null;
}

if (map.firstKey().compareTo(key) < 0) {
map.pollFirstEntry();
Map.Entry<K, V> old = map.pollFirstEntry();

map.put(key, value);

return old.getValue();
}

return value;
}

/** @return Values. */
Expand Down

0 comments on commit 8f90140

Please sign in to comment.