Skip to content
This repository has been archived by the owner on Mar 7, 2024. It is now read-only.

Add bq_ingested_timestamp #9

Merged
merged 9 commits into from
Jun 15, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
add bq_ingested_timestamp
  • Loading branch information
mariusandra committed May 27, 2021
commit ae6a873e689248c30d7af94063d00f51e500927c
81 changes: 62 additions & 19 deletions index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { createBuffer } from '@posthog/plugin-contrib'
import { Plugin, PluginMeta, PluginEvent } from '@posthog/plugin-scaffold'
import { BigQuery, Table } from '@google-cloud/bigquery'
import { BigQuery, Table, TableField, TableMetadata } from '@google-cloud/bigquery'

class RetryError extends Error {}
neilkakkar marked this conversation as resolved.
Show resolved Hide resolved

type BigQueryPlugin = Plugin<{
global: {
bigQueryClient: BigQuery
bigQueryTable: Table
bigQueryTableFields: TableField[]

exportEventsBuffer: ReturnType<typeof createBuffer>
exportEventsToIgnore: Set<string>
Expand Down Expand Up @@ -51,32 +52,70 @@ export const setupPlugin: BigQueryPlugin['setupPlugin'] = async (meta) => {
})
global.bigQueryTable = global.bigQueryClient.dataset(config.datasetId).table(config.tableId)

global.bigQueryTableFields = [
{ name: 'uuid', type: 'STRING' },
{ name: 'event', type: 'STRING' },
{ name: 'properties', type: 'STRING' },
{ name: 'elements', type: 'STRING' },
{ name: 'set', type: 'STRING' },
{ name: 'set_once', type: 'STRING' },
{ name: 'distinct_id', type: 'STRING' },
{ name: 'team_id', type: 'INT64' },
{ name: 'ip', type: 'STRING' },
{ name: 'site_url', type: 'STRING' },
{ name: 'timestamp', type: 'TIMESTAMP' },
{ name: 'bq_ingested_timestamp', type: 'TIMESTAMP' },
]

try {
// check if the table exists
await global.bigQueryTable.get()
const [metadata]: TableMetadata[] = await global.bigQueryTable.getMetadata()

if (!metadata.schema || !metadata.schema.fields) {
throw new Error('Can not get metadata for table')
}

const existingFields = metadata.schema.fields
const fieldsToAdd = global.bigQueryTableFields.filter(
({ name }) => !existingFields.find((f: any) => f.name === name)
)

if (fieldsToAdd.length > 0) {
console.info(
`Incomplete schema on BigQuery table! Adding the following fields to reach parity: ${JSON.stringify(
fieldsToAdd
)}`
)

let result: TableMetadata
try {
metadata.schema.fields = metadata.schema.fields.concat(fieldsToAdd)
;[result] = await global.bigQueryTable.setMetadata(metadata)
} catch (error) {
console.error(error)
neilkakkar marked this conversation as resolved.
Show resolved Hide resolved
const fieldsToStillAdd = global.bigQueryTableFields.filter(
({ name }) => !result.schema?.fields?.find((f: any) => f.name === name)
)

if (fieldsToStillAdd.length > 0) {
throw new Error(
`Tried adding fields ${JSON.stringify(fieldsToAdd)}, but ${JSON.stringify(
fieldsToStillAdd
)} still to add. Can not start plugin.`
)
}
}
}
} catch (error) {
// some other error? abort!
if (!error.message.includes('Not found')) {
throw new Error(error)
}
console.log(`Creating BigQuery Table - ${config.datasetId}:${config.tableId}`)

const schema = [
{ name: 'uuid', type: 'STRING' },
{ name: 'event', type: 'STRING' },
{ name: 'properties', type: 'STRING' },
{ name: 'elements', type: 'STRING' },
{ name: 'set', type: 'STRING' },
{ name: 'set_once', type: 'STRING' },
{ name: 'distinct_id', type: 'STRING' },
{ name: 'team_id', type: 'INT64' },
{ name: 'ip', type: 'STRING' },
{ name: 'site_url', type: 'STRING' },
{ name: 'timestamp', type: 'TIMESTAMP' },
]

try {
await global.bigQueryClient.dataset(config.datasetId).createTable(config.tableId, { schema })
await global.bigQueryClient
.dataset(config.datasetId)
.createTable(config.tableId, { schema: global.bigQueryTableFields })
} catch (error) {
// a different worker already created the table
if (!error.message.includes('Already Exists')) {
Expand Down Expand Up @@ -131,12 +170,16 @@ export async function exportEventsToBigQuery(events: PluginEvent[], { global }:
ip,
site_url,
timestamp: timestamp ? global.bigQueryClient.timestamp(timestamp) : null,
bq_ingested_timestamp: global.bigQueryClient.timestamp(new Date()),
}
})
await global.bigQueryTable.insert(rows)
console.log(`Inserted ${events.length} ${events.length > 1 ? 'events' : 'event'} to BigQuery`)
} catch (error) {
console.error(`Error inserting ${events.length} ${events.length > 1 ? 'events' : 'event'} into BigQuery: `, error)
console.error(
`Error inserting ${events.length} ${events.length > 1 ? 'events' : 'event'} into BigQuery: `,
error
)
throw new RetryError(`Error inserting into BigQuery! ${JSON.stringify(error.errors)}`)
}
}
Expand Down