Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions pkg/generators/natsnames.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package generators

import (
"os"
"strconv"
)

// NType is an enum type for the different types of NATS connections
type NType int

// Enum values for NType
const (
NTypeBus NType = iota
NTypeKeyValue
NTypeRegistry
)

// String returns the string representation of a NType
func (n NType) String() string {
return []string{"bus", "kv", "reg"}[n]
}

// GenerateConnectionName generates a connection name for a NATS connection
// The connection name will be formatted as follows: "hostname:pid:service:type"
func GenerateConnectionName(service string, ntype NType) string {
host, err := os.Hostname()
if err != nil {
host = ""
}

return firstNRunes(host, 5) + ":" + strconv.Itoa(os.Getpid()) + ":" + service + ":" + ntype.String()
}

// firstNRunes returns the first n runes of a string
func firstNRunes(s string, n int) string {
runes := []rune(s)
if n > len(runes) {
n = len(runes)
}
return string(runes[:n])
}
12 changes: 12 additions & 0 deletions pkg/natsjsregistry/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

type storeOptionsKey struct{}
type defaultTTLKey struct{}
type serviceNameKey struct{}

// StoreOptions sets the options for the underlying store
func StoreOptions(opts []store.Option) registry.Option {
Expand All @@ -30,3 +31,14 @@ func DefaultTTL(t time.Duration) registry.Option {
o.Context = context.WithValue(o.Context, defaultTTLKey{}, t)
}
}

// ServiceName links the service name to the registry if possible.
// The name will be part of the connection name to the Nats registry
func ServiceName(name string) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, serviceNameKey{}, name)
}
}
18 changes: 16 additions & 2 deletions pkg/natsjsregistry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv"
"github.com/nats-io/nats.go"
"github.com/opencloud-eu/opencloud/pkg/generators"
"go-micro.dev/v4/registry"
"go-micro.dev/v4/server"
"go-micro.dev/v4/store"
Expand All @@ -29,7 +30,15 @@ var (
)

func init() {
cmd.DefaultRegistries[_registryName] = NewRegistry
cmd.DefaultRegistries[_registryName] = NewRegistryMicro
}

// NewRegistryMicro returns a new natsjs registry, forcing the service name
// to be "_go-micro". This is the registry that is intended to be used by
// go-micro
func NewRegistryMicro(opts ...registry.Option) registry.Registry {
overwrittenOpts := append(opts, ServiceName("_go-micro"))
return NewRegistry(overwrittenOpts...)
}

// NewRegistry returns a new natsjs registry
Expand Down Expand Up @@ -186,6 +195,11 @@ func (n *storeregistry) storeOptions(opts registry.Options) []store.Option {
storeoptions = append(storeoptions, natsjskv.DefaultTTL(defaultTTL))
}

serviceName := "_unknown" // use "_unknown" as default service name if nothing else is provided
if name, ok := opts.Context.Value(serviceNameKey{}).(string); ok {
serviceName = name
}

addr := []string{"127.0.0.1:9233"}
if len(opts.Addrs) > 0 {
addr = opts.Addrs
Expand All @@ -195,7 +209,7 @@ func (n *storeregistry) storeOptions(opts registry.Options) []store.Option {
storeoptions = append(storeoptions, store.Nodes(addr...))

natsOptions := nats.GetDefaultOptions()
natsOptions.Name = "nats-js-kv-registry"
natsOptions.Name = generators.GenerateConnectionName(serviceName, generators.NTypeRegistry)
natsOptions.User, natsOptions.Password = getAuth()
natsOptions.ReconnectedCB = func(_ *nats.Conn) {
if err := n.Init(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func GetRegistry(opts ...Option) mRegistry.Registry {
_reg = natsjsregistry.NewRegistry(
mRegistry.Addrs(cfg.Addresses...),
natsjsregistry.DefaultTTL(cfg.RegisterTTL),
natsjsregistry.ServiceName("_oc"),
)
case "memory":
_reg = memr.NewRegistry()
Expand Down
4 changes: 3 additions & 1 deletion services/activitylog/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
microstore "go-micro.dev/v4/store"

"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/registry"
ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc"
"github.com/opencloud-eu/opencloud/pkg/tracing"
Expand Down Expand Up @@ -70,7 +71,8 @@ func Server(cfg *config.Config) *cli.Command {

defer cancel()

evStream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
evStream, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
if err != nil {
logger.Error().Err(err).Msg("Failed to initialize event stream")
return err
Expand Down
4 changes: 3 additions & 1 deletion services/antivirus/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/opencloud-eu/reva/v2/pkg/rhttp"
"go.opentelemetry.io/otel/trace"

"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/services/antivirus/pkg/config"
"github.com/opencloud-eu/opencloud/services/antivirus/pkg/scanners"
Expand Down Expand Up @@ -114,7 +115,8 @@ func (av Antivirus) Run() error {
av.config.Events.TLSInsecure = false
}

natsStream, err := stream.NatsFromConfig(av.config.Service.Name, false, stream.NatsConfig(av.config.Events))
connName := generators.GenerateConnectionName(av.config.Service.Name, generators.NTypeBus)
natsStream, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(av.config.Events))
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion services/audit/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/urfave/cli/v2"

"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/services/audit/pkg/config"
"github.com/opencloud-eu/opencloud/services/audit/pkg/config/parser"
"github.com/opencloud-eu/opencloud/services/audit/pkg/logging"
Expand All @@ -36,7 +37,8 @@ func Server(cfg *config.Config) *cli.Command {
)
defer cancel()

client, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
client, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion services/clientlog/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/urfave/cli/v2"

"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/registry"
"github.com/opencloud-eu/opencloud/pkg/tracing"
"github.com/opencloud-eu/opencloud/pkg/version"
Expand Down Expand Up @@ -68,7 +69,8 @@ func Server(cfg *config.Config) *cli.Command {

defer cancel()

s, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
s, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion services/eventhistory/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
microstore "go-micro.dev/v4/store"

"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc"
"github.com/opencloud-eu/opencloud/pkg/tracing"
"github.com/opencloud-eu/opencloud/pkg/version"
Expand Down Expand Up @@ -55,7 +56,8 @@ func Server(cfg *config.Config) *cli.Command {

m.BuildInfo.WithLabelValues(version.GetString()).Set(1)

consumer, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
consumer, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion services/frontend/pkg/command/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go-micro.dev/v4/metadata"
"google.golang.org/protobuf/types/known/fieldmaskpb"

"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/middleware"
"github.com/opencloud-eu/opencloud/pkg/registry"
Expand All @@ -34,7 +35,8 @@ var _registeredEvents = []events.Unmarshaller{

// ListenForEvents listens for events and acts accordingly
func ListenForEvents(ctx context.Context, cfg *config.Config, l log.Logger) error {
bus, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
bus, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
if err != nil {
l.Error().Err(err).Msg("cannot connect to nats")
return err
Expand Down
4 changes: 3 additions & 1 deletion services/graph/pkg/server/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/opencloud-eu/opencloud/pkg/account"
"github.com/opencloud-eu/opencloud/pkg/cors"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/keycloak"
"github.com/opencloud-eu/opencloud/pkg/middleware"
"github.com/opencloud-eu/opencloud/pkg/registry"
Expand Down Expand Up @@ -56,7 +57,8 @@ func Server(opts ...Option) (http.Service, error) {

if options.Config.Events.Endpoint != "" {
var err error
eventsStream, err = stream.NatsFromConfig(options.Config.Service.Name, false, stream.NatsConfig(options.Config.Events))
connName := generators.GenerateConnectionName(options.Config.Service.Name, generators.NTypeBus)
eventsStream, err = stream.NatsFromConfig(connName, false, stream.NatsConfig(options.Config.Events))
if err != nil {
options.Logger.Error().
Err(err).
Expand Down
4 changes: 3 additions & 1 deletion services/notifications/pkg/command/send_email.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package command

import (
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/services/notifications/pkg/config"
"github.com/opencloud-eu/reva/v2/pkg/events"
"github.com/opencloud-eu/reva/v2/pkg/events/stream"
Expand Down Expand Up @@ -31,7 +32,8 @@ func SendEmail(cfg *config.Config) *cli.Command {
if !daily && !weekly {
return errors.New("at least one of '--daily' or '--weekly' must be set")
}
s, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Notifications.Events))
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
s, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Notifications.Events))
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion services/notifications/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool"

"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/registry"
"github.com/opencloud-eu/opencloud/pkg/service/grpc"
"github.com/opencloud-eu/opencloud/pkg/tracing"
Expand Down Expand Up @@ -94,7 +95,8 @@ func Server(cfg *config.Config) *cli.Command {
registeredEvents[typ.String()] = e
}

client, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Notifications.Events))
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
client, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Notifications.Events))
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion services/policies/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/urfave/cli/v2"

"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/service/grpc"
"github.com/opencloud-eu/opencloud/pkg/tracing"
Expand Down Expand Up @@ -104,7 +105,8 @@ func Server(cfg *config.Config) *cli.Command {

{

bus, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
bus, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion services/postprocessing/pkg/command/postprocessing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/services/postprocessing/pkg/config"
"github.com/opencloud-eu/opencloud/services/postprocessing/pkg/config/parser"
"github.com/opencloud-eu/reva/v2/pkg/events"
Expand Down Expand Up @@ -40,7 +41,8 @@ func RestartPostprocessing(cfg *config.Config) *cli.Command {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(c *cli.Context) error {
stream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
stream, err := stream.NatsFromConfig(connName, false, stream.NatsConfig{
Endpoint: cfg.Postprocessing.Events.Endpoint,
Cluster: cfg.Postprocessing.Events.Cluster,
EnableTLS: cfg.Postprocessing.Events.EnableTLS,
Expand Down
8 changes: 5 additions & 3 deletions services/postprocessing/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/version"
"github.com/opencloud-eu/opencloud/services/postprocessing/pkg/config"
Expand Down Expand Up @@ -46,7 +47,8 @@ var (

// NewPostprocessingService returns a new instance of a postprocessing service
func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store.Store, tp trace.TracerProvider, cfg *config.Config) (*PostprocessingService, error) {
pub, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
pub, err := stream.NatsFromConfig(connName, false, stream.NatsConfig{
Endpoint: cfg.Postprocessing.Events.Endpoint,
Cluster: cfg.Postprocessing.Events.Cluster,
EnableTLS: cfg.Postprocessing.Events.EnableTLS,
Expand All @@ -55,11 +57,11 @@ func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store.
AuthUsername: cfg.Postprocessing.Events.AuthUsername,
AuthPassword: cfg.Postprocessing.Events.AuthPassword,
})

if err != nil {
return nil, err
}
raw, err := raw.FromConfig(ctx, cfg.Service.Name, raw.Config{

raw, err := raw.FromConfig(ctx, connName, raw.Config{
Endpoint: cfg.Postprocessing.Events.Endpoint,
Cluster: cfg.Postprocessing.Events.Cluster,
EnableTLS: cfg.Postprocessing.Events.EnableTLS,
Expand Down
4 changes: 3 additions & 1 deletion services/proxy/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/justinas/alice"
"github.com/oklog/run"
"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/log"
pkgmiddleware "github.com/opencloud-eu/opencloud/pkg/middleware"
"github.com/opencloud-eu/opencloud/pkg/oidc"
Expand Down Expand Up @@ -157,7 +158,8 @@ func Server(cfg *config.Config) *cli.Command {
var publisher events.Stream
if cfg.Events.Endpoint != "" {
var err error
publisher, err = stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Events))
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
publisher, err = stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events))
if err != nil {
logger.Error().
Err(err).
Expand Down
4 changes: 3 additions & 1 deletion services/search/pkg/service/grpc/v0/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go-micro.dev/v4/metadata"
grpcmetadata "google.golang.org/grpc/metadata"

"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/log"
"github.com/opencloud-eu/opencloud/pkg/registry"
v0 "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/messages/search/v0"
Expand Down Expand Up @@ -115,7 +116,8 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error)

// setup event handling

stream, err := raw.FromConfig(context.Background(), cfg.Service.Name, raw.Config{
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
stream, err := raw.FromConfig(context.Background(), connName, raw.Config{
Endpoint: cfg.Events.Endpoint,
Cluster: cfg.Events.Cluster,
EnableTLS: cfg.Events.EnableTLS,
Expand Down
Loading