Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 147 additions & 23 deletions services/search/pkg/command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,31 @@ import (
"fmt"
"os/signal"

"github.com/opencloud-eu/reva/v2/pkg/events/raw"
"github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool"
opensearchgo "github.com/opensearch-project/opensearch-go/v4"
opensearchgoAPI "github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"github.com/urfave/cli/v2"

"github.com/opencloud-eu/opencloud/pkg/config/configlog"
"github.com/opencloud-eu/opencloud/pkg/generators"
"github.com/opencloud-eu/opencloud/pkg/registry"
"github.com/opencloud-eu/opencloud/pkg/runner"
ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc"
"github.com/opencloud-eu/opencloud/pkg/tracing"
"github.com/opencloud-eu/opencloud/pkg/version"
"github.com/opencloud-eu/opencloud/services/search/pkg/bleve"
"github.com/opencloud-eu/opencloud/services/search/pkg/config"
"github.com/opencloud-eu/opencloud/services/search/pkg/config/parser"
"github.com/opencloud-eu/opencloud/services/search/pkg/content"
"github.com/opencloud-eu/opencloud/services/search/pkg/logging"
"github.com/opencloud-eu/opencloud/services/search/pkg/metrics"
"github.com/opencloud-eu/opencloud/services/search/pkg/opensearch"
bleveQuery "github.com/opencloud-eu/opencloud/services/search/pkg/query/bleve"
"github.com/opencloud-eu/opencloud/services/search/pkg/search"
"github.com/opencloud-eu/opencloud/services/search/pkg/server/debug"
"github.com/opencloud-eu/opencloud/services/search/pkg/server/grpc"
"github.com/urfave/cli/v2"
svcEvent "github.com/opencloud-eu/opencloud/services/search/pkg/service/event"
)

// Server is the entrypoint for the server command.
Expand Down Expand Up @@ -52,36 +65,147 @@ func Server(cfg *config.Config) *cli.Command {
mtrcs := metrics.New()
mtrcs.BuildInfo.WithLabelValues(version.GetString()).Set(1)

gr := runner.NewGroup()
// initialize search engine
var eng search.Engine
switch cfg.Engine.Type {
case "bleve":
idx, err := bleve.NewIndex(cfg.Engine.Bleve.Datapath)
if err != nil {
return err
}

grpcServer, teardown, err := grpc.Server(
grpc.Config(cfg),
grpc.Logger(logger),
grpc.Name(cfg.Service.Name),
grpc.Context(ctx),
grpc.Metrics(mtrcs),
grpc.JWTSecret(cfg.TokenManager.JWTSecret),
grpc.TraceProvider(traceProvider),
)
defer teardown()
defer func() {
if err = idx.Close(); err != nil {
logger.Error().Err(err).Msg("could not close bleve index")
}
}()

eng = bleve.NewBackend(idx, bleveQuery.DefaultCreator, logger)
case "open-search":
client, err := opensearchgoAPI.NewClient(opensearchgoAPI.Config{
Client: opensearchgo.Config{
Addresses: cfg.Engine.OpenSearch.Client.Addresses,
Username: cfg.Engine.OpenSearch.Client.Username,
Password: cfg.Engine.OpenSearch.Client.Password,
Header: cfg.Engine.OpenSearch.Client.Header,
CACert: cfg.Engine.OpenSearch.Client.CACert,
RetryOnStatus: cfg.Engine.OpenSearch.Client.RetryOnStatus,
DisableRetry: cfg.Engine.OpenSearch.Client.DisableRetry,
EnableRetryOnTimeout: cfg.Engine.OpenSearch.Client.EnableRetryOnTimeout,
MaxRetries: cfg.Engine.OpenSearch.Client.MaxRetries,
CompressRequestBody: cfg.Engine.OpenSearch.Client.CompressRequestBody,
DiscoverNodesOnStart: cfg.Engine.OpenSearch.Client.DiscoverNodesOnStart,
DiscoverNodesInterval: cfg.Engine.OpenSearch.Client.DiscoverNodesInterval,
EnableMetrics: cfg.Engine.OpenSearch.Client.EnableMetrics,
EnableDebugLogger: cfg.Engine.OpenSearch.Client.EnableDebugLogger,
},
})
if err != nil {
return fmt.Errorf("failed to create OpenSearch client: %w", err)
}

openSearchBackend, err := opensearch.NewBackend(cfg.Engine.OpenSearch.ResourceIndex.Name, client)
if err != nil {
return fmt.Errorf("failed to create OpenSearch backend: %w", err)
}

eng = openSearchBackend
default:
return fmt.Errorf("unknown search engine: %s", cfg.Engine.Type)
}

// initialize gateway selector
selector, err := pool.GatewaySelector(cfg.Reva.Address, pool.WithRegistry(registry.GetRegistry()), pool.WithTracerProvider(traceProvider))
if err != nil {
logger.Info().Err(err).Str("transport", "grpc").Msg("Failed to initialize server")
logger.Fatal().Err(err).Msg("could not get reva gateway selector")
return err
}

gr.Add(runner.NewGoMicroGrpcServerRunner(cfg.Service.Name+".grpc", grpcServer))
// initialize search content extractor
var extractor content.Extractor
switch cfg.Extractor.Type {
case "basic":
if extractor, err = content.NewBasicExtractor(logger); err != nil {
return err
}
case "tika":
if extractor, err = content.NewTikaExtractor(selector, logger, cfg); err != nil {
return err
}
default:
return fmt.Errorf("unknown search extractor: %s", cfg.Extractor.Type)
}

ss := search.NewService(selector, eng, extractor, mtrcs, logger, cfg)

debugServer, err := debug.Server(
debug.Logger(logger),
debug.Context(ctx),
debug.Config(cfg),
)
if err != nil {
logger.Info().Err(err).Str("transport", "debug").Msg("Failed to initialize server")
return err
// setup the servers
gr := runner.NewGroup()

if !cfg.GRPC.Disabled {
grpcServer, err := grpc.Server(
grpc.Config(cfg),
grpc.Logger(logger),
grpc.Name(cfg.Service.Name),
grpc.Context(ctx),
grpc.Metrics(mtrcs),
grpc.JWTSecret(cfg.TokenManager.JWTSecret),
grpc.TraceProvider(traceProvider),
grpc.GatewaySelector(selector),
grpc.Searcher(ss),
)
if err != nil {
logger.Error().Err(err).Str("transport", "grpc").Msg("Failed to initialize server")
return err
}

gr.Add(runner.NewGoMicroGrpcServerRunner(cfg.Service.Name+".grpc", grpcServer))
} else {
logger.Info().Msg("gRPC server disabled, not starting gRPC service")
}

gr.Add(runner.NewGolangHttpServerRunner(cfg.Service.Name+".debug", debugServer))
if !cfg.Events.Disabled {
connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus)
bus, err := raw.FromConfig(context.Background(), connName, raw.Config{
Endpoint: cfg.Events.Endpoint,
Cluster: cfg.Events.Cluster,
EnableTLS: cfg.Events.EnableTLS,
TLSInsecure: cfg.Events.TLSInsecure,
TLSRootCACertificate: cfg.Events.TLSRootCACertificate,
AuthUsername: cfg.Events.AuthUsername,
AuthPassword: cfg.Events.AuthPassword,
MaxAckPending: cfg.Events.MaxAckPending,
AckWait: cfg.Events.AckWait,
})

eventSvc, err := svcEvent.New(ctx, bus, logger, traceProvider, mtrcs, ss, cfg.Events.DebounceDuration, cfg.Events.NumConsumers, cfg.Events.AsyncUploads)
if err != nil {
logger.Error().Err(err).Str("transport", "event").Msg("Failed to initialize server")
return err
}

gr.Add(runner.New(cfg.Service.Name+".svc", func() error {
return eventSvc.Run()
}, func() {
eventSvc.Close()
}))
} else {
logger.Info().Msg("event listening disabled, not starting event service")
}

// always start a debug server
{
debugServer, err := debug.Server(
debug.Logger(logger),
debug.Context(ctx),
debug.Config(cfg),
)
if err != nil {
logger.Error().Err(err).Str("transport", "debug").Msg("Failed to initialize server")
return err
}

gr.Add(runner.NewGolangHttpServerRunner(cfg.Service.Name+".debug", debugServer))
}

grResults := gr.Run(ctx)

Expand Down
1 change: 1 addition & 0 deletions services/search/pkg/config/defaults/defaultconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func DefaultConfig() *config.Config {
Cluster: "opencloud-cluster",
DebounceDuration: 1000,
AsyncUploads: true,
NumConsumers: 1,
EnableTLS: false,
MaxAckPending: 1000,
AckWait: 1 * time.Minute,
Expand Down
1 change: 1 addition & 0 deletions services/search/pkg/config/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import "github.com/opencloud-eu/opencloud/pkg/shared"

// GRPCConfig defines the available grpc configuration.
type GRPCConfig struct {
Disabled bool `yaml:"disabled" env:"SEARCH_GRPC_DISABLED" desc:"Disables the GRPC service. Set this to true if the service should only handle events." introductionVersion:"%%NEXT%%"`
Addr string `yaml:"addr" env:"SEARCH_GRPC_ADDR" desc:"The bind address of the GRPC service." introductionVersion:"1.0.0"`
Namespace string `yaml:"-"`
TLS *shared.GRPCServiceTLS `yaml:"tls"`
Expand Down
3 changes: 2 additions & 1 deletion services/search/pkg/config/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import "time"

// Events combines the configuration options for the event bus.
type Events struct {
Disabled bool `yaml:"disabled" env:"SEARCH_EVENTS_DISABLED" desc:"Disables listening for events. Set this to true if the service should only handle GRPC requests." introductionVersion:"%%NEXT%%"`
Endpoint string `yaml:"endpoint" env:"OC_EVENTS_ENDPOINT;SEARCH_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture." introductionVersion:"1.0.0"`
Cluster string `yaml:"cluster" env:"OC_EVENTS_CLUSTER;SEARCH_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system." introductionVersion:"1.0.0"`
AsyncUploads bool `yaml:"async_uploads" env:"OC_ASYNC_UPLOADS;SEARCH_EVENTS_ASYNC_UPLOADS" desc:"Enable asynchronous file uploads." introductionVersion:"1.0.0"`
NumConsumers int `yaml:"num_consumers" env:"SEARCH_EVENTS_NUM_CONSUMERS" desc:"The amount of concurrent event consumers to start. Event consumers are used for searching files. Multiple consumers increase parallelisation, but will also increase CPU and memory demands. The default value is 0." introductionVersion:"1.0.0"`
NumConsumers int `yaml:"num_consumers" env:"SEARCH_EVENTS_NUM_CONSUMERS" desc:"The amount of concurrent event consumers to start. Event consumers are used for searching files. Multiple consumers increase parallelisation, but will also increase CPU and memory demands." introductionVersion:"1.0.0"`
DebounceDuration int `yaml:"debounce_duration" env:"SEARCH_EVENTS_REINDEX_DEBOUNCE_DURATION" desc:"The duration in milliseconds the reindex debouncer waits before triggering a reindex of a space that was modified." introductionVersion:"1.0.0"`

TLSInsecure bool `yaml:"tls_insecure" env:"OC_INSECURE;SEARCH_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates." introductionVersion:"1.0.0"`
Expand Down
144 changes: 0 additions & 144 deletions services/search/pkg/search/events.go

This file was deleted.

Loading