Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Flush sessions on partition revoke #17436

Merged
merged 22 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix up locking mechanism
  • Loading branch information
benjackwhite committed Sep 14, 2023
commit 83625b50d9fb41ac3605fb88d5c527f550474aa5
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ export class OffsetHighWaterMarker {
await client.zadd(key, 'GT', offset, id)
})
} catch (error) {
status.error('🧨', 'WrittenOffsetCache failed to add high-water mark for partition', {
status.error('🧨', 'OffsetHighWaterMarker failed to add high-water mark for partition', {
error: error.message,
key,
...tp,
Expand Down Expand Up @@ -127,7 +127,7 @@ export class OffsetHighWaterMarker {
await client.zremrangebyscore(key, '-Inf', offset)
})
} catch (error) {
status.error('🧨', 'WrittenOffsetCache failed to commit high-water mark for partition', {
status.error('🧨', 'OffsetHighWaterMarker failed to commit high-water mark for partition', {
error: error.message,
key,
...tp,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
import { captureException } from '@sentry/node'
import { randomUUID } from 'crypto'
import { Redis } from 'ioredis'
import { TopicPartition } from 'node-rdkafka-acosom'

import { RedisPool } from '../../../../types'
import { timeoutGuard } from '../../../../utils/db/utils'
import { status } from '../../../../utils/status'

export const topicPartitionKey = (prefix: string, tp: TopicPartition) => {
return `${prefix}/${tp.topic}/${tp.partition}`
}

/**
* If a file is written to S3 we need to know the offset of the last message in that file so that we can
* commit it to Kafka. But we don't write every offset as we bundle files to reduce the number of writes.
*
* And not every attempted commit will succeed
* Due to the nature of batching, we can't rely solely on Kafka for consumer locking.
*
* That means if a consumer restarts or a rebalance moves a partition to another consumer we need to know
* which offsets have been written to S3 for each session, and which haven't
* so that we don't re-process those messages.
* When a rebalance occurs we try to flush data to S3 so that the new consumer doesn't have to re-process it.
* To do this we keep a "lock" in place until we have flushed as much data as possible.
*/
export class PartitionLocker {
consumerID = randomUUID()
delay = 1000

constructor(private redisPool: RedisPool, private keyPrefix = '@posthog/replay/locks') {}

private async run<T>(description: string, fn: (client: Redis) => Promise<T>): Promise<T> {
Expand All @@ -30,30 +34,86 @@ export class PartitionLocker {
}
}

/*
## Check that partitions are locked to this consumer
- If the lock is claimed and not cleared after the timeout, this will eventually throw an error
*/
public async check(tps: TopicPartition[]) {
await new Promise((r) => setTimeout(r, 1000))
private keys(tps: TopicPartition[]): string[] {
return tps.map((tp) => topicPartitionKey(this.keyPrefix, tp))
}

/*
## Claim the lock for partitions for this consumer
Claim the lock for partitions for this consumer
- If already locked, we extend the TTL
- If it is claimed, we wait and retry until it is cleared
- If unclaimed, we claim it
*/
public async claim(tps: TopicPartition[]) {
await new Promise((r) => setTimeout(r, 1000))
const keys = this.keys(tps)
const unclaimedKeys = [...keys]

try {
while (unclaimedKeys.length > 0) {
await this.run(`claim keys that belong to this consumer`, async (client) => {
await Promise.allSettled(
keys.map(async (key) => {
const existingClaim = await client.get(key)

if (existingClaim && existingClaim !== this.consumerID) {
// Still claimed by someone else!
return
}

// Set the key so it is claimed by us
const success = await client.set(key, this.consumerID, 'NX', 'EX', 30)

if (success) {
unclaimedKeys.splice(unclaimedKeys.indexOf(key), 1)
}
})
)
})

if (unclaimedKeys.length > 0) {
status.warn('🧨', `PartitionLocker failed to claim keys. Waiting ${this.delay} before retrying...`)
}
}
} catch (error) {
status.error('🧨', 'PartitionLocker failed to claim keys', {
error: error.message,
keys,
})
captureException(error, {
extra: {
keys,
},
})
}
}

/*
## Release a lock for a partition
Release a lock for a partition
- Clear our claim if it is set to our consumer so that another can claim it
*/
public async releae(tps: TopicPartition[]) {
await new Promise((r) => setTimeout(r, 1000))
public async release(tps: TopicPartition[]) {
const keys = this.keys(tps)
try {
await this.run(`release keys that belong to this consumer`, async (client) => {
await Promise.allSettled(
keys.map(async (key) => {
const value = await client.get(key)
if (value === this.consumerID) {
benjackwhite marked this conversation as resolved.
Show resolved Hide resolved
await client.del(key)
}
})
)
})
} catch (error) {
status.error('🧨', 'PartitionLocker failed to release keys', {
error: error.message,
keys,
})
captureException(error, {
extra: {
keys,
},
})
}
}

// public async getWaterMarks(tp: TopicPartition): Promise<OffsetHighWaterMarks> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ require('@sentry/tracing')

const groupId = 'session-recordings-blob'
const sessionTimeout = 30000
const HIGH_WATERMARK_KEY = 'session_replay_blob_ingester'

// const flushIntervalTimeoutMs = 30000

const gaugeSessionsHandled = new Gauge({
Expand Down Expand Up @@ -96,7 +98,7 @@ export class SessionRecordingIngesterV2 {
offsetHighWaterMarker: OffsetHighWaterMarker
realtimeManager: RealtimeManager
replayEventsIngester: ReplayEventsIngester
paritionLocker: PartitionLocker
partitionLocker: PartitionLocker
batchConsumer?: BatchConsumer
flushInterval: NodeJS.Timer | null = null
partitionAssignments: Record<number, PartitionMetrics> = {}
Expand All @@ -113,7 +115,7 @@ export class SessionRecordingIngesterV2 {
) {
this.recordingConsumerConfig = sessionRecordingConsumerConfig(this.serverConfig)
this.realtimeManager = new RealtimeManager(this.redisPool, this.recordingConsumerConfig)
this.paritionLocker = new PartitionLocker(this.redisPool)
this.partitionLocker = new PartitionLocker(this.redisPool)

this.offsetHighWaterMarker = new OffsetHighWaterMarker(
this.redisPool,
Expand Down Expand Up @@ -171,13 +173,13 @@ export class SessionRecordingIngesterV2 {
const { team_id, session_id } = event
const key = `${team_id}-${session_id}`

const { partition, topic, offset } = event.metadata
const { offset } = event.metadata

const highWaterMarkSpan = sentrySpan?.startChild({
op: 'checkHighWaterMark',
})

if (await this.offsetHighWaterMarker.isBelowHighWaterMark({ topic, partition }, session_id, offset)) {
if (await this.offsetHighWaterMarker.isBelowHighWaterMark(event.metadata, session_id, offset)) {
eventDroppedCounter
.labels({
event_type: 'session_recordings_blob_ingestion',
Expand All @@ -189,6 +191,19 @@ export class SessionRecordingIngesterV2 {
return
}

// Check that we are not past the high water mark for this partition
if (await this.offsetHighWaterMarker.isBelowHighWaterMark(event.metadata, HIGH_WATERMARK_KEY, offset)) {
eventDroppedCounter
.labels({
event_type: 'session_recordings_blob_ingestion',
drop_cause: 'high_water_mark_partition',
})
.inc()

highWaterMarkSpan?.finish()
return
}

if (!this.sessions[key]) {
const { partition, topic } = event.metadata

Expand Down Expand Up @@ -293,18 +308,10 @@ export class SessionRecordingIngesterV2 {
{ message: 'Processing batch is taking longer than 60 seconds', timeout: 60 * 1000 },
async () => {
const transaction = Sentry.startTransaction({ name: `blobIngestion_handleEachBatch` }, {})

histogramKafkaBatchSize.observe(messages.length)

const recordingMessages: IncomingRecordingMessage[] = []

const parititonTopics = messages.map((message) => ({
partition: message.partition,
topic: message.topic,
}))

await this.paritionLocker.check(parititonTopics)

for (const message of messages) {
const { partition, offset, timestamp } = message

Expand Down Expand Up @@ -493,7 +500,7 @@ export class SessionRecordingIngesterV2 {
this.partitionAssignments[topicPartition.partition] = {}
})

await this.paritionLocker.claim(topicPartitions)
await this.partitionLocker.claim(topicPartitions)
await this.offsetsRefresher.refresh()
}

Expand Down Expand Up @@ -537,7 +544,7 @@ export class SessionRecordingIngesterV2 {

await this.destroySessions(sessionsToDrop)
await this.offsetsRefresher.refresh()
await this.paritionLocker.releae(topicPartitions)
await this.partitionLocker.release(topicPartitions)
}

async flushAllReadySessions(): Promise<void> {
Expand Down Expand Up @@ -587,6 +594,7 @@ export class SessionRecordingIngesterV2 {
// that is no longer found across any of the existing sessions.
// This approach is fault-tolerant in that if anything goes wrong, the next commit on that partition will work
public async commitOffset(topic: string, partition: number, offset: number): Promise<void> {
const topicPartition = { topic, partition }
let potentiallyBlockingSession: SessionManager | undefined

for (const sessionManager of Object.values(this.sessions)) {
Expand Down Expand Up @@ -627,13 +635,13 @@ export class SessionRecordingIngesterV2 {
})

this.batchConsumer?.consumer.commit({
topic,
partition,
...topicPartition,
// see https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html for example
// for some reason you commit the next offset you expect to read and not the one you actually have
offset: highestOffsetToCommit + 1,
})

await this.offsetHighWaterMarker.add(topicPartition, HIGH_WATERMARK_KEY, highestOffsetToCommit)
await this.offsetHighWaterMarker.clear({ topic, partition }, highestOffsetToCommit)
gaugeOffsetCommitted.set({ partition }, highestOffsetToCommit)
}
Expand Down