Skip to content

Commit

Permalink
chore: Add more logs for onEvent to faster find slowness sources (Pos…
Browse files Browse the repository at this point in the history
  • Loading branch information
tiina303 authored Sep 21, 2023
1 parent 50af54f commit 1b36e1f
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ export async function processOnEventStep(runner: EventPipelineRunner, event: Pos

await runInstrumentedFunction({
timeoutContext: () => ({
event: JSON.stringify(processedPluginEvent),
team_id: event.teamId,
event_uuid: event.eventUuid,
}),
func: () => runOnEvent(runner.hub, processedPluginEvent),
statsKey: `kafka_queue.single_on_event`,
Expand Down
65 changes: 37 additions & 28 deletions plugin-server/src/worker/plugins/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,48 @@ async function runSingleTeamPluginOnEvent(
pluginConfig: PluginConfig,
onEvent: any
): Promise<void> {
// Runs onEvent for a single plugin without any retries
const metricName = 'plugin.on_event'
const metricTags = {
plugin: pluginConfig.plugin?.name ?? '?',
teamId: event.team_id.toString(),
}

const timer = new Date()
const timeout = setTimeout(() => {
console.log(
`⌛⌛⌛ Still running single onEvent plugin for team ${event.team_id} for plugin ${pluginConfig.id}`
)
}, 10 * 1000) // 10 seconds
try {
await onEvent!(event)
await hub.appMetrics.queueMetric({
teamId: event.team_id,
pluginConfigId: pluginConfig.id,
category: 'onEvent',
successes: 1,
})
} catch (error) {
hub.statsd?.increment(`${metricName}.ERROR`, metricTags)
await processError(hub, pluginConfig, error, event)
await hub.appMetrics.queueError(
{
// Runs onEvent for a single plugin without any retries
const metricName = 'plugin.on_event'
const metricTags = {
plugin: pluginConfig.plugin?.name ?? '?',
teamId: event.team_id.toString(),
}

const timer = new Date()
try {
await onEvent!(event)
await hub.appMetrics.queueMetric({
teamId: event.team_id,
pluginConfigId: pluginConfig.id,
category: 'onEvent',
failures: 1,
},
{
error,
event,
}
)
successes: 1,
})
} catch (error) {
hub.statsd?.increment(`${metricName}.ERROR`, metricTags)
await processError(hub, pluginConfig, error, event)
await hub.appMetrics.queueError(
{
teamId: event.team_id,
pluginConfigId: pluginConfig.id,
category: 'onEvent',
failures: 1,
},
{
error,
event,
}
)
}
hub.statsd?.timing(metricName, timer, metricTags)
} finally {
clearTimeout(timeout)
}
hub.statsd?.timing(metricName, timer, metricTags)
}

export async function runOnEvent(hub: Hub, event: ProcessedPluginEvent): Promise<void> {
Expand Down

0 comments on commit 1b36e1f

Please sign in to comment.