Skip to content

Commit

Permalink
[Scanner] handle current execution to read from blobstore (cadence-wo…
Browse files Browse the repository at this point in the history
…rkflow#3435)

* [Scanner] handle current execution to read from blobstore (cadence-workflow#3435)
  • Loading branch information
mkolodezny committed Aug 20, 2020
1 parent aad128d commit 64df193
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 12 deletions.
47 changes: 37 additions & 10 deletions common/reconciliation/common/blobstoreIterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,19 @@ type (
}
)

var scanTypeExecFnMap = map[ScanType]func(data []byte) (*ScanOutputEntity, error){
ConcreteExecutionType: deserializeConcreteExecution,
CurrentExecutionType: deserializeCurrentExecution,
}

// NewBlobstoreIterator constructs a new iterator backed by blobstore.
func NewBlobstoreIterator(
client blobstore.Client,
keys Keys,
scanType ScanType,
) ScanOutputIterator {
return &blobstoreIterator{
itr: pagination.NewIterator(keys.MinPage, getBlobstoreFetchPageFn(client, keys)),
itr: pagination.NewIterator(keys.MinPage, getBlobstoreFetchPageFn(client, keys, scanTypeExecFnMap[scanType])),
}
}

Expand All @@ -64,6 +70,7 @@ func (i *blobstoreIterator) HasNext() bool {
func getBlobstoreFetchPageFn(
client blobstore.Client,
keys Keys,
execDeserializeFunc func(data []byte) (*ScanOutputEntity, error),
) pagination.FetchFn {
return func(token pagination.PageToken) (pagination.Page, error) {
index := token.(int)
Expand All @@ -83,17 +90,11 @@ func getBlobstoreFetchPageFn(
if len(p) == 0 {
continue
}
// TODO handle multiple execution types
soe := ScanOutputEntity{
Execution: &ConcreteExecution{},
}
if err := json.Unmarshal(p, &soe); err != nil {
return pagination.Page{}, err
}
if err := ValidateConcreteExecution(soe.Execution.(*ConcreteExecution)); err != nil {
soe, err := execDeserializeFunc(p)
if err != nil {
return pagination.Page{}, err
}
executions = append(executions, &soe)
executions = append(executions, soe)
}
var nextPageToken interface{} = index + 1
if nextPageToken.(int) > keys.MaxPage {
Expand All @@ -106,3 +107,29 @@ func getBlobstoreFetchPageFn(
}, nil
}
}

func deserializeConcreteExecution(data []byte) (*ScanOutputEntity, error) {
soe := &ScanOutputEntity{
Execution: &ConcreteExecution{},
}
if err := json.Unmarshal(data, &soe); err != nil {
return nil, err
}
if err := ValidateConcreteExecution(soe.Execution.(*ConcreteExecution)); err != nil {
return nil, err
}
return soe, nil
}

func deserializeCurrentExecution(data []byte) (*ScanOutputEntity, error) {
soe := &ScanOutputEntity{
Execution: &CurrentExecution{},
}
if err := json.Unmarshal(data, &soe); err != nil {
return nil, err
}
if err := ValidateCurrentExecution(soe.Execution.(*CurrentExecution)); err != nil {
return nil, err
}
return soe, nil
}
3 changes: 2 additions & 1 deletion common/reconciliation/common/writerIterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ func (s *WriterIteratorSuite) TestWriterIterator() {
s.Equal(0, flushedKeys.MinPage)
s.Equal(9, flushedKeys.MaxPage)
s.Equal(Extension("test"), flushedKeys.Extension)
blobstoreItr := NewBlobstoreIterator(blobstore, *flushedKeys)
blobstoreItr := NewBlobstoreIterator(blobstore, *flushedKeys, ConcreteExecutionType)
i := 0
s.True(blobstoreItr.HasNext())
for blobstoreItr.HasNext() {
exec, err := blobstoreItr.Next()
s.NoError(err)
Expand Down
2 changes: 1 addition & 1 deletion service/worker/scanner/executions/shard/fixer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewFixer(
id := uuid.New()
return &fixer{
shardID: shardID,
itr: common.NewBlobstoreIterator(blobstoreClient, keys),
itr: common.NewBlobstoreIterator(blobstoreClient, keys, scanType),
skippedWriter: common.NewBlobstoreWriter(id, common.SkippedExtension, blobstoreClient, blobstoreFlushThreshold),
failedWriter: common.NewBlobstoreWriter(id, common.FailedExtension, blobstoreClient, blobstoreFlushThreshold),
fixedWriter: common.NewBlobstoreWriter(id, common.FixedExtension, blobstoreClient, blobstoreFlushThreshold),
Expand Down

0 comments on commit 64df193

Please sign in to comment.