Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions cmd/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (a *Agent) pg2pulsar(params *structpb.Struct) (*pb.AgentConfigResponse, err
}

func (a *Agent) pulsar2pg(params *structpb.Struct) (*pb.AgentConfigResponse, error) {
v, err := extract(params, "PGConnURL", "PulsarURL", "PulsarTopic", "?PGLogPath", "?BatchTxSize")
v, err := extract(params, "PGConnURL", "PulsarURL", "PulsarTopic", "?PGLogPath", "?BatchTxSize", "?ReaderPrefix")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -260,14 +260,19 @@ func (a *Agent) pulsar2pg(params *structpb.Struct) (*pb.AgentConfigResponse, err
a.pgSink = pgSink

logger := logrus.WithFields(logrus.Fields{
"PulsarURL": v["PulsarURL"].GetStringValue(),
"PulsarTopic": v["PulsarTopic"].GetStringValue(),
"PGLogPath": v["PGLogPath"].GetStringValue(),
"BatchTxSize": batchTXSize,
"PulsarURL": v["PulsarURL"].GetStringValue(),
"PulsarTopic": v["PulsarTopic"].GetStringValue(),
"PGLogPath": v["PGLogPath"].GetStringValue(),
"BatchTxSize": batchTXSize,
"ReaderPrefix": v["ReaderPrefix"].GetStringValue(),
})
logger.Info("start pulsar2pg")

pulsarSrc := &source.PulsarReaderSource{PulsarOption: pulsar.ClientOptions{URL: v["PulsarURL"].GetStringValue()}, PulsarTopic: v["PulsarTopic"].GetStringValue()}
pulsarSrc := &source.PulsarReaderSource{
PulsarOption: pulsar.ClientOptions{URL: v["PulsarURL"].GetStringValue()},
PulsarTopic: v["PulsarTopic"].GetStringValue(),
ReaderPrefix: v["ReaderPrefix"].GetStringValue(),
}
if err = a.sourceToSink(pulsarSrc, pgSink); err != nil {
logger.Fatalf("sourceToSink error: %v", err)
return nil, err
Expand Down
20 changes: 13 additions & 7 deletions cmd/pulsar2pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
)

var (
SinkPGConnURL string
SinkPGLogPath string
SourcePulsarURL string
SourcePulsarTopic string
Renice int64
BatchTXSize int
SinkPGConnURL string
SinkPGLogPath string
SourcePulsarURL string
SourcePulsarTopic string
SourceReaderPrefix string
Renice int64
BatchTXSize int
)

func init() {
Expand All @@ -24,6 +25,7 @@ func init() {
pulsar2pg.Flags().StringVarP(&SinkPGLogPath, "PGLogPath", "", "", "pg log path for finding last checkpoint lsn")
pulsar2pg.Flags().StringVarP(&SourcePulsarURL, "PulsarURL", "", "", "connection url to sink pulsar cluster")
pulsar2pg.Flags().StringVarP(&SourcePulsarTopic, "PulsarTopic", "", "", "the sink pulsar topic name and as well as the logical replication slot name")
pulsar2pg.Flags().StringVarP(&SourceReaderPrefix, "ReaderPrefix", "", "", "subscription role prefix for pulsar reader")
pulsar2pg.Flags().Int64VarP(&Renice, "Renice", "", -10, "try renice the sink pg process")
pulsar2pg.Flags().IntVarP(&BatchTXSize, "BatchTxSize", "", 100, "the max number of tx in a pipeline")
pulsar2pg.MarkFlagRequired("PGConnURL")
Expand All @@ -44,7 +46,11 @@ var pulsar2pg = &cobra.Command{
defer pgLog.Close()
pgSink.LogReader = pgLog
}
pulsarSrc := &source.PulsarReaderSource{PulsarOption: pulsar.ClientOptions{URL: SourcePulsarURL}, PulsarTopic: SourcePulsarTopic}
pulsarSrc := &source.PulsarReaderSource{
PulsarOption: pulsar.ClientOptions{URL: SourcePulsarURL},
PulsarTopic: SourcePulsarTopic,
ReaderPrefix: SourceReaderPrefix,
}
return sourceToSink(pulsarSrc, pgSink)
},
}
2 changes: 2 additions & 0 deletions pkg/source/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type PulsarReaderSource struct {

PulsarOption pulsar.ClientOptions
PulsarTopic string
ReaderPrefix string

client pulsar.Client
reader pulsar.Reader
Expand Down Expand Up @@ -67,6 +68,7 @@ func (p *PulsarReaderSource) Capture(cp cursor.Checkpoint) (changes chan Change,
StartMessageID: start,
StartMessageIDInclusive: true,
ReceiverQueueSize: ReceiverQueueSize,
SubscriptionRolePrefix: p.ReaderPrefix,
})
if err != nil {
return nil, err
Expand Down
11 changes: 6 additions & 5 deletions pkg/source/pulsar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,18 @@ func TestPulsarReaderSource(t *testing.T) {
}
producer.Flush()

newPulsarReaderSource := func() *PulsarReaderSource {
newPulsarReaderSource := func(readerPrefix string) *PulsarReaderSource {
return &PulsarReaderSource{
BaseSource: BaseSource{ReadTimeout: time.Millisecond * 100},
PulsarOption: option,
PulsarTopic: topic,
ReaderPrefix: readerPrefix,
seekOffset: time.Millisecond * -100,
}
}

// test from start
src := newPulsarReaderSource()
// test from start (without reader prefix)
src := newPulsarReaderSource("")
changes, err := src.Capture(cursor.Checkpoint{})
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -104,8 +105,8 @@ func TestPulsarReaderSource(t *testing.T) {

src.Stop()

// test from specified time and lsn, and not include specified lsn
src = newPulsarReaderSource()
// test from specified time and lsn, and not include specified lsn (with reader prefix)
src = newPulsarReaderSource("test-prefix")
changes, err = src.Capture(cursor.Checkpoint{LSN: uint64(1), Data: []byte(now.Add(time.Millisecond * 500).Format(time.RFC3339Nano))})
if err != nil {
t.Fatal(err)
Expand Down