Skip to content

Commit 059deb1

Browse files
committed
fix: missing inserted at in logs
1 parent f8b2138 commit 059deb1

File tree

10 files changed

+81
-5
lines changed

10 files changed

+81
-5
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ require (
125125
github.com/buger/jsonparser v1.1.1 // indirect
126126
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
127127
github.com/containerd/continuity v0.4.5 // indirect
128-
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
128+
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
129129
github.com/dlclark/regexp2 v1.11.5 // indirect
130130
github.com/dnwe/otelsarama v0.0.0-20240308230250-9388d9d40bc0 // indirect
131131
github.com/docker/cli v27.4.1+incompatible // indirect

internal/replication/manager.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,12 @@ func (m *Manager) synchronizePipelines(ctx context.Context) error {
217217
for _, pipeline := range pipelines {
218218
m.logger.Debugf("restoring pipeline %s", pipeline.ID)
219219
if _, err := m.startPipeline(ctx, pipeline); err != nil {
220-
return err
220+
switch {
221+
case errors.Is(err, ledger.ErrAlreadyStarted("")):
222+
m.logger.Debugf("Pipeline already started, skipping")
223+
default:
224+
return err
225+
}
221226
}
222227
}
223228

internal/replication/pipeline.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ package replication
22

33
import (
44
"context"
5+
"fmt"
6+
"runtime/debug"
57
"time"
68

9+
"github.com/davecgh/go-spew/spew"
710
"github.com/formancehq/go-libs/v3/bun/bunpaginate"
811
"github.com/formancehq/go-libs/v3/collectionutils"
912
"github.com/formancehq/go-libs/v3/logging"
@@ -64,17 +67,31 @@ type PipelineHandler struct {
6467
}
6568

6669
func (p *PipelineHandler) Run(ctx context.Context, ingestedLogs chan uint64) {
70+
p.logger.Debugf("Pipeline started.")
6771
nextInterval := time.Duration(0)
72+
73+
defer func() {
74+
fmt.Println("Terminated.")
75+
if e := recover(); e != nil {
76+
fmt.Printf("error: %s\r\n", e)
77+
debug.PrintStack()
78+
}
79+
}()
80+
6881
for {
6982
select {
7083
case ch := <-p.stopChannel:
84+
p.logger.Debugf("Pipeline terminated.")
7185
close(ch)
7286
return
7387
case <-time.After(nextInterval):
88+
p.logger.Debugf("Fetch next batch.")
89+
spew.Dump(p.pipeline)
7490
var builder query.Builder
7591
if p.pipeline.LastLogID != nil {
7692
builder = query.Gt("id", *p.pipeline.LastLogID)
7793
}
94+
spew.Dump(builder)
7895
logs, err := p.store.ListLogs(ctx, common.InitialPaginatedQuery[any]{
7996
PageSize: p.pipelineConfig.LogsPageSize,
8097
Column: "id",
@@ -93,18 +110,21 @@ func (p *PipelineHandler) Run(ctx context.Context, ingestedLogs chan uint64) {
93110
}
94111
}
95112

113+
p.logger.Debugf("Got %d items", len(logs.Data))
96114
if len(logs.Data) == 0 {
97115
nextInterval = p.pipelineConfig.PullInterval
98116
continue
99117
}
100118

101119
for {
120+
p.logger.Debugf("send to exporter")
102121
_, err := p.exporter.Accept(ctx, collectionutils.Map(logs.Data, func(log ledger.Log) drivers.LogWithLedger {
103122
return drivers.LogWithLedger{
104123
Log: log,
105124
Ledger: p.pipeline.Ledger,
106125
}
107126
})...)
127+
p.logger.Debugf("exported: %s", err)
108128
if err != nil {
109129
p.logger.Errorf("Error pushing data on exporter: %s, waiting for: %s", err, p.pipelineConfig.PushRetryPeriod)
110130
select {
@@ -118,6 +138,7 @@ func (p *PipelineHandler) Run(ctx context.Context, ingestedLogs chan uint64) {
118138
}
119139

120140
lastLogID := logs.Data[len(logs.Data)-1].ID
141+
p.logger.Debugf("Move last log id to %d", lastLogID)
121142
p.pipeline.LastLogID = lastLogID
122143

123144
select {

internal/replication/store.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,12 @@ func (s *storageAdapter) OpenLedger(ctx context.Context, name string) (LogFetche
6060
store, l, err := s.storageDriver.OpenLedger(ctx, name)
6161

6262
return LogFetcherFn(func(ctx context.Context, query common.PaginatedQuery[any]) (*bunpaginate.Cursor[ledger.Log], error) {
63-
return store.Logs().Paginate(ctx, query)
63+
ret, err := store.Logs().Paginate(ctx, query)
64+
if err != nil {
65+
return nil, err
66+
}
67+
68+
return ret, nil
6469
}), l, err
6570
}
6671

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
name: Add missing index
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
create index {{ if not .Transactional }}concurrently{{end}} logs_ids on "{{.Schema}}".logs (id);
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
name: Fill inserted at field in log data
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
do $$
2+
declare
3+
_offset integer := 0;
4+
_batch_size integer := 1000;
5+
begin
6+
set search_path = '{{ .Schema }}';
7+
8+
create temp table logs_view as
9+
select row_number() over (order by id) as row_number, id, ledger
10+
from logs
11+
where type = 'NEW_TRANSACTION';
12+
create index logs_view_row_numbers on logs_view(row_number);
13+
14+
perform pg_notify('migrations-{{ .Schema }}', 'init: ' || (select count(*) from logs_view));
15+
16+
loop
17+
with _rows as (
18+
select id, ledger, row_number
19+
from logs_view
20+
where row_number >= _offset and row_number < _offset + _batch_size
21+
)
22+
update logs
23+
set data = jsonb_set(data, '{transaction, insertedAt}', to_jsonb(to_jsonb(date)#>>'{}' || 'Z'))
24+
from _rows
25+
where logs.id = _rows.id and
26+
logs.ledger = _rows.ledger;
27+
28+
exit when not found;
29+
30+
_offset = _offset + _batch_size;
31+
32+
perform pg_notify('migrations-{{ .Schema }}', 'continue: ' || _batch_size);
33+
34+
commit;
35+
end loop;
36+
37+
drop table if exists logs_view;
38+
end
39+
$$;
40+

tools/provisioner/Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# syntax=docker/dockerfile:1
12
FROM golang:1.25-alpine AS compiler
23
WORKDIR /src
34
COPY --from=root pkg pkg
@@ -14,6 +15,7 @@ COPY cmd /src/tools/provisioner/cmd
1415
COPY pkg /src/tools/provisioner/pkg
1516
RUN --mount=type=cache,target=$GOPATH go build -o provisioner
1617

18+
# syntax=docker/dockerfile:1
1719
FROM alpine:3.21
1820
LABEL org.opencontainers.image.source=https://github.com/formancehq/ledger
1921
COPY --from=compiler /src/tools/provisioner/provisioner /bin/provisioner

tools/provisioner/justfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
set positional-arguments
44

5-
push-image version='latest':
5+
push-image version='latest' registry='ghcr.io':
66
docker buildx build . \
77
--build-context root=../.. \
8-
-t ghcr.io/formancehq/ledger-provisioner:{{ version }} \
8+
-t {{ registry }}/formancehq/ledger-provisioner:{{ version }} \
99
--push \
1010
--platform linux/amd64,linux/arm64

0 commit comments

Comments
 (0)