Skip to content

Commit

Permalink
[Profiling] Check caches before querying (#143089)
Browse files Browse the repository at this point in the history
* Add LRU cache for stackframes

* Add LRU cache for executables

* Remove splitting mgets into chunks

* Move LRU cache for stacktraces before query

* Summarize cache and query results

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
jbcrail and kibanamachine authored Oct 17, 2022
1 parent 39d1934 commit 35e8170
Showing 1 changed file with 148 additions and 79 deletions.
227 changes: 148 additions & 79 deletions x-pack/plugins/profiling/server/routes/stacktrace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
*/

import type { Logger } from '@kbn/core/server';
import { chunk } from 'lodash';
import LRUCache from 'lru-cache';
import { INDEX_EXECUTABLES, INDEX_FRAMES, INDEX_TRACES } from '../../common';
import {
Expand All @@ -32,8 +31,6 @@ import { withProfilingSpan } from '../utils/with_profiling_span';
import { DownsampledEventsIndex } from './downsampling';
import { ProjectTimeQuery } from './query';

const traceLRU = new LRUCache<StackTraceID, StackTrace>({ max: 20000 });

const BASE64_FRAME_ID_LENGTH = 32;

export type EncodedStackTrace = DedotObject<{
Expand Down Expand Up @@ -236,80 +233,102 @@ export async function searchEventsGroupByStackTrace({
return { totalCount, stackTraceEvents };
}

function summarizeCacheAndQuery(
logger: Logger,
name: string,
cacheHits: number,
cacheTotal: number,
queryHits: number,
queryTotal: number
) {
logger.info(`found ${cacheHits} out of ${cacheTotal} ${name} in the cache`);
if (cacheHits === cacheTotal) {
return;
}
logger.info(`found ${queryHits} out of ${queryTotal} ${name}`);
if (queryHits < queryTotal) {
logger.info(`failed to find ${queryTotal - queryHits} ${name}`);
}
}

const traceLRU = new LRUCache<StackTraceID, StackTrace>({ max: 20000 });

export async function mgetStackTraces({
logger,
client,
events,
concurrency = 1,
}: {
logger: Logger;
client: ProfilingESClient;
events: Map<StackTraceID, number>;
concurrency?: number;
}) {
const stackTraceIDs = [...events.keys()];
const chunkSize = Math.floor(events.size / concurrency);
let chunks = chunk(stackTraceIDs, chunkSize);

if (chunks.length !== concurrency) {
// The last array element contains the remainder, just drop it as irrelevant.
chunks = chunks.slice(0, concurrency);
}

const stackResponses = await withProfilingSpan('mget_stacktraces', () =>
Promise.all(
chunks.map((ids) => {
return client.mget<
PickFlattened<
ProfilingStackTrace,
ProfilingESField.StacktraceFrameIDs | ProfilingESField.StacktraceFrameTypes
>
>('mget_stacktraces_chunk', {
index: INDEX_TRACES,
ids,
realtime: true,
_source_includes: [
ProfilingESField.StacktraceFrameIDs,
ProfilingESField.StacktraceFrameTypes,
],
});
})
)
);
const stackTraceIDs = new Set([...events.keys()]);
const stackTraces = new Map<StackTraceID, StackTrace>();

let cacheHits = 0;
let totalFrames = 0;
const stackTraces = new Map<StackTraceID, StackTrace>();
const stackFrameDocIDs = new Set<string>();
const executableDocIDs = new Set<string>();

for (const stackTraceID of stackTraceIDs) {
const stackTrace = traceLRU.get(stackTraceID);
if (stackTrace) {
cacheHits++;
stackTraceIDs.delete(stackTraceID);
stackTraces.set(stackTraceID, stackTrace);

totalFrames += stackTrace.FrameIDs.length;
for (const frameID of stackTrace.FrameIDs) {
stackFrameDocIDs.add(frameID);
}
for (const fileID of stackTrace.FileIDs) {
executableDocIDs.add(fileID);
}
}
}

if (stackTraceIDs.size === 0) {
summarizeCacheAndQuery(logger, 'stacktraces', cacheHits, events.size, 0, 0);
return { stackTraces, totalFrames, stackFrameDocIDs, executableDocIDs };
}

const stackResponses = await client.mget<
PickFlattened<
ProfilingStackTrace,
ProfilingESField.StacktraceFrameIDs | ProfilingESField.StacktraceFrameTypes
>
>('mget_stacktraces', {
index: INDEX_TRACES,
ids: [...stackTraceIDs],
realtime: true,
_source_includes: [ProfilingESField.StacktraceFrameIDs, ProfilingESField.StacktraceFrameTypes],
});

let queryHits = 0;
const t0 = Date.now();

await withProfilingSpan('decode_stacktraces', async () => {
// flatMap() is significantly slower than an explicit for loop
for (const res of stackResponses) {
for (const trace of res.docs) {
if ('error' in trace) {
continue;
for (const trace of stackResponses.docs) {
if ('error' in trace) {
continue;
}
// Sometimes we don't find the trace.
// This is due to ES delays writing (data is not immediately seen after write).
// Also, ES doesn't know about transactions.
if (trace.found) {
queryHits++;
const traceid = trace._id as StackTraceID;
const stackTrace = decodeStackTrace(trace._source as EncodedStackTrace);

stackTraces.set(traceid, stackTrace);
traceLRU.set(traceid, stackTrace);

totalFrames += stackTrace.FrameIDs.length;
for (const frameID of stackTrace.FrameIDs) {
stackFrameDocIDs.add(frameID);
}
// Sometimes we don't find the trace.
// This is due to ES delays writing (data is not immediately seen after write).
// Also, ES doesn't know about transactions.
if (trace.found) {
const traceid = trace._id as StackTraceID;
let stackTrace = traceLRU.get(traceid) as StackTrace;
if (!stackTrace) {
stackTrace = decodeStackTrace(trace._source as EncodedStackTrace);
traceLRU.set(traceid, stackTrace);
}

totalFrames += stackTrace.FrameIDs.length;
stackTraces.set(traceid, stackTrace);
for (const frameID of stackTrace.FrameIDs) {
stackFrameDocIDs.add(frameID);
}
for (const fileID of stackTrace.FileIDs) {
executableDocIDs.add(fileID);
}
for (const fileID of stackTrace.FileIDs) {
executableDocIDs.add(fileID);
}
}
}
Expand All @@ -321,15 +340,20 @@ export async function mgetStackTraces({
logger.info('Average size of stacktrace: ' + totalFrames / stackTraces.size);
}

if (stackTraces.size < events.size) {
logger.info(
'failed to find ' + (events.size - stackTraces.size) + ' stacktraces (todo: find out why)'
);
}
summarizeCacheAndQuery(
logger,
'stacktraces',
cacheHits,
events.size,
queryHits,
stackTraceIDs.size
);

return { stackTraces, totalFrames, stackFrameDocIDs, executableDocIDs };
}

const frameLRU = new LRUCache<StackFrameID, StackFrame>({ max: 100000 });

export async function mgetStackFrames({
logger,
client,
Expand All @@ -341,7 +365,20 @@ export async function mgetStackFrames({
}): Promise<Map<StackFrameID, StackFrame>> {
const stackFrames = new Map<StackFrameID, StackFrame>();

let cacheHits = 0;
const cacheTotal = stackFrameIDs.size;

for (const stackFrameID of stackFrameIDs) {
const stackFrame = frameLRU.get(stackFrameID);
if (stackFrame) {
cacheHits++;
stackFrames.set(stackFrameID, stackFrame);
stackFrameIDs.delete(stackFrameID);
}
}

if (stackFrameIDs.size === 0) {
summarizeCacheAndQuery(logger, 'frames', cacheHits, cacheTotal, 0, 0);
return stackFrames;
}

Expand All @@ -352,33 +389,40 @@ export async function mgetStackFrames({
});

// Create a lookup map StackFrameID -> StackFrame.
let framesFound = 0;
let queryHits = 0;
const t0 = Date.now();
const docs = resStackFrames.docs;
for (const frame of docs) {
if ('error' in frame) {
continue;
}
if (frame.found) {
stackFrames.set(frame._id, {
queryHits++;
const stackFrame = {
FileName: frame._source!.Stackframe.file?.name,
FunctionName: frame._source!.Stackframe.function?.name,
FunctionOffset: frame._source!.Stackframe.function?.offset,
LineNumber: frame._source!.Stackframe.line?.number,
SourceType: frame._source!.Stackframe.source?.type,
});
framesFound++;
} else {
stackFrames.set(frame._id, emptyStackFrame);
};
stackFrames.set(frame._id, stackFrame);
frameLRU.set(frame._id, stackFrame);
continue;
}

stackFrames.set(frame._id, emptyStackFrame);
frameLRU.set(frame._id, emptyStackFrame);
}

logger.info(`processing data took ${Date.now() - t0} ms`);

logger.info('found ' + framesFound + ' / ' + stackFrameIDs.size + ' frames');
summarizeCacheAndQuery(logger, 'frames', cacheHits, cacheTotal, queryHits, stackFrameIDs.size);

return stackFrames;
}

const executableLRU = new LRUCache<FileID, Executable>({ max: 100000 });

export async function mgetExecutables({
logger,
client,
Expand All @@ -390,7 +434,20 @@ export async function mgetExecutables({
}): Promise<Map<FileID, Executable>> {
const executables = new Map<FileID, Executable>();

let cacheHits = 0;
const cacheTotal = executableIDs.size;

for (const fileID of executableIDs) {
const executable = executableLRU.get(fileID);
if (executable) {
cacheHits++;
executables.set(fileID, executable);
executableIDs.delete(fileID);
}
}

if (executableIDs.size === 0) {
summarizeCacheAndQuery(logger, 'frames', cacheHits, cacheTotal, 0, 0);
return executables;
}

Expand All @@ -400,26 +457,38 @@ export async function mgetExecutables({
_source_includes: [ProfilingESField.ExecutableFileName],
});

// Create a lookup map StackFrameID -> StackFrame.
let exeFound = 0;
// Create a lookup map FileID -> Executable.
let queryHits = 0;
const t0 = Date.now();
const docs = resExecutables.docs;
for (const exe of docs) {
if ('error' in exe) {
continue;
}
if (exe.found) {
executables.set(exe._id, {
queryHits++;
const executable = {
FileName: exe._source!.Executable.file.name,
});
exeFound++;
} else {
executables.set(exe._id, emptyExecutable);
};
executables.set(exe._id, executable);
executableLRU.set(exe._id, executable);
continue;
}

executables.set(exe._id, emptyExecutable);
executableLRU.set(exe._id, emptyExecutable);
}

logger.info(`processing data took ${Date.now() - t0} ms`);

logger.info('found ' + exeFound + ' / ' + executableIDs.size + ' executables');
summarizeCacheAndQuery(
logger,
'executables',
cacheHits,
cacheTotal,
queryHits,
executableIDs.size
);

return executables;
}

0 comments on commit 35e8170

Please sign in to comment.