Skip to content

Commit cbfe1c3

Browse files
committed
fix: missing inserted at in logs
Also fixes stop issue of exporters.
1 parent f8b2138 commit cbfe1c3

File tree

11 files changed

+99
-19
lines changed

11 files changed

+99
-19
lines changed

internal/replication/drivers/clickhouse/driver.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ func (c *Driver) Start(ctx context.Context) error {
4444

4545
func (c *Driver) Accept(ctx context.Context, logs ...drivers.LogWithLedger) ([]error, error) {
4646

47+
c.logger.Debugf("Prepare new batch of %d logs", len(logs))
48+
4749
batch, err := c.db.PrepareBatch(ctx, "insert into logs(ledger, id, type, date, data)")
4850
if err != nil {
4951
return nil, errors.Wrap(err, "failed to prepare batch")

internal/replication/manager.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,12 @@ func (m *Manager) synchronizePipelines(ctx context.Context) error {
217217
for _, pipeline := range pipelines {
218218
m.logger.Debugf("restoring pipeline %s", pipeline.ID)
219219
if _, err := m.startPipeline(ctx, pipeline); err != nil {
220-
return err
220+
switch {
221+
case errors.Is(err, ledger.ErrAlreadyStarted("")):
222+
m.logger.Debugf("Pipeline already started, skipping")
223+
default:
224+
return err
225+
}
221226
}
222227
}
223228

internal/replication/pipeline.go

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,21 @@ type PipelineHandler struct {
6464
}
6565

6666
func (p *PipelineHandler) Run(ctx context.Context, ingestedLogs chan uint64) {
67+
p.logger.Debugf("Pipeline started.")
6768
nextInterval := time.Duration(0)
69+
70+
stop := func(ch chan error) {
71+
p.logger.Debugf("Pipeline terminated.")
72+
close(ch)
73+
}
74+
6875
for {
6976
select {
7077
case ch := <-p.stopChannel:
71-
close(ch)
78+
stop(ch)
7279
return
7380
case <-time.After(nextInterval):
81+
p.logger.Debugf("Fetch next batch.")
7482
var builder query.Builder
7583
if p.pipeline.LastLogID != nil {
7684
builder = query.Gt("id", *p.pipeline.LastLogID)
@@ -86,38 +94,57 @@ func (p *PipelineHandler) Run(ctx context.Context, ingestedLogs chan uint64) {
8694
if err != nil {
8795
p.logger.Errorf("Error fetching logs: %s", err)
8896
select {
89-
case <-ctx.Done():
97+
case ch := <-p.stopChannel:
98+
stop(ch)
9099
return
91100
case <-time.After(p.pipelineConfig.PullInterval):
92101
continue
93102
}
94103
}
95104

105+
p.logger.Debugf("Got %d items", len(logs.Data))
96106
if len(logs.Data) == 0 {
97107
nextInterval = p.pipelineConfig.PullInterval
98108
continue
99109
}
100110

101111
for {
102-
_, err := p.exporter.Accept(ctx, collectionutils.Map(logs.Data, func(log ledger.Log) drivers.LogWithLedger {
103-
return drivers.LogWithLedger{
104-
Log: log,
105-
Ledger: p.pipeline.Ledger,
106-
}
107-
})...)
108-
if err != nil {
109-
p.logger.Errorf("Error pushing data on exporter: %s, waiting for: %s", err, p.pipelineConfig.PushRetryPeriod)
110-
select {
111-
case <-ctx.Done():
112-
return
113-
case <-time.After(p.pipelineConfig.PushRetryPeriod):
114-
continue
112+
p.logger.Debugf("Send data to exporter.")
113+
errChan := make(chan error, 1)
114+
exportContext, cancel := context.WithCancel(ctx)
115+
go func() {
116+
_, err := p.exporter.Accept(exportContext, collectionutils.Map(logs.Data, func(log ledger.Log) drivers.LogWithLedger {
117+
return drivers.LogWithLedger{
118+
Log: log,
119+
Ledger: p.pipeline.Ledger,
120+
}
121+
})...)
122+
errChan <- err
123+
}()
124+
select {
125+
case err := <-errChan:
126+
cancel()
127+
if err != nil {
128+
p.logger.Errorf("Error pushing data on exporter: %s, waiting for: %s", err, p.pipelineConfig.PushRetryPeriod)
129+
select {
130+
case ch := <-p.stopChannel:
131+
stop(ch)
132+
return
133+
case <-time.After(p.pipelineConfig.PushRetryPeriod):
134+
continue
135+
}
115136
}
137+
case ch := <-p.stopChannel:
138+
cancel()
139+
stop(ch)
140+
return
116141
}
142+
117143
break
118144
}
119145

120146
lastLogID := logs.Data[len(logs.Data)-1].ID
147+
p.logger.Debugf("Move last log id to %d", *lastLogID)
121148
p.pipeline.LastLogID = lastLogID
122149

123150
select {
@@ -129,14 +156,15 @@ func (p *PipelineHandler) Run(ctx context.Context, ingestedLogs chan uint64) {
129156
if !logs.HasMore {
130157
nextInterval = p.pipelineConfig.PullInterval
131158
} else {
159+
p.logger.Debugf("Has more logs to fetch.")
132160
nextInterval = 0
133161
}
134162
}
135163
}
136164
}
137165

138166
func (p *PipelineHandler) Shutdown(ctx context.Context) error {
139-
p.logger.Infof("shutdowning pipeline")
167+
p.logger.Infof("Shutdowning pipeline")
140168
errorChannel := make(chan error, 1)
141169
select {
142170
case <-ctx.Done():
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
name: Add missing index
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
create index {{ if not .Transactional }}concurrently{{end}} logs_ids on "{{.Schema}}".logs (id);
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
name: Fill inserted at field in log data
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
do $$
2+
declare
3+
_offset integer := 0;
4+
_batch_size integer := 1000;
5+
begin
6+
set search_path = '{{ .Schema }}';
7+
8+
create temp table logs_view as
9+
select row_number() over (order by id) as row_number, id, ledger
10+
from logs
11+
where type = 'NEW_TRANSACTION' or type = 'REVERTED_TRANSACTION';
12+
create index logs_view_row_numbers on logs_view(row_number);
13+
14+
perform pg_notify('migrations-{{ .Schema }}', 'init: ' || (select count(*) from logs_view));
15+
16+
loop
17+
with _rows as (
18+
select id, ledger, row_number
19+
from logs_view
20+
where row_number >= _offset and row_number < _offset + _batch_size
21+
)
22+
update logs
23+
set data = jsonb_set(data, '{transaction, insertedAt}', to_jsonb(to_jsonb(date)#>>'{}' || 'Z'))
24+
from _rows
25+
where logs.id = _rows.id and
26+
logs.ledger = _rows.ledger;
27+
28+
exit when not found;
29+
30+
_offset = _offset + _batch_size;
31+
32+
perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);
33+
34+
commit;
35+
end loop;
36+
37+
drop table if exists logs_view;
38+
end
39+
$$;
40+

tools/provisioner/Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# syntax=docker/dockerfile:1
12
FROM golang:1.25-alpine AS compiler
23
WORKDIR /src
34
COPY --from=root pkg pkg
@@ -14,6 +15,7 @@ COPY cmd /src/tools/provisioner/cmd
1415
COPY pkg /src/tools/provisioner/pkg
1516
RUN --mount=type=cache,target=$GOPATH go build -o provisioner
1617

18+
# syntax=docker/dockerfile:1
1719
FROM alpine:3.21
1820
LABEL org.opencontainers.image.source=https://github.com/formancehq/ledger
1921
COPY --from=compiler /src/tools/provisioner/provisioner /bin/provisioner

0 commit comments

Comments
 (0)