Skip to content

Commit

Permalink
[DPP-1327] Data migration and ingestion support for new transaction r…
Browse files Browse the repository at this point in the history
…elated tables
  • Loading branch information
pbatko-da committed Dec 1, 2022
1 parent 695d680 commit 0ff9811
Show file tree
Hide file tree
Showing 23 changed files with 722 additions and 150 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
367299fee938fdfdaedc92a6cf643ea7659df543837fb89fd0db177e1e1bc0e0
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
------------------------------------ ETQ Data migration -------------------------------

-- Removes all elements from a that are present in b, essentially computes a - b.
CREATE OR REPLACE FUNCTION etq_array_diff(
arrayClob1 IN CLOB,
arrayClob2 IN CLOB
)
RETURN CLOB
IS
arrayJson1 json_array_t := json_array_t.parse(arrayClob1);
outputJsonArray json_array_t := json_array_t ('[]');
filterExpression varchar2(100);
BEGIN
FOR i IN 0 .. arrayJson1.get_size - 1
LOOP
-- `$[*]` selects each element of the array
-- `(@ == v)` is a filter expression that check whether each matched element is equal to some value `v`
filterExpression := '$[*]?(@ == ' || (arrayJson1.get(i).to_clob()) ||')';
IF NOT json_exists(arrayClob2, filterExpression)
THEN
outputJsonArray.append(arrayJson1.get(i));
END IF;
END LOOP;
RETURN outputJsonArray.to_clob();
END;
/

-- Populate pe_create_id_filter_non_stakeholder_informee
INSERT INTO pe_create_id_filter_non_stakeholder_informee(event_sequential_id, party_id)
WITH
input1 AS
(
SELECT
event_sequential_id AS i,
etq_array_diff(tree_event_witnesses, flat_event_witnesses) AS ps
FROM participant_events_create
)
SELECT i, p
FROM input1, json_table(ps, '$[*]' columns (p NUMBER PATH '$'));

-- Populate pe_consuming_id_filter_stakeholder
INSERT INTO pe_consuming_id_filter_stakeholder(event_sequential_id, template_id, party_id)
WITH
input1 AS
(
SELECT
event_sequential_id AS i,
template_id AS t,
flat_event_witnesses AS ps
FROM participant_events_consuming_exercise
)
SELECT i, t, p
FROM input1, json_table(ps, '$[*]' columns (p NUMBER PATH '$'));

-- Populate pe_consuming_id_filter_non_stakeholder_informee
INSERT INTO pe_consuming_id_filter_non_stakeholder_informee(event_sequential_id, party_id)
WITH
input1 AS
(
SELECT
event_sequential_id AS i,
etq_array_diff(tree_event_witnesses, flat_event_witnesses) AS ps
FROM participant_events_consuming_exercise
)
SELECT i, p
FROM input1, json_table(ps, '$[*]' columns (p NUMBER PATH '$'));

-- Populate pe_non_consuming_exercise_filter_nonstakeholder_informees
INSERT INTO pe_non_consuming_id_filter_informee(event_sequential_id, party_id)
WITH
input1 AS
(
SELECT
event_sequential_id AS i,
etq_array_diff(tree_event_witnesses, flat_event_witnesses) AS ps
FROM participant_events_non_consuming_exercise
)
SELECT i, p
FROM input1, json_table(ps, '$[*]' columns (p NUMBER PATH '$'));

-- Populate participant_transaction_meta
INSERT INTO participant_transaction_meta(transaction_id, event_offset, event_sequential_id_first, event_sequential_id_last)
WITH
input1 AS (
SELECT
transaction_id AS t,
event_offset AS o,
event_sequential_id AS i
FROM participant_events_create
UNION ALL
SELECT
transaction_id AS t,
event_offset AS o,
event_sequential_id AS i
FROM participant_events_consuming_exercise
UNION ALL
SELECT
transaction_id AS t,
event_offset AS o,
event_sequential_id AS i
FROM participant_events_non_consuming_exercise
UNION ALL
SELECT
c.transaction_id AS t,
c.event_offset AS o,
d.event_sequential_id AS i
FROM participant_events_divulgence d
JOIN participant_events_create c ON d.contract_id = c.contract_id
),
input2 AS (
SELECT
t,
o,
min(i) as first_i,
max(i) as last_i
FROM input1
GROUP BY t, o
)
SELECT t, o, first_i, last_i FROM input2;

DROP FUNCTION etq_array_diff;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
045d9250d86a224a140fd591ccb76e32b44d9c063dcc8b47371d93c4a1cd48ad
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
------------------------------------ ETQ Data migration -------------------------------

-- Removes all elements from a that are present in b, essentially computes a - b.
CREATE OR REPLACE FUNCTION etq_array_diff(a int[], b int[])
RETURNS int[]
AS
$$
SELECT coalesce(array_agg(el), '{}')
FROM unnest(a) as el
WHERE el <> all(b)
$$
LANGUAGE SQL;

-- Populate pe_create_id_filter_non_stakeholder_informee
WITH
input1 AS
(
SELECT
event_sequential_id AS i,
etq_array_diff(tree_event_witnesses, flat_event_witnesses) AS ps
FROM participant_events_create
)
INSERT INTO pe_create_id_filter_non_stakeholder_informee(event_sequential_id, party_id)
SELECT i, unnest(ps) FROM input1;

-- Populate pe_consuming_id_filter_stakeholder
WITH
input1 AS
(
SELECT
event_sequential_id AS i,
template_id AS t,
flat_event_witnesses AS ps
FROM participant_events_consuming_exercise
)
INSERT INTO pe_consuming_id_filter_stakeholder(event_sequential_id, template_id, party_id)
SELECT i, t, unnest(ps) FROM input1;

-- Populate pe_consuming_id_filter_non_stakeholder_informee
WITH
input1 AS
(
SELECT
event_sequential_id AS i,
etq_array_diff(tree_event_witnesses, flat_event_witnesses) AS ps
FROM participant_events_consuming_exercise
)
INSERT INTO pe_consuming_id_filter_non_stakeholder_informee(event_sequential_id, party_id)
SELECT i, unnest(ps) FROM input1;

-- Populate pe_non_consuming_exercise_filter_nonstakeholder_informees
WITH
input1 AS
(
SELECT
event_sequential_id AS i,
tree_event_witnesses AS ps
FROM participant_events_non_consuming_exercise
)
INSERT INTO pe_non_consuming_id_filter_informee(event_sequential_id, party_id)
SELECT i, unnest(ps) FROM input1;

-- Populate participant_transaction_meta
WITH
input1 AS (
SELECT
transaction_id AS t,
event_offset AS o,
event_sequential_id AS i
FROM participant_events_create
UNION ALL
SELECT
transaction_id AS t,
event_offset AS o,
event_sequential_id AS i
FROM participant_events_consuming_exercise
UNION ALL
SELECT
transaction_id AS t,
event_offset AS o,
event_sequential_id AS i
FROM participant_events_non_consuming_exercise
UNION ALL
-- NOTE: Divulgence offsets with no corresponding create events will not
-- have an entry in transaction_meta table
SELECT
c.transaction_id AS t,
c.event_offset AS o,
d.event_sequential_id AS i
FROM participant_events_divulgence d
JOIN participant_events_create c ON d.contract_id = c.contract_id
),
input2 AS (
SELECT
t,
o,
min(i) as first_i,
max(i) as last_i
FROM input1
GROUP BY t, o
)
INSERT INTO participant_transaction_meta(transaction_id, event_offset, event_sequential_id_first, event_sequential_id_last)
SELECT t, o, first_i, last_i FROM input2;

DROP FUNCTION etq_array_diff;
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.daml.platform.store.dao.DbDispatcher
import com.daml.platform.store.dao.events.{CompressionStrategy, LfValueTranslation}
import com.daml.platform.store.interning.{InternizingStringInterningView, StringInterning}
import java.sql.Connection
import scala.util.chaining._

import com.daml.metrics.api.MetricsContext

Expand Down Expand Up @@ -186,6 +187,7 @@ object ParallelIndexerSubscription {
Timed.value(
metrics.daml.parallelIndexer.seqMapping.duration, {
var eventSeqId = previous.lastSeqEventId
var lastTransactionMetaLastEventId = eventSeqId
val batchWithSeqIds = current.batch.map {
case dbDto: DbDto.EventCreate =>
eventSeqId += 1
Expand All @@ -199,10 +201,24 @@ object ParallelIndexerSubscription {
eventSeqId += 1
dbDto.copy(event_sequential_id = eventSeqId)

case dbDto: DbDto.CreateFilter =>
// we do not increase the event_seq_id here, because all the CreateFilter DbDto-s must have the same eventSeqId as the preceding EventCreate
case dbDto: DbDto.IdFilterCreateStakeholder =>
// we do not increase the event_seq_id here, because all the IdFilterCreateStakeholder DbDto-s must have the same eventSeqId as the preceding EventCreate
dbDto.copy(event_sequential_id = eventSeqId)

case dbDto: DbDto.IdFilterCreateNonStakeholderInformee =>
dbDto.copy(event_sequential_id = eventSeqId)
case dbDto: DbDto.IdFilterConsumingStakeholder =>
dbDto.copy(event_sequential_id = eventSeqId)
case dbDto: DbDto.IdFilterConsumingNonStakeholderInformee =>
dbDto.copy(event_sequential_id = eventSeqId)
case dbDto: DbDto.IdFilterNonConsumingInformee =>
dbDto.copy(event_sequential_id = eventSeqId)
case dbDto: DbDto.TransactionMeta =>
dbDto
.copy(
event_sequential_id_first = lastTransactionMetaLastEventId + 1,
event_sequential_id_last = eventSeqId,
)
.tap(_ => lastTransactionMetaLastEventId = eventSeqId)
case unChanged => unChanged
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,40 @@ object DbDto {
StringInterningDto(entry._1, entry._2)
}

final case class CreateFilter(
final case class IdFilterCreateStakeholder(
event_sequential_id: Long,
template_id: String,
party_id: String,
) extends DbDto

final case class IdFilterCreateNonStakeholderInformee(
event_sequential_id: Long,
party_id: String,
) extends DbDto

final case class IdFilterConsumingStakeholder(
event_sequential_id: Long,
template_id: String,
party_id: String,
) extends DbDto

final case class IdFilterConsumingNonStakeholderInformee(
event_sequential_id: Long,
party_id: String,
) extends DbDto

final case class IdFilterNonConsumingInformee(
event_sequential_id: Long,
party_id: String,
) extends DbDto

final case class TransactionMeta(
transaction_id: String,
event_offset: String,
event_sequential_id_first: Long,
event_sequential_id_last: Long,
) extends DbDto

final case class TransactionMetering(
application_id: String,
action_count: Int,
Expand Down
Loading

0 comments on commit 0ff9811

Please sign in to comment.