Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: App metrics no hub no promise manager #17581

Merged
merged 1 commit into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export function getDefaultConfig(): PluginsServerConfig {
KAFKA_MAX_MESSAGE_BATCH_SIZE: isDevEnv() ? 0 : 900_000,
KAFKA_FLUSH_FREQUENCY_MS: isTestEnv() ? 5 : 500,
APP_METRICS_FLUSH_FREQUENCY_MS: isTestEnv() ? 5 : 20_000,
APP_METRICS_FLUSH_MAX_QUEUE_SIZE: isTestEnv() ? 5 : 1000,
tomasfarias marked this conversation as resolved.
Show resolved Hide resolved
REDIS_URL: 'redis://127.0.0.1',
POSTHOG_REDIS_PASSWORD: '',
POSTHOG_REDIS_HOST: '',
Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ export interface PluginsServerConfig {
KAFKA_MAX_MESSAGE_BATCH_SIZE: number
KAFKA_FLUSH_FREQUENCY_MS: number
APP_METRICS_FLUSH_FREQUENCY_MS: number
APP_METRICS_FLUSH_MAX_QUEUE_SIZE: number
BASE_DIR: string // base path for resolving local plugins
PLUGINS_RELOAD_PUBSUB_CHANNEL: string // Redis channel for reload events'
LOG_LEVEL: LogLevel
Expand Down
10 changes: 9 additions & 1 deletion plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { AppMetrics } from '../../worker/ingestion/app-metrics'
import { OrganizationManager } from '../../worker/ingestion/organization-manager'
import { EventsProcessor } from '../../worker/ingestion/process-event'
import { TeamManager } from '../../worker/ingestion/team-manager'
import { isTestEnv } from '../env-utils'
import { status } from '../status'
import { createRedisPool, UUIDT } from '../utils'
import { PluginsApiKeyManager } from './../../worker/vm/extensions/helpers/api-key-manager'
Expand Down Expand Up @@ -192,9 +193,16 @@ export async function createHub(
// :TODO: This is only used on worker threads, not main
hub.eventsProcessor = new EventsProcessor(hub as Hub)

hub.appMetrics = new AppMetrics(hub as Hub)
hub.appMetrics = new AppMetrics(
kafkaProducer,
serverConfig.APP_METRICS_FLUSH_FREQUENCY_MS,
serverConfig.APP_METRICS_FLUSH_MAX_QUEUE_SIZE
)

const closeHub = async () => {
if (!isTestEnv()) {
await hub.appMetrics?.flush()
}
await Promise.allSettled([kafkaProducer.disconnect(), redisPool.drain(), hub.postgres?.end()])
await redisPool.clear()

Expand Down
85 changes: 37 additions & 48 deletions plugin-server/src/worker/ingestion/app-metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ import * as Sentry from '@sentry/node'
import { Message } from 'kafkajs'
import { DateTime } from 'luxon'
import { configure } from 'safe-stable-stringify'
import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper'

import { KAFKA_APP_METRICS } from '../../config/kafka-topics'
import { Hub, TeamId, TimestampFormat } from '../../types'
import { TeamId, TimestampFormat } from '../../types'
import { cleanErrorStackTrace } from '../../utils/db/error'
import { status } from '../../utils/status'
import { castTimestampOrNow, UUIDT } from '../../utils/utils'
Expand Down Expand Up @@ -61,52 +62,43 @@ const safeJSONStringify = configure({
})

export class AppMetrics {
hub: Hub
kafkaProducer: KafkaProducerWrapper
queuedData: Record<string, QueuedMetric>

flushFrequencyMs: number
maxQueueSize: number

timer: NodeJS.Timeout | null
lastFlushTime: number
// For quick access to queueSize instead of using Object.keys(queuedData).length every time
queueSize: number
Comment on lines +72 to +73
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This adds an extra attribute that we have to keep track on with every method that manipulates the queue. Should be fine for now, but it could be bug-prone as it introduces a problem we didn't have: Forgetting to update the queueSize.

If this was Python I would suggest using a property that does the equivalent Object.keys(queuedData).length call, e.g.:

@property
def queue_size(self):
    return len(self.queuedData)

But with js I'm not sure if there's anything better.

The issue grows only if we add new methods that manipulate the queue, so when we do that we can tackle this too.


constructor(hub: Hub) {
this.hub = hub
constructor(kafkaProducer: KafkaProducerWrapper, flushFrequencyMs: number, maxQueueSize: number) {
this.queuedData = {}

this.flushFrequencyMs = hub.APP_METRICS_FLUSH_FREQUENCY_MS
this.timer = null
this.kafkaProducer = kafkaProducer
this.flushFrequencyMs = flushFrequencyMs
this.maxQueueSize = maxQueueSize
this.lastFlushTime = Date.now()
this.queueSize = 0
}

async isAvailable(metric: AppMetric, errorWithContext?: ErrorWithContext): Promise<boolean> {
if (this.hub.APP_METRICS_GATHERED_FOR_ALL) {
return true
}

// :TRICKY: If postgres connection is down, we ignore this metric
try {
return await this.hub.organizationManager.hasAvailableFeature(metric.teamId, 'app_metrics')
} catch (err) {
status.warn(
'⚠️',
'Error querying whether app_metrics is available. Ignoring this metric',
metric,
errorWithContext,
err
)
return false
async queueMetric(metric: AppMetric, timestamp?: number): Promise<void> {
// We don't want to immediately flush all the metrics every time as we can internally
// aggregate them quite a bit and reduce the message count by a lot.
// However, we also don't want to wait too long, nor have the queue grow too big resulting in
// the flush taking a long time.
const now = Date.now()
if (now - this.lastFlushTime > this.flushFrequencyMs || this.queueSize > this.maxQueueSize) {
await this.flush()
tomasfarias marked this conversation as resolved.
Show resolved Hide resolved
}
}

async queueMetric(metric: AppMetric, timestamp?: number): Promise<void> {
timestamp = timestamp || Date.now()
timestamp = timestamp || now
const key = this._key(metric)

if (!(await this.isAvailable(metric))) {
return
}

const { successes, successesOnRetry, failures, errorUuid, errorType, errorDetails, ...metricInfo } = metric

if (!this.queuedData[key]) {
this.queueSize += 1
this.queuedData[key] = {
successes: 0,
successesOnRetry: 0,
Expand All @@ -131,33 +123,29 @@ export class AppMetrics {
this.queuedData[key].failures += failures
}
this.queuedData[key].lastTimestamp = timestamp

if (this.timer === null) {
this.timer = setTimeout(() => {
this.hub.promiseManager.trackPromise(this.flush(), 'app metrics')
this.timer = null
}, this.flushFrequencyMs)
}
}

async queueError(metric: AppMetric, errorWithContext: ErrorWithContext, timestamp?: number) {
if (await this.isAvailable(metric, errorWithContext)) {
await this.queueMetric(
{
...metric,
...this._metricErrorParameters(errorWithContext),
},
timestamp
)
}
await this.queueMetric(
{
...metric,
...this._metricErrorParameters(errorWithContext),
},
timestamp
)
}

async flush(): Promise<void> {
console.log(`Flushing app metrics`)
const startTime = Date.now()
this.lastFlushTime = startTime
if (Object.keys(this.queuedData).length === 0) {
return
}

// TODO: We might be dropping some metrics here if someone wrote between queue assigment and queuedData={} assignment
const queue = this.queuedData
this.queueSize = 0
this.queuedData = {}

const kafkaMessages: Message[] = Object.values(queue).map((value) => ({
Expand All @@ -178,10 +166,11 @@ export class AppMetrics {
}),
}))

await this.hub.kafkaProducer.queueMessage({
await this.kafkaProducer.queueMessage({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, no hub 🥳

topic: KAFKA_APP_METRICS,
messages: kafkaMessages,
})
console.log(`Finisehd flushing app metrics, took ${Date.now() - startTime}ms`)
}

_metricErrorParameters(errorWithContext: ErrorWithContext): Partial<AppMetric> {
Expand Down
66 changes: 28 additions & 38 deletions plugin-server/tests/worker/ingestion/app-metrics.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ describe('AppMetrics()', () => {
let closeHub: () => Promise<void>

beforeEach(async () => {
;[hub, closeHub] = await createHub({ APP_METRICS_FLUSH_FREQUENCY_MS: 100 })
appMetrics = new AppMetrics(hub)

jest.spyOn(hub.organizationManager, 'hasAvailableFeature').mockResolvedValue(true)
;[hub, closeHub] = await createHub({ APP_METRICS_FLUSH_FREQUENCY_MS: 100, APP_METRICS_FLUSH_MAX_QUEUE_SIZE: 5 })
appMetrics = new AppMetrics(
hub.kafkaProducer,
hub.APP_METRICS_FLUSH_FREQUENCY_MS,
hub.APP_METRICS_FLUSH_MAX_QUEUE_SIZE
)
// doesn't flush again on the next call, i.e. flust metrics were reset
jest.spyOn(hub.kafkaProducer, 'queueMessage').mockReturnValue(Promise.resolve())
})

afterEach(async () => {
jest.useRealTimers()
if (appMetrics.timer) {
clearTimeout(appMetrics.timer)
}
await closeHub()
})

Expand Down Expand Up @@ -164,44 +164,34 @@ describe('AppMetrics()', () => {
])
})

it('creates timer to flush if no timer before', async () => {
jest.spyOn(appMetrics, 'flush')
jest.useFakeTimers()

await appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp)

const timer = appMetrics.timer
expect(timer).not.toBeNull()

jest.advanceTimersByTime(120)
it('flushes when time is up', async () => {
Date.now = jest.fn(() => 1600000000)
await appMetrics.flush()

expect(appMetrics.timer).toBeNull()
expect(appMetrics.flush).toHaveBeenCalled()
})
jest.spyOn(appMetrics, 'flush')
Date.now = jest.fn(() => 1600000120)

it('does not create a timer on subsequent requests', async () => {
await appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp)
const originalTimer = appMetrics.timer
await appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp)

expect(originalTimer).not.toBeNull()
expect(appMetrics.timer).toEqual(originalTimer)
})

it('does nothing if feature is not available', async () => {
jest.mocked(hub.organizationManager.hasAvailableFeature).mockResolvedValue(false)

expect(appMetrics.flush).toHaveBeenCalledTimes(1)
// doesn't flush again on the next call, i.e. flust metrics were reset
Date.now = jest.fn(() => 1600000130)
await appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp)
expect(appMetrics.queuedData).toEqual({})
expect(appMetrics.flush).toHaveBeenCalledTimes(1)
})

it('does not query `hasAvailableFeature` if not needed', async () => {
hub.APP_METRICS_GATHERED_FOR_ALL = true

await appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp)

expect(appMetrics.queuedData).not.toEqual({})
expect(hub.organizationManager.hasAvailableFeature).not.toHaveBeenCalled()
it('flushes when max queue size is hit', async () => {
jest.spyOn(appMetrics, 'flush')
// parallel could trigger multiple flushes and make the test flaky
for (let i = 0; i < 7; i++) {
await appMetrics.queueMetric({ ...metric, successes: 1, teamId: i }, timestamp)
}
expect(appMetrics.flush).toHaveBeenCalledTimes(1)
// we only count different keys, so this should not trigger a flush
for (let i = 0; i < 7; i++) {
await appMetrics.queueMetric({ ...metric, successes: 1 }, timestamp)
}
expect(appMetrics.flush).toHaveBeenCalledTimes(1)
})
})

Expand Down