From c0c38f7e926a025adedf4b08f2660fb5cc61c688 Mon Sep 17 00:00:00 2001 From: Andrew Dawson Date: Tue, 18 Dec 2018 11:53:58 -0800 Subject: [PATCH] Add blobstore interface and plumb into frontend (#1340) --- cmd/server/server.go | 2 + common/blobstore/client.go | 74 +++++++++++++++++++ common/mocks/Client.go | 92 ++++++++++++++++++++++++ common/service/service.go | 2 + host/onebox.go | 5 +- service/frontend/service.go | 2 +- service/frontend/workflowHandler.go | 5 +- service/frontend/workflowHandler_test.go | 29 +++++--- 8 files changed, 199 insertions(+), 12 deletions(-) create mode 100644 common/blobstore/client.go create mode 100644 common/mocks/Client.go diff --git a/cmd/server/server.go b/cmd/server/server.go index 633745e6d45..38531f6f23f 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -22,6 +22,7 @@ package main import ( "github.com/uber/cadence/common/archival" + "github.com/uber/cadence/common/blobstore" "log" "time" @@ -112,6 +113,7 @@ func (s *server) startService() common.Daemon { dc := dynamicconfig.NewCollection(params.DynamicConfig, params.Logger) params.ArchivalClient = archival.NewNopClient() + params.BlobstoreClient = blobstore.NewNopClient() svcCfg := s.cfg.Services[s.name] params.MetricScope = svcCfg.Metrics.NewScope() diff --git a/common/blobstore/client.go b/common/blobstore/client.go new file mode 100644 index 00000000000..de83eb8b461 --- /dev/null +++ b/common/blobstore/client.go @@ -0,0 +1,74 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package blobstore + +import ( + "context" + "errors" + "io" +) + +// CompressionType defines the type of compression used for a blob +type CompressionType int + +const ( + // NoCompression indicates that blob is not compressed + NoCompression CompressionType = iota +) + +// Blob defines a blob +type Blob struct { + Body io.Reader + CompressionType CompressionType + Tags map[string]string +} + +// BucketMetadataResponse contains information relating to a bucket's configuration +type BucketMetadataResponse struct { + Owner string + RetentionDays int +} + +// Client is used to operate on blobs in a blobstore +type Client interface { + UploadBlob(ctx context.Context, bucket string, path string, blob *Blob) error + DownloadBlob(ctx context.Context, bucket string, path string) (*Blob, error) + BucketMetadata(ctx context.Context, bucket string) (*BucketMetadataResponse, error) +} + +type nopClient struct{} + +func (c *nopClient) UploadBlob(ctx context.Context, bucket string, path string, blob *Blob) error { + return errors.New("not implemented") +} + +func (c *nopClient) DownloadBlob(ctx context.Context, bucket string, path string) (*Blob, error) { + return nil, errors.New("not implemented") +} + +func (c *nopClient) BucketMetadata(ctx context.Context, bucket string) (*BucketMetadataResponse, error) { + return nil, errors.New("not implemented") +} + +// NewNopClient creates a nop client +func NewNopClient() Client { + return &nopClient{} +} diff --git a/common/mocks/Client.go b/common/mocks/Client.go new file mode 100644 index 00000000000..3f2b2de6223 --- /dev/null +++ b/common/mocks/Client.go @@ -0,0 +1,92 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mocks + +import blobstore "github.com/uber/cadence/common/blobstore" +import context "context" +import mock "github.com/stretchr/testify/mock" + +// Client is an autogenerated mock type for the Client type +type Client struct { + mock.Mock +} + +// BucketMetadata provides a mock function with given fields: ctx, bucket +func (_m *Client) BucketMetadata(ctx context.Context, bucket string) (*blobstore.BucketMetadataResponse, error) { + ret := _m.Called(ctx, bucket) + + var r0 *blobstore.BucketMetadataResponse + if rf, ok := ret.Get(0).(func(context.Context, string) *blobstore.BucketMetadataResponse); ok { + r0 = rf(ctx, bucket) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*blobstore.BucketMetadataResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, bucket) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DownloadBlob provides a mock function with given fields: ctx, bucket, path +func (_m *Client) DownloadBlob(ctx context.Context, bucket string, path string) (*blobstore.Blob, error) { + ret := _m.Called(ctx, bucket, path) + + var r0 *blobstore.Blob + if rf, ok := ret.Get(0).(func(context.Context, string, string) *blobstore.Blob); ok { + r0 = rf(ctx, bucket, path) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*blobstore.Blob) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { + r1 = rf(ctx, bucket, path) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// UploadBlob provides a mock function with given fields: ctx, bucket, path, blob +func (_m *Client) UploadBlob(ctx context.Context, bucket string, path string, blob *blobstore.Blob) error { + ret := _m.Called(ctx, bucket, path, blob) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, *blobstore.Blob) error); ok { + r0 = rf(ctx, bucket, path, blob) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/common/service/service.go b/common/service/service.go index 0ebe847b01b..37e69ad2321 100644 --- a/common/service/service.go +++ b/common/service/service.go @@ -22,6 +22,7 @@ package service import ( "github.com/uber/cadence/common/archival" + "github.com/uber/cadence/common/blobstore" "math/rand" "os" "time" @@ -66,6 +67,7 @@ type ( DynamicConfig dynamicconfig.Client DispatcherProvider client.DispatcherProvider ArchivalClient archival.Client + BlobstoreClient blobstore.Client } // RingpopFactory provides a bootstrapped ringpop diff --git a/host/onebox.go b/host/onebox.go index e443303d148..b4f0b7bf393 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -24,6 +24,7 @@ import ( "errors" "flag" "fmt" + "github.com/uber/cadence/common/blobstore" "reflect" "sync" "time" @@ -289,6 +290,7 @@ func (c *cadenceImpl) startFrontend(rpHosts []string, startWG *sync.WaitGroup) { } params.DynamicConfig = dynamicconfig.NewNopClient() + params.BlobstoreClient = blobstore.NewNopClient() // TODO when cross DC is public, remove this temporary override var kafkaProducer messaging.Producer @@ -305,7 +307,8 @@ func (c *cadenceImpl) startFrontend(rpHosts []string, startWG *sync.WaitGroup) { c.frontEndService = service.New(params) c.frontendHandler = frontend.NewWorkflowHandler( - c.frontEndService, frontend.NewConfig(dynamicconfig.NewNopCollection()), c.metadataMgr, c.historyMgr, c.historyV2Mgr, c.visibilityMgr, kafkaProducer) + c.frontEndService, frontend.NewConfig(dynamicconfig.NewNopCollection()), + c.metadataMgr, c.historyMgr, c.historyV2Mgr, c.visibilityMgr, kafkaProducer, params.BlobstoreClient) err = c.frontendHandler.Start() if err != nil { c.logger.WithField("error", err).Fatal("Failed to start frontend") diff --git a/service/frontend/service.go b/service/frontend/service.go index 4aa1a8c8226..5a77e5b90e3 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -135,7 +135,7 @@ func (s *Service) Start() { kafkaProducer = &mocks.KafkaProducer{} } - wfHandler := NewWorkflowHandler(base, s.config, metadata, history, historyV2, visibility, kafkaProducer) + wfHandler := NewWorkflowHandler(base, s.config, metadata, history, historyV2, visibility, kafkaProducer, params.BlobstoreClient) wfHandler.Start() adminHandler := NewAdminHandler(base, pConfig.NumHistoryShards, metadata) diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index bc2d2466949..e94d58df8d0 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -25,6 +25,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/uber/cadence/common/blobstore" "sync" "time" @@ -71,6 +72,7 @@ type ( rateLimiter common.TokenBucket config *Config domainReplicator DomainReplicator + blobstoreClient blobstore.Client service.Service } @@ -122,7 +124,7 @@ var ( // NewWorkflowHandler creates a thrift handler for the cadence service func NewWorkflowHandler(sVice service.Service, config *Config, metadataMgr persistence.MetadataManager, historyMgr persistence.HistoryManager, historyV2Mgr persistence.HistoryV2Manager, visibilityMgr persistence.VisibilityManager, - kafkaProducer messaging.Producer) *WorkflowHandler { + kafkaProducer messaging.Producer, blobstoreClient blobstore.Client) *WorkflowHandler { handler := &WorkflowHandler{ Service: sVice, config: config, @@ -134,6 +136,7 @@ func NewWorkflowHandler(sVice service.Service, config *Config, metadataMgr persi domainCache: cache.NewDomainCache(metadataMgr, sVice.GetClusterMetadata(), sVice.GetMetricsClient(), sVice.GetLogger()), rateLimiter: common.NewTokenBucket(config.RPS(), common.NewRealTimeSource()), domainReplicator: NewDomainReplicator(kafkaProducer, sVice.GetLogger()), + blobstoreClient: blobstoreClient, } // prevent us from trying to serve requests before handler's Start() is complete handler.startWG.Add(1) diff --git a/service/frontend/workflowHandler_test.go b/service/frontend/workflowHandler_test.go index 15e806a5ce9..b82199c8b92 100644 --- a/service/frontend/workflowHandler_test.go +++ b/service/frontend/workflowHandler_test.go @@ -57,6 +57,7 @@ type ( mockVisibilityMgr *mocks.VisibilityManager mockDomainCache *cache.DomainCacheMock mockService cs.Service + mockBlobstoreClient *mocks.Client } ) @@ -87,6 +88,7 @@ func (s *workflowHandlerSuite) SetupTest() { s.mockHistoryV2Mgr = &mocks.HistoryV2Manager{} s.mockVisibilityMgr = &mocks.VisibilityManager{} s.mockService = cs.NewTestService(s.mockClusterMetadata, s.mockMessagingClient, s.mockMetricClient, s.logger) + s.mockBlobstoreClient = &mocks.Client{} } func (s *workflowHandlerSuite) TestMergeDomainData_Overriding() { @@ -162,7 +164,8 @@ func (s *workflowHandlerSuite) TestDisableListVisibilityByFilter() { config := NewConfig(dc.NewCollection(dc.NewNopClient(), s.logger)) config.DisableListVisibilityByFilter = dc.GetBoolPropertyFnFilteredByDomain(true) - wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer) + wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, + s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer, s.mockBlobstoreClient) mockDomainCache := &cache.DomainCacheMock{} wh.metricsClient = wh.Service.GetMetricsClient() wh.domainCache = mockDomainCache @@ -230,7 +233,8 @@ func (s *workflowHandlerSuite) TestDisableListVisibilityByFilter() { func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_RequestIdNotSet() { config := NewConfig(dc.NewCollection(dc.NewNopClient(), s.logger)) config.RPS = dc.GetIntPropertyFn(10) - wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer) + wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, + s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer, s.mockBlobstoreClient) wh.metricsClient = wh.Service.GetMetricsClient() wh.startWG.Done() @@ -261,7 +265,8 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_RequestIdNotSet func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_StartRequestNotSet() { config := NewConfig(dc.NewCollection(dc.NewNopClient(), s.logger)) config.RPS = dc.GetIntPropertyFn(10) - wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer) + wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, + s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer, s.mockBlobstoreClient) wh.metricsClient = wh.Service.GetMetricsClient() wh.startWG.Done() @@ -273,7 +278,8 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_StartRequestNot func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_DomainNotSet() { config := NewConfig(dc.NewCollection(dc.NewNopClient(), s.logger)) config.RPS = dc.GetIntPropertyFn(10) - wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer) + wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, + s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer, s.mockBlobstoreClient) wh.metricsClient = wh.Service.GetMetricsClient() wh.startWG.Done() @@ -304,7 +310,8 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_DomainNotSet() func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_WorkflowIdNotSet() { config := NewConfig(dc.NewCollection(dc.NewNopClient(), s.logger)) config.RPS = dc.GetIntPropertyFn(10) - wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer) + wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, + s.mockVisibilityMgr, s.mockProducer, s.mockBlobstoreClient) wh.metricsClient = wh.Service.GetMetricsClient() wh.startWG.Done() @@ -335,7 +342,8 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_WorkflowIdNotSe func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_WorkflowTypeNotSet() { config := NewConfig(dc.NewCollection(dc.NewNopClient(), s.logger)) config.RPS = dc.GetIntPropertyFn(10) - wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer) + wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, + s.mockVisibilityMgr, s.mockProducer, s.mockBlobstoreClient) wh.metricsClient = wh.Service.GetMetricsClient() wh.startWG.Done() @@ -367,7 +375,8 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_WorkflowTypeNot func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_TaskListNotSet() { config := NewConfig(dc.NewCollection(dc.NewNopClient(), s.logger)) config.RPS = dc.GetIntPropertyFn(10) - wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer) + wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, + s.mockVisibilityMgr, s.mockProducer, s.mockBlobstoreClient) wh.metricsClient = wh.Service.GetMetricsClient() wh.startWG.Done() @@ -399,7 +408,8 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_TaskListNotSet( func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_InvalidExecutionStartToCloseTimeout() { config := NewConfig(dc.NewCollection(dc.NewNopClient(), s.logger)) config.RPS = dc.GetIntPropertyFn(10) - wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer) + wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, + s.mockVisibilityMgr, s.mockProducer, s.mockBlobstoreClient) wh.metricsClient = wh.Service.GetMetricsClient() wh.startWG.Done() @@ -431,7 +441,8 @@ func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_InvalidExecutio func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_InvalidTaskStartToCloseTimeout() { config := NewConfig(dc.NewCollection(dc.NewNopClient(), s.logger)) config.RPS = dc.GetIntPropertyFn(10) - wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, s.mockVisibilityMgr, s.mockProducer) + wh := NewWorkflowHandler(s.mockService, config, s.mockMetadataMgr, s.mockHistoryMgr, s.mockHistoryV2Mgr, + s.mockVisibilityMgr, s.mockProducer, s.mockBlobstoreClient) wh.metricsClient = wh.Service.GetMetricsClient() wh.startWG.Done()