Skip to content

Commit

Permalink
Merge pull request #526 from tidepool-org/tk-async-deletes
Browse files Browse the repository at this point in the history
[BACK-1737] Delete blobs, data, data sources, restricted tokens and provider sessions when users are deleted
  • Loading branch information
toddkazakov authored Oct 30, 2020
2 parents 6c78388 + b4d320f commit b4a931b
Show file tree
Hide file tree
Showing 641 changed files with 80,978 additions and 1,188 deletions.
47 changes: 47 additions & 0 deletions auth/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package events

import (
"context"

ev "github.com/tidepool-org/go-common/events"

"github.com/tidepool-org/platform/auth"
"github.com/tidepool-org/platform/errors"
"github.com/tidepool-org/platform/log"
)

type userDeletionEventsHandler struct {
ev.NoopUserEventsHandler

ctx context.Context
client auth.Client
}

func NewUserDataDeletionHandler(ctx context.Context, client auth.Client) ev.EventHandler {
return ev.NewUserEventsHandler(&userDeletionEventsHandler{
ctx: ctx,
client: client,
})
}

func (u *userDeletionEventsHandler) HandleDeleteUserEvent(payload ev.DeleteUserEvent) error {
var errs []error
logger := log.LoggerFromContext(u.ctx).WithField("userId", payload.UserID)

logger.Infof("Deleting restricted tokens for user")
if err := u.client.DeleteAllRestrictedTokens(u.ctx, payload.UserID); err != nil {
errs = append(errs, err)
logger.WithError(err).Error("unable to delete restricted tokens for user")
}

logger.Infof("Deleting provider sessions for user")
if err := u.client.DeleteAllProviderSessions(u.ctx, payload.UserID); err != nil {
errs = append(errs, err)
logger.WithError(err).Error("unable to delete provider sessions for user")
}

if len(errs) != 0 {
return errors.New("Unable to delete auth data for user")
}
return nil
}
58 changes: 51 additions & 7 deletions auth/service/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package service
import (
"context"

eventsCommon "github.com/tidepool-org/go-common/events"

"github.com/tidepool-org/platform/application"
"github.com/tidepool-org/platform/auth/client"
authEvents "github.com/tidepool-org/platform/auth/events"
"github.com/tidepool-org/platform/auth/service"
"github.com/tidepool-org/platform/auth/service/api"
authServiceApiV1 "github.com/tidepool-org/platform/auth/service/api/v1"
Expand All @@ -14,6 +17,8 @@ import (
dataSourceClient "github.com/tidepool-org/platform/data/source/client"
dexcomProvider "github.com/tidepool-org/platform/dexcom/provider"
"github.com/tidepool-org/platform/errors"
"github.com/tidepool-org/platform/events"
logInternal "github.com/tidepool-org/platform/log"
"github.com/tidepool-org/platform/platform"
"github.com/tidepool-org/platform/provider"
providerFactory "github.com/tidepool-org/platform/provider/factory"
Expand All @@ -25,12 +30,13 @@ import (

type Service struct {
*serviceService.Service
domain string
authStore *authMongo.Store
dataSourceClient *dataSourceClient.Client
taskClient task.Client
providerFactory provider.Factory
authClient *Client
domain string
authStore *authMongo.Store
dataSourceClient *dataSourceClient.Client
taskClient task.Client
providerFactory provider.Factory
authClient *Client
userEventsHandler events.Runner
}

func New() *Service {
Expand All @@ -39,6 +45,18 @@ func New() *Service {
}
}

func (s *Service) Run() error {
errs := make(chan error, 0)
go func() {
errs <- s.userEventsHandler.Run(context.Background())
}()
go func() {
errs <- s.Service.Run()
}()

return <-errs
}

func (s *Service) Initialize(provider application.Provider) error {
if err := s.Service.Initialize(provider); err != nil {
return err
Expand All @@ -62,10 +80,14 @@ func (s *Service) Initialize(provider application.Provider) error {
if err := s.initializeProviderFactory(); err != nil {
return err
}
return s.initializeAuthClient()
if err := s.initializeAuthClient(); err != nil {
return err
}
return s.initializeUserEventsHandler()
}

func (s *Service) Terminate() {
s.terminateUserEventsHandler()
s.terminateAuthClient()
s.terminateProviderFactory()
s.terminateTaskClient()
Expand Down Expand Up @@ -306,3 +328,25 @@ func (s *Service) terminateAuthClient() {
s.SetAuthClient(nil)
}
}

func (s *Service) initializeUserEventsHandler() error {
s.Logger().Debug("Initializing user events handler")

ctx := logInternal.NewContextWithLogger(context.Background(), s.Logger())
handler := authEvents.NewUserDataDeletionHandler(ctx, s.authClient)
handlers := []eventsCommon.EventHandler{handler}
runner := events.NewRunner(handlers)

if err := runner.Initialize(); err != nil {
return errors.Wrap(err, "unable to initialize events runner")
}
s.userEventsHandler = runner

return nil
}

func (s *Service) terminateUserEventsHandler() {
if s.userEventsHandler != nil {
s.userEventsHandler.Terminate()
}
}
5 changes: 5 additions & 0 deletions auth/service/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"net/http"
"os"

eventsTest "github.com/tidepool-org/platform/events/test"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
. "github.com/onsi/gomega/ghttp"
Expand Down Expand Up @@ -36,6 +38,7 @@ var _ = Describe("Service", func() {
var taskClientConfig map[string]interface{}
var authServiceConfig map[string]interface{}
var service *authServiceService.Service
var oldKafkaConfig map[string]string

BeforeEach(func() {
provider = applicationTest.NewProviderWithDefaults()
Expand Down Expand Up @@ -92,6 +95,7 @@ var _ = Describe("Service", func() {
}

(*provider.ConfigReporterOutput).(*configTest.Reporter).Config = authServiceConfig
oldKafkaConfig = eventsTest.SetTestEnvironmentVariables()

service = authServiceService.New()
Expect(service).ToNot(BeNil())
Expand All @@ -101,6 +105,7 @@ var _ = Describe("Service", func() {
if server != nil {
server.Close()
}
eventsTest.RestoreOldEnvironmentVariables(oldKafkaConfig)
})

Context("Initialize", func() {
Expand Down
29 changes: 29 additions & 0 deletions blob/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package events

import (
"context"

ev "github.com/tidepool-org/go-common/events"

"github.com/tidepool-org/platform/blob"
"github.com/tidepool-org/platform/log"
)

type userDeletionEventsHandler struct {
ev.NoopUserEventsHandler

ctx context.Context
blobClient blob.Client
}

func NewUserDataDeletionHandler(ctx context.Context, blobClient blob.Client) ev.EventHandler {
return ev.NewUserEventsHandler(&userDeletionEventsHandler{
ctx: ctx,
blobClient: blobClient,
})
}

func (u *userDeletionEventsHandler) HandleDeleteUserEvent(payload ev.DeleteUserEvent) error {
log.LoggerFromContext(u.ctx).WithField("userId", payload.UserID).Infof("Deleting blobs for user")
return u.blobClient.DeleteAll(u.ctx, payload.UserID)
}
28 changes: 27 additions & 1 deletion blob/service/api/v1/test/provider.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
package test

import "github.com/tidepool-org/platform/blob"
import (
"github.com/tidepool-org/platform/auth"
"github.com/tidepool-org/platform/blob"
)

type Provider struct {
AuthClientInvocations int
AuthClientStub func() auth.Client
AuthClientOutputs []auth.Client
AuthClientOutput *auth.Client
BlobClientInvocations int
BlobClientStub func() blob.Client
BlobClientOutputs []blob.Client
Expand All @@ -29,7 +36,26 @@ func (p *Provider) BlobClient() blob.Client {
panic("BlobClient has no output")
}

func (p *Provider) AuthClient() auth.Client {
p.AuthClientInvocations++
if p.AuthClientStub != nil {
return p.AuthClientStub()
}
if len(p.AuthClientOutputs) > 0 {
output := p.AuthClientOutputs[0]
p.AuthClientOutputs = p.AuthClientOutputs[1:]
return output
}
if p.AuthClientOutput != nil {
return *p.AuthClientOutput
}
panic("AuthClient has no output")
}

func (p *Provider) AssertOutputsEmpty() {
if len(p.AuthClientOutputs) > 0 {
panic("AuthClientOutputs is not empty")
}
if len(p.BlobClientOutputs) > 0 {
panic("BlobClientOutputs is not empty")
}
Expand Down
51 changes: 43 additions & 8 deletions blob/service/api/v1/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ import (
"fmt"
"net/http"

"github.com/tidepool-org/platform/permission"

"github.com/ant0ine/go-json-rest/rest"

"github.com/tidepool-org/platform/auth"

"github.com/tidepool-org/platform/blob"
"github.com/tidepool-org/platform/errors"
"github.com/tidepool-org/platform/page"
Expand All @@ -14,11 +18,12 @@ import (
)

type Provider interface {
AuthClient() auth.Client
BlobClient() blob.Client
}

type Router struct {
provider Provider
Provider
}

func NewRouter(provider Provider) (*Router, error) {
Expand All @@ -27,7 +32,7 @@ func NewRouter(provider Provider) (*Router, error) {
}

return &Router{
provider: provider,
Provider: provider,
}, nil
}

Expand Down Expand Up @@ -58,7 +63,12 @@ func (r *Router) List(res rest.ResponseWriter, req *rest.Request) {
return
}

result, err := r.provider.BlobClient().List(req.Context(), userID, filter, pagination)
err = r.AuthClient().EnsureAuthorizedService(req.Context())
if responder.RespondIfError(err) {
return
}

result, err := r.Provider.BlobClient().List(req.Context(), userID, filter, pagination)
if responder.RespondIfError(err) {
return
}
Expand Down Expand Up @@ -94,7 +104,12 @@ func (r *Router) Create(res rest.ResponseWriter, req *rest.Request) {
content.DigestMD5 = digestMD5
content.MediaType = mediaType

result, err := r.provider.BlobClient().Create(req.Context(), userID, content)
_, err = r.AuthClient().EnsureAuthorizedUser(req.Context(), userID, permission.Write)
if responder.RespondIfError(err) {
return
}

result, err := r.Provider.BlobClient().Create(req.Context(), userID, content)
if err != nil {
if errors.Code(err) == request.ErrorCodeDigestsNotEqual {
responder.Error(http.StatusBadRequest, err)
Expand All @@ -116,7 +131,12 @@ func (r *Router) DeleteAll(res rest.ResponseWriter, req *rest.Request) {
return
}

if responder.RespondIfError(r.provider.BlobClient().DeleteAll(req.Context(), userID)) {
err = r.AuthClient().EnsureAuthorizedService(req.Context())
if responder.RespondIfError(err) {
return
}

if responder.RespondIfError(r.Provider.BlobClient().DeleteAll(req.Context(), userID)) {
return
}

Expand All @@ -132,7 +152,12 @@ func (r *Router) Get(res rest.ResponseWriter, req *rest.Request) {
return
}

result, err := r.provider.BlobClient().Get(req.Context(), id)
err = r.AuthClient().EnsureAuthorizedService(req.Context())
if responder.RespondIfError(err) {
return
}

result, err := r.Provider.BlobClient().Get(req.Context(), id)
if responder.RespondIfError(err) {
return
} else if result == nil {
Expand All @@ -153,7 +178,12 @@ func (r *Router) GetContent(res rest.ResponseWriter, req *rest.Request) {
return
}

content, err := r.provider.BlobClient().GetContent(req.Context(), id)
err = r.AuthClient().EnsureAuthorizedService(req.Context())
if responder.RespondIfError(err) {
return
}

content, err := r.Provider.BlobClient().GetContent(req.Context(), id)
if responder.RespondIfError(err) {
return
} else if content == nil {
Expand Down Expand Up @@ -189,7 +219,12 @@ func (r *Router) Delete(res rest.ResponseWriter, req *rest.Request) {
return
}

deleted, err := r.provider.BlobClient().Delete(req.Context(), id, condition)
err = r.AuthClient().EnsureAuthorizedService(req.Context())
if responder.RespondIfError(err) {
return
}

deleted, err := r.Provider.BlobClient().Delete(req.Context(), id, condition)
if responder.RespondIfError(err) {
return
} else if !deleted {
Expand Down
Loading

0 comments on commit b4a931b

Please sign in to comment.