diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 7f6b05d4134..f7650ae1d60 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -595,7 +595,6 @@ func (m *Launcher) run(ctx context.Context) (err error) { userLogSvc platform.UserOperationLogService = m.kvService bucketLogSvc platform.BucketOperationLogService = m.kvService orgLogSvc platform.OrganizationOperationLogService = m.kvService - onboardingSvc platform.OnboardingService = m.kvService scraperTargetSvc platform.ScraperTargetStoreService = m.kvService telegrafSvc platform.TelegrafConfigStore = m.kvService userResourceSvc platform.UserResourceMappingService = m.kvService @@ -605,6 +604,12 @@ func (m *Launcher) run(ctx context.Context) (err error) { notificationEndpointStore platform.NotificationEndpointService = m.kvService ) + store, err := tenant.NewStore(m.kvStore) + if err != nil { + m.log.Error("Failed creating new meta store", zap.Error(err)) + return err + } + if m.enableNewMetaStore { var ts platform.TenantService if m.newMetaStoreReadOnly { @@ -617,11 +622,6 @@ func (m *Launcher) run(ctx context.Context) (err error) { newSvc := tenant.NewService(store) ts = tenant.NewDuplicateReadTenantService(m.log, oldSvc, newSvc) } else { - store, err := tenant.NewStore(m.kvStore) - if err != nil { - m.log.Error("Failed creating new meta store", zap.Error(err)) - return err - } ts = tenant.NewService(store) } @@ -888,7 +888,6 @@ func (m *Launcher) run(ctx context.Context) (err error) { SourceService: sourceSvc, VariableService: variableSvc, PasswordsService: passwdsSvc, - OnboardingService: onboardingSvc, InfluxQLService: storageQueryService, FluxService: storageQueryService, TaskService: taskSvc, @@ -943,8 +942,18 @@ func (m *Launcher) run(ctx context.Context) (err error) { pkgHTTPServer = pkger.NewHTTPServer(pkgServerLogger, pkgSVC) } + var onboardHTTPServer *tenant.OnboardHandler + { + onboardSvc := tenant.NewOnboardService(store, authSvc) // basic service + onboardSvc = tenant.NewAuthedOnboardSvc(onboardSvc) // with auth + onboardSvc = tenant.NewOnboardingMetrics(m.reg, onboardSvc, tenant.WithSuffix("new")) // with metrics + onboardSvc = tenant.NewOnboardingLogger(m.log.With(zap.String("handler", "onboard")), onboardSvc) // with logging + + onboardHTTPServer = tenant.NewHTTPOnboardHandler(m.log, onboardSvc) + } + { - platformHandler := http.NewPlatformHandler(m.apibackend, http.WithResourceHandler(pkgHTTPServer)) + platformHandler := http.NewPlatformHandler(m.apibackend, http.WithResourceHandler(pkgHTTPServer), http.WithResourceHandler(onboardHTTPServer)) httpLogger := m.log.With(zap.String("service", "http")) m.httpServer.Handler = http.NewHandlerFromRegistry( diff --git a/http/api_handler.go b/http/api_handler.go index e955af1aab7..47e81eb36df 100644 --- a/http/api_handler.go +++ b/http/api_handler.go @@ -69,7 +69,6 @@ type APIBackend struct { SourceService influxdb.SourceService VariableService influxdb.VariableService PasswordsService influxdb.PasswordsService - OnboardingService influxdb.OnboardingService InfluxQLService query.ProxyQueryService FluxService query.ProxyQueryService TaskService influxdb.TaskService @@ -180,9 +179,6 @@ func NewAPIHandler(b *APIBackend, opts ...APIHandlerOptFn) *APIHandler { h.Mount(prefixSignIn, sessionHandler) h.Mount(prefixSignOut, sessionHandler) - setupBackend := NewSetupBackend(b.Logger.With(zap.String("handler", "setup")), b) - h.Mount(prefixSetup, NewSetupHandler(b.Logger, setupBackend)) - sourceBackend := NewSourceBackend(b.Logger.With(zap.String("handler", "source")), b) sourceBackend.SourceService = authorizer.NewSourceService(b.SourceService) sourceBackend.BucketService = authorizer.NewBucketService(b.BucketService, noAuthUserResourceMappingService) diff --git a/http/onboarding.go b/http/onboarding.go index ce39be6d81a..2dc7deec47a 100644 --- a/http/onboarding.go +++ b/http/onboarding.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "net/http" @@ -23,9 +24,9 @@ type SetupBackend struct { // NewSetupBackend returns a new instance of SetupBackend. func NewSetupBackend(log *zap.Logger, b *APIBackend) *SetupBackend { return &SetupBackend{ - HTTPErrorHandler: b.HTTPErrorHandler, - log: log, - OnboardingService: b.OnboardingService, + HTTPErrorHandler: b.HTTPErrorHandler, + log: log, + // OnboardingService: b.OnboardingService, } } @@ -210,3 +211,7 @@ func (s *SetupService) OnboardInitialUser(ctx context.Context, or *platform.Onbo Bucket: bkt, }, nil } + +func (s *SetupService) OnboardUser(ctx context.Context, or *platform.OnboardingRequest) (*platform.OnboardingResults, error) { + return nil, errors.New("not yet implemented") +} diff --git a/kv/onboarding.go b/kv/onboarding.go index 0ef15bbde51..821a4193d1b 100644 --- a/kv/onboarding.go +++ b/kv/onboarding.go @@ -2,6 +2,7 @@ package kv import ( "context" + "errors" "fmt" "time" @@ -171,3 +172,7 @@ func (s *Service) OnboardInitialUser(ctx context.Context, req *influxdb.Onboardi Auth: auth, }, nil } + +func (s *Service) OnboardUser(ctx context.Context, req *influxdb.OnboardingRequest) (*influxdb.OnboardingResults, error) { + return nil, errors.New("not yet implemented") +} diff --git a/mock/onboarding_service.go b/mock/onboarding_service.go index ed73dd505e6..c0197ab406c 100644 --- a/mock/onboarding_service.go +++ b/mock/onboarding_service.go @@ -16,15 +16,19 @@ type OnboardingService struct { UserService AuthorizationService - IsOnboardingFn func(context.Context) (bool, error) - GenerateFn func(context.Context, *platform.OnboardingRequest) (*platform.OnboardingResults, error) + IsOnboardingFn func(context.Context) (bool, error) + OnboardInitialUserFn func(context.Context, *platform.OnboardingRequest) (*platform.OnboardingResults, error) + OnboardUserFn func(context.Context, *platform.OnboardingRequest) (*platform.OnboardingResults, error) } // NewOnboardingService returns a mock of OnboardingService where its methods will return zero values. func NewOnboardingService() *OnboardingService { return &OnboardingService{ IsOnboardingFn: func(context.Context) (bool, error) { return false, nil }, - GenerateFn: func(context.Context, *platform.OnboardingRequest) (*platform.OnboardingResults, error) { + OnboardInitialUserFn: func(context.Context, *platform.OnboardingRequest) (*platform.OnboardingResults, error) { + return nil, nil + }, + OnboardUserFn: func(context.Context, *platform.OnboardingRequest) (*platform.OnboardingResults, error) { return nil, nil }, } @@ -37,5 +41,10 @@ func (s *OnboardingService) IsOnboarding(ctx context.Context) (bool, error) { // OnboardInitialUser OnboardingResults. func (s *OnboardingService) OnboardInitialUser(ctx context.Context, req *platform.OnboardingRequest) (*platform.OnboardingResults, error) { - return s.GenerateFn(ctx, req) + return s.OnboardInitialUserFn(ctx, req) +} + +// OnboardUser OnboardingResults. +func (s *OnboardingService) OnboardUser(ctx context.Context, req *platform.OnboardingRequest) (*platform.OnboardingResults, error) { + return s.OnboardUserFn(ctx, req) } diff --git a/onboarding.go b/onboarding.go index 6674c7fb21e..7005821288c 100644 --- a/onboarding.go +++ b/onboarding.go @@ -4,16 +4,14 @@ import "context" // OnboardingService represents a service for the first run. type OnboardingService interface { - PasswordsService - BucketService - OrganizationService - UserService - AuthorizationService - // IsOnboarding determine if onboarding request is allowed. IsOnboarding(ctx context.Context) (bool, error) + // OnboardInitialUser OnboardingResults. OnboardInitialUser(ctx context.Context, req *OnboardingRequest) (*OnboardingResults, error) + + // OnboardUser creates a new user/org/buckets + OnboardUser(ctx context.Context, req *OnboardingRequest) (*OnboardingResults, error) } // OnboardingResults is a group of elements required for first run. diff --git a/tenant/error.go b/tenant/error.go index 6732481f31e..7d214e575f2 100644 --- a/tenant/error.go +++ b/tenant/error.go @@ -27,6 +27,17 @@ var ( Code: influxdb.EInternal, Msg: "unable to generate valid id", } + + // ErrOnboardingNotAllowed occurs when request to onboard comes in and we are not allowing this request + ErrOnboardingNotAllowed = &influxdb.Error{ + Code: influxdb.EConflict, + Msg: "onboarding has already been completed", + } + + ErrOnboardInvalid = &influxdb.Error{ + Code: influxdb.EEmptyValue, + Msg: "onboard failed, missing value", + } ) // ErrCorruptID the ID stored in the Store is corrupt. diff --git a/tenant/http_client_onboarding.go b/tenant/http_client_onboarding.go new file mode 100644 index 00000000000..a7c4a29c53a --- /dev/null +++ b/tenant/http_client_onboarding.go @@ -0,0 +1,79 @@ +package tenant + +import ( + "context" + "path" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/pkg/httpc" +) + +// OnboardClientService connects to Influx via HTTP to perform onboarding operations +type OnboardClientService struct { + Client *httpc.Client +} + +// IsOnboarding determine if onboarding request is allowed. +func (s *OnboardClientService) IsOnboarding(ctx context.Context) (bool, error) { + var resp isOnboardingResponse + err := s.Client. + Get(prefixOrganizations). + DecodeJSON(&resp). + Do(ctx) + + if err != nil { + return false, err + } + return resp.Allowed, nil +} + +// OnboardInitialUser OnboardingResults. +func (s *OnboardClientService) OnboardInitialUser(ctx context.Context, or *influxdb.OnboardingRequest) (*influxdb.OnboardingResults, error) { + res := &onboardingResponse{} + + err := s.Client. + PostJSON(or, prefixOnboard). + DecodeJSON(res). + Do(ctx) + + if err != nil { + return nil, err + } + + bkt, err := res.Bucket.toInfluxDB() + if err != nil { + return nil, err + } + + return &influxdb.OnboardingResults{ + Org: &res.Organization.Organization, + User: &res.User.User, + Auth: res.Auth.toPlatform(), + Bucket: bkt, + }, nil +} + +func (s *OnboardClientService) OnboardUser(ctx context.Context, or *influxdb.OnboardingRequest) (*influxdb.OnboardingResults, error) { + res := &onboardingResponse{} + + err := s.Client. + PostJSON(or, path.Join(prefixOnboard, "user")). + DecodeJSON(res). + Do(ctx) + + if err != nil { + return nil, err + } + + bkt, err := res.Bucket.toInfluxDB() + if err != nil { + return nil, err + } + + return &influxdb.OnboardingResults{ + Org: &res.Organization.Organization, + User: &res.User.User, + Auth: res.Auth.toPlatform(), + Bucket: bkt, + }, nil +} diff --git a/tenant/http_server_onboarding.go b/tenant/http_server_onboarding.go new file mode 100644 index 00000000000..c7c67e31881 --- /dev/null +++ b/tenant/http_server_onboarding.go @@ -0,0 +1,174 @@ +package tenant + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/go-chi/chi" + "github.com/go-chi/chi/middleware" + "github.com/influxdata/influxdb/v2" + kithttp "github.com/influxdata/influxdb/v2/kit/transport/http" + "go.uber.org/zap" +) + +// OnboardHandler represents an HTTP API handler for users. +type OnboardHandler struct { + chi.Router + api *kithttp.API + log *zap.Logger + onboardingSvc influxdb.OnboardingService +} + +const ( + prefixOnboard = "/api/v2/setup" +) + +// NewHTTPOnboardHandler constructs a new http server. +func NewHTTPOnboardHandler(log *zap.Logger, onboardSvc influxdb.OnboardingService) *OnboardHandler { + svr := &OnboardHandler{ + api: kithttp.NewAPI(kithttp.WithLog(log)), + log: log, + onboardingSvc: onboardSvc, + } + + r := chi.NewRouter() + r.Use( + middleware.Recoverer, + middleware.RequestID, + middleware.RealIP, + ) + + // RESTy routes for "articles" resource + r.Route("/", func(r chi.Router) { + r.Post("/", svr.handleInitialOnboardRequest) + r.Get("/", svr.handleIsOnboarding) + r.Post("/user", svr.handleOnboardRequest) + + }) + + svr.Router = r + return svr +} + +func (h *OnboardHandler) Prefix() string { + return prefixOnboard +} + +type isOnboardingResponse struct { + Allowed bool `json:"allowed"` +} + +// isOnboarding is the HTTP handler for the POST /api/v2/setup route. +func (h *OnboardHandler) handleIsOnboarding(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + result, err := h.onboardingSvc.IsOnboarding(ctx) + if err != nil { + h.api.Err(w, err) + return + } + h.log.Debug("Onboarding eligibility check finished", zap.String("result", fmt.Sprint(result))) + + h.api.Respond(w, http.StatusOK, isOnboardingResponse{result}) +} + +// handleInitialOnboardRequest is the HTTP handler for the GET /api/v2/setup route. +func (h *OnboardHandler) handleInitialOnboardRequest(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req, err := decodeOnboardRequest(ctx, r) + if err != nil { + h.api.Err(w, err) + return + } + results, err := h.onboardingSvc.OnboardInitialUser(ctx, req) + if err != nil { + h.api.Err(w, err) + return + } + h.log.Debug("Onboarding setup completed", zap.String("results", fmt.Sprint(results))) + + h.api.Respond(w, http.StatusCreated, NewOnboardingResponse(results)) +} + +// isOnboarding is the HTTP handler for the POST /api/v2/setup route. +func (h *OnboardHandler) handleOnboardRequest(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req, err := decodeOnboardRequest(ctx, r) + if err != nil { + h.api.Err(w, err) + return + } + results, err := h.onboardingSvc.OnboardUser(ctx, req) + if err != nil { + h.api.Err(w, err) + return + } + h.log.Debug("Onboarding setup completed", zap.String("results", fmt.Sprint(results))) + + h.api.Respond(w, http.StatusCreated, NewOnboardingResponse(results)) +} + +type onboardingResponse struct { + User *userResponse `json:"user"` + Bucket *bucketResponse `json:"bucket"` + Organization orgResponse `json:"org"` + Auth *authResponse `json:"auth"` +} + +func NewOnboardingResponse(results *influxdb.OnboardingResults) *onboardingResponse { + return &onboardingResponse{ + User: newUserResponse(results.User), + Bucket: NewBucketResponse(results.Bucket), + Organization: newOrgResponse(*results.Org), + Auth: newAuthResponse(results.Auth), + } +} + +func decodeOnboardRequest(ctx context.Context, r *http.Request) (*influxdb.OnboardingRequest, error) { + req := &influxdb.OnboardingRequest{} + if err := json.NewDecoder(r.Body).Decode(req); err != nil { + return nil, err + } + + return req, nil +} + +type authResponse struct { + influxdb.Authorization + Links map[string]string `json:"links"` +} + +func newAuthResponse(a *influxdb.Authorization) *authResponse { + if a == nil { + return nil + } + + res := &authResponse{ + Authorization: *a, + Links: map[string]string{ + "self": fmt.Sprintf("/api/v2/authorizations/%s", a.ID), + "user": fmt.Sprintf("/api/v2/users/%s", a.UserID), + }, + } + return res +} + +func (a *authResponse) toPlatform() *influxdb.Authorization { + res := &influxdb.Authorization{ + ID: a.ID, + Token: a.Token, + Status: a.Status, + Description: a.Description, + OrgID: a.OrgID, + UserID: a.UserID, + CRUDLog: influxdb.CRUDLog{ + CreatedAt: a.CreatedAt, + UpdatedAt: a.UpdatedAt, + }, + } + for _, p := range a.Permissions { + res.Permissions = append(res.Permissions, influxdb.Permission{Action: p.Action, Resource: p.Resource}) + } + return res +} diff --git a/tenant/http_server_onboarding_test.go b/tenant/http_server_onboarding_test.go new file mode 100644 index 00000000000..ec1b4fbc329 --- /dev/null +++ b/tenant/http_server_onboarding_test.go @@ -0,0 +1,65 @@ +package tenant_test + +import ( + "context" + "net/http/httptest" + "testing" + + "github.com/go-chi/chi" + ihttp "github.com/influxdata/influxdb/v2/http" + "github.com/influxdata/influxdb/v2/kv" + "github.com/influxdata/influxdb/v2/tenant" + itesting "github.com/influxdata/influxdb/v2/testing" + "github.com/influxdata/influxdb/v2" + "go.uber.org/zap/zaptest" +) + +func initOnboardHttpService(f itesting.OnboardingFields, t *testing.T) (influxdb.OnboardingService, func()) { + t.Helper() + + s, stCloser, err := NewTestInmemStore(t) + if err != nil { + t.Fatal(err) + } + storage, err := tenant.NewStore(s) + if err != nil { + t.Fatal(err) + } + + authsvc := kv.NewService(zaptest.NewLogger(t), s) + + ten := tenant.NewService(storage) + + svc := tenant.NewOnboardService(storage, authsvc) + + ctx := context.Background() + if !f.IsOnboarding { + // create a dummy so so we can no longer onboard + err := ten.CreateUser(ctx, &influxdb.User{Name: "dummy", Status: influxdb.Active}) + if err != nil { + t.Fatal(err) + } + } + + handler := tenant.NewHTTPOnboardHandler(zaptest.NewLogger(t), svc) + r := chi.NewRouter() + r.Mount(handler.Prefix(), handler) + server := httptest.NewServer(r) + httpClient, err := ihttp.NewHTTPClient(server.URL, "", false) + if err != nil { + t.Fatal(err) + } + + client := tenant.OnboardClientService{ + Client: httpClient, + } + + return &client, func() { + server.Close() + stCloser() + } +} + +func TestOnboardService(t *testing.T) { + itesting.OnboardInitialUser(initOnboardHttpService, t) +} diff --git a/tenant/middleware_onboarding_auth.go b/tenant/middleware_onboarding_auth.go new file mode 100644 index 00000000000..daf5304b7cd --- /dev/null +++ b/tenant/middleware_onboarding_auth.go @@ -0,0 +1,49 @@ +package tenant + +import ( + "context" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/authorizer" +) + +var _ influxdb.OnboardingService = (*AuthedOnboardSvc)(nil) + +// TODO (al): remove authorizer/org when the org service moves to tenant + +// AuthedOnboardSvc wraps a influxdb.OnboardingService and authorizes actions +// against it appropriately. +type AuthedOnboardSvc struct { + s influxdb.OnboardingService +} + +// NewAuthedOnboardSvc constructs an instance of an authorizing org serivce. +func NewAuthedOnboardSvc(s influxdb.OnboardingService) *AuthedOnboardSvc { + return &AuthedOnboardSvc{ + s: s, + } +} + +// IsOnboarding pass through. this is handled by the underlieing service layer +func (s *AuthedOnboardSvc) IsOnboarding(ctx context.Context) (bool, error) { + return s.s.IsOnboarding(ctx) +} + +// OnboardInitialUser pass through. this is handled by the underlieing service layer +func (s *AuthedOnboardSvc) OnboardInitialUser(ctx context.Context, req *influxdb.OnboardingRequest) (*influxdb.OnboardingResults, error) { + return s.s.OnboardInitialUser(ctx, req) +} + +// OnboardUser needs to confirm this user has access to do global create for multiple resources +func (s *AuthedOnboardSvc) OnboardUser(ctx context.Context, req *influxdb.OnboardingRequest) (*influxdb.OnboardingResults, error) { + if _, _, err := authorizer.AuthorizeWriteGlobal(ctx, influxdb.OrgsResourceType); err != nil { + return nil, err + } + if _, _, err := authorizer.AuthorizeWriteGlobal(ctx, influxdb.UsersResourceType); err != nil { + return nil, err + } + if _, _, err := authorizer.AuthorizeWriteGlobal(ctx, influxdb.BucketsResourceType); err != nil { + return nil, err + } + return s.s.OnboardUser(ctx, req) +} diff --git a/tenant/middleware_onboarding_logging.go b/tenant/middleware_onboarding_logging.go new file mode 100644 index 00000000000..e2cb438ac46 --- /dev/null +++ b/tenant/middleware_onboarding_logging.go @@ -0,0 +1,63 @@ +package tenant + +import ( + "context" + "fmt" + "time" + + "github.com/influxdata/influxdb/v2" + "go.uber.org/zap" +) + +type OnboardingLogger struct { + logger *zap.Logger + onboardingService influxdb.OnboardingService +} + +// NewOnboardingLogger returns a logging service middleware for the Bucket Service. +func NewOnboardingLogger(log *zap.Logger, s influxdb.OnboardingService) *OnboardingLogger { + return &OnboardingLogger{ + logger: log, + onboardingService: s, + } +} + +var _ influxdb.OnboardingService = (*OnboardingLogger)(nil) + +func (l *OnboardingLogger) IsOnboarding(ctx context.Context) (available bool, err error) { + defer func(start time.Time) { + dur := zap.Duration("took", time.Since(start)) + if err != nil { + l.logger.Error("failed to check onboarding", zap.Error(err), dur) + return + } + l.logger.Debug("is onboarding", dur) + }(time.Now()) + return l.onboardingService.IsOnboarding(ctx) +} + +func (l *OnboardingLogger) OnboardInitialUser(ctx context.Context, req *influxdb.OnboardingRequest) (res *influxdb.OnboardingResults, err error) { + defer func(start time.Time) { + dur := zap.Duration("took", time.Since(start)) + if err != nil { + msg := fmt.Sprintf("failed to onboard user %s", req.User) + l.logger.Error(msg, zap.Error(err), dur) + return + } + l.logger.Debug("onboard initial user", dur) + }(time.Now()) + return l.onboardingService.OnboardInitialUser(ctx, req) +} + +func (l *OnboardingLogger) OnboardUser(ctx context.Context, req *influxdb.OnboardingRequest) (res *influxdb.OnboardingResults, err error) { + defer func(start time.Time) { + dur := zap.Duration("took", time.Since(start)) + if err != nil { + msg := fmt.Sprintf("failed to onboard user %s", req.User) + l.logger.Error(msg, zap.Error(err), dur) + return + } + l.logger.Debug("onboard user", dur) + }(time.Now()) + return l.onboardingService.OnboardUser(ctx, req) +} diff --git a/tenant/middleware_onboarding_metrics.go b/tenant/middleware_onboarding_metrics.go new file mode 100644 index 00000000000..9ed3f75fabe --- /dev/null +++ b/tenant/middleware_onboarding_metrics.go @@ -0,0 +1,44 @@ +package tenant + +import ( + "context" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/metric" + "github.com/influxdata/influxdb/v2/kit/prom" +) + +var _ influxdb.OnboardingService = (*OnboardingMetrics)(nil) + +type OnboardingMetrics struct { + // RED metrics + rec *metric.REDClient + + onboardingService influxdb.OnboardingService +} + +// NewOnboardingMetrics returns a metrics service middleware for the User Service. +func NewOnboardingMetrics(reg *prom.Registry, s influxdb.OnboardingService, opts ...MetricsOption) *OnboardingMetrics { + o := applyOpts(opts...) + return &OnboardingMetrics{ + rec: metric.New(reg, o.applySuffix("onboard")), + onboardingService: s, + } +} + +func (m *OnboardingMetrics) IsOnboarding(ctx context.Context) (bool, error) { + rec := m.rec.Record("is_onboarding") + available, err := m.onboardingService.IsOnboarding(ctx) + return available, rec(err) +} + +func (m *OnboardingMetrics) OnboardInitialUser(ctx context.Context, req *influxdb.OnboardingRequest) (*influxdb.OnboardingResults, error) { + rec := m.rec.Record("onboard_initial_user") + res, err := m.onboardingService.OnboardInitialUser(ctx, req) + return res, rec(err) +} +func (m *OnboardingMetrics) OnboardUser(ctx context.Context, req *influxdb.OnboardingRequest) (*influxdb.OnboardingResults, error) { + rec := m.rec.Record("onboard_user") + res, err := m.onboardingService.OnboardUser(ctx, req) + return res, rec(err) +} diff --git a/tenant/service_bucket.go b/tenant/service_bucket.go index 1e411b18132..bf5f92bbf24 100644 --- a/tenant/service_bucket.go +++ b/tenant/service_bucket.go @@ -156,12 +156,7 @@ func (s *Service) CreateBucket(ctx context.Context, b *influxdb.Bucket) error { return err } - err := s.store.CreateBucket(ctx, tx, b) - if err != nil { - return err - } - - return s.addOrgRelationToResource(ctx, tx, b.OrgID, b.ID, influxdb.BucketsResourceType) + return s.store.CreateBucket(ctx, tx, b) }) } diff --git a/tenant/service_onboarding.go b/tenant/service_onboarding.go new file mode 100644 index 00000000000..cd635e5ac8c --- /dev/null +++ b/tenant/service_onboarding.go @@ -0,0 +1,165 @@ +package tenant + +import ( + "context" + "fmt" + "time" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kv" +) + +type OnboardService struct { + store *Store + authSvc influxdb.AuthorizationService +} + +func NewOnboardService(st *Store, as influxdb.AuthorizationService) influxdb.OnboardingService { + return &OnboardService{ + store: st, + authSvc: as, + } +} + +// IsOnboarding determine if onboarding request is allowed. +func (s *OnboardService) IsOnboarding(ctx context.Context) (bool, error) { + allowed := false + err := s.store.View(ctx, func(tx kv.Tx) error { + // we are allowed to onboard a user if we have no users or orgs + users, _ := s.store.ListUsers(ctx, tx, influxdb.FindOptions{Limit: 1}) + orgs, _ := s.store.ListOrgs(ctx, tx, influxdb.FindOptions{Limit: 1}) + if len(users) == 0 && len(orgs) == 0 { + allowed = true + } + return nil + }) + return allowed, err +} + +// OnboardInitialUser allows us to onboard a new user if is onboarding is allowd +func (s *OnboardService) OnboardInitialUser(ctx context.Context, req *influxdb.OnboardingRequest) (*influxdb.OnboardingResults, error) { + allowed, err := s.IsOnboarding(ctx) + if err != nil { + return nil, err + } + + if !allowed { + return nil, ErrOnboardingNotAllowed + } + + return s.onboardUser(ctx, req, func(influxdb.ID) []influxdb.Permission { return influxdb.OperPermissions() }) +} + +// OnboardUser allows us to onboard a new user if is onboarding is allowd +func (s *OnboardService) OnboardUser(ctx context.Context, req *influxdb.OnboardingRequest) (*influxdb.OnboardingResults, error) { + return s.onboardUser(ctx, req, influxdb.OwnerPermissions) +} + +// onboardUser allows us to onboard new users. +func (s *OnboardService) onboardUser(ctx context.Context, req *influxdb.OnboardingRequest, permFn func(orgID influxdb.ID) []influxdb.Permission) (*influxdb.OnboardingResults, error) { + if req == nil || req.User == "" || req.Password == "" || req.Org == "" || req.Bucket == "" { + return nil, ErrOnboardInvalid + } + + result := &influxdb.OnboardingResults{} + + err := s.store.Update(ctx, func(tx kv.Tx) error { + // create a user + user := &influxdb.User{ + Name: req.User, + Status: influxdb.Active, + } + + if err := s.store.CreateUser(ctx, tx, user); err != nil { + return err + } + + // create users password + if req.Password != "" { + passHash, err := encryptPassword(req.Password) + if err != nil { + return err + } + + s.store.SetPassword(ctx, tx, user.ID, passHash) + } + + // create users org + org := &influxdb.Organization{ + Name: req.Org, + } + + if err := s.store.CreateOrg(ctx, tx, org); err != nil { + return err + } + + // create urm + err := s.store.CreateURM(ctx, tx, &influxdb.UserResourceMapping{ + UserID: user.ID, + UserType: influxdb.Owner, + MappingType: influxdb.UserMappingType, + ResourceType: influxdb.OrgsResourceType, + ResourceID: org.ID, + }) + if err != nil { + return err + } + + // create orgs buckets + ub := &influxdb.Bucket{ + OrgID: org.ID, + Name: req.Bucket, + Type: influxdb.BucketTypeUser, + RetentionPeriod: time.Duration(req.RetentionPeriod) * time.Hour, + } + + if err := s.store.CreateBucket(ctx, tx, ub); err != nil { + return err + } + + tb := &influxdb.Bucket{ + OrgID: org.ID, + Type: influxdb.BucketTypeSystem, + Name: influxdb.TasksSystemBucketName, + RetentionPeriod: influxdb.TasksSystemBucketRetention, + Description: "System bucket for task logs", + } + + if err := s.store.CreateBucket(ctx, tx, tb); err != nil { + return err + } + + mb := &influxdb.Bucket{ + OrgID: org.ID, + Type: influxdb.BucketTypeSystem, + Name: influxdb.MonitoringSystemBucketName, + RetentionPeriod: influxdb.MonitoringSystemBucketRetention, + Description: "System bucket for monitoring logs", + } + + if err := s.store.CreateBucket(ctx, tx, mb); err != nil { + return err + } + + result.User = user + result.Org = org + result.Bucket = ub + return nil + }) + + if err != nil { + return result, err + } + + // bolt doesn't lock per collection or record so we have to close our transaction + // before we can reach out to the auth service. + result.Auth = &influxdb.Authorization{ + Description: fmt.Sprintf("%s's Token", req.User), + Permissions: permFn(result.Org.ID), + Token: req.Token, + UserID: result.User.ID, + OrgID: result.Org.ID, + } + + return result, s.authSvc.CreateAuthorization(ctx, result.Auth) +} diff --git a/tenant/service_onboarding_test.go b/tenant/service_onboarding_test.go new file mode 100644 index 00000000000..e9a8ccedb1f --- /dev/null +++ b/tenant/service_onboarding_test.go @@ -0,0 +1,52 @@ +package tenant_test + +import ( + "context" + "testing" + + influxdb "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kv" + "github.com/influxdata/influxdb/v2/tenant" + influxdbtesting "github.com/influxdata/influxdb/v2/testing" + "go.uber.org/zap/zaptest" +) + +func TestBoltOnboardingService(t *testing.T) { + influxdbtesting.OnboardInitialUser(initBoltOnboardingService, t) +} + +func initBoltOnboardingService(f influxdbtesting.OnboardingFields, t *testing.T) (influxdb.OnboardingService, func()) { + s, closeStore, err := NewTestInmemStore(t) + if err != nil { + t.Fatalf("failed to create new bolt kv store: %v", err) + } + + svc := initOnboardingService(s, f, t) + return svc, func() { + closeStore() + } +} + +func initOnboardingService(s kv.Store, f influxdbtesting.OnboardingFields, t *testing.T) influxdb.OnboardingService { + storage, err := tenant.NewStore(s) + if err != nil { + t.Fatal(err) + } + ten := tenant.NewService(storage) + + // we will need an auth service as well + svc := tenant.NewOnboardService(storage, kv.NewService(zaptest.NewLogger(t), s)) + + ctx := context.Background() + + t.Logf("Onboarding: %v", f.IsOnboarding) + if !f.IsOnboarding { + // create a dummy so so we can no longer onboard + err := ten.CreateUser(ctx, &influxdb.User{Name: "dummy", Status: influxdb.Active}) + if err != nil { + t.Fatal(err) + } + } + + return svc +} diff --git a/tenant/service_org.go b/tenant/service_org.go index 51cf24eac08..4ef2a92c189 100644 --- a/tenant/service_org.go +++ b/tenant/service_org.go @@ -129,28 +129,6 @@ func (s *Service) CreateOrganization(ctx context.Context, o *influxdb.Organizati if err != nil { return err } - err = s.store.CreateURM(ctx, tx, &influxdb.UserResourceMapping{ - UserID: userID, - UserType: influxdb.Owner, - MappingType: influxdb.UserMappingType, - ResourceType: influxdb.BucketsResourceType, - ResourceID: tb.ID, - }) - if err != nil { - return err - } - err = s.store.CreateURM(ctx, tx, &influxdb.UserResourceMapping{ - UserID: userID, - UserType: influxdb.Owner, - MappingType: influxdb.UserMappingType, - ResourceType: influxdb.BucketsResourceType, - ResourceID: mb.ID, - }) - - if err != nil { - return err - } - } return nil }) diff --git a/tenant/service_urm.go b/tenant/service_urm.go index 76587fdda0a..f03bdfa5ece 100644 --- a/tenant/service_urm.go +++ b/tenant/service_urm.go @@ -44,30 +44,6 @@ func (s *Service) DeleteUserResourceMapping(ctx context.Context, resourceID, use return err } -// addOrgRelationToResource duplicates the organizations user resource mappings for this new resource -func (s *Service) addOrgRelationToResource(ctx context.Context, tx kv.Tx, orgID, resourceID influxdb.ID, resourceType influxdb.ResourceType) error { - urms, err := s.store.ListURMs(ctx, tx, influxdb.UserResourceMappingFilter{ - ResourceID: orgID, - }) - if err != nil { - return err - } - for _, urm := range urms { - err := s.store.CreateURM(ctx, tx, &influxdb.UserResourceMapping{ - UserID: urm.UserID, - UserType: urm.UserType, - MappingType: urm.MappingType, - ResourceType: resourceType, - ResourceID: resourceID, - }) - if err != nil { - return err - } - } - return nil - -} - // removeResourceRelations allows us to clean up any resource relationship that would have normally been left over after a delete action of a resource. func (s *Service) removeResourceRelations(ctx context.Context, tx kv.Tx, resourceID influxdb.ID) error { urms, err := s.store.ListURMs(ctx, tx, influxdb.UserResourceMappingFilter{ diff --git a/tenant/service_user.go b/tenant/service_user.go index 91ae0551b6b..2e8aaf8e743 100644 --- a/tenant/service_user.go +++ b/tenant/service_user.go @@ -192,3 +192,11 @@ func (s *Service) CompareAndSetPassword(ctx context.Context, userID influxdb.ID, return s.SetPassword(ctx, userID, new) } + +func encryptPassword(password string) (string, error) { + passHash, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost) + if err != nil { + return "", err + } + return string(passHash), nil +} diff --git a/testing/onboarding.go b/testing/onboarding.go index 56dae05b1ce..17b2f3f9f38 100644 --- a/testing/onboarding.go +++ b/testing/onboarding.go @@ -10,6 +10,22 @@ import ( "github.com/influxdata/influxdb/v2/mock" ) +var onboardCmpOptions = cmp.Options{ + cmp.Comparer(func(x, y *platform.OnboardingResults) bool { + if x == nil && y == nil { + return true + } + if x != nil && y == nil || y != nil && x == nil { + return false + } + + return x.User.Name == y.User.Name && x.User.OAuthID == y.User.OAuthID && x.User.Status == y.User.Status && + x.Org.Name == y.Org.Name && x.Org.Description == y.Org.Description && + x.Bucket.Type == y.Bucket.Type && x.Bucket.Description == y.Bucket.Description && x.Bucket.RetentionPolicyName == y.Bucket.RetentionPolicyName && x.Bucket.RetentionPeriod == y.Bucket.RetentionPeriod && x.Bucket.Name == y.Bucket.Name && + (x.Auth != nil && y.Auth != nil && cmp.Equal(x.Auth.Permissions, y.Auth.Permissions)) // its possible auth wont exist on the basic service level + }), +} + // OnboardingFields will include the IDGenerator, TokenGenerator // and IsOnboarding type OnboardingFields struct { @@ -28,9 +44,8 @@ func OnboardInitialUser( request *platform.OnboardingRequest } type wants struct { - errCode string - results *platform.OnboardingResults - password string + errCode string + results *platform.OnboardingResults } tests := []struct { name string @@ -148,7 +163,6 @@ func OnboardInitialUser( }, }, wants: wants{ - password: "password1", results: &platform.OnboardingResults{ User: &platform.User{ ID: MustIDBase16(oneID), @@ -206,14 +220,9 @@ func OnboardInitialUser( t.Fatalf("expected error code to match '%s' got '%v'", tt.wants.errCode, code) } } - if diff := cmp.Diff(results, tt.wants.results); diff != "" { + if diff := cmp.Diff(results, tt.wants.results, onboardCmpOptions); diff != "" { t.Errorf("onboarding results are different -got/+want\ndiff %s", diff) } - if results != nil { - if err = s.ComparePassword(ctx, results.User.ID, tt.wants.password); err != nil { - t.Errorf("onboarding set password is wrong") - } - } }) }