Skip to content

Commit

Permalink
Implementation of s3 using new archival interface (#2991)
Browse files Browse the repository at this point in the history
* Initial commit of s3 using new archival interface

* Changes after first review
- Remove RunID from visibility query since there is no point in using this. Also the indexing is not set up for this to work
- Add metrics to visibility and history stores
- Update README with instructions on how data is stored and query syntax

* Add some notes about how the query syntax works

* Update README

* Validate URI against s3. Cleanup
Remove redundant declarations of ensureContextTimeout and add it to missing functions
  • Loading branch information
jontro authored and yycptt committed Jan 24, 2020
1 parent 1c9b86c commit 2342092
Show file tree
Hide file tree
Showing 14 changed files with 10,003 additions and 23 deletions.
64 changes: 41 additions & 23 deletions common/archiver/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/archiver/filestore"
"github.com/uber/cadence/common/archiver/s3store"
"github.com/uber/cadence/common/service/config"
)

Expand Down Expand Up @@ -126,26 +127,33 @@ func (p *archiverProvider) GetHistoryArchiver(scheme, serviceName string) (archi
if !ok {
return nil, ErrBootstrapContainerNotFound
}
var historyArchiver archiver.HistoryArchiver
var err error

switch scheme {
case filestore.URIScheme:
if p.historyArchiverConfigs.Filestore == nil {
return nil, ErrArchiverConfigNotFound
}
historyArchiver, err := filestore.NewHistoryArchiver(container, p.historyArchiverConfigs.Filestore)
if err != nil {
return nil, err
}

p.Lock()
defer p.Unlock()
if existingHistoryArchiver, ok := p.historyArchivers[archiverKey]; ok {
return existingHistoryArchiver, nil
historyArchiver, err = filestore.NewHistoryArchiver(container, p.historyArchiverConfigs.Filestore)
case s3store.URIScheme:
if p.historyArchiverConfigs.S3store == nil {
return nil, ErrArchiverConfigNotFound
}
p.historyArchivers[archiverKey] = historyArchiver
return historyArchiver, nil
historyArchiver, err = s3store.NewHistoryArchiver(container, p.historyArchiverConfigs.S3store)
default:
return nil, ErrUnknownScheme
}
if err != nil {
return nil, err
}
return nil, ErrUnknownScheme
p.Lock()
defer p.Unlock()
if existingHistoryArchiver, ok := p.historyArchivers[archiverKey]; ok {
return existingHistoryArchiver, nil
}
p.historyArchivers[archiverKey] = historyArchiver
return historyArchiver, nil
}

func (p *archiverProvider) GetVisibilityArchiver(scheme, serviceName string) (archiver.VisibilityArchiver, error) {
Expand All @@ -162,25 +170,35 @@ func (p *archiverProvider) GetVisibilityArchiver(scheme, serviceName string) (ar
return nil, ErrBootstrapContainerNotFound
}

var visibilityArchiver archiver.VisibilityArchiver
var err error

switch scheme {
case filestore.URIScheme:
if p.visibilityArchiverConfigs.Filestore == nil {
return nil, ErrArchiverConfigNotFound
}
visibilityArchiver, err := filestore.NewVisibilityArchiver(container, p.visibilityArchiverConfigs.Filestore)
if err != nil {
return nil, err
visibilityArchiver, err = filestore.NewVisibilityArchiver(container, p.visibilityArchiverConfigs.Filestore)
case s3store.URIScheme:
if p.visibilityArchiverConfigs.S3store == nil {
return nil, ErrArchiverConfigNotFound
}
visibilityArchiver, err = s3store.NewVisibilityArchiver(container, p.visibilityArchiverConfigs.S3store)
default:
return nil, ErrUnknownScheme
}
if err != nil {
return nil, err
}

p.Lock()
defer p.Unlock()
if existingVisibilityArchiver, ok := p.visibilityArchivers[archiverKey]; ok {
return existingVisibilityArchiver, nil
}
p.visibilityArchivers[archiverKey] = visibilityArchiver
return visibilityArchiver, nil
p.Lock()
defer p.Unlock()
if existingVisibilityArchiver, ok := p.visibilityArchivers[archiverKey]; ok {
return existingVisibilityArchiver, nil
}
return nil, ErrUnknownScheme
p.visibilityArchivers[archiverKey] = visibilityArchiver
return visibilityArchiver, nil

}

func (p *archiverProvider) getArchiverKey(scheme, serviceName string) string {
Expand Down
101 changes: 101 additions & 0 deletions common/archiver/s3store/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Amazon S3 blobstore
## Configuration
See https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html#specifying-credentials on how to set up authentication against s3

Enabling archival is done by using the configuration below. `Region` and `bucket URI` are required
```
archival:
history:
status: "enabled"
enableRead: true
provider:
s3store:
region: "us-east-1"
visibility:
status: "enabled"
enableRead: true
provider:
s3store:
region: "us-east-1"
domainDefaults:
archival:
history:
status: "enabled"
URI: "s3://<bucket-name>"
visibility:
status: "enabled"
URI: "s3://<bucket-name>"
```

## Visibility query syntax
You can query the visibility store by using the `cadence workflow listarchived` command

The syntax for the query is based on SQL

Supported column names are
- WorkflowID *String*
- StartTime *Date*
- CloseTime *Date*
- SearchPrecision *String - Day, Hour, Minute, Second*

WorkflowID and SearchPrecision are always required. One of StartTime and CloseTime are required and they are mutually exclusive.

Searching for a record will be done in times in the UTC timezone

SearchPrecision specifies what range you want to search for records. If you use `SearchPrecision = 'Day'`
it will search all records starting from `2020-01-21T00:00:00Z` to `2020-01-21T59:59:59Z`

### Limitations

- The only operator supported is `=` due to how records are stored in s3.

### Example

*Searches for all records done in day 2020-01-21 with the specified workflow id*

`./cadence --do samples-domain workflow listarchived -q "StartTime = '2020-01-21T00:00:00Z' AND WorkflowID='workflow-id' AND SearchPrecision='Day'"`
## Storage in S3
Workflow runs are stored in s3 using the following structure
```
s3://<bucket-name>/<domain-id>/<workflow-id>/
history/<run-id>
visibility/
startTimeout/2020-01-21T16:16:11Z/<run-id>
closeTimeout/2020-01-21T16:16:11Z/<run-id>
```

## Using localstack for local development
1. Install awscli from [here](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-install.html)
2. Install localstack from [here](https://github.com/localstack/localstack#installing)
3. Launch localstack with `SERVICES=s3 localstack start`
4. Create a bucket using `aws --endpoint-url=http://localhost:4572 s3 mb s3://cadence-development`
5. Configure archival and domainDefaults with the following configuration
```
archival:
history:
status: "enabled"
enableRead: true
provider:
s3store:
region: "us-east-1"
endpoint: "http://127.0.0.1:4572"
s3ForcePathStyle: true
visibility:
status: "enabled"
enableRead: true
provider:
s3store:
region: "us-east-1"
endpoint: "http://127.0.0.1:4572"
s3ForcePathStyle: true
domainDefaults:
archival:
history:
status: "enabled"
URI: "s3://cadence-development"
visibility:
status: "enabled"
URI: "s3://cadence-development"
```
Loading

0 comments on commit 2342092

Please sign in to comment.