Skip to content

Commit

Permalink
Support NDC raw histroy in message parser (uber#3227)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored May 1, 2020
1 parent fdc989c commit f38cfde
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 6 deletions.
1 change: 1 addition & 0 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,7 @@ func (entry *DomainCacheEntry) IsDomainActive() bool {
return entry.clusterMetadata.GetCurrentClusterName() == entry.replicationConfig.ActiveClusterName && !entry.IsDomainPendingActive()
}

// IsDomainPendingActive returns whether the domain is in pending active state
func (entry *DomainCacheEntry) IsDomainPendingActive() bool {
if !entry.isGlobalDomain {
// domain is not a global domain, meaning domain is always "active" within each cluster
Expand Down
59 changes: 53 additions & 6 deletions tools/cli/adminKafkaCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,18 @@ import (
"github.com/uber/cadence/service/history"
)

type filterFn func(*replicator.ReplicationTask) bool
type filterFnForVisibility func(*indexer.Message) bool
type (
filterFn func(*replicator.ReplicationTask) bool
filterFnForVisibility func(*indexer.Message) bool

type kafkaMessageType int
kafkaMessageType int

historyV2Task struct {
Task *replicator.ReplicationTask
Events []*shared.HistoryEvent
NewRunEvents []*shared.HistoryEvent
}
)

const (
kafkaMessageTypeReplicationTask kafkaMessageType = iota
Expand Down Expand Up @@ -121,13 +129,14 @@ func AdminKafkaParse(c *cli.Context) {
readerCh := make(chan []byte, chanBufferSize)
writerCh := newWriterChannel(kafkaMessageType(c.Int(FlagMessageType)))
doneCh := make(chan struct{})
serializer := persistence.NewPayloadSerializer()

var skippedCount int32
skipErrMode := c.Bool(FlagSkipErrorMode)

go startReader(inputFile, readerCh)
go startParser(readerCh, writerCh, skipErrMode, &skippedCount)
go startWriter(outputFile, writerCh, doneCh, &skippedCount, c)
go startWriter(outputFile, writerCh, doneCh, &skippedCount, serializer, c)

<-doneCh

Expand Down Expand Up @@ -242,6 +251,7 @@ func startWriter(
writerCh *writerChannel,
doneCh chan struct{},
skippedCount *int32,
serializer persistence.PayloadSerializer,
c *cli.Context,
) {

Expand All @@ -252,7 +262,7 @@ func startWriter(

switch writerCh.Type {
case kafkaMessageTypeReplicationTask:
writeReplicationTask(outputFile, writerCh, skippedCount, skipErrMode, headerMode, c)
writeReplicationTask(outputFile, writerCh, skippedCount, skipErrMode, headerMode, serializer, c)
case kafkaMessageTypeVisibilityMsg:
writeVisibilityMessage(outputFile, writerCh, skippedCount, skipErrMode, headerMode, c)
}
Expand All @@ -264,6 +274,7 @@ func writeReplicationTask(
skippedCount *int32,
skipErrMode bool,
headerMode bool,
serializer persistence.PayloadSerializer,
c *cli.Context,
) {
filter := buildFilterFn(c.String(FlagWorkflowID), c.String(FlagRunID))
Expand All @@ -275,7 +286,7 @@ Loop:
break Loop
}
if filter(task) {
jsonStr, err := json.Marshal(task)
jsonStr, err := decodeReplicationTask(task, serializer)
if err != nil {
if !skipErrMode {
ErrorAndExit(malformedMessage, fmt.Errorf("failed to encode into json, err: %v", err))
Expand Down Expand Up @@ -899,3 +910,39 @@ func loadBrokerConfig(hostFile string, cluster string) ([]string, *tls.Config, e
}
return nil, nil, fmt.Errorf("failed to load broker for cluster %v", cluster)
}

func decodeReplicationTask(
task *replicator.ReplicationTask,
serializer persistence.PayloadSerializer,
) ([]byte, error) {

switch task.GetTaskType() {
case replicator.ReplicationTaskTypeHistoryV2:
historyV2 := task.GetHistoryTaskV2Attributes()
events, err := serializer.DeserializeBatchEvents(
persistence.NewDataBlobFromThrift(historyV2.Events),
)
if err != nil {
return nil, err
}
var newRunEvents []*shared.HistoryEvent
if historyV2.IsSetNewRunEvents() {
newRunEvents, err = serializer.DeserializeBatchEvents(
persistence.NewDataBlobFromThrift(historyV2.NewRunEvents),
)
if err != nil {
return nil, err
}
}
historyV2.Events = nil
historyV2.NewRunEvents = nil
historyV2Attributes := &historyV2Task{
Task: task,
Events: events,
NewRunEvents: newRunEvents,
}
return json.Marshal(historyV2Attributes)
default:
return json.Marshal(task)
}
}

0 comments on commit f38cfde

Please sign in to comment.