Skip to content

Commit

Permalink
feat: Improved instrumentation around core functions (PostHog#17574)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Sep 21, 2023
1 parent b6c2524 commit c7a797d
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 88 deletions.
4 changes: 3 additions & 1 deletion plugin-server/src/kafka/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ export const instrumentConsumerMetrics = (
}
}
} else if (error.code === CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
status.info('📝️', `librdkafka ${strategyString} rebalance started, partitions revoked`, { assignments })
status.info('📝️', `librdkafka ${strategyString} rebalance started, partitions revoked`, {
revocations: assignments,
})
for (const [topic, count] of countPartitionsPerTopic(assignments)) {
if (cooperativeRebalance) {
kafkaRebalancePartitionCount.labels({ topic: topic }).dec(count)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ export async function eachMessageAppsOnEventHandlers(

const event = convertToIngestionEvent(clickHouseEvent, skipElementsChain)
await runInstrumentedFunction({
event: event,
func: () => queue.workerMethods.runAppsOnEventPipeline(event),
statsKey: `kafka_queue.process_async_handlers_on_event`,
timeoutMessage: 'After 30 seconds still running runAppsOnEventPipeline',
timeoutContext: () => ({
event: JSON.stringify(event),
}),
teamId: event.teamId,
})
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,12 @@ export async function eachMessageWebhooksHandlers(
convertToProcessedPluginEvent(event)

await runInstrumentedFunction({
event: event,
func: () => runWebhooks(statsd, actionMatcher, hookCannon, event),
statsKey: `kafka_queue.process_async_handlers_webhooks`,
timeoutMessage: 'After 30 seconds still running runWebhooksHandlersEventPipeline',
timeoutContext: () => ({
event: JSON.stringify(event),
}),
teamId: event.teamId,
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const FLAG_EXPIRE_MS = 'PX'
* To do this we keep a "lock" in place until we have flushed as much data as possible.
*/
export class PartitionLocker {
consumerID = randomUUID()
consumerID = process.env.HOSTNAME ?? randomUUID()
delay = 1000
ttl = 30000

Expand Down Expand Up @@ -62,8 +62,6 @@ export class PartitionLocker {
keys.map(async (key) => {
const existingClaim = await client.get(key)

status.info('🔒', `PartitionLocker claim: ${key}:${existingClaim}`)

if (existingClaim && existingClaim !== this.consumerID) {
// Still claimed by someone else!
blockingConsumers.add(existingClaim)
Expand Down Expand Up @@ -92,7 +90,7 @@ export class PartitionLocker {
}
}

status.info('🔒', 'PartitionLocker claimed all required keys')
status.debug('🔒', 'PartitionLocker claimed all required keys')
} catch (error) {
status.error('🧨', 'PartitionLocker errored to claim keys', {
error: error.message,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ export class RealtimeManager extends EventEmitter {
try {
const subMessage = JSON.parse(message) as { team_id: number; session_id: string }
this.emitSubscriptionEvent(subMessage.team_id, subMessage.session_id)
status.info('🔌', 'RealtimeManager recevied realtime request', subMessage)
} catch (e) {
captureException('Failed to parse message from redis pubsub', e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ export class SessionManager {
})

this.realtimeTail.on('line', async (data: string) => {
status.info('⚡️', '[session-manager][realtime] writing to redis', {
status.debug('⚡️', '[session-manager][realtime] writing to redis', {
sessionId: this.sessionId,
teamId: this.teamId,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ import { sessionRecordingConsumerConfig } from '../../../config/config'
import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics'
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
import { runInstrumentedFunction } from '../../../main/utils'
import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, TeamId } from '../../../types'
import { BackgroundRefresher } from '../../../utils/background-refresher'
import { PostgresRouter } from '../../../utils/db/postgres'
import { timeoutGuard } from '../../../utils/db/utils'
import { status } from '../../../utils/status'
import { asyncTimeoutGuard } from '../../../utils/timing'
import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-manager'
import { ObjectStorage } from '../../services/object_storage'
import { addSentryBreadcrumbsEventListeners } from '../kafka-metrics'
Expand Down Expand Up @@ -183,24 +182,24 @@ export class SessionRecordingIngesterV2 {
op: 'checkHighWaterMark',
})

if (await this.offsetHighWaterMarker.isBelowHighWaterMark(event.metadata, session_id, offset)) {
// Check that we are not below the high water mark for this partition (another consumer may have flushed further than us when revoking)
if (await this.offsetHighWaterMarker.isBelowHighWaterMark(event.metadata, HIGH_WATERMARK_KEY, offset)) {
eventDroppedCounter
.labels({
event_type: 'session_recordings_blob_ingestion',
drop_cause: 'high_water_mark',
drop_cause: 'high_water_mark_partition',
})
.inc()

highWaterMarkSpan?.finish()
return
}

// Check that we are not below the high water mark for this partition (another consumer may have flushed further than us when revoking)
if (await this.offsetHighWaterMarker.isBelowHighWaterMark(event.metadata, HIGH_WATERMARK_KEY, offset)) {
if (await this.offsetHighWaterMarker.isBelowHighWaterMark(event.metadata, session_id, offset)) {
eventDroppedCounter
.labels({
event_type: 'session_recordings_blob_ingestion',
drop_cause: 'high_water_mark_partition',
drop_cause: 'high_water_mark',
})
.inc()

Expand Down Expand Up @@ -309,9 +308,10 @@ export class SessionRecordingIngesterV2 {
}

public async handleEachBatch(messages: Message[]): Promise<void> {
await asyncTimeoutGuard(
{ message: 'Processing batch is taking longer than 60 seconds', timeout: 60 * 1000 },
async () => {
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch`,
logExecutionTime: true,
func: async () => {
const transaction = Sentry.startTransaction({ name: `blobIngestion_handleEachBatch` }, {})
histogramKafkaBatchSize.observe(messages.length)

Expand All @@ -321,68 +321,86 @@ export class SessionRecordingIngesterV2 {
await this.partitionLocker.claim(messages)
}

for (const message of messages) {
const { partition, offset, timestamp } = message
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.parseKafkaMessages`,
func: async () => {
for (const message of messages) {
const { partition, offset, timestamp } = message

if (timestamp && this.partitionAssignments[partition]) {
const metrics = this.partitionAssignments[partition]
if (timestamp && this.partitionAssignments[partition]) {
const metrics = this.partitionAssignments[partition]

// For some reason timestamp can be null. If it isn't, update our ingestion metrics
metrics.lastMessageTimestamp = timestamp
// If we don't have a last known commit then set it to this offset as we can't commit lower than that
metrics.lastKnownCommit = metrics.lastKnownCommit ?? offset
metrics.lastMessageOffset = offset
// For some reason timestamp can be null. If it isn't, update our ingestion metrics
metrics.lastMessageTimestamp = timestamp
// If we don't have a last known commit then set it to this offset as we can't commit lower than that
metrics.lastKnownCommit = metrics.lastKnownCommit ?? offset
metrics.lastMessageOffset = offset

counterKafkaMessageReceived.inc({ partition })
counterKafkaMessageReceived.inc({ partition })

gaugeLagMilliseconds
.labels({
partition: partition.toString(),
})
.set(now() - timestamp)
gaugeLagMilliseconds
.labels({
partition: partition.toString(),
})
.set(now() - timestamp)

const offsetsByPartition = await this.offsetsRefresher.get()
const highOffset = offsetsByPartition[partition]
const offsetsByPartition = await this.offsetsRefresher.get()
const highOffset = offsetsByPartition[partition]

if (highOffset) {
// NOTE: This is an important metric used by the autoscaler
gaugeLag.set({ partition }, Math.max(0, highOffset - metrics.lastMessageOffset))
}
}
if (highOffset) {
// NOTE: This is an important metric used by the autoscaler
gaugeLag.set({ partition }, Math.max(0, highOffset - metrics.lastMessageOffset))
}
}

const recordingMessage = await this.parseKafkaMessage(message, (token) =>
this.teamsRefresher.get().then((teams) => teams[token] || null)
)
const recordingMessage = await this.parseKafkaMessage(message, (token) =>
this.teamsRefresher.get().then((teams) => teams[token] || null)
)

if (recordingMessage) {
recordingMessages.push(recordingMessage)
}
}
if (recordingMessage) {
recordingMessages.push(recordingMessage)
}
}
},
})

for (const message of recordingMessages) {
const consumeSpan = transaction?.startChild({
op: 'blobConsume',
})
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.consumeSerial`,
func: async () => {
for (const message of recordingMessages) {
const consumeSpan = transaction?.startChild({
op: 'blobConsume',
})

await this.consume(message, consumeSpan)
// TODO: We could do this as batch of offsets for the whole lot...
consumeSpan?.finish()
}
await this.consume(message, consumeSpan)
// TODO: We could do this as batch of offsets for the whole lot...
consumeSpan?.finish()
}
},
})

for (const message of messages) {
// Now that we have consumed everything, attempt to commit all messages in this batch
const { partition, offset } = message
await this.commitOffset(message.topic, partition, offset)
}

await this.replayEventsIngester.consumeBatch(recordingMessages)
const timeout = timeoutGuard(`Flushing sessions timed out`, {}, 120 * 1000)
await this.flushAllReadySessions()
clearTimeout(timeout)
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.consumeReplayEvents`,
func: async () => {
await this.replayEventsIngester.consumeBatch(recordingMessages)
},
})
await runInstrumentedFunction({
statsKey: `recordingingester.handleEachBatch.flushAllReadySessions`,
func: async () => {
await this.flushAllReadySessions()
},
})

transaction.finish()
}
)
},
})
}

public async start(): Promise<void> {
Expand Down Expand Up @@ -549,12 +567,19 @@ export class SessionRecordingIngesterV2 {
// - have some sort of timeout so we don't get stuck here forever
if (this.serverConfig.SESSION_RECORDING_PARTITION_REVOKE_OPTIMIZATION) {
status.info('🔁', `blob_ingester_consumer - flushing ${sessionsToDrop.length} sessions on revoke...`)
await Promise.allSettled(
sessionsToDrop
.map(([_, x]) => x)
.sort((x) => x.buffer.oldestKafkaTimestamp ?? Infinity)
.map((x) => x.flush('partition_shutdown'))
)

await runInstrumentedFunction({
statsKey: `recordingingester.onRevokePartitions.flushSessions`,
logExecutionTime: true,
func: async () => {
await Promise.allSettled(
sessionsToDrop
.map(([_, x]) => x)
.sort((x) => x.buffer.oldestKafkaTimestamp ?? Infinity)
.map((x) => x.flush('partition_shutdown'))
)
},
})
}

topicPartitions.forEach((topicPartition: TopicPartition) => {
Expand Down
42 changes: 29 additions & 13 deletions plugin-server/src/main/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,54 @@ import { exponentialBuckets, Histogram } from 'prom-client'
import { timeoutGuard } from '../utils/db/utils'
import { status } from '../utils/status'

interface FunctionInstrumentation<T, E> {
event: E
timeoutMessage: string
interface FunctionInstrumentation<T> {
statsKey: string
func: (event: E) => Promise<T>
teamId: number
func: () => Promise<T>
timeout?: number
timeoutMessage?: string
timeoutContext?: () => Record<string, any>
teamId?: number
logExecutionTime?: boolean
}

export async function runInstrumentedFunction<T, E>({
const logTime = (startTime: number, statsKey: string, error?: any) => {
status.info('⏱️', `${statsKey} took ${Math.round(performance.now() - startTime)}ms`, {
error,
})
}

export async function runInstrumentedFunction<T>({
timeoutMessage,
event,
timeout,
timeoutContext,
func,
statsKey,
teamId,
}: FunctionInstrumentation<T, E>): Promise<T> {
const timeout = timeoutGuard(timeoutMessage, {
event: JSON.stringify(event),
})
logExecutionTime = false,
}: FunctionInstrumentation<T>): Promise<T> {
const t = timeoutGuard(timeoutMessage ?? `Timeout warning for '${statsKey}'!`, timeoutContext, timeout)
const startTime = performance.now()
const end = instrumentedFunctionDuration.startTimer({
function: statsKey,
})

try {
const result = await func(event)
const result = await func()
end({ success: 'true' })
if (logExecutionTime) {
logTime(startTime, statsKey)
}
return result
} catch (error) {
end({ success: 'false' })
status.info('🔔', error)
if (logExecutionTime) {
logTime(startTime, statsKey, error)
}
Sentry.captureException(error, { tags: { team_id: teamId } })
throw error
} finally {
clearTimeout(timeout)
clearTimeout(t)
}
}

Expand Down
7 changes: 4 additions & 3 deletions plugin-server/src/utils/db/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ export function sanitizeEventName(eventName: any): string {

export function timeoutGuard(
message: string,
context?: Record<string, any>,
context?: Record<string, any> | (() => Record<string, any>),
timeout = defaultConfig.TASK_TIMEOUT * 1000
): NodeJS.Timeout {
return setTimeout(() => {
console.log(`⌛⌛⌛ ${message}`, context)
Sentry.captureMessage(message, context ? { extra: context } : undefined)
const ctx = typeof context === 'function' ? context() : context
console.log(`⌛⌛⌛ ${message}`, ctx)
Sentry.captureMessage(message, ctx ? { extra: ctx } : undefined)
}, timeout)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ export async function pluginsProcessEventStep(
event: PluginEvent
): Promise<PluginEvent | null> {
const processedEvent = await runInstrumentedFunction({
event,
func: (event) => runProcessEvent(runner.hub, event),
timeoutContext: () => ({
event: JSON.stringify(event),
}),
func: () => runProcessEvent(runner.hub, event),
statsKey: 'kafka_queue.single_event',
timeoutMessage: 'Still running plugins on event. Timeout warning after 30 sec!',
teamId: event.team_id,
Expand Down
Loading

0 comments on commit c7a797d

Please sign in to comment.