Skip to content

Commit

Permalink
Merge branch 'master' into clickhouse_LTS
Browse files Browse the repository at this point in the history
* master:
  fix: exclusion steps cannot be selected (#9762)
  feat(lemon-button): Support `status` for `primary` buttons (#9782)
  fix: healthcheck for kafka on plugin server (#9771)
  fix(billing): Update billing success message (#9739)
  fix(plugin-server): Set transpileOnly when importing piscina code in tests (#9777)
  fix(plugin-server): Remove unused kafka reset test code (#9779)
  fix: set kafka_skip_broken_messages on dead letter queue table (#9754)
  fix(plugin-server): remove dead code from worker.test.ts (#9776)
  refactor(plugin-server): Run ingestion only on worker threads (#9738)
  fix: Plugin-server tests with kafka need to have consumer stopped (#9774)
  chore(deps): Update posthog-js to 1.21.1 (#9773)
  chore(dep): upgrading rr-web (#9772)
  fix: ouroboros inputs (#9769)
  • Loading branch information
fuziontech committed May 13, 2022
2 parents 8423d59 + 7b0b96c commit 7478bbc
Show file tree
Hide file tree
Showing 32 changed files with 255 additions and 452 deletions.
11 changes: 11 additions & 0 deletions ee/clickhouse/migrations/0028_dead_letter_queue_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from infi.clickhouse_orm import migrations

from ee.clickhouse.sql.dead_letter_queue import DEAD_LETTER_QUEUE_TABLE_MV_SQL, KAFKA_DEAD_LETTER_QUEUE_TABLE_SQL
from posthog.settings.data_stores import CLICKHOUSE_CLUSTER

operations = [
migrations.RunSQL(f"DROP TABLE IF EXISTS events_dead_letter_queue_mv ON CLUSTER {CLICKHOUSE_CLUSTER}"),
migrations.RunSQL(f"DROP TABLE IF EXISTS kafka_events_dead_letter_queue ON CLUSTER {CLICKHOUSE_CLUSTER}"),
migrations.RunSQL(KAFKA_DEAD_LETTER_QUEUE_TABLE_SQL()),
migrations.RunSQL(DEAD_LETTER_QUEUE_TABLE_MV_SQL),
]
7 changes: 6 additions & 1 deletion ee/clickhouse/sql/dead_letter_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@
ttl_period=ttl_period("_timestamp", 4), # 4 weeks
)

KAFKA_DEAD_LETTER_QUEUE_TABLE_SQL = lambda: DEAD_LETTER_QUEUE_TABLE_BASE_SQL.format(
# skip up to 1000 messages per block. blocks can be as large as 65505
# if a block has >1000 broken messages it probably means we're doing something wrong
# so it should fail and require manual intervention
KAFKA_DEAD_LETTER_QUEUE_TABLE_SQL = lambda: (
DEAD_LETTER_QUEUE_TABLE_BASE_SQL + " SETTINGS kafka_skip_broken_messages=1000"
).format(
table_name="kafka_" + DEAD_LETTER_QUEUE_TABLE,
cluster=CLICKHOUSE_CLUSTER,
engine=kafka_engine(topic=KAFKA_DEAD_LETTER_QUEUE),
Expand Down
4 changes: 2 additions & 2 deletions ee/clickhouse/sql/test/__snapshots__/test_schema.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
tags Array(VARCHAR)

) ENGINE = Kafka('test.kafka.broker:9092', 'events_dead_letter_queue_test', 'group1', 'JSONEachRow')

SETTINGS kafka_skip_broken_messages=1000
'
---
# name: test_create_kafka_table_with_different_kafka_host[kafka_groups]
Expand Down Expand Up @@ -408,7 +408,7 @@
tags Array(VARCHAR)

) ENGINE = Kafka('kafka', 'events_dead_letter_queue_test', 'group1', 'JSONEachRow')

SETTINGS kafka_skip_broken_messages=1000
'
---
# name: test_create_table_query[kafka_groups]
Expand Down
28 changes: 26 additions & 2 deletions frontend/src/lib/components/LemonButton/LemonButton.scss
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,41 @@

.LemonButton--primary {
background: var(--primary);
color: #fff;
color: #fff !important;
&:not(:disabled):hover,
&.LemonButton--active {
background: var(--primary-hover);
}
&:not(:disabled):active {
background: var(--primary-active);
}
&.LemonRow--status-danger {
background: var(--danger);
&:not(:disabled):hover,
&.LemonButton--active {
background: var(--danger-hover);
}
&:not(:disabled):active {
background: var(--danger-active);
}
}
&.LemonRow--status-warning {
background: var(--warning);
&:not(:disabled):hover,
&.LemonButton--active {
background: var(--warning-hover);
}
&:not(:disabled):active {
background: var(--warning-active);
}
}
&.LemonRow--status-success {
background: var(--success);
// TODO: Hover/active colors are not defined for success yet
}
.LemonRow__icon,
.spinner {
color: #fff;
color: #fff !important;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,21 @@ export function PropertyValue({
const isMultiSelect = operator && isOperatorMulti(operator)
const isDateTimeProperty = operator && isOperatorDate(operator)

const [input, setInput] = useState(isMultiSelect ? '' : toString(value))
const [shouldBlur, setShouldBlur] = useState(false)
// what the human has typed into the box
const [input, setInput] = useState(Array.isArray(value) ? '' : toString(value) ?? '')
// options from the server for search
const [options, setOptions] = useState({} as Record<string, Option>)

const [shouldBlur, setShouldBlur] = useState(false)
const autoCompleteRef = useRef<HTMLElement>(null)

const { formatForDisplay } = useValues(propertyDefinitionsModel)

// update the input field if passed a new `value` prop
useEffect(() => {
if (!value) {
if (value == null) {
setInput('')
} else if (value !== input) {
} else if (!Array.isArray(value) && toString(value) !== input) {
const valueObject = options[propertyKey]?.values?.find((v) => v.id === value)
if (valueObject) {
setInput(toString(valueObject.name))
Expand Down
5 changes: 3 additions & 2 deletions frontend/src/scenes/billing/BillingSubscribed.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { router } from 'kea-router'
import { Link } from 'lib/components/Link'
import { billingSubscribedLogic, SubscriptionStatus } from './billingSubscribedLogic'
import { SceneExport } from 'scenes/sceneTypes'
import { dayjs } from 'lib/dayjs'

export const scene: SceneExport = {
component: BillingSubscribed,
Expand Down Expand Up @@ -63,8 +64,8 @@ function SubscriptionSuccess(): JSX.Element {
</p>
{billing?.plan?.key === 'standard' && (
<p className="text-muted-alt">
You will be billed within the <b>first 3 days of each month</b>. If you ingest less than 1M events,
you will not be billed.
You will be billed on each month on the <strong>{dayjs().format('D')}</strong>. If you ingest less
than 1M events, you will not be billed.
</p>
)}
<p>
Expand Down
8 changes: 7 additions & 1 deletion frontend/src/scenes/funnels/funnelLogic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1228,9 +1228,15 @@ export const funnelLogic = kea<funnelLogicType<openPersonsModelProps>>({
insightLogic(props).actions.setFilters(cleanedParams)
},
setEventExclusionFilters: ({ filters }) => {
const exclusions = (filters.events as FunnelStepRangeEntityFilter[]).map((exclusion) => {
exclusion.funnel_from_step =
exclusion.funnel_from_step || values.exclusionDefaultStepRange.funnel_from_step
exclusion.funnel_to_step = exclusion.funnel_to_step || values.exclusionDefaultStepRange.funnel_to_step
return exclusion
})
actions.setFilters({
...values.filters,
exclusions: filters.events as FunnelStepRangeEntityFilter[],
exclusions,
})
},
setOneEventExclusionFilter: ({ eventFilter, index }) => {
Expand Down
26 changes: 4 additions & 22 deletions frontend/src/scenes/funnels/funnelUtils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ describe('getIncompleteConversionWindowStartDate()', () => {
describe('getClampedStepRangeFilter', () => {
it('prefers step range to existing filters', () => {
const stepRange = {
funnel_from_step: 2,
funnel_to_step: 3,
funnel_from_step: 0,
funnel_to_step: 1,
} as FunnelStepRangeEntityFilter
const filters = {
funnel_from_step: 1,
Expand All @@ -162,8 +162,8 @@ describe('getClampedStepRangeFilter', () => {
filters,
})
expect(clampedStepRange).toEqual({
funnel_from_step: 2,
funnel_to_step: 3,
funnel_from_step: 0,
funnel_to_step: 1,
})
})

Expand All @@ -185,24 +185,6 @@ describe('getClampedStepRangeFilter', () => {
})
})

it('sets values to undefined if they match the event and action length', () => {
const stepRange = {} as FunnelStepRangeEntityFilter
const filters = {
funnel_from_step: 0,
funnel_to_step: 3,
actions: [{}, {}],
events: [{}, {}],
} as FilterType
const clampedStepRange = getClampedStepRangeFilter({
stepRange,
filters,
})
expect(clampedStepRange).toEqual({
funnel_from_step: undefined,
funnel_to_step: undefined,
})
})

it('returns undefined if the incoming filters are undefined', () => {
const stepRange = {} as FunnelStepRangeEntityFilter
const filters = {
Expand Down
28 changes: 15 additions & 13 deletions frontend/src/scenes/funnels/funnelUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,9 @@ export const deepCleanFunnelExclusionEvents = (filters: FilterType): FunnelStepR
return exclusions.length > 0 ? exclusions : undefined
}

const findFirstNumber = (candidates: (number | undefined)[]): number | undefined =>
candidates.find((s) => typeof s === 'number')

export const getClampedStepRangeFilter = ({
stepRange,
filters,
Expand All @@ -338,23 +341,22 @@ export const getClampedStepRangeFilter = ({
filters: FilterType
}): FunnelStepRangeEntityFilter => {
const maxStepIndex = Math.max((filters.events?.length || 0) + (filters.actions?.length || 0) - 1, 1)
const incomingFunnelFromStep = stepRange?.funnel_from_step || filters.funnel_from_step
const incomingFunnelToStep = stepRange?.funnel_to_step || filters.funnel_to_step

const funnelFromStepIsSet = typeof incomingFunnelFromStep === 'number' && incomingFunnelFromStep !== 0
const funnelToStepIsSet = typeof incomingFunnelToStep === 'number' && incomingFunnelToStep !== maxStepIndex
let funnel_from_step = findFirstNumber([stepRange?.funnel_from_step, filters.funnel_from_step])
let funnel_to_step = findFirstNumber([stepRange?.funnel_to_step, filters.funnel_to_step])

if (funnelFromStepIsSet || funnelToStepIsSet) {
const funnel_from_step = clamp(incomingFunnelFromStep ?? 0, 0, maxStepIndex)
return {
...(stepRange as FunnelStepRangeEntityFilter),
funnel_from_step,
funnel_to_step: clamp(incomingFunnelToStep ?? maxStepIndex, funnel_from_step + 1, maxStepIndex),
}
const funnelFromStepIsSet = typeof funnel_from_step === 'number'
const funnelToStepIsSet = typeof funnel_to_step === 'number'

if (funnelFromStepIsSet && funnelToStepIsSet) {
funnel_from_step = clamp(funnel_from_step ?? 0, 0, maxStepIndex)
funnel_to_step = clamp(funnel_to_step ?? maxStepIndex, funnel_from_step + 1, maxStepIndex)
}

return {
funnel_from_step: undefined,
funnel_to_step: undefined,
...(stepRange || {}),
funnel_from_step,
funnel_to_step,
}
}

Expand Down
5 changes: 4 additions & 1 deletion frontend/src/scenes/insights/utils/cleanFilters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,10 @@ export function cleanFilters(
...cleanedParams,
...getClampedStepRangeFilter({ filters: cleanedParams }),
exclusions: (cleanedParams.exclusions || []).map((e) =>
getClampedStepRangeFilter({ stepRange: e, filters: cleanedParams })
getClampedStepRangeFilter({
stepRange: e,
filters: cleanedParams,
})
),
}
} else if (filters.insight === InsightType.PATHS) {
Expand Down
2 changes: 2 additions & 0 deletions frontend/src/styles/global.scss
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ NOTE: $medium is a SCSS var; not a keyword
--primary-bg-active: #{$primary_bg_active};
--success: #{$success};
--danger: #{$danger};
--danger-hover: #{$danger_hover};
--danger-active: #{$danger_active};
--warning: #{$warning};
--warning-hover: #{$warning_hover};
--warning-active: #{$warning_active};
Expand Down
2 changes: 2 additions & 0 deletions frontend/src/vars.scss
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ $warning: #f7a501;
$warning_hover: #f3ba3e;
$warning_active: #e69900;
$danger: #f96132;
$danger_hover: #fa754c;
$danger_active: #f9430b;
$danger_bridge: #df4313; // Used for bridge pages (posthog.com to PostHog App transition)

// Brand colors, as in logo
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
"kea-waitfor": "^0.2.1",
"kea-window-values": "^3.0.0-alpha.1",
"md5": "^2.3.0",
"posthog-js": "1.21.0",
"posthog-js": "1.21.1",
"posthog-js-lite": "^0.0.3",
"prop-types": "^15.7.2",
"query-selector-shadow-dom": "0.8.0",
Expand All @@ -119,7 +119,7 @@
"react-transition-group": "^4.4.2",
"react-virtualized": "^9.22.3",
"resize-observer-polyfill": "^1.5.1",
"rrweb": "^1.1.2",
"rrweb": "^1.1.3",
"sass": "^1.26.2",
"use-debounce": "^6.0.1",
"use-resize-observer": "^8.0.0",
Expand Down
49 changes: 1 addition & 48 deletions plugin-server/src/main/ingestion-queues/ingest-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ import { PluginEvent } from '@posthog/plugin-scaffold'

import { Hub, WorkerMethods } from '../../types'
import { status } from '../../utils/status'
import { onEvent } from '../runner/on-event'
import { runInstrumentedFunction } from '../utils'
import { Action } from './../../types'
import { processEvent } from './process-event'

export async function ingestEvent(
server: Hub,
Expand All @@ -14,55 +10,12 @@ export async function ingestEvent(
checkAndPause?: () => void // pause incoming messages if we are slow in getting them out again
): Promise<void> {
const eachEventStartTimer = new Date()
const isSnapshot = event.event === '$snapshot'

checkAndPause?.()

server.statsd?.increment('kafka_queue_ingest_event_hit')

const processedEvent = await processEvent(server, workerMethods, event)

checkAndPause?.()

if (processedEvent) {
let actionMatches: Action[] = []
await Promise.all([
runInstrumentedFunction({
server,
event: processedEvent,
func: async (event) => {
const result = await workerMethods.ingestEvent(event)
actionMatches = result.actionMatches || []
},
statsKey: 'kafka_queue.single_ingestion',
timeoutMessage: 'After 30 seconds still ingesting event',
}),
onEvent(server, workerMethods, processedEvent),
])

server.statsd?.increment('kafka_queue_single_event_processed_and_ingested')

if (actionMatches.length > 0) {
const promises = []
for (const actionMatch of actionMatches) {
promises.push(
runInstrumentedFunction({
server,
event: processedEvent,
func: (event) => workerMethods.onAction(actionMatch, event),
statsKey: `kafka_queue.on_action`,
timeoutMessage: 'After 30 seconds still running onAction',
})
)
}
await Promise.all(promises)
}
} else {
// processEvent might not return an event. This is expected and plugins, e.g. downsample plugin uses it.
server.statsd?.increment('kafka_queue.dropped_event', {
teamID: String(event.team_id),
})
}
await workerMethods.runEventPipeline(event)

server.statsd?.timing('kafka_queue.each_event', eachEventStartTimer)
server.internalMetrics?.incr('$$plugin_server_events_processed')
Expand Down
25 changes: 8 additions & 17 deletions plugin-server/src/main/ingestion-queues/kafka-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ import { PluginServerMode } from '../../types'
import { Hub, Queue, WorkerMethods } from '../../types'
import { status } from '../../utils/status'
import { groupIntoBatches, killGracefully, sanitizeEvent } from '../../utils/utils'
import { onEvent } from '../runner/on-event'
import { runInstrumentedFunction } from '../utils'
import { KAFKA_BUFFER } from './../../config/kafka-topics'
import { ingestEvent } from './ingest-event'

class DelayProcessing extends Error {}
Expand Down Expand Up @@ -44,21 +42,14 @@ export class KafkaQueue implements Queue {
}

private async eachMessageIngestion(message: KafkaMessage): Promise<void> {
// Currently the else part is never triggered. The plugin server can only be
// in "ingestion" mode at the moment, and onEvent is triggered in ingestEvent
if (this.pluginServerMode === PluginServerMode.Ingestion) {
const { data: dataStr, ...rawEvent } = JSON.parse(message.value!.toString())
const combinedEvent = { ...rawEvent, ...JSON.parse(dataStr) }
const event: PluginEvent = sanitizeEvent({
...combinedEvent,
site_url: combinedEvent.site_url || null,
ip: combinedEvent.ip || null,
})
await ingestEvent(this.pluginsServer, this.workerMethods, event)
} else {
const event = JSON.parse(message.value!.toString())
await onEvent(this.pluginsServer, this.workerMethods, event)
}
const { data: dataStr, ...rawEvent } = JSON.parse(message.value!.toString())
const combinedEvent = { ...rawEvent, ...JSON.parse(dataStr) }
const event: PluginEvent = sanitizeEvent({
...combinedEvent,
site_url: combinedEvent.site_url || null,
ip: combinedEvent.ip || null,
})
await ingestEvent(this.pluginsServer, this.workerMethods, event)
}

private async eachMessageBuffer(
Expand Down
Loading

0 comments on commit 7478bbc

Please sign in to comment.