Skip to content

Commit 0d35ad8

Browse files
authored
Merge branch 'main' into dz/storage-load-metrics
2 parents 0723a81 + c1be236 commit 0d35ad8

File tree

11 files changed

+135
-98
lines changed

11 files changed

+135
-98
lines changed

codegenerator/cli/npm/envio/src/FetchState.res

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -93,25 +93,6 @@ let copy = (fetchState: t) => {
9393
}
9494
}
9595

96-
/*
97-
Comapritor for two events from the same chain. No need for chain id or timestamp
98-
*/
99-
let eventItemGt = (a: Internal.eventItem, b: Internal.eventItem) =>
100-
if a.blockNumber > b.blockNumber {
101-
true
102-
} else if a.blockNumber === b.blockNumber {
103-
a.logIndex > b.logIndex
104-
} else {
105-
false
106-
}
107-
108-
/*
109-
Merges two event queues on a single event fetcher
110-
111-
Pass the shorter list into A for better performance
112-
*/
113-
let mergeSortedEventList = (a, b) => Utils.Array.mergeSorted(eventItemGt, a, b)
114-
11596
let mergeIntoPartition = (p: partition, ~target: partition, ~maxAddrInPartition) => {
11697
switch (p, target) {
11798
| ({selection: {dependsOnAddresses: true}}, {selection: {dependsOnAddresses: true}}) => {
@@ -565,6 +546,18 @@ type query = {
565546
exception UnexpectedPartitionNotFound({partitionId: string})
566547
exception UnexpectedMergeQueryResponse({message: string})
567548

549+
/*
550+
Comparitor for two events from the same chain. No need for chain id or timestamp
551+
*/
552+
let compareBufferItem = (a: Internal.eventItem, b: Internal.eventItem) => {
553+
let blockDiff = b.blockNumber - a.blockNumber
554+
if blockDiff === 0 {
555+
b.logIndex - a.logIndex
556+
} else {
557+
blockDiff
558+
}
559+
}
560+
568561
/*
569562
Updates fetchState with a response for a given query.
570563
Returns Error if the partition with given query cannot be found (unexpected)
@@ -576,7 +569,7 @@ let handleQueryResult = (
576569
{partitions} as fetchState: t,
577570
~query: query,
578571
~latestFetchedBlock: blockNumberAndTimestamp,
579-
~reversedNewItems,
572+
~newItems,
580573
~currentBlockHeight,
581574
): result<t, exn> =>
582575
{
@@ -633,7 +626,12 @@ let handleQueryResult = (
633626
fetchState->updateInternal(
634627
~partitions,
635628
~currentBlockHeight,
636-
~queue=mergeSortedEventList(reversedNewItems, fetchState.queue),
629+
~queue=fetchState.queue
630+
->Array.concat(newItems)
631+
// Theoretically it could be faster to asume that
632+
// the items are sorted, but there are cases
633+
// when the data source returns them unsorted
634+
->Js.Array2.sortInPlaceWith(compareBufferItem),
637635
)
638636
})
639637

codegenerator/cli/npm/envio/src/db/Table.res

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,8 @@ let toSqlParams = (table: table, ~schema, ~pgSchema) => {
240240
switch field {
241241
| Field(f) =>
242242
switch f.fieldType {
243+
// The case for `BigDecimal! @config(precision: 10, scale: 8)`
244+
| Custom(fieldType) if fieldType->Js.String2.startsWith("NUMERIC(") => fieldType
243245
| Custom(fieldType) => `${(Text :> string)}[]::"${pgSchema}".${(fieldType :> string)}`
244246
| Boolean => `${(Integer :> string)}[]::${(f.fieldType :> string)}`
245247
| fieldType => (fieldType :> string)

codegenerator/cli/templates/dynamic/codegen/src/TestHelpers_MockDb.res.hbs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ and makeProcessEvents = (mockDb: t, ~chainId=?) => async (
318318
events: array<Types.eventLog<unknown>>,
319319
) => {
320320
let config = RegisterHandlers.getConfig()
321-
let reversedWithContractRegister = []
321+
let itemsWithContractRegister = []
322322

323323
let processingChainId = ref(chainId)
324324
let eventItems = events->Array.map(event => {
@@ -355,7 +355,7 @@ and makeProcessEvents = (mockDb: t, ~chainId=?) => async (
355355
blockNumber: event.block->Types.Block.getNumber,
356356
}
357357
if eventConfig.contractRegister->Option.isSome {
358-
reversedWithContractRegister->Js.Array2.unshift(eventItem)->ignore
358+
itemsWithContractRegister->Js.Array2.push(eventItem)->ignore
359359
}
360360
eventItem
361361
})
@@ -387,8 +387,8 @@ and makeProcessEvents = (mockDb: t, ~chainId=?) => async (
387387
//No need to check contract is registered or return anything.
388388
//The only purpose is to test the registerContract function and to
389389
//add the entity to the in memory store for asserting registrations
390-
if reversedWithContractRegister->Utils.Array.notEmpty {
391-
let dcs = await ChainFetcher.runContractRegistersOrThrow(~reversedWithContractRegister, ~config)
390+
if itemsWithContractRegister->Utils.Array.notEmpty {
391+
let dcs = await ChainFetcher.runContractRegistersOrThrow(~itemsWithContractRegister, ~config)
392392

393393
// TODO: Reuse FetchState logic to clean up duplicate dcs
394394
if dcs->Utils.Array.notEmpty {

codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ let getContractStartBlock = (
353353
}
354354

355355
let runContractRegistersOrThrow = async (
356-
~reversedWithContractRegister: array<Internal.eventItem>,
356+
~itemsWithContractRegister: array<Internal.eventItem>,
357357
~config: Config.t,
358358
) => {
359359
let dynamicContracts = []
@@ -396,8 +396,8 @@ let runContractRegistersOrThrow = async (
396396
}
397397

398398
let promises = []
399-
for idx in reversedWithContractRegister->Array.length - 1 downto 0 {
400-
let eventItem = reversedWithContractRegister->Array.getUnsafe(idx)
399+
for idx in 0 to itemsWithContractRegister->Array.length - 1 {
400+
let eventItem = itemsWithContractRegister->Array.getUnsafe(idx)
401401
let contractRegister = switch eventItem.eventConfig.contractRegister {
402402
| Some(contractRegister) => contractRegister
403403
| None =>
@@ -452,7 +452,7 @@ Returns Error if the node with given id cannot be found (unexpected)
452452
let handleQueryResult = (
453453
chainFetcher: t,
454454
~query: FetchState.query,
455-
~reversedNewItems,
455+
~newItems,
456456
~dynamicContracts,
457457
~latestFetchedBlock,
458458
~currentBlockHeight,
@@ -470,7 +470,7 @@ let handleQueryResult = (
470470
->FetchState.handleQueryResult(
471471
~query,
472472
~latestFetchedBlock,
473-
~reversedNewItems,
473+
~newItems,
474474
~currentBlockHeight,
475475
)
476476
->Result.map(fetchState => {

codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res

Lines changed: 29 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ type action =
123123
// because when processing the response, there might be an async contract registration.
124124
// So after it's finished we dispatch the submit action to get the latest fetch state.
125125
| SubmitPartitionQueryResponse({
126-
reversedNewItems: array<Internal.eventItem>,
126+
newItems: array<Internal.eventItem>,
127127
dynamicContracts: array<FetchState.indexingContract>,
128128
currentBlockHeight: int,
129129
latestFetchedBlock: FetchState.blockNumberAndTimestamp,
@@ -469,7 +469,7 @@ let validatePartitionQueryResponse = (
469469

470470
let submitPartitionQueryResponse = (
471471
state,
472-
~reversedNewItems,
472+
~newItems,
473473
~dynamicContracts,
474474
~currentBlockHeight,
475475
~latestFetchedBlock,
@@ -484,7 +484,7 @@ let submitPartitionQueryResponse = (
484484
~query,
485485
~currentBlockHeight,
486486
~latestFetchedBlock,
487-
~reversedNewItems,
487+
~newItems,
488488
~dynamicContracts,
489489
)
490490
->Utils.unwrapResultExn
@@ -538,12 +538,10 @@ let processPartitionQueryResponse = async (
538538
latestFetchedBlockTimestamp,
539539
} = response
540540

541-
let reversedWithContractRegister = []
542-
let reversedNewItems = []
541+
let itemsWithContractRegister = []
542+
let newItems = []
543543

544-
// It's cheaper to reverse only items with contract register
545-
// Then all items. That's why we use downto loop
546-
for idx in parsedQueueItems->Array.length - 1 downto 0 {
544+
for idx in 0 to parsedQueueItems->Array.length - 1 {
547545
let item = parsedQueueItems->Array.getUnsafe(idx)
548546
if (
549547
switch chainFetcher.processingFilters {
@@ -552,29 +550,29 @@ let processPartitionQueryResponse = async (
552550
}
553551
) {
554552
if item.eventConfig.contractRegister !== None {
555-
reversedWithContractRegister->Array.push(item)
553+
itemsWithContractRegister->Array.push(item)
556554
}
557555

558556
// TODO: Don't really need to keep it in the queue
559557
// when there's no handler (besides raw_events, processed counter, and dcsToStore consuming)
560-
reversedNewItems->Array.push(item)
558+
newItems->Array.push(item)
561559
}
562560
}
563561

564-
let dynamicContracts = switch reversedWithContractRegister {
562+
let dynamicContracts = switch itemsWithContractRegister {
565563
| [] as empty =>
566564
// A small optimisation to not recreate an empty array
567565
empty->(Utils.magic: array<Internal.eventItem> => array<FetchState.indexingContract>)
568566
| _ =>
569567
await ChainFetcher.runContractRegistersOrThrow(
570-
~reversedWithContractRegister,
568+
~itemsWithContractRegister,
571569
~config=state.config,
572570
)
573571
}
574572

575573
dispatchAction(
576574
SubmitPartitionQueryResponse({
577-
reversedNewItems,
575+
newItems,
578576
dynamicContracts,
579577
currentBlockHeight,
580578
latestFetchedBlock: {
@@ -643,15 +641,15 @@ let actionReducer = (state: t, action: action) => {
643641
| ValidatePartitionQueryResponse(partitionQueryResponse) =>
644642
state->validatePartitionQueryResponse(partitionQueryResponse)
645643
| SubmitPartitionQueryResponse({
646-
reversedNewItems,
644+
newItems,
647645
dynamicContracts,
648646
currentBlockHeight,
649647
latestFetchedBlock,
650648
query,
651649
chain,
652650
}) =>
653651
state->submitPartitionQueryResponse(
654-
~reversedNewItems,
652+
~newItems,
655653
~dynamicContracts,
656654
~currentBlockHeight,
657655
~latestFetchedBlock,
@@ -878,24 +876,23 @@ let injectedTaskReducer = (
878876
state.writeThrottlers.deepCleanCount = state.writeThrottlers.deepCleanCount + 1
879877
false
880878
}
881-
let timeRef = Hrtime.makeTimer()
882-
let _ = await Promise.all(
883-
Entities.allEntities->Belt.Array.map(entityConfig => {
884-
Db.sql->DbFunctions.EntityHistory.pruneStaleEntityHistory(
885-
~entityName=entityConfig.name,
886-
~safeChainIdAndBlockNumberArray,
887-
~shouldDeepClean,
888-
)
889-
}),
890-
)
891-
892-
if Env.Benchmark.shouldSaveData {
893-
let elapsedTimeMillis = Hrtime.timeSince(timeRef)->Hrtime.toMillis->Hrtime.floatFromMillis
894879

895-
Benchmark.addSummaryData(
896-
~group="Other",
897-
~label="Prune Stale History Time (ms)",
898-
~value=elapsedTimeMillis,
880+
for idx in 0 to Entities.allEntities->Array.length - 1 {
881+
if idx !== 0 {
882+
// Add some delay between entities
883+
// To unblock the pg client if it's needed for something else
884+
await Utils.delay(400)
885+
}
886+
let entityConfig = Entities.allEntities->Array.getUnsafe(idx)
887+
let timeRef = Hrtime.makeTimer()
888+
await Db.sql->DbFunctions.EntityHistory.pruneStaleEntityHistory(
889+
~entityName=entityConfig.name,
890+
~safeChainIdAndBlockNumberArray,
891+
~shouldDeepClean,
892+
)
893+
Prometheus.RollbackHistoryPrune.increment(
894+
~timeMillis=Hrtime.timeSince(timeRef)->Hrtime.toMillis,
895+
~entityName=entityConfig.name,
899896
)
900897
}
901898
}

scenarios/test_codegen/schema.graphql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ type EntityWithAllTypes {
114114
arrayOfBigInts: [BigInt!]!
115115
bigDecimal: BigDecimal!
116116
optBigDecimal: BigDecimal
117+
bigDecimalWithConfig: BigDecimal! @config(precision: 10, scale: 8)
117118
arrayOfBigDecimals: [BigDecimal!]!
118119
# NOTE: Timestamp serialization is currently just a type cast and so testing is non deterministic
119120
# timestamp: Timestamp!
@@ -141,6 +142,7 @@ type EntityWithAllNonArrayTypes {
141142
optBigInt: BigInt
142143
bigDecimal: BigDecimal!
143144
optBigDecimal: BigDecimal
145+
bigDecimalWithConfig: BigDecimal! @config(precision: 10, scale: 8)
144146
enumField: AccountType!
145147
optEnumField: AccountType
146148
}

scenarios/test_codegen/src/EventHandlers.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,7 @@ Gravatar.FactoryEvent.handlerWithLoader({
540540
optBigInt: 2n,
541541
arrayOfBigInts: [3n, 4n],
542542
bigDecimal: new BigDecimal("1.1"),
543+
bigDecimalWithConfig: new BigDecimal("1.1"),
543544
optBigDecimal: new BigDecimal("2.2"),
544545
arrayOfBigDecimals: [new BigDecimal("3.3"), new BigDecimal("4.4")],
545546
json: { foo: ["bar"] },
@@ -563,6 +564,7 @@ Gravatar.FactoryEvent.handlerWithLoader({
563564
optBigInt: 2n,
564565
arrayOfBigInts: [3n, 4n],
565566
bigDecimal: new BigDecimal("1.1"),
567+
bigDecimalWithConfig: new BigDecimal("1.1"),
566568
optBigDecimal: new BigDecimal("2.2"),
567569
arrayOfBigDecimals: [new BigDecimal("3.3"), new BigDecimal("4.4")],
568570
json: { foo: ["bar"] },

scenarios/test_codegen/test/ChainManager_test.res

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ let populateChainQueuesWithRandomEvents = (~runTime=1000, ~maxBlockTime=15, ())
8080
blockNumber: batchItem.blockNumber,
8181
blockTimestamp: batchItem.timestamp,
8282
},
83-
~reversedNewItems=[batchItem],
83+
~newItems=[batchItem],
8484
~currentBlockHeight=currentBlockNumber.contents,
8585
)
8686
->Result.getExn

scenarios/test_codegen/test/SerDe_Test.res

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ describe("SerDe Test", () => {
3434
optBigInt: Some(BigInt.fromInt(2)),
3535
arrayOfBigInts: [BigInt.fromInt(3), BigInt.fromInt(4)],
3636
bigDecimal: BigDecimal.fromStringUnsafe("1.1"),
37+
bigDecimalWithConfig: BigDecimal.fromStringUnsafe("1.1"),
3738
optBigDecimal: Some(BigDecimal.fromStringUnsafe("2.2")),
3839
arrayOfBigDecimals: [BigDecimal.fromStringUnsafe("3.3"), BigDecimal.fromStringUnsafe("4.4")],
3940
//TODO: get timestamp working
@@ -123,7 +124,7 @@ describe("SerDe Test", () => {
123124
Entities.EntityWithAllNonArrayTypes.table->PgStorage.makeCreateTableQuery(~pgSchema="public")
124125
Assert.equal(
125126
createQuery,
126-
`CREATE TABLE IF NOT EXISTS "public"."EntityWithAllNonArrayTypes"("bigDecimal" NUMERIC NOT NULL, "bigInt" NUMERIC NOT NULL, "bool" BOOLEAN NOT NULL, "enumField" "public".AccountType NOT NULL, "float_" DOUBLE PRECISION NOT NULL, "id" TEXT NOT NULL, "int_" INTEGER NOT NULL, "optBigDecimal" NUMERIC, "optBigInt" NUMERIC, "optBool" BOOLEAN, "optEnumField" "public".AccountType, "optFloat" DOUBLE PRECISION, "optInt" INTEGER, "optString" TEXT, "string" TEXT NOT NULL, "db_write_timestamp" TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY("id"));`,
127+
`CREATE TABLE IF NOT EXISTS "public"."EntityWithAllNonArrayTypes"("bigDecimal" NUMERIC NOT NULL, "bigDecimalWithConfig" NUMERIC(10, 8) NOT NULL, "bigInt" NUMERIC NOT NULL, "bool" BOOLEAN NOT NULL, "enumField" "public".AccountType NOT NULL, "float_" DOUBLE PRECISION NOT NULL, "id" TEXT NOT NULL, "int_" INTEGER NOT NULL, "optBigDecimal" NUMERIC, "optBigInt" NUMERIC, "optBool" BOOLEAN, "optEnumField" "public".AccountType, "optFloat" DOUBLE PRECISION, "optInt" INTEGER, "optString" TEXT, "string" TEXT NOT NULL, "db_write_timestamp" TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY("id"));`,
127128
)
128129
let query = PgStorage.makeInsertUnnestSetQuery(
129130
~table=Entities.EntityWithAllNonArrayTypes.table,
@@ -134,8 +135,8 @@ describe("SerDe Test", () => {
134135

135136
Assert.equal(
136137
query,
137-
`INSERT INTO "public"."EntityWithAllNonArrayTypes" ("bigDecimal", "bigInt", "bool", "enumField", "float_", "id", "int_", "optBigDecimal", "optBigInt", "optBool", "optEnumField", "optFloat", "optInt", "optString", "string")
138-
SELECT * FROM unnest($1::NUMERIC[],$2::NUMERIC[],$3::INTEGER[]::BOOLEAN[],$4::TEXT[]::"public".AccountType[],$5::DOUBLE PRECISION[],$6::TEXT[],$7::INTEGER[],$8::NUMERIC[],$9::NUMERIC[],$10::INTEGER[]::BOOLEAN[],$11::TEXT[]::"public".AccountType[],$12::DOUBLE PRECISION[],$13::INTEGER[],$14::TEXT[],$15::TEXT[])ON CONFLICT("id") DO UPDATE SET "bigDecimal" = EXCLUDED."bigDecimal","bigInt" = EXCLUDED."bigInt","bool" = EXCLUDED."bool","enumField" = EXCLUDED."enumField","float_" = EXCLUDED."float_","int_" = EXCLUDED."int_","optBigDecimal" = EXCLUDED."optBigDecimal","optBigInt" = EXCLUDED."optBigInt","optBool" = EXCLUDED."optBool","optEnumField" = EXCLUDED."optEnumField","optFloat" = EXCLUDED."optFloat","optInt" = EXCLUDED."optInt","optString" = EXCLUDED."optString","string" = EXCLUDED."string";`,
138+
`INSERT INTO "public"."EntityWithAllNonArrayTypes" ("bigDecimal", "bigDecimalWithConfig", "bigInt", "bool", "enumField", "float_", "id", "int_", "optBigDecimal", "optBigInt", "optBool", "optEnumField", "optFloat", "optInt", "optString", "string")
139+
SELECT * FROM unnest($1::NUMERIC[],$2::NUMERIC(10, 8)[],$3::NUMERIC[],$4::INTEGER[]::BOOLEAN[],$5::TEXT[]::"public".AccountType[],$6::DOUBLE PRECISION[],$7::TEXT[],$8::INTEGER[],$9::NUMERIC[],$10::NUMERIC[],$11::INTEGER[]::BOOLEAN[],$12::TEXT[]::"public".AccountType[],$13::DOUBLE PRECISION[],$14::INTEGER[],$15::TEXT[],$16::TEXT[])ON CONFLICT("id") DO UPDATE SET "bigDecimal" = EXCLUDED."bigDecimal","bigDecimalWithConfig" = EXCLUDED."bigDecimalWithConfig","bigInt" = EXCLUDED."bigInt","bool" = EXCLUDED."bool","enumField" = EXCLUDED."enumField","float_" = EXCLUDED."float_","int_" = EXCLUDED."int_","optBigDecimal" = EXCLUDED."optBigDecimal","optBigInt" = EXCLUDED."optBigInt","optBool" = EXCLUDED."optBool","optEnumField" = EXCLUDED."optEnumField","optFloat" = EXCLUDED."optFloat","optInt" = EXCLUDED."optInt","optString" = EXCLUDED."optString","string" = EXCLUDED."string";`,
139140
)
140141
})
141142

@@ -156,6 +157,7 @@ SELECT * FROM unnest($1::NUMERIC[],$2::NUMERIC[],$3::INTEGER[]::BOOLEAN[],$4::TE
156157
optBigInt: Some(BigInt.fromInt(2)),
157158
bigDecimal: BigDecimal.fromStringUnsafe("1.1"),
158159
optBigDecimal: Some(BigDecimal.fromStringUnsafe("2.2")),
160+
bigDecimalWithConfig: BigDecimal.fromStringUnsafe("1.1"),
159161
enumField: ADMIN,
160162
optEnumField: Some(ADMIN),
161163
}

0 commit comments

Comments
 (0)