From 7dea2c23165dccd3cdc5b24d17aee821442d4535 Mon Sep 17 00:00:00 2001 From: Mike Cohen Date: Sat, 12 Sep 2020 16:37:03 +1000 Subject: [PATCH] Added a cache VQL plugin which stores data for a time. (#633) The cache function produces a scope lifetime cache. It takes 2 parameters, a func and a key. The key is used to index into the cache, while the func will be evaluated when the cache misses. The following is an example of how to use the cache: ``` LET Foo(X) = if(condition=log(message=format(format="I Ran with %v", args=X)), then=X + 5) SELECT cache(func=Foo(X=5), key=5), cache(func=Foo(X=10), key=5), cache(func=Foo(X=10), key=10) FROM scope() ``` This will produce a row with (10, 10, 15) - the middle item is cached because its key is the same as the first item. --- api/api.go | 13 +- .../definitions/Generic/System/Pstree.yaml | 25 ++- .../Windows/Events/ProcessCreation.yaml | 28 ++- .../server/testcases/functions.in.yaml | 8 + .../server/testcases/functions.out.yaml | 6 + bin/artifacts.go | 5 +- bin/golden.go | 6 +- flows/api.go | 7 +- flows/artifacts.go | 14 +- flows/artifacts_test.go | 10 +- flows/foreman.go | 7 +- flows/hunts.go | 17 +- flows/monitoring.go | 7 +- go.mod | 2 +- go.sum | 7 +- .../artifact/linechart-directive.js | 2 +- .../angular-components/core/api-service.js | 5 +- gui/static/package-lock.json | 6 +- gui/static/package.json | 2 +- result_sets/result_sets.go | 7 +- server/enroll.go | 7 +- server/server_test.go | 10 +- .../client_monitoring/client_monitoring.go | 24 +- services/frontend/frontend.go | 7 +- services/hunt_manager/hunt_manager.go | 34 ++- services/hunt_manager/hunt_manager_test.go | 59 ++++- services/interrogation/interrogation.go | 14 +- services/interrogation/interrogation_test.go | 12 +- services/interrogation/utils.go | 7 +- services/inventory/inventory_test.go | 9 +- services/journal.go | 9 +- services/journal/journal.go | 4 +- services/labels/labels.go | 13 +- services/launcher.go | 9 +- services/launcher/artifacts_test.go | 12 +- services/launcher/launcher_test.go | 30 ++- services/notifications/notifications.go | 20 +- services/repository/artifacts_test.go | 40 ++++ services/repository/manager.go | 8 +- services/repository/plugin.go | 8 +- services/repository/repository.go | 1 + services/server_artifacts/server_artifacts.go | 14 +- services/server_artifacts/server_uploader.go | 7 +- services/test_utils.go | 6 +- services/vfs_service/utils.go | 7 +- services/vfs_service/vfs_service_test.go | 7 +- startup/startup.go | 13 +- vql/common/cache.go | 212 ++++++++++++++++++ vql/functions/functions.go | 6 +- vql/server/artifacts.go | 7 +- vql/server/hunt.go | 2 +- vql/server/monitoring.go | 9 +- vql/server/repository.go | 8 +- vql/tools/collector.go | 10 +- 54 files changed, 712 insertions(+), 117 deletions(-) create mode 100644 vql/common/cache.go diff --git a/api/api.go b/api/api.go index eca102e1103..c5524464c1e 100644 --- a/api/api.go +++ b/api/api.go @@ -189,8 +189,12 @@ func (self *ApiServer) CollectArtifact( if err != nil { return nil, err } + launcher, err := services.GetLauncher() + if err != nil { + return nil, err + } - flow_id, err := services.GetLauncher().ScheduleArtifactCollection( + flow_id, err := launcher.ScheduleArtifactCollection( ctx, self.config, acl_manager, repository, in) if err != nil { return nil, err @@ -813,7 +817,12 @@ func (self *ApiServer) WriteEvent( // Only return the first row if true { - err := services.GetJournal().PushRowsToArtifact(self.config, + journal, err := services.GetJournal() + if err != nil { + return nil, err + } + + err = journal.PushRowsToArtifact(self.config, rows, in.Query.Name, peer_name, "") return &empty.Empty{}, err } diff --git a/artifacts/definitions/Generic/System/Pstree.yaml b/artifacts/definitions/Generic/System/Pstree.yaml index 082bb0d24c5..7770153ab56 100644 --- a/artifacts/definitions/Generic/System/Pstree.yaml +++ b/artifacts/definitions/Generic/System/Pstree.yaml @@ -24,8 +24,23 @@ sources: // through it many times. LET processes <= SELECT Name, Pid, Ppid FROM pslist() - // Recursive function to find process parent. - LET pstree(LookupPid) = SELECT * FROM foreach( + LET m <= memoize(query={ SELECT Name, Pid, Ppid FROM pslist()}, key="Pid", period=1) + + // Recursive function to find process parent (clients > 0.4.9). + LET pstree_memoized(LookupPid) = SELECT * FROM foreach( + row=get(item=m, field=LookupPid), + query={ + SELECT * FROM chain( + a={ + SELECT Name, Pid, Ppid FROM scope() + }, + b={ + SELECT Name, Pid, Ppid FROM pstree(LookupPid=Ppid) + }) + }) + + // Recursive function to find process parent (clients < 0.4.9). + LET pstree_legacy(LookupPid) = SELECT * FROM foreach( row={ SELECT Name, Pid, Ppid FROM processes WHERE Pid = LookupPid @@ -40,6 +55,12 @@ sources: }) }) + // Select which version we should use + LET pstree(LookupPid) = SELECT * FROM if( + condition=version(function="memoize") >= 0, + then=pstree_memoized, + else=pstree_legacy) + SELECT Name, Pid, Ppid, join(array=pstree(LookupPid=Pid).Name, sep=CallChainSep) AS CallChain FROM processes diff --git a/artifacts/definitions/Windows/Events/ProcessCreation.yaml b/artifacts/definitions/Windows/Events/ProcessCreation.yaml index 328d009d8f6..abcd34fa944 100644 --- a/artifacts/definitions/Windows/Events/ProcessCreation.yaml +++ b/artifacts/definitions/Windows/Events/ProcessCreation.yaml @@ -13,10 +13,29 @@ parameters: sources: - precondition: SELECT OS From info() where OS = 'windows' - queries: - - | + query: | + // Get information about the process + LET get_pid_query(Lpid) = SELECT Pid, Ppid, Name FROM if(condition=Lpid > 0, + then={ + SELECT Pid, Ppid, Name FROM pslist(pid=Lpid) + }) + + // Build the call chain. Cache the results for a short time. + LET pstree(LookupPid) = SELECT * FROM foreach( + row=cache(func=get_pid_query(Lpid=LookupPid), key=str(str=LookupPid)), + query={ + SELECT * FROM chain( + a={ + SELECT Pid, Ppid, Name FROM scope() + }, b={ + SELECT Pid, Ppid, Name FROM pstree(LookupPid=Ppid) + }) + }) + + LET call_chain(LookupPid) = SELECT Pid, Ppid, Name FROM pstree(LookupPid=LookupPid) + // Convert the timestamp from WinFileTime to Epoch. - SELECT timestamp(epoch=atoi(string=Parse.TIME_CREATED) / 10000000 - 11644473600 ) as Timestamp, + SELECT timestamp(winfiletime=atoi(string=Parse.TIME_CREATED)) as Timestamp, Parse.ParentProcessID as PPID, Parse.ProcessID as PID, Parse.ProcessName as Name, { @@ -24,7 +43,8 @@ sources: } AS CommandLine, { SELECT CommandLine FROM pslist(pid=Parse.ParentProcessID) - } AS ParentInfo + } AS ParentInfo, + join(array=call_chain(LookupPid=Parse.ProcessID).Name, sep=" <- ") AS CallChain FROM wmi_events( query=eventQuery, wait=5000000, // Do not time out. diff --git a/artifacts/testdata/server/testcases/functions.in.yaml b/artifacts/testdata/server/testcases/functions.in.yaml index 242dd1fe39d..5e03b4e2839 100644 --- a/artifacts/testdata/server/testcases/functions.in.yaml +++ b/artifacts/testdata/server/testcases/functions.in.yaml @@ -15,3 +15,11 @@ Queries: WHERE _value = 3 }) }) AS Filtered FROM scope() + + # Test cache functions (first 2 should be same value due to caching) + - LET Foo(X) = if(condition=log(message=format(format="I Ran with %v", args=X)), + then=X + 5) + - SELECT cache(func=Foo(X=5), key=5), + cache(func=Foo(X=10), key=5), + cache(func=Foo(X=10), key=10) + FROM scope() diff --git a/artifacts/testdata/server/testcases/functions.out.yaml b/artifacts/testdata/server/testcases/functions.out.yaml index b5ba5f5e6d6..e3ed480712f 100644 --- a/artifacts/testdata/server/testcases/functions.out.yaml +++ b/artifacts/testdata/server/testcases/functions.out.yaml @@ -11,4 +11,10 @@ LET rows <= SELECT * FROM scope()[]SELECT len(list=["a", "b"]), len(list="hello" "Z": 3 } } +]LET Foo(X) = if(condition=log(message=format(format="I Ran with %v", args=X)), then=X + 5)[]SELECT cache(func=Foo(X=5), key=5), cache(func=Foo(X=10), key=5), cache(func=Foo(X=10), key=10) FROM scope()[ + { + "cache(func=Foo(X=5), key=5)": 10, + "cache(func=Foo(X=10), key=5)": 10, + "cache(func=Foo(X=10), key=10)": 15 + } ] \ No newline at end of file diff --git a/bin/artifacts.go b/bin/artifacts.go index 9f4cf2fdd6d..7add1476cb9 100644 --- a/bin/artifacts.go +++ b/bin/artifacts.go @@ -242,7 +242,10 @@ func doArtifactList() { continue } - request, err := services.GetLauncher().CompileCollectorArgs( + launcher, err := services.GetLauncher() + kingpin.FatalIfError(err, "GetLauncher") + + request, err := launcher.CompileCollectorArgs( ctx, config_obj, vql_subsystem.NullACLManager{}, repository, &flows_proto.ArtifactCollectorArgs{ Artifacts: []string{artifact.Name}, diff --git a/bin/golden.go b/bin/golden.go index 619bc90a80b..4c961feb6d1 100644 --- a/bin/golden.go +++ b/bin/golden.go @@ -110,14 +110,16 @@ func makeCtxWithTimeout(duration int) (context.Context, func()) { p := pprof.Lookup("goroutines") if p != nil { - p.WriteTo(os.Stderr, 1) + p.WriteTo(os.Stdout, 1) } p = pprof.Lookup("mutex") if p != nil { - p.WriteTo(os.Stderr, 1) + p.WriteTo(os.Stdout, 1) } + os.Stdout.Close() + // Hard exit with an error. os.Exit(-1) } diff --git a/flows/api.go b/flows/api.go index 344ac3c6e72..bfed94b2e8e 100644 --- a/flows/api.go +++ b/flows/api.go @@ -266,9 +266,14 @@ func ArchiveFlow( Set("Timestamp", time.Now().UTC().Unix()). Set("Flow", collection_context) + journal, err := services.GetJournal() + if err != nil { + return nil, err + } + return &api_proto.StartFlowResponse{ FlowId: flow_id, - }, services.GetJournal().PushRowsToArtifact(config_obj, + }, journal.PushRowsToArtifact(config_obj, []*ordereddict.Dict{row}, "System.Flow.Archive", client_id, flow_id) } diff --git a/flows/artifacts.go b/flows/artifacts.go index 61bb72af830..bf017807367 100644 --- a/flows/artifacts.go +++ b/flows/artifacts.go @@ -125,7 +125,12 @@ func closeContext( Set("FlowId", collection_context.SessionId). Set("ClientId", collection_context.ClientId) - return services.GetJournal().PushRowsToArtifact(config_obj, + journal, err := services.GetJournal() + if err != nil { + return err + } + + return journal.PushRowsToArtifact(config_obj, []*ordereddict.Dict{row}, "System.Flow.Completion", collection_context.ClientId, collection_context.SessionId, @@ -540,7 +545,12 @@ func appendUploadDataToFile( Set("Size", file_buffer.Size). Set("UploadedSize", file_buffer.StoredSize) - return services.GetJournal().PushRowsToArtifact(config_obj, + journal, err := services.GetJournal() + if err != nil { + return err + } + + return journal.PushRowsToArtifact(config_obj, []*ordereddict.Dict{row}, "System.Upload.Completion", message.Source, collection_context.SessionId, ) diff --git a/flows/artifacts_test.go b/flows/artifacts_test.go index 479d0138f36..1e4b1b9447d 100644 --- a/flows/artifacts_test.go +++ b/flows/artifacts_test.go @@ -94,7 +94,10 @@ func (self *TestSuite) TestRetransmission() { // Schedule a new flow. ctx := context.Background() - flow_id, err := services.GetLauncher().ScheduleArtifactCollection( + launcher, err := services.GetLauncher() + assert.NoError(self.T(), err) + + flow_id, err := launcher.ScheduleArtifactCollection( ctx, self.config_obj, vql_subsystem.NullACLManager{}, repository, request) @@ -153,7 +156,10 @@ func (self *TestSuite) TestResourceLimits() { // Schedule a new flow. ctx := context.Background() - flow_id, err := services.GetLauncher().ScheduleArtifactCollection( + launcher, err := services.GetLauncher() + assert.NoError(self.T(), err) + + flow_id, err := launcher.ScheduleArtifactCollection( ctx, self.config_obj, vql_subsystem.NullACLManager{}, diff --git a/flows/foreman.go b/flows/foreman.go index a1a6c814df5..e30a253fbb6 100644 --- a/flows/foreman.go +++ b/flows/foreman.go @@ -99,8 +99,13 @@ func ForemanProcessMessage( return nil } + journal, err := services.GetJournal() + if err != nil { + return err + } + // Notify the hunt manager that we need to hunt this client. - err := services.GetJournal().PushRowsToArtifact(config_obj, + err = journal.PushRowsToArtifact(config_obj, []*ordereddict.Dict{ordereddict.NewDict(). Set("HuntId", hunt.HuntId). Set("ClientId", client_id). diff --git a/flows/hunts.go b/flows/hunts.go index 1f1b7ee4017..43fc4a1e662 100644 --- a/flows/hunts.go +++ b/flows/hunts.go @@ -126,9 +126,13 @@ func CreateHunt( // time. This ensures that if the artifact definition is // changed after this point, the hunt will continue to // schedule consistent VQL on the clients. - compiled, err := services.GetLauncher(). - CompileCollectorArgs( - ctx, config_obj, acl_manager, repository, hunt.StartRequest) + launcher, err := services.GetLauncher() + if err != nil { + return "", err + } + + compiled, err := launcher.CompileCollectorArgs( + ctx, config_obj, acl_manager, repository, hunt.StartRequest) if err != nil { return "", err } @@ -272,7 +276,12 @@ func ModifyHunt( Set("Hunt", hunt). Set("User", user) - err := services.GetJournal().PushRowsToArtifact(config_obj, + journal, err := services.GetJournal() + if err != nil { + return err + } + + err = journal.PushRowsToArtifact(config_obj, []*ordereddict.Dict{row}, "System.Hunt.Archive", "server", hunt_modification.HuntId) if err != nil { diff --git a/flows/monitoring.go b/flows/monitoring.go index 1c720ff0459..7428007da11 100644 --- a/flows/monitoring.go +++ b/flows/monitoring.go @@ -54,7 +54,12 @@ func MonitoringProcessMessage( for _, row := range rows { row.Set("ClientId", message.Source) } - return services.GetJournal().PushRowsToArtifact( + journal, err := services.GetJournal() + if err != nil { + return err + } + + return journal.PushRowsToArtifact( config_obj, rows, response.Query.Name, message.Source, message.SessionId) diff --git a/go.mod b/go.mod index 2151e4f1eba..69e1ab16e59 100644 --- a/go.mod +++ b/go.mod @@ -114,7 +114,7 @@ require ( www.velocidex.com/golang/go-prefetch v0.0.0-20200722101157-37e4751dd5ca www.velocidex.com/golang/oleparse v0.0.0-20190327031422-34195d413196 www.velocidex.com/golang/regparser v0.0.0-20190625082115-b02dc43c2500 - www.velocidex.com/golang/vfilter v0.0.0-20200903172553-c41828760309 + www.velocidex.com/golang/vfilter v0.0.0-20200911062452-cf0fe8a4de78 www.velocidex.com/golang/vtypes v0.0.0-20180924145839-b0d509f8925b ) diff --git a/go.sum b/go.sum index 3546201a69d..5999e74df7d 100644 --- a/go.sum +++ b/go.sum @@ -70,7 +70,6 @@ github.com/Velocidex/go-yara v1.1.10-0.20200414034554-457848df11f9 h1:zatH9PNN7I github.com/Velocidex/go-yara v1.1.10-0.20200414034554-457848df11f9/go.mod h1:N1A2MzBKorQm3WixuPUSm4gmzGA6i5sYrwHyUBvY5lg= github.com/Velocidex/json v0.0.0-20200724131328-8f5c7b0a25ec h1:ep1GlQfQtVZsn8yaUwILZxlK1Sw13dYhsOT3bEhZ8HE= github.com/Velocidex/json v0.0.0-20200724131328-8f5c7b0a25ec/go.mod h1:ukJBuruT9b24pdgZwWDvOaCYHeS03B7oQPCUWh25bwM= -github.com/Velocidex/ordereddict v0.0.0-20191103011020-3b5a5f6957d4/go.mod h1:pxJpvN5ISMtDwrdIdqnJ3ZrjIngCw+WT6gfNil6Zjvo= github.com/Velocidex/ordereddict v0.0.0-20191106020901-97c468e5e403/go.mod h1:pxJpvN5ISMtDwrdIdqnJ3ZrjIngCw+WT6gfNil6Zjvo= github.com/Velocidex/ordereddict v0.0.0-20200723153557-9460a6764ab8 h1:wRt5CLsj5hS3OpJjQYJMuUrmxKwRfUn76+4sqjfmNbA= github.com/Velocidex/ordereddict v0.0.0-20200723153557-9460a6764ab8/go.mod h1:pxJpvN5ISMtDwrdIdqnJ3ZrjIngCw+WT6gfNil6Zjvo= @@ -762,9 +761,7 @@ www.velocidex.com/golang/oleparse v0.0.0-20190327031422-34195d413196 h1:3oYZ7hPN www.velocidex.com/golang/oleparse v0.0.0-20190327031422-34195d413196/go.mod h1:i7M+d4Vxir8nmDACh+c6CsUU1r1Wcj00aRgNp8mXcPQ= www.velocidex.com/golang/regparser v0.0.0-20190625082115-b02dc43c2500 h1:XqZddiAbjPIsTZcEPbqqqABS/ZV5SB7j33eczNsqD60= www.velocidex.com/golang/regparser v0.0.0-20190625082115-b02dc43c2500/go.mod h1:DVzloLH8L+oF3zma1Jisaat5bGF+4VLggDcYlIp00ns= -www.velocidex.com/golang/vfilter v0.0.0-20200903081613-6b20dc963172 h1:tmKvMO1zsjMfThcdyqWXoHkoOdvfY/iHhzaa93GnBOw= -www.velocidex.com/golang/vfilter v0.0.0-20200903081613-6b20dc963172/go.mod h1:mABF6rGkfq9qwvo2SppBxYonhnd8OPSA6rxzzl75A4Y= -www.velocidex.com/golang/vfilter v0.0.0-20200903172553-c41828760309 h1:mq63LAXMGM41TZAm9YTkC+N8ic+U+kUkvJDSJwj6OHc= -www.velocidex.com/golang/vfilter v0.0.0-20200903172553-c41828760309/go.mod h1:XlUeViBwZxeefhxbkxW2oGUVcB/oQfxtBgnxL9jLryg= +www.velocidex.com/golang/vfilter v0.0.0-20200911062452-cf0fe8a4de78 h1:9M+BvRRVYm4TCpA4UVGI/bf5vBhsGHnzYklH2OgigtI= +www.velocidex.com/golang/vfilter v0.0.0-20200911062452-cf0fe8a4de78/go.mod h1:XlUeViBwZxeefhxbkxW2oGUVcB/oQfxtBgnxL9jLryg= www.velocidex.com/golang/vtypes v0.0.0-20180924145839-b0d509f8925b h1:z5v5o1dhtzaxvlWm6qSTYZ4OTr56Ol2JpM1Y5Wu9zQE= www.velocidex.com/golang/vtypes v0.0.0-20180924145839-b0d509f8925b/go.mod h1:tXxIx8UJuI81Hoxcv0DTq2a1Pi1H6l1uCf4dhqUSUkw= diff --git a/gui/static/angular-components/artifact/linechart-directive.js b/gui/static/angular-components/artifact/linechart-directive.js index 36465f86a1a..5284079df46 100644 --- a/gui/static/angular-components/artifact/linechart-directive.js +++ b/gui/static/angular-components/artifact/linechart-directive.js @@ -76,7 +76,7 @@ exports.LineChartDirective = function() { elem = $(elem); var plot = null; - var placeholder = $("
").appendTo(elem); + var placeholder = $("
").appendTo(elem); placeholder.bind("plotselected", function (event, ranges) { $.each(plot.getXAxes(), function(_, axis) { var opts = axis.options; diff --git a/gui/static/angular-components/core/api-service.js b/gui/static/angular-components/core/api-service.js index acd92a053b8..b4a7f9d6014 100644 --- a/gui/static/angular-components/core/api-service.js +++ b/gui/static/angular-components/core/api-service.js @@ -288,7 +288,10 @@ ApiService.prototype.poll = function(apiPath, intervalMs, opt_params, if (cancelled) { return; } - result.reject(response); + // Do not cancel the polling if there is a server + // error. Polling normally updates the display and + // intermittant failures will cause the display to break. + // result.reject(response); }.bind(this)).finally(function() { if (cancelled) { return; diff --git a/gui/static/package-lock.json b/gui/static/package-lock.json index 05973a3860a..bac3b15708b 100644 --- a/gui/static/package-lock.json +++ b/gui/static/package-lock.json @@ -2756,9 +2756,9 @@ "integrity": "sha512-lNaHNVymajmk0OJMBn8fVUAU1BtDeKIqKoVhk4xAALB57aALg6b4W0MfJ/cUE0g9YBXy5XhSlPIpYIJ7HaY/3Q==" }, "flot": { - "version": "4.2.1", - "resolved": "https://registry.npmjs.org/flot/-/flot-4.2.1.tgz", - "integrity": "sha512-tnZ4Pg1upBSfSs52VVRPZqF8FUxeVVI124srTV/XNcwJR5ygjaCZuwnpkAuZAWFLZMg2OCHb53pB6VoVB7YT4g==" + "version": "3.2.13", + "resolved": "https://registry.npmjs.org/flot/-/flot-3.2.13.tgz", + "integrity": "sha512-ZJl8zazqgbn79YCdyzt9JV1r38Gk7Dkt+tBb5Kx1sMEDvLVz+ViwF/QTWKcYjyaPO+UW58FP+fFWZFp6dXeMAA==" }, "flush-write-stream": { "version": "1.1.1", diff --git a/gui/static/package.json b/gui/static/package.json index d3a1bd6a9f0..8b7f7a2200d 100644 --- a/gui/static/package.json +++ b/gui/static/package.json @@ -20,7 +20,7 @@ "datatables.net-buttons": "^1.6.3", "datatables.net-colreorder": "^1.5.2", "del": "^5.1.0", - "flot": "^4.2.1", + "flot": "^3.2.10", "font-awesome": "^4.7.0", "gulp-cli": "^2.3.0", "gulp-uglify": "^3.0.2", diff --git a/result_sets/result_sets.go b/result_sets/result_sets.go index 168a0417494..e25a4432e00 100644 --- a/result_sets/result_sets.go +++ b/result_sets/result_sets.go @@ -16,6 +16,7 @@ package result_sets import ( + "errors" "fmt" "github.com/Velocidex/json" @@ -29,7 +30,11 @@ import ( ) func GetArtifactMode(config_obj *config_proto.Config, artifact_name string) (int, error) { - repository, _ := services.GetRepositoryManager().GetGlobalRepository(config_obj) + manager := services.GetRepositoryManager() + if manager == nil { + return 0, errors.New("No Repository Manager") + } + repository, _ := manager.GetGlobalRepository(config_obj) artifact, pres := repository.Get(config_obj, artifact_name) if !pres { diff --git a/server/enroll.go b/server/enroll.go index 41134c498a5..07012df00e0 100644 --- a/server/enroll.go +++ b/server/enroll.go @@ -44,7 +44,12 @@ func enroll( return err } - return services.GetJournal().PushRowsToArtifact(config_obj, + journal, err := services.GetJournal() + if err != nil { + return err + } + + return journal.PushRowsToArtifact(config_obj, []*ordereddict.Dict{ordereddict.NewDict().Set("ClientId", client_id)}, "Server.Internal.Enrollment", "server" /* client_id */, "", ) diff --git a/server/server_test.go b/server/server_test.go index 9c0f48b1f73..98265e4bae5 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -484,7 +484,10 @@ func (self *ServerTestSuite) TestScheduleCollection() { repository, err := services.GetRepositoryManager().GetGlobalRepository(self.config_obj) require.NoError(t, err) - flow_id, err := services.GetLauncher().ScheduleArtifactCollection( + launcher, err := services.GetLauncher() + assert.NoError(self.T(), err) + + flow_id, err := launcher.ScheduleArtifactCollection( context.Background(), self.config_obj, vql_subsystem.NullACLManager{}, @@ -515,7 +518,10 @@ func (self *ServerTestSuite) createArtifactCollection() (string, error) { require.NoError(self.T(), err) // Schedule a flow in the database. - flow_id, err := services.GetLauncher().ScheduleArtifactCollection( + launcher, err := services.GetLauncher() + assert.NoError(self.T(), err) + + flow_id, err := launcher.ScheduleArtifactCollection( context.Background(), self.config_obj, vql_subsystem.NullACLManager{}, diff --git a/services/client_monitoring/client_monitoring.go b/services/client_monitoring/client_monitoring.go index a836de62c07..1b8e2eb2e92 100644 --- a/services/client_monitoring/client_monitoring.go +++ b/services/client_monitoring/client_monitoring.go @@ -108,13 +108,16 @@ func (self *ClientEventTable) compileArtifactCollectorArgs( config_obj *config_proto.Config, artifact *flows_proto.ArtifactCollectorArgs) ( []*actions_proto.VQLCollectorArgs, error) { - // Make a local copy. result := []*actions_proto.VQLCollectorArgs{} - launcher := services.GetLauncher() - - // Compile each artifact separately into its own VQLCollectorArgs so they can be run in parallel. + launcher, err := services.GetLauncher() + if err != nil { + return nil, err + } + // Compile each artifact separately into its own + // VQLCollectorArgs so they can be run in parallel. for _, name := range artifact.Artifacts { + // Make a local copy. temp := *artifact temp.Artifacts = []string{name} compiled, err := launcher.CompileCollectorArgs( @@ -189,7 +192,12 @@ func (self *ClientEventTable) setClientMonitoringState( // Notify all the client monitoring tables that we got // updated. This should cause all frontends to refresh. - return services.GetJournal().PushRowsToArtifact(config_obj, + journal, err := services.GetJournal() + if err != nil { + return err + } + + return journal.PushRowsToArtifact(config_obj, []*ordereddict.Dict{ ordereddict.NewDict(). Set("setter", self.id). @@ -358,8 +366,12 @@ func StartClientMonitoringService( logger := logging.GetLogger(config_obj, &logging.FrontendComponent) logger.Info("Starting Client Monitoring Service") + journal, err := services.GetJournal() + if err != nil { + return err + } - events, cancel := services.GetJournal().Watch("Server.Internal.ArtifactModification") + events, cancel := journal.Watch("Server.Internal.ArtifactModification") wg.Add(1) go func() { diff --git a/services/frontend/frontend.go b/services/frontend/frontend.go index 86a6e8dd139..b0b05ce844a 100644 --- a/services/frontend/frontend.go +++ b/services/frontend/frontend.go @@ -193,7 +193,12 @@ func (self *FrontendManager) syncActiveFrontends() error { self.mu.Unlock() if self.sample%2 == 0 { - err = services.GetJournal().PushRowsToArtifact(self.config_obj, + journal, err := services.GetJournal() + if err != nil { + return err + } + + err = journal.PushRowsToArtifact(self.config_obj, []*ordereddict.Dict{ordereddict.NewDict(). Set("CPUPercent", total_metrics.CpuLoadPercent). Set("MemoryUse", total_metrics.ProcessResidentMemoryBytes). diff --git a/services/hunt_manager/hunt_manager.go b/services/hunt_manager/hunt_manager.go index f1a1d043f12..130123b631a 100644 --- a/services/hunt_manager/hunt_manager.go +++ b/services/hunt_manager/hunt_manager.go @@ -104,7 +104,11 @@ func (self *HuntManager) StartParticipation( Config: config_obj, Logger: logging.NewPlainLogger(config_obj, &logging.GenericComponent), }) - qm_chan, cancel := services.GetJournal().Watch("System.Hunt.Participation") + journal, err := services.GetJournal() + if err != nil { + return err + } + qm_chan, cancel := journal.Watch("System.Hunt.Participation") wg.Add(1) go func() { @@ -135,7 +139,12 @@ func (self *HuntManager) StartFlowCompletion( wg *sync.WaitGroup) error { scope := vfilter.NewScope() - qm_chan, cancel := services.GetJournal().Watch("System.Flow.Completion") + journal, err := services.GetJournal() + if err != nil { + return err + } + + qm_chan, cancel := journal.Watch("System.Flow.Completion") wg.Add(1) go func() { @@ -182,7 +191,12 @@ func (self *HuntManager) ProcessFlowCompletion( } path_manager := paths.NewHuntPathManager(hunt_id) - err = services.GetJournal().PushRows(config_obj, path_manager.ClientErrors(), + journal, err := services.GetJournal() + if err != nil { + return + } + + err = journal.PushRows(config_obj, path_manager.ClientErrors(), []*ordereddict.Dict{ordereddict.NewDict(). Set("ClientId", flow.ClientId). Set("FlowId", flow.SessionId). @@ -303,7 +317,12 @@ func (self *HuntManager) ProcessRow( return } - flow_id, err := services.GetLauncher().ScheduleArtifactCollection( + launcher, err := services.GetLauncher() + if err != nil { + return + } + + flow_id, err := launcher.ScheduleArtifactCollection( ctx, config_obj, vql_subsystem.NullACLManager{}, repository, request) if err != nil { scope.Log("hunt manager: %v", err) @@ -312,9 +331,14 @@ func (self *HuntManager) ProcessRow( row.Set("FlowId", flow_id) row.Set("Timestamp", time.Now().Unix()) + journal, err := services.GetJournal() + if err != nil { + scope.Log("hunt manager: %v", err) + return + } path_manager := paths.NewHuntPathManager(participation_row.HuntId) - err = services.GetJournal().PushRows(config_obj, + err = journal.PushRows(config_obj, path_manager.Clients(), []*ordereddict.Dict{row}) if err != nil { scope.Log("hunt manager: %v", err) diff --git a/services/hunt_manager/hunt_manager_test.go b/services/hunt_manager/hunt_manager_test.go index 5f2b1357eb4..6105cf3801a 100644 --- a/services/hunt_manager/hunt_manager_test.go +++ b/services/hunt_manager/hunt_manager_test.go @@ -69,7 +69,10 @@ func (self *HuntTestSuite) TearDownTest() { func (self *HuntTestSuite) TestHuntManager() { t := self.T() - services.GetLauncher().SetFlowIdForTests("F.1234") + launcher, err := services.GetLauncher() + assert.NoError(t, err) + + launcher.SetFlowIdForTests("F.1234") // The hunt will launch the Generic.Client.Info on the client. hunt_obj := &api_proto.Hunt{ @@ -90,7 +93,10 @@ func (self *HuntTestSuite) TestHuntManager() { services.GetHuntDispatcher().Refresh(self.config_obj) // Simulate a System.Hunt.Participation event - services.GetJournal().PushRowsToArtifact(self.config_obj, + journal, err := services.GetJournal() + assert.NoError(t, err) + + journal.PushRowsToArtifact(self.config_obj, []*ordereddict.Dict{ordereddict.NewDict(). Set("HuntId", self.hunt_id). Set("ClientId", self.client_id). @@ -119,7 +125,10 @@ func (self *HuntTestSuite) TestHuntManager() { func (self *HuntTestSuite) TestHuntWithLabelClientNoLabel() { t := self.T() - services.GetLauncher().SetFlowIdForTests("F.1234") + launcher, err := services.GetLauncher() + assert.NoError(t, err) + + launcher.SetFlowIdForTests("F.1234") // The hunt will launch the Generic.Client.Info on the client. hunt_obj := &api_proto.Hunt{ @@ -149,7 +158,10 @@ func (self *HuntTestSuite) TestHuntWithLabelClientNoLabel() { // Simulate a System.Hunt.Participation event path_manager := result_sets.NewArtifactPathManager(self.config_obj, self.client_id, "", "System.Hunt.Participation") - services.GetJournal().PushRows(self.config_obj, path_manager, + journal, err := services.GetJournal() + assert.NoError(t, err) + + journal.PushRows(self.config_obj, path_manager, []*ordereddict.Dict{ordereddict.NewDict(). Set("HuntId", self.hunt_id). Set("ClientId", self.client_id). @@ -171,8 +183,10 @@ func (self *HuntTestSuite) TestHuntWithLabelClientNoLabel() { func (self *HuntTestSuite) TestHuntWithLabelClientHasLabelDifferentCase() { t := self.T() + launcher, err := services.GetLauncher() + assert.NoError(t, err) - services.GetLauncher().SetFlowIdForTests("F.1234") + launcher.SetFlowIdForTests("F.1234") // The hunt will launch the Generic.Client.Info on the client. hunt_obj := &api_proto.Hunt{ @@ -207,7 +221,10 @@ func (self *HuntTestSuite) TestHuntWithLabelClientHasLabelDifferentCase() { // Simulate a System.Hunt.Participation event path_manager := result_sets.NewArtifactPathManager(self.config_obj, self.client_id, "", "System.Hunt.Participation") - services.GetJournal().PushRows(self.config_obj, path_manager, + journal, err := services.GetJournal() + assert.NoError(t, err) + + journal.PushRows(self.config_obj, path_manager, []*ordereddict.Dict{ordereddict.NewDict(). Set("HuntId", self.hunt_id). Set("ClientId", self.client_id). @@ -234,7 +251,10 @@ func (self *HuntTestSuite) TestHuntWithLabelClientHasLabelDifferentCase() { func (self *HuntTestSuite) TestHuntWithOverride() { t := self.T() - services.GetLauncher().SetFlowIdForTests("F.1234") + launcher, err := services.GetLauncher() + assert.NoError(t, err) + + launcher.SetFlowIdForTests("F.1234") // Hunt is paused so normally will not receive any clients. hunt_obj := &api_proto.Hunt{ @@ -257,7 +277,10 @@ func (self *HuntTestSuite) TestHuntWithOverride() { // Simulate a System.Hunt.Participation event path_manager := result_sets.NewArtifactPathManager(self.config_obj, self.client_id, "", "System.Hunt.Participation") - services.GetJournal().PushRows(self.config_obj, path_manager, + journal, err := services.GetJournal() + assert.NoError(t, err) + + journal.PushRows(self.config_obj, path_manager, []*ordereddict.Dict{ordereddict.NewDict(). Set("HuntId", self.hunt_id). Set("ClientId", self.client_id). @@ -286,7 +309,10 @@ func (self *HuntTestSuite) TestHuntWithOverride() { func (self *HuntTestSuite) TestHuntWithLabelClientHasLabel() { t := self.T() - services.GetLauncher().SetFlowIdForTests("F.1234") + launcher, err := services.GetLauncher() + assert.NoError(t, err) + + launcher.SetFlowIdForTests("F.1234") // The hunt will launch the Generic.Client.Info on the client. hunt_obj := &api_proto.Hunt{ @@ -320,7 +346,10 @@ func (self *HuntTestSuite) TestHuntWithLabelClientHasLabel() { // Simulate a System.Hunt.Participation event path_manager := result_sets.NewArtifactPathManager(self.config_obj, self.client_id, "", "System.Hunt.Participation") - services.GetJournal().PushRows(self.config_obj, path_manager, + journal, err := services.GetJournal() + assert.NoError(t, err) + + journal.PushRows(self.config_obj, path_manager, []*ordereddict.Dict{ordereddict.NewDict(). Set("HuntId", self.hunt_id). Set("ClientId", self.client_id). @@ -349,7 +378,10 @@ func (self *HuntTestSuite) TestHuntWithLabelClientHasLabel() { func (self *HuntTestSuite) TestHuntWithLabelClientHasExcludedLabel() { t := self.T() - services.GetLauncher().SetFlowIdForTests("F.1234") + launcher, err := services.GetLauncher() + assert.NoError(t, err) + + launcher.SetFlowIdForTests("F.1234") // The hunt will launch the Generic.Client.Info on the client. hunt_obj := &api_proto.Hunt{ @@ -391,7 +423,10 @@ func (self *HuntTestSuite) TestHuntWithLabelClientHasExcludedLabel() { // Simulate a System.Hunt.Participation event path_manager := result_sets.NewArtifactPathManager(self.config_obj, self.client_id, "", "System.Hunt.Participation") - services.GetJournal().PushRows(self.config_obj, path_manager, + journal, err := services.GetJournal() + assert.NoError(t, err) + + journal.PushRows(self.config_obj, path_manager, []*ordereddict.Dict{ordereddict.NewDict(). Set("HuntId", self.hunt_id). Set("ClientId", self.client_id). diff --git a/services/interrogation/interrogation.go b/services/interrogation/interrogation.go index 8d0f16992ee..07c2e267179 100644 --- a/services/interrogation/interrogation.go +++ b/services/interrogation/interrogation.go @@ -44,7 +44,12 @@ func (self *EnrollmentService) Start( config_obj *config_proto.Config, wg *sync.WaitGroup) error { - events, cancel := services.GetJournal().Watch("Server.Internal.Enrollment") + journal, err := services.GetJournal() + if err != nil { + return err + } + + events, cancel := journal.Watch("Server.Internal.Enrollment") wg.Add(1) go func() { @@ -111,7 +116,12 @@ func (self *EnrollmentService) ProcessRow( } // Issue the flow on the client. - flow_id, err := services.GetLauncher().ScheduleArtifactCollection( + launcher, err := services.GetLauncher() + if err != nil { + return err + } + + flow_id, err := launcher.ScheduleArtifactCollection( ctx, config_obj, vql_subsystem.NullACLManager{}, repository, &flows_proto.ArtifactCollectorArgs{ diff --git a/services/interrogation/interrogation_test.go b/services/interrogation/interrogation_test.go index ce4bc512547..7d9ec7bac5c 100644 --- a/services/interrogation/interrogation_test.go +++ b/services/interrogation/interrogation_test.go @@ -71,11 +71,14 @@ func (self *ServicesTestSuite) EmulateCollection( // Emulate a Generic.Client.Info collection: First write the // result set, then write the collection context. // Write a result set for this artifact. - services.GetJournal().PushRowsToArtifact(self.config_obj, + journal, err := services.GetJournal() + assert.NoError(self.T(), err) + + journal.PushRowsToArtifact(self.config_obj, rows, artifact, self.client_id, self.flow_id) // Emulate a flow completion message coming from the flow processor. - services.GetJournal().PushRowsToArtifact(self.config_obj, + journal.PushRowsToArtifact(self.config_obj, []*ordereddict.Dict{ordereddict.NewDict(). Set("ClientId", self.client_id). Set("FlowId", self.flow_id). @@ -144,7 +147,10 @@ func (self *ServicesTestSuite) TestEnrollService() { path_manager := result_sets.NewArtifactPathManager( self.config_obj, "server" /* client_id */, "", "Server.Internal.Enrollment") - err = services.GetJournal().PushRows(self.config_obj, + journal, err := services.GetJournal() + assert.NoError(self.T(), err) + + err = journal.PushRows(self.config_obj, path_manager, []*ordereddict.Dict{ enroll_message, enroll_message, enroll_message, enroll_message}) assert.NoError(self.T(), err) diff --git a/services/interrogation/utils.go b/services/interrogation/utils.go index 2c0075afd26..1fb4dc0f83b 100644 --- a/services/interrogation/utils.go +++ b/services/interrogation/utils.go @@ -25,7 +25,12 @@ func watchForFlowCompletion( handler func(ctx context.Context, scope *vfilter.Scope, row *ordereddict.Dict)) error { - events, cancel := services.GetJournal().Watch("System.Flow.Completion") + journal, err := services.GetJournal() + if err != nil { + return err + } + + events, cancel := journal.Watch("System.Flow.Completion") manager := services.GetRepositoryManager() wg.Add(1) diff --git a/services/inventory/inventory_test.go b/services/inventory/inventory_test.go index 3c6cabe0b41..4f3863d6388 100644 --- a/services/inventory/inventory_test.go +++ b/services/inventory/inventory_test.go @@ -183,7 +183,10 @@ tools: // Launch the artifact - this will result in the tool being // downloaded and the hash calculated on demand. - response, err := services.GetLauncher().CompileCollectorArgs( + launcher, err := services.GetLauncher() + assert.NoError(self.T(), err) + + response, err := launcher.CompileCollectorArgs( ctx, self.config_obj, vql_subsystem.NullACLManager{}, repository, &flows_proto.ArtifactCollectorArgs{ Artifacts: []string{"TestArtifact"}, @@ -269,8 +272,10 @@ tools: assert.NoError(self.T(), err) self.installGitHubMock() + launcher, err := services.GetLauncher() + assert.NoError(self.T(), err) - response, err := services.GetLauncher().CompileCollectorArgs( + response, err := launcher.CompileCollectorArgs( ctx, self.config_obj, vql_subsystem.NullACLManager{}, repository, &flows_proto.ArtifactCollectorArgs{ Artifacts: []string{"TestArtifact"}, diff --git a/services/journal.go b/services/journal.go index ca9c90ff210..74109751c09 100644 --- a/services/journal.go +++ b/services/journal.go @@ -15,6 +15,7 @@ package services // in real time from client event artifacts. import ( + "errors" "sync" "github.com/Velocidex/ordereddict" @@ -29,11 +30,15 @@ var ( GJournal JournalService ) -func GetJournal() JournalService { +func GetJournal() (JournalService, error) { journal_mu.Lock() defer journal_mu.Unlock() - return GJournal + if GJournal == nil { + return nil, errors.New("Journal service not ready") + } + + return GJournal, nil } func RegisterJournal(journal JournalService) { diff --git a/services/journal/journal.go b/services/journal/journal.go index c6ed15696fb..98623294e4f 100644 --- a/services/journal/journal.go +++ b/services/journal/journal.go @@ -69,8 +69,8 @@ func StartJournalService( // 1. Watchers will never be notified. // 2. PushRows() will fail with an error. service := &JournalService{} - old_service := services.GetJournal() - if old_service != nil { + old_service, err := services.GetJournal() + if err == nil { service.qm = old_service.(*JournalService).qm } diff --git a/services/labels/labels.go b/services/labels/labels.go index 4aa46233e33..171edb445f6 100644 --- a/services/labels/labels.go +++ b/services/labels/labels.go @@ -190,7 +190,12 @@ func (self *Labeler) notifyClient( config_obj *config_proto.Config, client_id, new_label, operation string) error { // Notify other frontends about this change. - return services.GetJournal().PushRowsToArtifact(config_obj, + journal, err := services.GetJournal() + if err != nil { + return err + } + + return journal.PushRowsToArtifact(config_obj, []*ordereddict.Dict{ ordereddict.NewDict(). Set("client_id", client_id). @@ -360,8 +365,12 @@ func (self *Labeler) Start(ctx context.Context, } self.lru = cache.NewLRUCache(expected_clients) + journal, err := services.GetJournal() + if err != nil { + return err + } - events, cancel := services.GetJournal().Watch("Server.Internal.Label") + events, cancel := journal.Watch("Server.Internal.Label") wg.Add(1) go func() { diff --git a/services/launcher.go b/services/launcher.go index 9efe0da3aef..cee8edf12b5 100644 --- a/services/launcher.go +++ b/services/launcher.go @@ -43,6 +43,7 @@ package services import ( "context" + "errors" "sync" actions_proto "www.velocidex.com/golang/velociraptor/actions/proto" @@ -57,11 +58,15 @@ var ( g_launcher Launcher = nil ) -func GetLauncher() Launcher { +func GetLauncher() (Launcher, error) { launcher_mu.Lock() defer launcher_mu.Unlock() - return g_launcher + if g_launcher == nil { + return nil, errors.New("Launcher not ready") + } + + return g_launcher, nil } func RegisterLauncher(l Launcher) { diff --git a/services/launcher/artifacts_test.go b/services/launcher/artifacts_test.go index 65d0fdaae6a..f73dc62cc6a 100644 --- a/services/launcher/artifacts_test.go +++ b/services/launcher/artifacts_test.go @@ -122,8 +122,10 @@ func (self *ArtifactTestSuite) TestUnknownArtifact() { Artifacts: []string{"Artifact5"}, } - launcher := services.GetLauncher() - _, err := launcher.CompileCollectorArgs(context.Background(), self.config_obj, + launcher, err := services.GetLauncher() + assert.NoError(self.T(), err) + + _, err = launcher.CompileCollectorArgs(context.Background(), self.config_obj, vql_subsystem.NullACLManager{}, self.repository, request) assert.Error(self.T(), err) @@ -138,7 +140,8 @@ func (self *ArtifactTestSuite) TestStackOverflow() { } // It should compile ok but overflow at runtime. - launcher := services.GetLauncher() + launcher, err := services.GetLauncher() + assert.NoError(self.T(), err) vql_request, err := launcher.CompileCollectorArgs(context.Background(), self.config_obj, vql_subsystem.NullACLManager{}, self.repository, request) @@ -162,8 +165,9 @@ func (self *ArtifactTestSuite) TestArtifactDependencies() { request := &flows_proto.ArtifactCollectorArgs{ Artifacts: []string{"Artifact6"}, } + launcher, err := services.GetLauncher() + assert.NoError(self.T(), err) - launcher := services.GetLauncher() vql_request, err := launcher.CompileCollectorArgs(context.Background(), self.config_obj, vql_subsystem.NullACLManager{}, self.repository, request) diff --git a/services/launcher/launcher_test.go b/services/launcher/launcher_test.go index d1be140a886..7f81babbee5 100644 --- a/services/launcher/launcher_test.go +++ b/services/launcher/launcher_test.go @@ -156,14 +156,17 @@ func (self *LauncherTestSuite) TestCompilingWithTools() { // collection can not be scheduled. The server needs to // download the file in order to calculate its hash - even // though it is not serving it to clients. - compiled, err := services.GetLauncher().CompileCollectorArgs(ctx, self.config_obj, + launcher, err := services.GetLauncher() + assert.NoError(self.T(), err) + + compiled, err := launcher.CompileCollectorArgs(ctx, self.config_obj, acl_manager, repository, request) assert.Error(self.T(), err) // Now make the tool download succeed. Compiling should work // and we should calculate the hash. status = 200 - compiled, err = services.GetLauncher().CompileCollectorArgs( + compiled, err = launcher.CompileCollectorArgs( ctx, self.config_obj, acl_manager, repository, request) assert.NoError(self.T(), err) @@ -171,7 +174,7 @@ func (self *LauncherTestSuite) TestCompilingWithTools() { // downloading the file ourselves - further compiles will work // automatically. status = 404 - compiled, err = services.GetLauncher().CompileCollectorArgs( + compiled, err = launcher.CompileCollectorArgs( ctx, self.config_obj, acl_manager, repository, request) assert.NoError(self.T(), err) @@ -200,7 +203,7 @@ func (self *LauncherTestSuite) TestCompilingWithTools() { assert.NoError(self.T(), err) status = 200 - compiled, err = services.GetLauncher().CompileCollectorArgs( + compiled, err = launcher.CompileCollectorArgs( ctx, self.config_obj, acl_manager, repository, request) assert.NoError(self.T(), err) @@ -243,7 +246,10 @@ func (self *LauncherTestSuite) TestCompiling() { ctx := context.Background() acl_manager := vql_subsystem.NullACLManager{} - compiled, err := services.GetLauncher().CompileCollectorArgs( + launcher, err := services.GetLauncher() + assert.NoError(self.T(), err) + + compiled, err := launcher.CompileCollectorArgs( ctx, self.config_obj, acl_manager, repository, request) assert.NoError(self.T(), err) @@ -283,7 +289,10 @@ func (self *LauncherTestSuite) TestCompilingObfuscation() { ctx := context.Background() acl_manager := vql_subsystem.NullACLManager{} - compiled, err := services.GetLauncher().CompileCollectorArgs( + launcher, err := services.GetLauncher() + assert.NoError(self.T(), err) + + compiled, err := launcher.CompileCollectorArgs( ctx, self.config_obj, acl_manager, repository, request) assert.NoError(self.T(), err) @@ -293,7 +302,7 @@ func (self *LauncherTestSuite) TestCompilingObfuscation() { // However when we obfuscate we remove descriptions. self.config_obj.Frontend.DoNotCompressArtifacts = false - compiled, err = services.GetLauncher().CompileCollectorArgs( + compiled, err = launcher.CompileCollectorArgs( ctx, self.config_obj, acl_manager, repository, request) assert.NoError(self.T(), err) @@ -320,7 +329,10 @@ func (self *LauncherTestSuite) TestCompilingPermissions() { acl_manager := vql_subsystem.NewServerACLManager(self.config_obj, "UserX") // Permission denied - the principal is not allowed to compile this artifact. - compiled, err := services.GetLauncher().CompileCollectorArgs( + launcher, err := services.GetLauncher() + assert.NoError(self.T(), err) + + compiled, err := launcher.CompileCollectorArgs( ctx, self.config_obj, acl_manager, repository, request) assert.Error(self.T(), err) assert.Contains(self.T(), err.Error(), "EXECVE") @@ -332,7 +344,7 @@ func (self *LauncherTestSuite) TestCompilingPermissions() { // Should be fine now. acl_manager = vql_subsystem.NewServerACLManager(self.config_obj, "UserX") - compiled, err = services.GetLauncher().CompileCollectorArgs( + compiled, err = launcher.CompileCollectorArgs( ctx, self.config_obj, acl_manager, repository, request) assert.NoError(self.T(), err) assert.Equal(self.T(), len(compiled.Query), 2) diff --git a/services/notifications/notifications.go b/services/notifications/notifications.go index c5052eb8cf2..458d07cde8f 100644 --- a/services/notifications/notifications.go +++ b/services/notifications/notifications.go @@ -55,7 +55,11 @@ func StartNotificationService( logger.Info("Starting the notification service.") // Watch the journal. - events, cancel := services.GetJournal().Watch("Server.Internal.Notifications") + journal, err := services.GetJournal() + if err != nil { + return err + } + events, cancel := journal.Watch("Server.Internal.Notifications") wg.Add(1) go func() { @@ -113,14 +117,24 @@ func (self *Notifier) ListenForNotification(client_id string) (chan bool, func() } func (self *Notifier) NotifyAllListeners(config_obj *config_proto.Config) error { - return services.GetJournal().PushRowsToArtifact(config_obj, + journal, err := services.GetJournal() + if err != nil { + return err + } + + return journal.PushRowsToArtifact(config_obj, []*ordereddict.Dict{ordereddict.NewDict().Set("Target", "All")}, "Server.Internal.Notifications", "server", "", ) } func (self *Notifier) NotifyListener(config_obj *config_proto.Config, id string) error { - return services.GetJournal().PushRowsToArtifact(config_obj, + journal, err := services.GetJournal() + if err != nil { + return err + } + + return journal.PushRowsToArtifact(config_obj, []*ordereddict.Dict{ordereddict.NewDict().Set("Target", id)}, "Server.Internal.Notifications", "server", "", ) diff --git a/services/repository/artifacts_test.go b/services/repository/artifacts_test.go index 6c4e6f18b95..e2f1368689b 100644 --- a/services/repository/artifacts_test.go +++ b/services/repository/artifacts_test.go @@ -19,6 +19,7 @@ package repository import ( "context" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -27,6 +28,7 @@ import ( "www.velocidex.com/golang/velociraptor/services/inventory" "www.velocidex.com/golang/velociraptor/services/journal" "www.velocidex.com/golang/velociraptor/services/notifications" + "www.velocidex.com/golang/velociraptor/utils" ) // Load all built in artifacts and make sure they validate syntax. @@ -55,3 +57,41 @@ func TestArtifactsSyntax(t *testing.T) { assert.NoError(t, err, "Error compiling "+artifact_name) } } + +var ( + artifact_definitions = []string{` +name: Test1 +sources: +- query: SELECT * FROM Artifact.Test1.Foobar() +`, ` +name: Test1.Foobar +sources: +- query: SELECT * FROM info() +`} +) + +func TestArtifactPlugin(t *testing.T) { + config_obj := config.GetDefaultConfig() + + sm := services.NewServiceManager(context.Background(), config_obj) + defer sm.Close() + + assert.NoError(t, sm.Start(journal.StartJournalService)) + assert.NoError(t, sm.Start(notifications.StartNotificationService)) + assert.NoError(t, sm.Start(inventory.StartInventoryService)) + assert.NoError(t, sm.Start(StartRepositoryManager)) + + manager := services.GetRepositoryManager() + repository := manager.NewRepository() + + for _, definition := range artifact_definitions { + artifact_definition, err := repository.LoadYaml(definition, false) + assert.NoError(t, err) + + utils.Debug(artifact_definition) + } + + wg := &sync.WaitGroup{} + p := NewArtifactRepositoryPlugin(wg, repository.(*Repository)).(*ArtifactRepositoryPlugin) + p.Print() +} diff --git a/services/repository/manager.go b/services/repository/manager.go index dabc2c6a8c6..0c77c25ddae 100644 --- a/services/repository/manager.go +++ b/services/repository/manager.go @@ -97,7 +97,12 @@ func (self *RepositoryManager) SetArtifactFile( return nil, err } - err = services.GetJournal().PushRowsToArtifact(config_obj, + journal, err := services.GetJournal() + if err != nil { + return nil, err + } + + err = journal.PushRowsToArtifact(config_obj, []*ordereddict.Dict{ ordereddict.NewDict().Set("artifact", artifact.Name). Set("op", "set"), @@ -189,6 +194,7 @@ func StartRepositoryManager(ctx context.Context, wg *sync.WaitGroup, } } logger.Info("Compiled all artifacts.") + grepository.Del("") }() wg.Add(1) diff --git a/services/repository/plugin.go b/services/repository/plugin.go index 640904a59cc..59b20ecf6be 100644 --- a/services/repository/plugin.go +++ b/services/repository/plugin.go @@ -133,7 +133,12 @@ func (self *ArtifactRepositoryPlugin) Call( acl_manager = vql_subsystem.NullACLManager{} } - request, err := services.GetLauncher().CompileCollectorArgs( + launcher, err := services.GetLauncher() + if err != nil { + return + } + + request, err := launcher.CompileCollectorArgs( ctx, config_obj, acl_manager, self.repository, &flows_proto.ArtifactCollectorArgs{ Artifacts: []string{artifact_name}, @@ -308,6 +313,7 @@ func (self _ArtifactRepositoryPluginAssociativeProtocol) GetMembers( func (self _ArtifactRepositoryPluginAssociativeProtocol) Associative( scope *vfilter.Scope, a vfilter.Any, b vfilter.Any) (vfilter.Any, bool) { + value := _getArtifactRepositoryPlugin(a) if value == nil { return nil, false diff --git a/services/repository/repository.go b/services/repository/repository.go index 89df86fec7e..0c68ad311b2 100644 --- a/services/repository/repository.go +++ b/services/repository/repository.go @@ -287,6 +287,7 @@ func (self *Repository) Del(name string) { defer self.mu.Unlock() delete(self.Data, name) + self.artifact_plugin = nil } func (self *Repository) List() []string { diff --git a/services/server_artifacts/server_artifacts.go b/services/server_artifacts/server_artifacts.go index 15d1e81d23a..a2222d6054b 100644 --- a/services/server_artifacts/server_artifacts.go +++ b/services/server_artifacts/server_artifacts.go @@ -94,7 +94,12 @@ type serverLogger struct { // need to be available immediately. func (self *serverLogger) Write(b []byte) (int, error) { msg := artifacts.DeobfuscateString(self.config_obj, string(b)) - err := services.GetJournal().PushRows(self.config_obj, + journal, err := services.GetJournal() + if err != nil { + return 0, err + } + + err = journal.PushRows(self.config_obj, self.path_manager, []*ordereddict.Dict{ ordereddict.NewDict(). Set("Timestamp", time.Now().UTC().UnixNano()/1000). @@ -174,7 +179,12 @@ func (self *ServerArtifactsRunner) processTask( if task.Cancel != nil { path_manager := paths.NewFlowPathManager("server", task.SessionId).Log() - err = services.GetJournal().PushRows(config_obj, path_manager, []*ordereddict.Dict{ + journal, err := services.GetJournal() + if err != nil { + return err + } + + err = journal.PushRows(config_obj, path_manager, []*ordereddict.Dict{ ordereddict.NewDict(). Set("Timestamp", time.Now().UTC().UnixNano()/1000). Set("time", time.Now().UTC().String()). diff --git a/services/server_artifacts/server_uploader.go b/services/server_artifacts/server_uploader.go index 2757cb3b888..9414ec2006a 100644 --- a/services/server_artifacts/server_uploader.go +++ b/services/server_artifacts/server_uploader.go @@ -43,7 +43,12 @@ func (self *ServerUploader) Upload( result, err := self.FileStoreUploader.Upload(ctx, scope, filename, accessor, store_as_name, expected_size, reader) if err == nil { - err = services.GetJournal().PushRows(self.config_obj, + journal, err := services.GetJournal() + if err != nil { + return nil, err + } + + err = journal.PushRows(self.config_obj, self.path_manager.UploadMetadata(), []*ordereddict.Dict{ordereddict.NewDict(). Set("Timestamp", time.Now().UTC().Unix()). diff --git a/services/test_utils.go b/services/test_utils.go index 2ef49d7246d..d4f5a5b3eba 100644 --- a/services/test_utils.go +++ b/services/test_utils.go @@ -24,7 +24,11 @@ func GetPublishedEvents( go func() { defer wg.Done() - events, cancel := GetJournal().Watch(artifact) + journal, err := GetJournal() + if err != nil { + return + } + events, cancel := journal.Watch(artifact) defer cancel() // Wait here until we are set up. diff --git a/services/vfs_service/utils.go b/services/vfs_service/utils.go index 4e50aeb1aca..0a1fc4bf388 100644 --- a/services/vfs_service/utils.go +++ b/services/vfs_service/utils.go @@ -26,7 +26,12 @@ func watchForFlowCompletion( config_obj *config_proto.Config, scope *vfilter.Scope, row *ordereddict.Dict)) error { - events, cancel := services.GetJournal().Watch("System.Flow.Completion") + journal, err := services.GetJournal() + if err != nil { + return err + } + + events, cancel := journal.Watch("System.Flow.Completion") wg.Add(1) go func() { diff --git a/services/vfs_service/vfs_service_test.go b/services/vfs_service/vfs_service_test.go index 82077cc0525..10e9532f992 100644 --- a/services/vfs_service/vfs_service_test.go +++ b/services/vfs_service/vfs_service_test.go @@ -88,13 +88,16 @@ func (self *VFSServiceTestSuite) EmulateCollection( self.config_obj, self.client_id, self.flow_id, artifact) // Write a result set for this artifact. - services.GetJournal().PushRows(self.config_obj, artifact_path_manager, rows) + journal, err := services.GetJournal() + assert.NoError(self.T(), err) + + journal.PushRows(self.config_obj, artifact_path_manager, rows) // Emulate a flow completion message coming from the flow processor. artifact_path_manager = result_sets.NewArtifactPathManager( self.config_obj, "server", "", "System.Flow.Completion") - services.GetJournal().PushRows(self.config_obj, artifact_path_manager, + journal.PushRows(self.config_obj, artifact_path_manager, []*ordereddict.Dict{ordereddict.NewDict(). Set("ClientId", self.client_id). Set("FlowId", self.flow_id). diff --git a/startup/startup.go b/startup/startup.go index 9937b276ba2..82b5161ddf1 100644 --- a/startup/startup.go +++ b/startup/startup.go @@ -56,7 +56,8 @@ func getServerServices(config_obj *config_proto.Config) *config_proto.ServerServ } func StartupEssentialServices(sm *services.Service) error { - if services.GetJournal() == nil { + j, _ := services.GetJournal() + if j == nil { err := sm.Start(journal.StartJournalService) if err != nil { return err @@ -84,7 +85,8 @@ func StartupEssentialServices(sm *services.Service) error { } } - if services.GetLauncher() == nil { + launcher_obj, _ := services.GetLauncher() + if launcher_obj == nil { err := sm.Start(launcher.StartLauncherService) if err != nil { return err @@ -186,7 +188,9 @@ func Reset() { // This function should not find any active services. Services // are responsible for unregistering themselves and holding // the service manager for the duration of their lifetime. - if services.GetJournal() != nil { + + journal, _ := services.GetJournal() + if journal != nil { fmt.Printf("Journal not reset.\n") } @@ -202,7 +206,8 @@ func Reset() { fmt.Printf("Repository Manager not reset.\n") } - if services.GetLauncher() != nil { + launcher, _ := services.GetLauncher() + if launcher != nil { fmt.Printf("Launcher not reset.\n") } diff --git a/vql/common/cache.go b/vql/common/cache.go new file mode 100644 index 00000000000..b9234e85e1d --- /dev/null +++ b/vql/common/cache.go @@ -0,0 +1,212 @@ +package common + +import ( + "context" + "sync" + "time" + + "github.com/Velocidex/ordereddict" + "www.velocidex.com/golang/velociraptor/json" + vql_subsystem "www.velocidex.com/golang/velociraptor/vql" + vfilter "www.velocidex.com/golang/vfilter" +) + +const ( + CACHE_KEY = "$cache_key" +) + +type _CacheObj struct { + mu sync.Mutex + expires time.Time + period time.Duration + expression vfilter.LazyExpr + scope *vfilter.Scope + ctx context.Context + key string + cache map[string]vfilter.Any +} + +func (self *_CacheObj) Get(key string) (vfilter.Any, bool) { + self.mu.Lock() + defer self.mu.Unlock() + + // Expire the entire cache when it gets too old. + if time.Now().After(self.expires) { + self.cache = make(map[string]vfilter.Any) + self.expires = time.Now().Add(self.period) + return nil, false + } + + value, pres := self.cache[key] + return value, pres +} + +func (self *_CacheObj) Set(key string, value vfilter.Any) { + self.mu.Lock() + defer self.mu.Unlock() + + self.cache[key] = value +} + +func (self *_CacheObj) Materialize() { + self.mu.Lock() + defer self.mu.Unlock() + + self.scope.Log("Materializing memoized query") + + self.cache = make(map[string]vfilter.Any) + stored_query := self.expression.ToStoredQuery(self.scope) + for row_item := range stored_query.Eval(self.ctx, self.scope) { + key, pres := self.scope.Associative(row_item, self.key) + if pres { + key_str := json.StringIndent(key) + self.cache[key_str] = row_item + } + } +} + +func NewCacheObj(ctx context.Context, scope *vfilter.Scope, key string) *_CacheObj { + return &_CacheObj{ + scope: scope, + ctx: ctx, + key: key, + cache: make(map[string]vfilter.Any), + } +} + +// Caches can be associative +type _CacheAssociative struct{} + +func (self _CacheAssociative) Applicable(a vfilter.Any, b vfilter.Any) bool { + _, ok := a.(*_CacheObj) + return ok +} + +// Associate object a with key b +func (self _CacheAssociative) Associative( + scope *vfilter.Scope, a vfilter.Any, b vfilter.Any) (vfilter.Any, bool) { + cache_obj, ok := a.(*_CacheObj) + if !ok { + return vfilter.Null{}, false + } + + lazy_b, ok := b.(*vfilter.LazyExpr) + if ok { + b = lazy_b.ReduceWithScope(scope) + } + + key := json.StringIndent(b) + + if time.Now().After(cache_obj.expires) { + cache_obj.Materialize() + cache_obj.expires = time.Now().Add(cache_obj.period) + } + + res, pres := cache_obj.cache[key] + if !pres { + return vfilter.Null{}, false + } + return res, true +} + +func (self _CacheAssociative) GetMembers(scope *vfilter.Scope, a vfilter.Any) []string { + return nil +} + +type _CacheFunctionArgs struct { + Func vfilter.LazyExpr `vfilter:"required,field=func,doc=A function to evaluate"` + Name string `vfilter:"optional,field=name,doc=The global name of this cache (needed when more than one)"` + Key string `vfilter:"required,field=key,doc=Cache key to use."` + Period int64 `vfilter:"optional,field=period,doc=The latest age of the cache."` +} + +type _CacheFunc struct{} + +func (self _CacheFunc) Info(scope *vfilter.Scope, type_map *vfilter.TypeMap) *vfilter.FunctionInfo { + return &vfilter.FunctionInfo{ + Name: "cache", + Doc: "Creates a cache object", + ArgType: type_map.AddType(scope, &_CacheFunctionArgs{}), + } +} + +func (self _CacheFunc) Call(ctx context.Context, scope *vfilter.Scope, + args *ordereddict.Dict) vfilter.Any { + + arg := &_CacheFunctionArgs{} + err := vfilter.ExtractArgs(scope, args, arg) + if err != nil { + scope.Log("cache: %s", err.Error()) + return vfilter.Null{} + } + + if arg.Name == "" { + arg.Name = CACHE_KEY + } + + cache_obj := vql_subsystem.CacheGet(scope, arg.Name) + if cache_obj == nil { + if arg.Period == 0 { + arg.Period = 60 + } + + new_cache_obj := NewCacheObj(ctx, scope, "") + new_cache_obj.period = time.Duration(arg.Period) * time.Second + cache_obj = new_cache_obj + } + defer vql_subsystem.CacheSet(scope, arg.Name, cache_obj) + + value, pres := cache_obj.(*_CacheObj).Get(arg.Key) + if !pres { + value = arg.Func.ReduceWithScope(scope) + if !vql_subsystem.IsNull(value) { + cache_obj.(*_CacheObj).Set(arg.Key, value) + } + } + + return value + +} + +type _MemoizeFunctionArgs struct { + Query vfilter.LazyExpr `vfilter:"required,field=query,doc=Query to expand into memory"` + Key string `vfilter:"required,field=key,doc=The name of the column to use as a key."` + Period int64 `vfilter:"optional,field=period,doc=The latest age of the cache."` +} + +type _MemoizeFunction struct{} + +func (self _MemoizeFunction) Info(scope *vfilter.Scope, type_map *vfilter.TypeMap) *vfilter.FunctionInfo { + return &vfilter.FunctionInfo{ + Name: "memoize", + Doc: "Memoize a query into memory.", + ArgType: type_map.AddType(scope, &_MemoizeFunctionArgs{}), + } +} + +func (self _MemoizeFunction) Call(ctx context.Context, scope *vfilter.Scope, + args *ordereddict.Dict) vfilter.Any { + + arg := &_MemoizeFunctionArgs{} + err := vfilter.ExtractArgs(scope, args, arg) + if err != nil { + scope.Log("memoize: %s", err.Error()) + return vfilter.Null{} + } + + if arg.Period == 0 { + arg.Period = 60 + } + + result := NewCacheObj(ctx, scope, arg.Key) + result.expression = arg.Query + result.period = time.Duration(arg.Period) * time.Second + + return result +} + +func init() { + vql_subsystem.RegisterProtocol(&_CacheAssociative{}) + vql_subsystem.RegisterFunction(&_CacheFunc{}) + vql_subsystem.RegisterFunction(&_MemoizeFunction{}) +} diff --git a/vql/functions/functions.go b/vql/functions/functions.go index cf9c7c87017..ab03ea1321a 100644 --- a/vql/functions/functions.go +++ b/vql/functions/functions.go @@ -324,7 +324,7 @@ func (self _Scope) Info(scope *vfilter.Scope, type_map *vfilter.TypeMap) *vfilte type _GetFunctionArgs struct { Item vfilter.Any `vfilter:"optional,field=item"` Member string `vfilter:"optional,field=member"` - Field string `vfilter:"optional,field=field"` + Field vfilter.Any `vfilter:"optional,field=field"` Default vfilter.Any `vfilter:"optional,field=default"` } @@ -360,12 +360,12 @@ func (self _GetFunction) Call( var pres bool - if arg.Field == "" && arg.Member == "" { + if arg.Field == nil && arg.Member == "" { scope.Log("get: either Field or Member should be specified.") return vfilter.Null{} } - if arg.Field != "" { + if arg.Field != nil { result, pres = scope.Associative(result, arg.Field) if !pres { return arg.Default diff --git a/vql/server/artifacts.go b/vql/server/artifacts.go index d7718f67588..38baddf6f37 100644 --- a/vql/server/artifacts.go +++ b/vql/server/artifacts.go @@ -112,7 +112,12 @@ func (self *ScheduleCollectionFunction) Call(ctx context.Context, return vfilter.Null{} } - flow_id, err := services.GetLauncher().ScheduleArtifactCollection( + launcher, err := services.GetLauncher() + if err != nil { + return vfilter.Null{} + } + + flow_id, err := launcher.ScheduleArtifactCollection( ctx, config_obj, acl_manager, repository, request) if err != nil { scope.Log("collect_client: %v", err) diff --git a/vql/server/hunt.go b/vql/server/hunt.go index b245577b392..893e2a14277 100644 --- a/vql/server/hunt.go +++ b/vql/server/hunt.go @@ -149,7 +149,7 @@ func (self *AddToHuntFunction) Call(ctx context.Context, return vfilter.Null{} } - journal := services.GetJournal() + journal, _ := services.GetJournal() if journal == nil { return vfilter.Null{} } diff --git a/vql/server/monitoring.go b/vql/server/monitoring.go index f81111a74c0..c4ccd8527ff 100644 --- a/vql/server/monitoring.go +++ b/vql/server/monitoring.go @@ -113,7 +113,12 @@ func (self WatchMonitoringPlugin) Call( return } - if services.GetJournal() == nil { + journal, _ := services.GetJournal() + if err != nil { + return + } + + if journal == nil { scope.Log("watch_monitoring: can only run on the server via the API") return } @@ -147,7 +152,7 @@ func (self WatchMonitoringPlugin) Call( } // Ask the journal service to watch the event queue for us. - qm_chan, cancel := services.GetJournal().Watch(arg.Artifact) + qm_chan, cancel := journal.Watch(arg.Artifact) defer cancel() for { diff --git a/vql/server/repository.go b/vql/server/repository.go index 5eaa7778433..58db4e0a299 100644 --- a/vql/server/repository.go +++ b/vql/server/repository.go @@ -152,7 +152,13 @@ func (self ArtifactsPlugin) Call( acl_manager := vql_subsystem.NullACLManager{} - request, err := services.GetLauncher().CompileCollectorArgs( + launcher, err := services.GetLauncher() + if err != nil { + scope.Log("artifact_definitions: %v", err) + return + } + + request, err := launcher.CompileCollectorArgs( ctx, config_obj, acl_manager, repository, &flows_proto.ArtifactCollectorArgs{ Artifacts: arg.Names, diff --git a/vql/tools/collector.go b/vql/tools/collector.go index 6761eeb001e..1d64aa65480 100644 --- a/vql/tools/collector.go +++ b/vql/tools/collector.go @@ -153,7 +153,13 @@ func (self CollectPlugin) Call( } - err = services.GetLauncher().EnsureToolsDeclared( + launcher, err := services.GetLauncher() + if err != nil { + scope.Log("collect: %v", err) + return + } + + err = launcher.EnsureToolsDeclared( ctx, config_obj, artifact) if err != nil { scope.Log("collect: %v %v", name, err) @@ -166,7 +172,7 @@ func (self CollectPlugin) Call( acl_manager = vql_subsystem.NullACLManager{} } - request, err := services.GetLauncher().CompileCollectorArgs( + request, err := launcher.CompileCollectorArgs( ctx, config_obj, acl_manager, repository, &flows_proto.ArtifactCollectorArgs{ Artifacts: []string{artifact.Name},