Skip to content

Commit

Permalink
Improve archival related code (uber#2268)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Jul 25, 2019
1 parent af42a50 commit 895cecc
Show file tree
Hide file tree
Showing 45 changed files with 803 additions and 523 deletions.
13 changes: 7 additions & 6 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,17 @@ func (s *server) startService() common.Daemon {
params.PublicClient = workflowserviceclient.New(dispatcher.ClientConfig(common.FrontendServiceName))

configuredForHistoryArchival := params.ClusterMetadata.HistoryArchivalConfig().ClusterConfiguredForArchival()
configuredForVisibilityArchival := params.ClusterMetadata.VisibilityArchivalConfig().ClusterConfiguredForArchival()
historyArchiverProvider := s.cfg.Archival.History.ArchiverProvider
visibilityArchiverProvider := s.cfg.Archival.Visibility.ArchiverProvider
if (configuredForHistoryArchival && historyArchiverProvider == nil) || (!configuredForHistoryArchival && historyArchiverProvider != nil) {
historyArchiverProviderCfg := s.cfg.Archival.History.Provider
if (configuredForHistoryArchival && historyArchiverProviderCfg == nil) || (!configuredForHistoryArchival && historyArchiverProviderCfg != nil) {
log.Fatalf("invalid history archival config")
}
if (configuredForVisibilityArchival && visibilityArchiverProvider == nil) || (!configuredForVisibilityArchival && visibilityArchiverProvider != nil) {

configuredForVisibilityArchival := params.ClusterMetadata.VisibilityArchivalConfig().ClusterConfiguredForArchival()
visibilityArchiverProviderCfg := s.cfg.Archival.Visibility.Provider
if (configuredForVisibilityArchival && visibilityArchiverProviderCfg == nil) || (!configuredForVisibilityArchival && visibilityArchiverProviderCfg != nil) {
log.Fatalf("invalid visibility archival config")
}
params.ArchiverProvider = provider.NewArchiverProvider(historyArchiverProvider, visibilityArchiverProvider)
params.ArchiverProvider = provider.NewArchiverProvider(historyArchiverProviderCfg, visibilityArchiverProviderCfg)

params.PersistenceConfig.TransactionSizeLimit = dc.GetIntProperty(dynamicconfig.TransactionSizeLimit, common.DefaultTransactionSizeLimit)

Expand Down
93 changes: 93 additions & 0 deletions common/archiver/URI.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package archiver

import (
"errors"
"net/url"
)

type (
// URI identifies the archival resource to which records are written to and read from.
URI interface {
Scheme() string
Path() string
Hostname() string
Port() string
Username() string
Password() string
String() string
}

uri struct {
url *url.URL
}
)

// NewURI constructs a new archiver URI from string.
func NewURI(s string) (URI, error) {
url, err := url.ParseRequestURI(s)
if err != nil {
return nil, err
}
if url.Opaque != "" {
return nil, errors.New("URI should begin with scheme://")
}
return &uri{url: url}, nil
}

func (u *uri) Scheme() string {
return u.url.Scheme
}

func (u *uri) Path() string {
return u.url.Path
}

func (u *uri) Hostname() string {
return u.url.Hostname()
}

func (u *uri) Port() string {
return u.url.Port()
}

func (u *uri) Username() string {
if u.url.User == nil {
return ""
}
return u.url.User.Username()
}

func (u *uri) Password() string {
if u.url.User == nil {
return ""
}
password, exist := u.url.User.Password()
if !exist {
return ""
}
return password
}

func (u *uri) String() string {
return u.url.String()
}
126 changes: 126 additions & 0 deletions common/archiver/URI_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package archiver

import (
"testing"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

type (
URISuite struct {
*require.Assertions
suite.Suite
}
)

func TestURISuite(t *testing.T) {
suite.Run(t, new(URISuite))
}

func (s *URISuite) SetupTest() {
s.Assertions = require.New(s.T())
}

func (s *URISuite) TestURI() {
testCases := []struct {
URIString string
valid bool
scheme string
path string
hostname string
port string
username string
password string
}{
{
URIString: "",
valid: false,
},
{
URIString: "some random string",
valid: false,
},
{
URIString: "mailto:a@b.com",
valid: false,
},
{
URIString: "test://",
valid: true,
scheme: "test",
},
{
URIString: "http://example.com/path",
valid: true,
scheme: "http",
hostname: "example.com",
path: "/path",
},
{
URIString: "http://example.com/path with space",
valid: true,
scheme: "http",
hostname: "example.com",
path: "/path with space",
},
{
URIString: "https://localhost:8080",
valid: true,
scheme: "https",
hostname: "localhost",
port: "8080",
},
{
URIString: "file:///absolute/path/to/dir",
valid: true,
scheme: "file",
path: "/absolute/path/to/dir",
},
{
URIString: "test://person:password@host/path",
valid: true,
scheme: "test",
hostname: "host",
path: "/path",
username: "person",
password: "password",
},
}

for _, tc := range testCases {
URI, err := NewURI(tc.URIString)
if !tc.valid {
s.Error(err)
continue
}

s.NoError(err)
s.Equal(tc.scheme, URI.Scheme())
s.Equal(tc.path, URI.Path())
s.Equal(tc.hostname, URI.Hostname())
s.Equal(tc.port, URI.Port())
s.Equal(tc.username, URI.Username())
s.Equal(tc.password, URI.Password())
}
}
38 changes: 21 additions & 17 deletions common/archiver/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,39 @@ package archiver

import (
"errors"

"github.com/uber/cadence/.gen/go/shared"
)

const (
// ArchiveNonRetriableErrorMsg is the log message when the Archive() method encounters a non-retriable error
ArchiveNonRetriableErrorMsg = "Archive method encountered an non-retriable error."
// ArchiveTransientErrorMsg is the log message when the Archive() method encounters a transient error
ArchiveTransientErrorMsg = "Archive method encountered a transient error."

// ErrInvalidURI is the error reason for invalid URI
ErrInvalidURI = "URI is invalid"
// ErrInvalidArchiveRequest is the error reason for invalid archive request
ErrInvalidArchiveRequest = "archive request is invalid"
// ErrConstructHistoryIterator is the error reason for failing to construct history iterator
ErrConstructHistoryIterator = "failed to construct history iterator"
// ErrReadHistory is the error reason for failing to read history
ErrReadHistory = "failed to read history batches"
// ErrHistoryMutated is the error reason for mutated history
ErrHistoryMutated = "history was mutated"
// ErrReasonInvalidURI is the error reason for invalid URI
ErrReasonInvalidURI = "URI is invalid"
// ErrReasonInvalidArchiveRequest is the error reason for invalid archive request
ErrReasonInvalidArchiveRequest = "archive request is invalid"
// ErrReasonConstructHistoryIterator is the error reason for failing to construct history iterator
ErrReasonConstructHistoryIterator = "failed to construct history iterator"
// ErrReasonReadHistory is the error reason for failing to read history
ErrReasonReadHistory = "failed to read history batches"
// ErrReasonHistoryMutated is the error reason for mutated history
ErrReasonHistoryMutated = "history was mutated"
)

var (
// ErrInvalidURIScheme is the error for invalid URI
ErrInvalidURIScheme = errors.New("URI scheme is invalid")
// ErrInvalidURI is the error for invalid URI
ErrInvalidURI = errors.New("URI is invalid")
// ErrURISchemeMismatch is the error for mismatch between URI scheme and archiver
ErrURISchemeMismatch = errors.New("URI scheme does not match the archiver")
// ErrHistoryMutated is the error for mutated history
ErrHistoryMutated = errors.New("history was mutated")
// ErrContextTimeout is the error for context timeout
ErrContextTimeout = errors.New("archive aborted because context timed out")
// ErrInvalidGetHistoryRequest is the error for invalid GetHistory request
ErrInvalidGetHistoryRequest = &shared.BadRequestError{Message: "Get archived history request is invalid"}
ErrInvalidGetHistoryRequest = errors.New("get archived history request is invalid")
// ErrGetHistoryTokenCorrupted is the error for corrupted GetHistory token
ErrGetHistoryTokenCorrupted = &shared.BadRequestError{Message: "Next page token is corrupted."}
ErrGetHistoryTokenCorrupted = errors.New("next page token is corrupted")
// ErrHistoryNotExist is the error for non-exist history
ErrHistoryNotExist = &shared.BadRequestError{Message: "Requested workflow history does not exist."}
ErrHistoryNotExist = errors.New("requested workflow history does not exist")
)
Loading

0 comments on commit 895cecc

Please sign in to comment.