Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feat: use runners to startup the services #9048

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Prev Previous commit
Next Next commit
feat: include policies and notifications services to the runners
  • Loading branch information
jvillafanez committed May 22, 2024
commit 0df34d596f551090e750215512abe2b025772bf9
43 changes: 24 additions & 19 deletions services/notifications/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package command
import (
"context"
"fmt"
"os/signal"

"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/ocis-pkg/handlers"
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
"github.com/owncloud/ocis/v2/ocis-pkg/runner"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
"github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
Expand Down Expand Up @@ -51,17 +52,14 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr := run.Group{}

ctx, cancel := func() (context.Context, context.CancelFunc) {
if cfg.Context == nil {
return context.WithCancel(context.Background())
}
return context.WithCancel(cfg.Context)
}()

defer cancel()
var cancel context.CancelFunc
ctx := cfg.Context
if ctx == nil {
ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...)
defer cancel()
}

gr := runner.NewGroup()
{
server := debug.NewService(
debug.Logger(logger),
Expand All @@ -75,10 +73,7 @@ func Server(cfg *config.Config) *cli.Command {
debug.Ready(handlers.Ready),
)

gr.Add(server.ListenAndServe, func(_ error) {
_ = server.Shutdown(ctx)
cancel()
})
gr.Add(runner.NewGolangHttpServerRunner("notifications_debug", server))
}

// evs defines a list of events to subscribe to
Expand Down Expand Up @@ -118,11 +113,21 @@ func Server(cfg *config.Config) *cli.Command {
valueService := settingssvc.NewValueService("com.owncloud.api.settings", grpcClient)
svc := service.NewEventsNotifier(evts, channel, logger, gatewaySelector, valueService, cfg.ServiceAccount.ServiceAccountID, cfg.ServiceAccount.ServiceAccountSecret, cfg.Notifications.EmailTemplatePath, cfg.Notifications.DefaultLanguage, cfg.WebUIURL)

gr.Add(svc.Run, func(error) {
cancel()
})
gr.Add(runner.New("notifications_svc", func() error {
return svc.Run()
}, func() {
svc.Close()
}))

return gr.Run()
grResults := gr.Run(ctx)

// return the first non-nil error found in the results
for _, grResult := range grResults {
if grResult.RunnerError != nil {
return grResult.RunnerError
}
}
return nil
},
}
}
40 changes: 31 additions & 9 deletions services/notifications/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (
"errors"
"fmt"
"net/url"
"os"
"os/signal"
"path"
"strings"
"syscall"
"sync"
"sync/atomic"

gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1"
Expand Down Expand Up @@ -47,22 +46,22 @@ func NewEventsNotifier(
logger: logger,
channel: channel,
events: events,
signals: make(chan os.Signal, 1),
gatewaySelector: gatewaySelector,
valueService: valueService,
serviceAccountID: serviceAccountID,
serviceAccountSecret: serviceAccountSecret,
emailTemplatePath: emailTemplatePath,
defaultLanguage: defaultLanguage,
ocisURL: ocisURL,
stopCh: make(chan struct{}, 1),
stopped: new(atomic.Bool),
}
}

type eventsNotifier struct {
logger log.Logger
channel channels.Channel
events <-chan events.Event
signals chan os.Signal
gatewaySelector pool.Selectable[gateway.GatewayAPIClient]
valueService settingssvc.ValueService
emailTemplatePath string
Expand All @@ -71,16 +70,27 @@ type eventsNotifier struct {
ocisURL string
serviceAccountID string
serviceAccountSecret string
stopCh chan struct{}
stopped *atomic.Bool
}

func (s eventsNotifier) Run() error {
signal.Notify(s.signals, syscall.SIGINT, syscall.SIGTERM)
var wg sync.WaitGroup

s.logger.Debug().
Msg("eventsNotifier started")
EventLoop:
for {
select {
case evt := <-s.events:
case evt, ok := <-s.events:
if !ok {
break EventLoop
}
// TODO: needs to be replaced with a worker pool
wg.Add(1)
go func() {
defer wg.Done()

switch e := evt.Event.(type) {
case events.SpaceShared:
s.handleSpaceShared(e)
Expand All @@ -94,12 +104,24 @@ func (s eventsNotifier) Run() error {
s.handleShareExpired(e)
}
}()
case <-s.signals:

if s.stopped.Load() {
break EventLoop
}
case <-s.stopCh:
s.logger.Debug().
Msg("eventsNotifier stopped")
return nil
break EventLoop
}
}
// wait until all the goroutines processing events have finished
wg.Wait()
return nil
}

func (s eventsNotifier) Close() {
s.stopped.Store(true)
close(s.stopCh)
}

func (s eventsNotifier) render(ctx context.Context, template email.MessageTemplate,
Expand Down
47 changes: 25 additions & 22 deletions services/policies/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"fmt"
"io"
"net/http"
"os/signal"

"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/oklog/run"
"github.com/owncloud/ocis/v2/ocis-pkg/config/configlog"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
"github.com/owncloud/ocis/v2/ocis-pkg/runner"
"github.com/owncloud/ocis/v2/ocis-pkg/service/debug"
"github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/tracing"
Expand All @@ -33,16 +34,12 @@ func Server(cfg *config.Config) *cli.Command {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(c *cli.Context) error {
var (
gr = run.Group{}
ctx, cancel = func() (context.Context, context.CancelFunc) {
if cfg.Context == nil {
return context.WithCancel(context.Background())
}
return context.WithCancel(cfg.Context)
}()
)
defer cancel()
var cancel context.CancelFunc
ctx := cfg.Context
if ctx == nil {
ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...)
defer cancel()
}

logger := log.NewLogger(
log.Name(cfg.Service.Name),
Expand All @@ -62,6 +59,7 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr := runner.NewGroup()
{
grpcClient, err := grpc.NewClient(
append(
Expand Down Expand Up @@ -104,9 +102,7 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr.Add(svc.Run, func(_ error) {
cancel()
})
gr.Add(runner.NewGoMicroGrpcServerRunner("policies_grpc", svc))
}

{
Expand All @@ -121,9 +117,11 @@ func Server(cfg *config.Config) *cli.Command {
return err
}

gr.Add(eventSvc.Run, func(_ error) {
cancel()
})
gr.Add(runner.New("policies_svc", func() error {
return eventSvc.Run()
}, func() {
eventSvc.Close()
}))
}

{
Expand Down Expand Up @@ -165,13 +163,18 @@ func Server(cfg *config.Config) *cli.Command {
),
)

gr.Add(server.ListenAndServe, func(_ error) {
_ = server.Shutdown(ctx)
cancel()
})
gr.Add(runner.NewGolangHttpServerRunner("policies_debug", server))
}

return gr.Run()
grResults := gr.Run(ctx)

// return the first non-nil error found in the results
for _, grResult := range grResults {
if grResult.RunnerError != nil {
return grResult.RunnerError
}
}
return nil
},
}
}
62 changes: 46 additions & 16 deletions services/policies/pkg/service/event/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package eventSVC

import (
"context"
"sync/atomic"

"github.com/cs3org/reva/v2/pkg/events"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
Expand All @@ -11,23 +12,27 @@ import (

// Service defines the service handlers.
type Service struct {
ctx context.Context
query string
log log.Logger
stream events.Stream
engine engine.Engine
tp trace.TracerProvider
ctx context.Context
query string
log log.Logger
stream events.Stream
engine engine.Engine
tp trace.TracerProvider
stopCh chan struct{}
stopped *atomic.Bool
}

// New returns a service implementation for Service.
func New(ctx context.Context, stream events.Stream, logger log.Logger, tp trace.TracerProvider, engine engine.Engine, query string) (Service, error) {
svc := Service{
ctx: ctx,
log: logger,
query: query,
tp: tp,
engine: engine,
stream: stream,
ctx: ctx,
log: logger,
query: query,
tp: tp,
engine: engine,
stream: stream,
stopCh: make(chan struct{}, 1),
stopped: new(atomic.Bool),
}

return svc, nil
Expand All @@ -40,16 +45,41 @@ func (s Service) Run() error {
return err
}

for e := range ch {
err := s.processEvent(e)
if err != nil {
return err
EventLoop:
for {
select {
case <-s.stopCh:
break EventLoop
case e, ok := <-ch:
if !ok {
break EventLoop
}

err := s.processEvent(e)
if err != nil {
return err
}

if s.stopped.Load() {
break EventLoop
}
}
}

return nil
}

// Close will make the policies service to stop processing, so the `Run`
// method can finish.
// TODO: Underlying services can't be stopped. This means that some goroutines
// will get stuck trying to push events through a channel nobody is reading
// from, so resources won't be freed and there will be memory leaks. For now,
// if the service is stopped, you should close the app soon after.
func (s Service) Close() {
s.stopped.Store(true)
close(s.stopCh)
}

func (s Service) processEvent(e events.Event) error {
ctx := e.GetTraceContext(s.ctx)
ctx, span := s.tp.Tracer("policies").Start(ctx, "processEvent")
Expand Down