Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Scanner] handle current execution to read from blobstore #3435

Merged
merged 10 commits into from
Aug 5, 2020
Prev Previous commit
Next Next commit
[Scanner] handle current execution for blobstore
  • Loading branch information
mkolodezny committed Aug 5, 2020
commit cc27612802f1a677444e70399ccecba4aaef4978
47 changes: 37 additions & 10 deletions common/reconciliation/common/blobstoreIterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,19 @@ type (
}
)

var scanTypeExecFnMap = map[ScanType]func(data []byte) (*ScanOutputEntity, error){
ConcreteExecutionType: deserializeConcreteExecution,
CurrentExecutionType: deserializeCurrentExecution,
}

// NewBlobstoreIterator constructs a new iterator backed by blobstore.
func NewBlobstoreIterator(
client blobstore.Client,
keys Keys,
scanType ScanType,
) ScanOutputIterator {
return &blobstoreIterator{
itr: pagination.NewIterator(keys.MinPage, getBlobstoreFetchPageFn(client, keys)),
itr: pagination.NewIterator(keys.MinPage, getBlobstoreFetchPageFn(client, keys, scanTypeExecFnMap[scanType])),
}
}

Expand All @@ -64,6 +70,7 @@ func (i *blobstoreIterator) HasNext() bool {
func getBlobstoreFetchPageFn(
client blobstore.Client,
keys Keys,
execDeserializeFunc func(data []byte) (*ScanOutputEntity, error),
) pagination.FetchFn {
return func(token pagination.PageToken) (pagination.Page, error) {
index := token.(int)
Expand All @@ -83,17 +90,11 @@ func getBlobstoreFetchPageFn(
if len(p) == 0 {
continue
}
// TODO handle multiple execution types
soe := ScanOutputEntity{
Execution: &ConcreteExecution{},
}
if err := json.Unmarshal(p, &soe); err != nil {
return pagination.Page{}, err
}
if err := ValidateConcreteExecution(soe.Execution.(*ConcreteExecution)); err != nil {
soe, err := execDeserializeFunc(p)
if err != nil {
return pagination.Page{}, err
}
executions = append(executions, &soe)
executions = append(executions, soe)
}
var nextPageToken interface{} = index + 1
if nextPageToken.(int) > keys.MaxPage {
Expand All @@ -106,3 +107,29 @@ func getBlobstoreFetchPageFn(
}, nil
}
}

func deserializeConcreteExecution(data []byte) (*ScanOutputEntity, error) {
soe := &ScanOutputEntity{
Execution: &ConcreteExecution{},
}
if err := json.Unmarshal(data, &soe); err != nil {
return nil, err
}
if err := ValidateConcreteExecution(soe.Execution.(*ConcreteExecution)); err != nil {
return nil, err
}
return soe, nil
}

func deserializeCurrentExecution(data []byte) (*ScanOutputEntity, error) {
soe := &ScanOutputEntity{
Execution: &CurrentExecution{},
}
if err := json.Unmarshal(data, &soe); err != nil {
return nil, err
}
if err := ValidateCurrentExecution(soe.Execution.(*CurrentExecution)); err != nil {
return nil, err
}
return soe, nil
}
3 changes: 2 additions & 1 deletion common/reconciliation/common/writerIterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ func (s *WriterIteratorSuite) TestWriterIterator() {
s.Equal(0, flushedKeys.MinPage)
s.Equal(9, flushedKeys.MaxPage)
s.Equal(Extension("test"), flushedKeys.Extension)
blobstoreItr := NewBlobstoreIterator(blobstore, *flushedKeys)
blobstoreItr := NewBlobstoreIterator(blobstore, *flushedKeys, ConcreteExecutionType)
i := 0
s.True(blobstoreItr.HasNext())
for blobstoreItr.HasNext() {
exec, err := blobstoreItr.Next()
s.NoError(err)
Expand Down
2 changes: 1 addition & 1 deletion service/worker/scanner/executions/shard/fixer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewFixer(
id := uuid.New()
return &fixer{
shardID: shardID,
itr: common.NewBlobstoreIterator(blobstoreClient, keys),
itr: common.NewBlobstoreIterator(blobstoreClient, keys, scanType),
skippedWriter: common.NewBlobstoreWriter(id, common.SkippedExtension, blobstoreClient, blobstoreFlushThreshold),
failedWriter: common.NewBlobstoreWriter(id, common.FailedExtension, blobstoreClient, blobstoreFlushThreshold),
fixedWriter: common.NewBlobstoreWriter(id, common.FixedExtension, blobstoreClient, blobstoreFlushThreshold),
Expand Down