Skip to content

Commit b62bb08

Browse files
committed
Added from-proto Clickhouse integration tests and fixed Clickhouse partition on `_block_timestamp_
1 parent 1f0e4d6 commit b62bb08

File tree

18 files changed

+1266
-845
lines changed

18 files changed

+1266
-845
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## Unreleased
9+
10+
### Clickhouse & `substreams-sink-sql from-proto`
11+
12+
* If a `clickhouse_table_options.partition_fields` already contains some partition for `_block_timestamp_`, the default `(toYYYYMM(_block_timestamp_))` will not be added anymore.
13+
14+
* The default partition on `_block_timestamp_` is now `(toYYYYMM(_block_timestamp_))` instead of just `_block_timestamp_`.
15+
816
## v4.6.4
917

1018
### Clickhouse & `substreams-sink-sql from-proto`

buf.work.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
version: v1
22
directories:
33
- proto
4+
- db_proto/test/substreams/order/proto

cmd/substreams-sink-sql/from_proto.go

Lines changed: 14 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,9 @@ import (
1212
"github.com/streamingfast/substreams-sink-sql/db_changes/db"
1313
"github.com/streamingfast/substreams-sink-sql/db_proto"
1414
"github.com/streamingfast/substreams-sink-sql/db_proto/proto"
15-
protosql "github.com/streamingfast/substreams-sink-sql/db_proto/sql"
16-
clickhouse "github.com/streamingfast/substreams-sink-sql/db_proto/sql/click_house"
17-
"github.com/streamingfast/substreams-sink-sql/db_proto/sql/postgres"
18-
schema2 "github.com/streamingfast/substreams-sink-sql/db_proto/sql/schema"
19-
stats2 "github.com/streamingfast/substreams-sink-sql/db_proto/stats"
2015
pbsql "github.com/streamingfast/substreams-sink-sql/pb/sf/substreams/sink/sql/services/v1"
2116
"github.com/streamingfast/substreams-sink-sql/services"
2217
"github.com/streamingfast/substreams/manifest"
23-
"go.uber.org/zap"
2418
"google.golang.org/protobuf/types/descriptorpb"
2519
)
2620

@@ -174,13 +168,6 @@ func fromProtoE(cmd *cobra.Command, args []string) error {
174168
return fmt.Errorf("message descriptor not found for output type %q. Your substreams need to bundle its protobuf definitions", outputType)
175169
}
176170

177-
schemaName := dsn.Schema()
178-
179-
schema, err := schema2.NewSchema(schemaName, rootMessageDescriptor, useProtoOption, zlog)
180-
if err != nil {
181-
return fmt.Errorf("creating schema: %w", err)
182-
}
183-
184171
baseSink, err := sink.NewFromViper(
185172
cmd,
186173
outputType,
@@ -195,123 +182,28 @@ func fromProtoE(cmd *cobra.Command, args []string) error {
195182
return fmt.Errorf("new base sinker: %w", err)
196183
}
197184

198-
var database protosql.Database
199-
200-
switch dsn.Driver() {
201-
case "postgres":
202-
database, err = postgres.NewDatabase(schema, dsn, outputModuleName, rootMessageDescriptor, useProtoOption, useConstraints, zlog)
203-
if err != nil {
204-
return fmt.Errorf("creating postgres database: %w", err)
205-
}
206-
207-
case "clickhouse":
208-
database, err = clickhouse.NewDatabase(
209-
cmd.Context(),
210-
schema,
211-
dsn,
212-
outputModuleName,
213-
rootMessageDescriptor,
214-
sflags.MustGetString(cmd, "clickhouse-sink-info-folder"),
215-
sflags.MustGetString(cmd, "clickhouse-cursor-file-path"),
216-
true,
217-
zlog,
218-
tracer,
219-
)
220-
if err != nil {
221-
return fmt.Errorf("creating clickhouse database: %w", err)
222-
}
223-
default:
224-
panic(fmt.Sprintf("unsupported driver: %s", dsn.Driver()))
225-
226-
}
227-
228-
sinkInfo, err := database.FetchSinkInfo(schema.Name)
229-
if err != nil {
230-
return fmt.Errorf("fetching sink info: %w", err)
231-
}
232-
233-
zlog.Info("sink info read", zap.Reflect("sink_info", sinkInfo))
234-
if sinkInfo == nil {
235-
err := database.BeginTransaction()
236-
if err != nil {
237-
return fmt.Errorf("begin transaction: %w", err)
238-
}
239-
err = database.CreateDatabase(useConstraints)
240-
if err != nil {
241-
database.RollbackTransaction()
242-
return fmt.Errorf("creating database: %w", err)
243-
}
244-
245-
err = database.StoreSinkInfo(schemaName, database.GetDialect().SchemaHash())
246-
if err != nil {
247-
database.RollbackTransaction()
248-
return fmt.Errorf("storing sink info: %w", err)
249-
}
250-
251-
err = database.CommitTransaction()
252-
253-
} else {
254-
migrationNeeded := sinkInfo.SchemaHash != database.GetDialect().SchemaHash()
255-
if migrationNeeded {
256-
257-
tempSchemaName := schema.Name + "_" + database.GetDialect().SchemaHash()
258-
tempSinkInfo, err := database.FetchSinkInfo(tempSchemaName)
259-
if err != nil {
260-
return fmt.Errorf("fetching temp schema sink info: %w", err)
261-
}
262-
if tempSinkInfo != nil {
263-
hash, err := database.DatabaseHash(schema.Name)
264-
if err != nil {
265-
return fmt.Errorf("fetching schema %q hash: %w", schema.Name, err)
266-
}
267-
dbTempHash, err := database.DatabaseHash(tempSchemaName)
268-
if err != nil {
269-
return fmt.Errorf("fetching temp schema %q hash: %w", tempSchemaName, err)
270-
}
271-
272-
if hash != dbTempHash {
273-
return fmt.Errorf("schema %s and temp schema %s have different hash", schema.Name, tempSchemaName)
274-
}
275-
err = database.BeginTransaction()
276-
if err != nil {
277-
return fmt.Errorf("begin transaction: %w", err)
278-
}
279-
err = database.UpdateSinkInfoHash(schemaName, tempSinkInfo.SchemaHash)
280-
if err != nil {
281-
database.RollbackTransaction()
282-
return fmt.Errorf("updating sink info hash: %w", err)
283-
}
284-
285-
err = database.CommitTransaction()
286-
if err != nil {
287-
return fmt.Errorf("commit transaction: %w", err)
288-
}
185+
factory := db_proto.SinkerFactory(baseSink, outputModuleName, rootMessageDescriptor.UnwrapMessage(), db_proto.SinkerFactoryOptions{
186+
UseProtoOption: useProtoOption,
187+
UseConstraints: useConstraints,
188+
UseTransactions: useTransactions,
189+
BlockBatchSize: blockBatchSize,
190+
Parallel: parallel,
191+
Clickhouse: db_proto.SinkerFactoryClickhouse{
192+
SinkInfoFolder: sflags.MustGetString(cmd, "clickhouse-sink-info-folder"),
193+
CursorFilePath: sflags.MustGetString(cmd, "clickhouse-cursor-file-path"),
194+
},
195+
})
289196

290-
} else {
291-
//todo: create the temp schema ... and exit
292-
293-
//err = schema.ChangeName(tempSchemaName, dialect)
294-
//if err != nil {
295-
// return nil, fmt.Errorf("changing schema name: %w", err)
296-
//}
297-
//generateTempSchema = true
298-
}
299-
}
300-
}
301-
302-
err = database.Open()
197+
sinker, err := factory(cmd.Context(), dsnString, dsn.Schema(), zlog, tracer)
303198
if err != nil {
304-
return fmt.Errorf("opening database: %w", err)
199+
return fmt.Errorf("creating sinker: %w", err)
305200
}
306201

307-
stats := stats2.NewStats(zlog)
308-
sinker := db_proto.NewSinker(rootMessageDescriptor, baseSink, database, useTransactions, useConstraints, blockBatchSize, parallel, stats, zlog)
309-
310202
err = sinker.Run(cmd.Context())
311203
if err != nil {
312204
return fmt.Errorf("running sinker: %w", err)
313205
}
314206

315-
stats.Log()
207+
sinker.LogStats()
316208
return nil
317209
}

db_proto/sinker.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,11 @@ func (s *Sinker) Run(ctx context.Context) error {
6060

6161
s.stats.LastBlockProcessAt = time.Now()
6262
s.Sinker.Run(ctx, cursor, s)
63-
return nil
63+
return s.Sinker.Err()
64+
}
65+
66+
func (s *Sinker) LogStats() {
67+
s.stats.Log()
6468
}
6569

6670
type Holder struct {
@@ -157,7 +161,10 @@ func (s *Sinker) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrp
157161
return fmt.Errorf("flushing: %w", err)
158162
}
159163

160-
flushDurationPerBlock := flushDuration / time.Duration(len(holding))
164+
var flushDurationPerBlock time.Duration
165+
if len(holding) > 0 {
166+
flushDurationPerBlock = flushDuration / time.Duration(len(holding))
167+
}
161168
s.stats.FlushDuration.Add(flushDurationPerBlock)
162169

163170
err = s.db.StoreCursor(cursor)

0 commit comments

Comments
 (0)