Skip to content

Commit

Permalink
Refactored server services to their own modules. (Velocidex#531)
Browse files Browse the repository at this point in the history
Server services are now broken into interfaces.  A service manager ensures all services are stopped when quitting.
  • Loading branch information
scudette authored Aug 3, 2020
1 parent 097e1d6 commit f0a9d04
Show file tree
Hide file tree
Showing 53 changed files with 2,277 additions and 1,712 deletions.
2 changes: 1 addition & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (self *ApiServer) CollectArtifact(
return nil, err
}

flow_id, err := services.ScheduleArtifactCollection(
flow_id, err := services.GetLauncher().ScheduleArtifactCollection(
ctx, self.config, in.Creator, repository, in)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion api/client_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ func getClientMonitoringState(config_obj *config_proto.Config) (
func setClientMonitoringState(
config_obj *config_proto.Config,
args *flows_proto.ArtifactCollectorArgs) error {
return services.UpdateClientEventTable(config_obj, args)
return services.ClientEventManager().UpdateClientEventTable(
config_obj, args)
}
2 changes: 1 addition & 1 deletion api/server_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func setServerMonitoringState(
return err
}

err = services.GlobalEventTable.Update(config_obj, args)
err = services.GetServerEventManager().Update(config_obj, args)
if err != nil {
return err
}
Expand Down
12 changes: 8 additions & 4 deletions bin/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
logging "www.velocidex.com/golang/velociraptor/logging"
"www.velocidex.com/golang/velociraptor/server"
"www.velocidex.com/golang/velociraptor/services"
vql_subsystem "www.velocidex.com/golang/velociraptor/vql"
)

Expand Down Expand Up @@ -153,10 +154,6 @@ func doArtifactCollect() {
_, err = getRepository(config_obj)
kingpin.FatalIfError(err, "Loading extra artifacts")

wg, _, cancel := startEssentialServices(config_obj)
defer wg.Wait()
defer cancel()

now := time.Now()
defer func() {
logging.GetLogger(config_obj, &logging.ToolComponent).
Expand Down Expand Up @@ -196,6 +193,13 @@ func doArtifactCollect() {
&logging.ToolComponent)
}

ctx := InstallSignalHandler(scope)
sm := services.NewServiceManager(ctx, config_obj)
defer sm.Close()

err = startEssentialServices(config_obj, sm)
kingpin.FatalIfError(err, "Starting services.")

query := `
SELECT * FROM collect(artifacts=Artifacts, output=Output, report=Report,
password=Password, args=Args, format=Format)`
Expand Down
48 changes: 10 additions & 38 deletions bin/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,14 @@
package main

import (
"context"
"sync"

"github.com/sirupsen/logrus"
kingpin "gopkg.in/alecthomas/kingpin.v2"
"www.velocidex.com/golang/velociraptor/api"
"www.velocidex.com/golang/velociraptor/frontend"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/gui/assets"
"www.velocidex.com/golang/velociraptor/logging"
"www.velocidex.com/golang/velociraptor/server"
"www.velocidex.com/golang/velociraptor/services"

config_proto "www.velocidex.com/golang/velociraptor/config/proto"
)

var (
Expand All @@ -49,24 +44,21 @@ func doFrontend() {
WithRequiredLogging().LoadAndValidate()
kingpin.FatalIfError(err, "Unable to load config file")

// Use both context and WaitGroup to control life time of
// services.
wg := &sync.WaitGroup{}
ctx, cancel := install_sig_handler()
defer cancel()

server, err := startFrontend(ctx, wg, config_obj)
sm := services.NewServiceManager(ctx, config_obj)
defer sm.Close()

server, err := startFrontend(sm, config_obj)
kingpin.FatalIfError(err, "startFrontend")
defer server.Close()

// Wait here until everything is done.
wg.Wait()
sm.Wg.Wait()
}

// Start the frontend service.
func startFrontend(
ctx context.Context,
wg *sync.WaitGroup,
func startFrontend(sm *services.Service,
config_obj *config_proto.Config) (*api.Builder, error) {

logger := logging.GetLogger(config_obj, &logging.FrontendComponent)
Expand All @@ -91,28 +83,8 @@ func startFrontend(
// Increase resource limits.
server.IncreaseLimits(config_obj)

// Start critical services first.
err = services.StartJournalService(config_obj)
if err != nil {
return nil, err
}

// Start the frontend service if needed.
err = frontend.StartFrontendService(ctx, config_obj, *frontend_node)
if err != nil {
return nil, err
}

// These services must start on all frontends
err = services.StartFrontendServices(ctx, wg, config_obj)
if err != nil {
logger.Error("Failed starting services: ", err)
return nil, err
}

// Start all services that are supposed to run on this
// frontend.
err = services.StartServices(ctx, wg, config_obj)
err = server.StartFrontendServices(config_obj, sm, *frontend_node)
if err != nil {
logger.Error("Failed starting services: ", err)
return nil, err
Expand All @@ -125,13 +97,13 @@ func startFrontend(

// Start the gRPC API server.
if config_obj.Frontend.ServerServices.ApiServer {
err = server_builder.WithAPIServer(ctx, wg)
err = server_builder.WithAPIServer(sm.Ctx, sm.Wg)
if err != nil {
return nil, err
}
}

return server_builder, server_builder.StartServer(ctx, wg)
return server_builder, server_builder.StartServer(sm.Ctx, sm.Wg)
}

func init() {
Expand Down
31 changes: 12 additions & 19 deletions bin/golden.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
artifacts "www.velocidex.com/golang/velociraptor/artifacts"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/constants"
flows_proto "www.velocidex.com/golang/velociraptor/flows/proto"
logging "www.velocidex.com/golang/velociraptor/logging"
"www.velocidex.com/golang/velociraptor/reporting"
"www.velocidex.com/golang/velociraptor/services"
Expand Down Expand Up @@ -65,31 +64,29 @@ type testFixture struct {
func vqlCollectorArgsFromFixture(
config_obj *config_proto.Config,
fixture *testFixture) *actions_proto.VQLCollectorArgs {
artifact_collector_args := &flows_proto.ArtifactCollectorArgs{
Parameters: &flows_proto.ArtifactParameters{},
}

vql_collector_args := &actions_proto.VQLCollectorArgs{}
for k, v := range fixture.Parameters {
artifact_collector_args.Parameters.Env = append(
artifact_collector_args.Parameters.Env,
vql_collector_args.Env = append(vql_collector_args.Env,
&actions_proto.VQLEnv{Key: k, Value: v})
}

vql_collector_args := &actions_proto.VQLCollectorArgs{}
err := services.AddArtifactCollectorArgs(
config_obj,
vql_collector_args,
artifact_collector_args)
kingpin.FatalIfError(err, "vqlCollectorArgsFromFixture")

return vql_collector_args
}

func runTest(fixture *testFixture,
config_obj *config_proto.Config) (string, error) {

err := services.StartJournalService(config_obj)
kingpin.FatalIfError(err, "Unable to start services")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()

sm := services.NewServiceManager(ctx, config_obj)
defer sm.Close()

err := startEssentialServices(config_obj, sm)
if err != nil {
return "", err
}

// Create an output container.
tmpfile, err := ioutil.TempFile("", "golden")
Expand Down Expand Up @@ -139,10 +136,6 @@ func runTest(fixture *testFixture,
return "", err
}

ctx, cancel := context.WithTimeout(
context.Background(), 60*time.Second)
defer cancel()

result_chan := vfilter.GetResponseChannel(
vql, ctx, scope,
vql_subsystem.MarshalJsonIndent(scope),
Expand Down
46 changes: 31 additions & 15 deletions bin/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"io"
"log"
"os"
"sync"

"github.com/Velocidex/ordereddict"
kingpin "gopkg.in/alecthomas/kingpin.v2"
Expand All @@ -36,6 +35,9 @@ import (
logging "www.velocidex.com/golang/velociraptor/logging"
"www.velocidex.com/golang/velociraptor/reporting"
"www.velocidex.com/golang/velociraptor/services"
"www.velocidex.com/golang/velociraptor/services/inventory"
"www.velocidex.com/golang/velociraptor/services/journal"
"www.velocidex.com/golang/velociraptor/services/launcher"
"www.velocidex.com/golang/velociraptor/uploads"
"www.velocidex.com/golang/velociraptor/utils"
vql_subsystem "www.velocidex.com/golang/velociraptor/vql"
Expand Down Expand Up @@ -190,18 +192,30 @@ func doRemoteQuery(
}
}

func startEssentialServices(config_obj *config_proto.Config) (
wg *sync.WaitGroup, ctx context.Context, cancel func()) {

wg = &sync.WaitGroup{}
ctx, cancel = context.WithCancel(context.Background())
func startEssentialServices(config_obj *config_proto.Config, sm *services.Service) error {
err := sm.Start(launcher.StartLauncherService)
if err != nil {
return err
}

if config_obj.Datastore != nil {
_ = services.StartJournalService(config_obj)
_ = services.StartNotificationService(ctx, wg, config_obj)
_ = services.StartInventoryService(ctx, wg, config_obj)
err = sm.Start(journal.StartJournalService)
if err != nil {
return err
}
err = sm.Start(services.StartNotificationService)
if err != nil {
return err
}

err = sm.Start(inventory.StartInventoryService)
if err != nil {
return err
}

}
return wg, ctx, cancel

return nil
}

func doQuery() {
Expand All @@ -220,10 +234,6 @@ func doQuery() {
return
}

wg, ctx, cancel := startEssentialServices(config_obj)
defer wg.Wait()
defer cancel()

repository, err := artifacts.GetGlobalRepository(config_obj)
kingpin.FatalIfError(err, "Artifact GetGlobalRepository ")

Expand Down Expand Up @@ -261,7 +271,13 @@ func doQuery() {
// Install throttler into the scope.
vfilter.InstallThrottler(scope, vfilter.NewTimeThrottler(float64(*rate)))

ctx = InstallSignalHandler(scope)
ctx := InstallSignalHandler(scope)

sm := services.NewServiceManager(ctx, config_obj)
defer sm.Close()

err = startEssentialServices(config_obj, sm)
kingpin.FatalIfError(err, "Starting services.")

if *trace_vql_flag {
scope.Tracer = log.New(os.Stderr, "VQL Trace: ", 0)
Expand Down
10 changes: 6 additions & 4 deletions bin/server_service_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"time"

"context"
Expand All @@ -38,6 +37,7 @@ import (
kingpin "gopkg.in/alecthomas/kingpin.v2"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
logging "www.velocidex.com/golang/velociraptor/logging"
"www.velocidex.com/golang/velociraptor/services"
"www.velocidex.com/golang/velociraptor/utils"
)

Expand Down Expand Up @@ -446,20 +446,22 @@ func NewVelociraptorServerService(name string) (
continue
}

wg := &sync.WaitGroup{}
ctx, cancel := install_sig_handler()
defer cancel()

sm := services.NewServiceManager(ctx, config_obj)
defer sm.Close()

elog.Info(1, fmt.Sprintf("%s service started", name))
server, err := startFrontend(ctx, wg, config_obj)
server, err := startFrontend(sm, config_obj)
if err != nil {
elog.Info(1, fmt.Sprintf("%s service error", err))
return
}
defer server.Close()

// Wait here until everything is done.
wg.Wait()
sm.Wg.Wait()

return
}
Expand Down
Loading

0 comments on commit f0a9d04

Please sign in to comment.