Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions codegenerator/cli/npm/envio/src/Persistence.res
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type t = {
mutable storageStatus: storageStatus,
storage: storage,
onStorageInitialize: option<unit => promise<unit>>,
cacheStorage: storage,
}

let entityHistoryActionEnumConfig: Internal.enumConfig<EntityHistory.RowAction.t> = {
Expand All @@ -64,7 +63,6 @@ let make = (
~allEnums,
~staticTables,
~storage,
~cacheStorage,
~onStorageInitialize=?,
) => {
let allEntities = userEntities->Js.Array2.concat([dcRegistryEntityConfig])
Expand All @@ -78,7 +76,6 @@ let make = (
storageStatus: Unknown,
storage,
onStorageInitialize,
cacheStorage,
}
}

Expand Down
11 changes: 5 additions & 6 deletions codegenerator/cli/npm/envio/src/db/EntityHistory.res
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,9 @@ let batchInsertRows = (self: t<'entity>, ~sql, ~rows: array<historyRow<'entity>>
type entityInternal

external castInternal: t<'entity> => t<entityInternal> = "%identity"
external eval: string => 'a = "eval"

let fromTable = (table: table, ~schema: S.t<'entity>): t<'entity> => {
let fromTable = (table: table, ~pgSchema, ~schema: S.t<'entity>): t<'entity> => {
let entity_history_block_timestamp = "entity_history_block_timestamp"
let entity_history_chain_id = "entity_history_chain_id"
let entity_history_block_number = "entity_history_block_number"
Expand Down Expand Up @@ -235,12 +236,10 @@ let fromTable = (table: table, ~schema: S.t<'entity>): t<'entity> => {
let dataFieldNames = dataFields->Belt.Array.map(field => field->getFieldName)

let originTableName = table.tableName
let originSchemaName = table.schemaName
let historyTableName = originTableName ++ "_history"
//ignore composite indices
let table = mkTable(
historyTableName,
~schemaName=originSchemaName,
~fields=Belt.Array.concatMany([
currentHistoryFields,
previousHistoryFields,
Expand All @@ -251,8 +250,8 @@ let fromTable = (table: table, ~schema: S.t<'entity>): t<'entity> => {

let insertFnName = `"insert_${table.tableName}"`
let historyRowArg = "history_row"
let historyTablePath = `"${originSchemaName}"."${historyTableName}"`
let originTablePath = `"${originSchemaName}"."${originTableName}"`
let historyTablePath = `"${pgSchema}"."${historyTableName}"`
let originTablePath = `"${pgSchema}"."${originTableName}"`

let previousHistoryFieldsAreNullStr =
previousChangeFieldNames
Expand Down Expand Up @@ -335,7 +334,7 @@ let fromTable = (table: table, ~schema: S.t<'entity>): t<'entity> => {
\${shouldCopyCurrentEntity});\``

let insertFn: (Postgres.sql, Js.Json.t, ~shouldCopyCurrentEntity: bool) => promise<unit> =
insertFnString->Table.PostgresInterop.eval
insertFnString->eval
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DZakh, I can't remember when this was added but don't you think eval should be at start up time and not at runtime of the insert function?


let schema = makeHistoryRowSchema(schema)

Expand Down
60 changes: 1 addition & 59 deletions codegenerator/cli/npm/envio/src/db/Table.res
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,12 @@ let getFieldType = (field: field) => {

type table = {
tableName: string,
schemaName: string,
fields: array<fieldOrDerived>,
compositeIndices: array<array<string>>,
}

let mkTable = (tableName, ~schemaName, ~compositeIndices=[], ~fields) => {
let mkTable = (tableName, ~compositeIndices=[], ~fields) => {
tableName,
schemaName,
fields,
compositeIndices,
}
Expand Down Expand Up @@ -299,59 +297,3 @@ let getCompositeIndices = (table): array<array<string>> => {
->getUnfilteredCompositeIndicesUnsafe
->Array.keep(ind => ind->Array.length > 1)
}

module PostgresInterop = {
type pgFn<'payload, 'return> = (Postgres.sql, 'payload) => promise<'return>
type batchSetFn<'a> = (Postgres.sql, array<'a>) => promise<unit>
external eval: string => 'a = "eval"

let makeBatchSetFnString = (table: table) => {
let fieldNamesInQuotes =
table->getNonDefaultFieldNames->Array.map(fieldName => `"${fieldName}"`)
`(sql, rows) => {
return sql\`
INSERT INTO "${table.schemaName}"."${table.tableName}"
\${sql(rows, ${fieldNamesInQuotes->Js.Array2.joinWith(", ")})}
ON CONFLICT(${table->getPrimaryKeyFieldNames->Js.Array2.joinWith(", ")}) DO UPDATE
SET
${fieldNamesInQuotes
->Array.map(fieldNameInQuotes => `${fieldNameInQuotes} = EXCLUDED.${fieldNameInQuotes}`)
->Js.Array2.joinWith(", ")};\`
}`
}

let chunkBatchQuery = (
sql,
entityDataArray: array<'entity>,
queryToExecute: pgFn<array<'entity>, 'return>,
~maxItemsPerQuery=500,
): promise<array<'return>> => {
let responses = []
let i = ref(0)
let shouldContinue = () => i.contents < entityDataArray->Array.length
// Split entityDataArray into chunks of maxItemsPerQuery
while shouldContinue() {
let chunk =
entityDataArray->Js.Array2.slice(~start=i.contents, ~end_=i.contents + maxItemsPerQuery)
let response = queryToExecute(sql, chunk)
responses->Js.Array2.push(response)->ignore
i := i.contents + maxItemsPerQuery
}
Promise.all(responses)
}

let makeBatchSetFn = (~table, ~schema: S.t<'a>): batchSetFn<'a> => {
let batchSetFn: pgFn<array<Js.Json.t>, unit> = table->makeBatchSetFnString->eval
let parseOrThrow = S.compile(
S.array(schema),
~input=Value,
~output=Json,
~mode=Sync,
~typeValidation=true,
)
async (sql, rows) => {
let rowsJson = rows->parseOrThrow->(Utils.magic: Js.Json.t => array<Js.Json.t>)
let _res = await chunkBatchQuery(sql, rowsJson, batchSetFn)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ let getEntityOperations = (mockDb: t, ~entityConfig: Internal.entityConfig): ent
}

let makeLoadEntitiesByIds = (mockDb: t) => {
(ids, ~entityConfig, ~logger as _=?) => {
(ids, ~entityConfig) => {
let operations = mockDb->getEntityOperations(~entityConfig)
ids->Array.keepMap(id => operations.get(id))->Promise.resolve
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ module {{entity.name.capitalized}} = {

let table = mkTable(
(name :> string),
~schemaName=Env.Db.publicSchema,
~fields=[
{{#each entity.postgres_fields as | pg_field |}}
mkField(
Expand Down Expand Up @@ -105,7 +104,7 @@ module {{entity.name.capitalized}} = {
{{/if}}
)

let entityHistory = table->EntityHistory.fromTable(~schema)
let entityHistory = table->EntityHistory.fromTable(~pgSchema=Env.Db.publicSchema, ~schema)

external castToInternal: t => Internal.entity = "%identity"
}
Expand Down
1 change: 0 additions & 1 deletion codegenerator/cli/templates/static/codegen/src/Config.res
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ let codegenPersistence = Persistence.make(
)->Entities.entityModToInternal,
~allEnums=Enums.allEnums,
~storage=PgStorage.make(~sql=Db.sql, ~pgSchema=storagePgSchema, ~pgUser=Env.Db.user),
~cacheStorage=PgStorage.make(~sql=Db.sql, ~pgSchema=Env.Db.cacheSchema, ~pgUser=Env.Db.user),
~onStorageInitialize=() => {
Hasura.trackDatabase(
~endpoint=Env.Hasura.graphqlEndpoint,
Expand Down
1 change: 0 additions & 1 deletion codegenerator/cli/templates/static/codegen/src/Env.res
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ module Db = {
let password = envSafe->EnvSafe.get("ENVIO_POSTGRES_PASSWORD", S.string, ~devFallback="testing")
let database = envSafe->EnvSafe.get("ENVIO_PG_DATABASE", S.string, ~devFallback="envio-dev")
let publicSchema = envSafe->EnvSafe.get("ENVIO_PG_PUBLIC_SCHEMA", S.string, ~fallback="public")
let cacheSchema = envSafe->EnvSafe.get("ENVIO_PG_CACHE_SCHEMA", S.string, ~fallback="envio_cache")
let ssl = envSafe->EnvSafe.get(
"ENVIO_PG_SSL_MODE",
Postgres.sslOptionsSchema,
Expand Down
10 changes: 3 additions & 7 deletions codegenerator/cli/templates/static/codegen/src/LoadLayer.res
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ type t = {
loadEntitiesByIds: (
array<Types.id>,
~entityConfig: Internal.entityConfig,
~logger: Pino.t=?,
) => promise<array<Internal.entity>>,
loadEntitiesByField: (
~operator: TableIndices.Operator.t,
Expand All @@ -33,7 +32,7 @@ let makeWithDbConnection = (~persistence=Config.codegenPersistence) => {
let storage = Persistence.getInitializedStorageOrThrow(persistence)
{
loadManager: LoadManager.make(),
loadEntitiesByIds: (ids, ~entityConfig, ~logger as _=?) =>
loadEntitiesByIds: (ids, ~entityConfig) =>
storage.loadByIdsOrThrow(
~table=entityConfig.table,
~rowsSchema=entityConfig.rowsSchema,
Expand All @@ -55,13 +54,10 @@ let loadById = (
let inMemTable = inMemoryStore->InMemoryStore.getInMemTable(~entityConfig)

let load = async idsToLoad => {
// Since makeLoader prevents registerign entities already existing in the inMemoryStore,
// Since LoadManager.call prevents registerign entities already existing in the inMemoryStore,
// we can be sure that we load only the new ones.
let dbEntities = try {
await idsToLoad->loadLayer.loadEntitiesByIds(
~entityConfig,
~logger=eventItem->Logging.getEventLogger,
)
await idsToLoad->loadLayer.loadEntitiesByIds(~entityConfig)
} catch {
| Persistence.StorageError({message, reason}) =>
reason->ErrorHandling.mkLogAndRaise(~logger=eventItem->Logging.getEventLogger, ~msg=message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ let make: (
~loadEntitiesByIds: (
array<Types.id>,
~entityConfig: Internal.entityConfig,
~logger: Pino.t=?,
) => promise<array<Internal.entity>>,
~loadEntitiesByField: (
~operator: TableIndices.Operator.t,
Expand Down
12 changes: 2 additions & 10 deletions codegenerator/cli/templates/static/codegen/src/db/TablesStatic.res
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ let isPrimaryKey = true
let isNullable = true
let isIndex = true

let publicSchema = Env.Db.publicSchema

module EventSyncState = {
//Used unsafely in DbFunctions.res so just enforcing the naming here
let blockTimestampFieldName = "block_timestamp"
Expand All @@ -24,7 +22,6 @@ module EventSyncState = {

let table = mkTable(
"event_sync_state",
~schemaName=publicSchema,
~fields=[
mkField("chain_id", Integer, ~fieldSchema=S.int, ~isPrimaryKey),
mkField(blockNumberFieldName, Integer, ~fieldSchema=S.int),
Expand All @@ -42,7 +39,7 @@ module EventSyncState = {

//We need to update values here not delet the rows, since restarting without a row
//has a different behaviour to restarting with an initialised row with zero values
let resetCurrentCurrentSyncStateQuery = `UPDATE ${table.schemaName}.${table.tableName}
let resetCurrentCurrentSyncStateQuery = `UPDATE "${Env.Db.publicSchema}"."${table.tableName}"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make this change? I don't mind either way, and I see this file used to access the env anyhow, but it seems counter productive for librifying if we remove thes from the state/config of the table. I personally think ultimately the whole system should run off of a config outside of the environment and the generated registeration can apply the environment variables.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see the point in that, the PgStorage module is what accepts the public schema not at the table level. I wonder if this then shouldn't be moved to the PgStorage module.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We should move it there at some point. Table shouldn't know anything about postgres implementation.

SET ${blockNumberFieldName} = 0,
${logIndexFieldName} = 0,
${blockTimestampFieldName} = 0,
Expand All @@ -67,7 +64,6 @@ module ChainMetadata = {

let table = mkTable(
"chain_metadata",
~schemaName=publicSchema,
~fields=[
mkField("chain_id", Integer, ~fieldSchema=S.int, ~isPrimaryKey),
mkField("start_block", Integer, ~fieldSchema=S.int),
Expand Down Expand Up @@ -103,7 +99,6 @@ module PersistedState = {

let table = mkTable(
"persisted_state",
~schemaName=publicSchema,
~fields=[
mkField("id", Serial, ~fieldSchema=S.int, ~isPrimaryKey),
mkField("envio_version", Text, ~fieldSchema=S.string),
Expand All @@ -125,7 +120,6 @@ module EndOfBlockRangeScannedData = {

let table = mkTable(
"end_of_block_range_scanned_data",
~schemaName=publicSchema,
~fields=[
mkField("chain_id", Integer, ~fieldSchema=S.int, ~isPrimaryKey),
mkField("block_number", Integer, ~fieldSchema=S.int, ~isPrimaryKey),
Expand Down Expand Up @@ -168,7 +162,6 @@ module RawEvents = {

let table = mkTable(
PgStorage.rawEventsTableName,
~schemaName=publicSchema,
~fields=[
mkField("chain_id", Integer, ~fieldSchema=S.int),
mkField("event_id", Numeric, ~fieldSchema=S.bigint),
Expand Down Expand Up @@ -231,7 +224,6 @@ module DynamicContractRegistry = {

let table = mkTable(
"dynamic_contract_registry",
~schemaName=publicSchema,
~fields=[
mkField("id", Text, ~isPrimaryKey, ~fieldSchema=S.string),
mkField("chain_id", Integer, ~fieldSchema=S.int),
Expand All @@ -250,5 +242,5 @@ module DynamicContractRegistry = {
],
)

let entityHistory = table->EntityHistory.fromTable(~schema)
let entityHistory = table->EntityHistory.fromTable(~pgSchema=Env.Db.publicSchema, ~schema)
}
2 changes: 1 addition & 1 deletion scenarios/test_codegen/test/helpers/Mock.res
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ module LoadLayer = {
let loadEntitiesByIdsCalls = []
let loadEntitiesByFieldCalls = []
let loadLayer = LoadLayer.make(
~loadEntitiesByIds=async (entityIds, ~entityConfig, ~logger as _=?) => {
~loadEntitiesByIds=async (entityIds, ~entityConfig) => {
loadEntitiesByIdsCalls
->Js.Array2.push({
entityIds,
Expand Down
15 changes: 9 additions & 6 deletions scenarios/test_codegen/test/lib_tests/EntityHistory_test.res
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,29 @@ module TestEntity = {
let rowsSchema = S.array(schema)
let table = Table.mkTable(
"TestEntity",
~schemaName="public",
~fields=[
Table.mkField("id", Text, ~fieldSchema=S.string, ~isPrimaryKey=true),
Table.mkField("fieldA", Integer, ~fieldSchema=S.int),
Table.mkField("fieldB", Text, ~fieldSchema=S.null(S.string), ~isNullable=true),
],
)

let entityHistory = table->EntityHistory.fromTable(~schema)
let entityHistory = table->EntityHistory.fromTable(~pgSchema="public", ~schema)

external castToInternal: t => Internal.entity = "%identity"
}

type testEntityHistory = EntityHistory.historyRow<TestEntity.t>
let testEntityHistorySchema = EntityHistory.makeHistoryRowSchema(TestEntity.schema)

let batchSetMockEntity = Table.PostgresInterop.makeBatchSetFn(
~table=TestEntity.table,
~schema=TestEntity.schema,
)
let batchSetMockEntity = (sql, items) =>
PgStorage.setOrThrow(
sql,
~items,
~pgSchema="public",
~table=TestEntity.table,
~itemSchema=TestEntity.schema,
)

let getAllMockEntity = sql =>
sql
Expand Down
3 changes: 0 additions & 3 deletions scenarios/test_codegen/test/lib_tests/Persistence_test.res
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ describe("Test Persistence layer init", () => {
)->Entities.entityModToInternal,
~allEnums=[],
~storage=storageMock.storage,
~cacheStorage=makeStorageMock().storage,
)

Assert.deepEqual(
Expand Down Expand Up @@ -191,7 +190,6 @@ describe("Test Persistence layer init", () => {
)->Entities.entityModToInternal,
~allEnums=[],
~storage=storageMock.storage,
~cacheStorage=makeStorageMock().storage,
)

let p = persistence->Persistence.init
Expand Down Expand Up @@ -228,7 +226,6 @@ describe("Test Persistence layer init", () => {
)->Entities.entityModToInternal,
~allEnums=[],
~storage=storageMock.storage,
~cacheStorage=makeStorageMock().storage,
)

let _ = persistence->Persistence.init(~skipIsInitializedCheck=true)
Expand Down
Loading