Skip to content

Commit

Permalink
Large refactor to use a path manager (Velocidex#331)
Browse files Browse the repository at this point in the history
In the process of adding support to the mysql data store it is becoming clear that in order to utilize the best intrinsic features of each data store technology we can not rely on a pure filesystem like abstraction. It is useful to hint to different implementations the likely access pattern so better optimizations can be undertaken.

Previously the layout of the different objects stored in the filestore was hard coded throughout the code in various path construction statements. This change introduces a path manager - an object responsible for mapping a particular entity into the filestore. The path manager is the perfect way we can hint to the data store how to treat different objects.

For example, the FlowPathManager is responsible for store various flow related items. Therefore we can use it to build paths like flow_path_manager.Log() for the result set storing flow logs, etc. A path manager is responsible for accessing a result set in the filestore (similar to a path but smarter).  In the future all file store operations will be made using path managers.

The filestore is now also responsible for storing and reading result set (i.e. rows) as well as just bulk files. This makes it easier for other code to use because we dont need to csv serialization/deserialization by hand now.

Filestore's interface is:

* PushRows() -> pushes rows into the result set specified by the path manager. We dont actually care exactly where they are stored.
* GetTimeRange() -> Gets those rows that were inserted in the time range specified (or all times)

Therefore callers do not need to worry about managing the data themselves - and the specific datastore can now use e.g. timestamp indexes to read data more efficiently.

Queuing is also moved to the file store:
* Watch() -> registers interest in a queue name (usually an artifact name) and watches for new events. Filestores may implement any message passing technique to make it work.

Directory based file store is designed to work in a single process. Therefore for directory based file store we use internal thread based notification mechanism (extremely low latency). 

This is a somewhat breaking change is that some of the layout of objects in the file store has changed. Velociraptor now uses line delimited json internally instead of CSV although we try to read csv files as a fallback for backward compatibility. If you upgrade you might need to reset the file store.
  • Loading branch information
scudette authored Apr 28, 2020
1 parent fd01346 commit e4f5971
Show file tree
Hide file tree
Showing 85 changed files with 2,700 additions and 1,680 deletions.
21 changes: 15 additions & 6 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"www.velocidex.com/golang/velociraptor/grpc_client"
"www.velocidex.com/golang/velociraptor/logging"
"www.velocidex.com/golang/velociraptor/paths"
"www.velocidex.com/golang/velociraptor/result_sets"
"www.velocidex.com/golang/velociraptor/server"
"www.velocidex.com/golang/velociraptor/services"
users "www.velocidex.com/golang/velociraptor/users"
Expand Down Expand Up @@ -180,7 +181,7 @@ func (self *ApiServer) CollectArtifact(

result.FlowId = flow_id

err = services.NotifyClient(in.ClientId)
err = services.NotifyClient(self.config, in.ClientId)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -395,10 +396,10 @@ func (self *ApiServer) NotifyClients(

if in.NotifyAll {
self.server_obj.Info("sending notification to everyone")
services.NotifyAll()
services.NotifyAll(self.config)
} else if in.ClientId != "" {
self.server_obj.Info("sending notification to %s", in.ClientId)
services.NotifyClient(in.ClientId)
services.NotifyClient(self.config, in.ClientId)
} else {
return nil, status.Error(codes.InvalidArgument,
"client id should be specified")
Expand Down Expand Up @@ -638,7 +639,7 @@ func (self *ApiServer) GetTable(
"User is not allowed to view results.")
}

result, err := getTable(self.config, in)
result, err := getTable(ctx, self.config, in)
if err != nil {
return &api_proto.GetTableResponse{}, nil
}
Expand Down Expand Up @@ -793,8 +794,16 @@ func (self *ApiServer) WriteEvent(
"Permission denied: PUBLISH "+peer_name+" to "+in.Query.Name)
}

return &empty.Empty{}, services.GetJournal().Push(
in.Query.Name, peer_name, 0, []byte(in.Response))
rows, err := utils.ParseJsonToDicts([]byte(in.Response))
if err != nil {
return nil, err
}

path_manager := result_sets.NewArtifactPathManager(self.config,
peer_name, "", in.Query.Name)

return &empty.Empty{}, services.GetJournal().PushRows(
path_manager, rows)
}

return nil, status.Error(codes.InvalidArgument, "no peer certs?")
Expand Down
59 changes: 31 additions & 28 deletions api/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package api

import (
"errors"
"os"
"path"
"regexp"
"strings"
Expand All @@ -35,6 +36,7 @@ import (
file_store "www.velocidex.com/golang/velociraptor/file_store"
flows_proto "www.velocidex.com/golang/velociraptor/flows/proto"
"www.velocidex.com/golang/velociraptor/paths"
"www.velocidex.com/golang/velociraptor/result_sets"
users "www.velocidex.com/golang/velociraptor/users"
)

Expand Down Expand Up @@ -246,39 +248,40 @@ func (self *ApiServer) ListAvailableEventResults(
"User is not allowed to view results.")
}

result := &api_proto.ListAvailableEventResultsResponse{}

root_path := "server_artifacts"

if in.ClientId != "" {
root_path = "clients/" + in.ClientId + "/monitoring"
}

path_manager := result_sets.NewMonitoringArtifactPathManager(in.ClientId)
file_store_factory := file_store.GetFileStore(self.config)
dir_list, err := file_store_factory.ListDirectory(root_path)
if err != nil {
return nil, err
}

for _, dirname := range dir_list {
available_event := &api_proto.AvailableEvent{
Artifact: dirname.Name(),
}
result.Logs = append(result.Logs, available_event)

timestamps, err := file_store_factory.ListDirectory(
path.Join(root_path, dirname.Name()))
if err == nil {
for _, filename := range timestamps {
timestamp := paths.DayNameToTimestamp(
filename.Name())
if timestamp > 0 {
available_event.Timestamps = append(
available_event.Timestamps,
seen := make(map[string]*api_proto.AvailableEvent)
err = file_store_factory.Walk(path_manager.Path(),
func(full_path string, info os.FileInfo, err error) error {
if !info.IsDir() && info.Size() > 0 {
relative_path := strings.TrimPrefix(full_path, path_manager.Path())
artifact_name := strings.TrimLeft(path.Dir(relative_path), "/")
date_name := path.Base(relative_path)
timestamp := paths.DayNameToTimestamp(date_name)

if timestamp != 0 {
event, pres := seen[artifact_name]
if !pres {
event = &api_proto.AvailableEvent{
Artifact: artifact_name,
}
}

event.Timestamps = append(event.Timestamps,
int32(timestamp))
seen[artifact_name] = event
}
}
}
return nil
})
if err != nil {
return nil, err
}

result := &api_proto.ListAvailableEventResultsResponse{}
for _, item := range seen {
result.Logs = append(result.Logs, item)
}

return result, nil
Expand Down
9 changes: 5 additions & 4 deletions api/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func GetApiClient(
return nil, errors.New("client_id must start with C")
}

client_urn := paths.GetClientMetadataPath(client_id)
client_path_manager := paths.NewClientPathManager(client_id)
db, err := datastore.GetDB(config_obj)
if err != nil {
return nil, err
Expand All @@ -68,7 +68,8 @@ func GetApiClient(
}

client_info := &actions_proto.ClientInfo{}
err = db.GetSubject(config_obj, client_urn, client_info)
err = db.GetSubject(config_obj,
client_path_manager.Path(), client_info)
if err != nil {
return nil, err
}
Expand All @@ -87,15 +88,15 @@ func GetApiClient(
}

public_key_info := &crypto_proto.PublicKey{}
err = db.GetSubject(config_obj, paths.GetClientKeyPath(client_id),
err = db.GetSubject(config_obj, client_path_manager.Key().Path(),
public_key_info)
if err != nil {
return nil, err
}

result.FirstSeenAt = public_key_info.EnrollTime

err = db.GetSubject(config_obj, paths.GetClientPingPath(client_id),
err = db.GetSubject(config_obj, client_path_manager.Ping().Path(),
client_info)
if err != nil {
return nil, err
Expand Down
71 changes: 46 additions & 25 deletions api/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,74 @@
package api

import (
"io"

context "golang.org/x/net/context"
file_store "www.velocidex.com/golang/velociraptor/file_store"
"www.velocidex.com/golang/velociraptor/file_store/api"
"www.velocidex.com/golang/velociraptor/file_store/csv"
"www.velocidex.com/golang/velociraptor/paths"
"www.velocidex.com/golang/velociraptor/result_sets"

api_proto "www.velocidex.com/golang/velociraptor/api/proto"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
)

func getTable(config_obj *config_proto.Config, in *api_proto.GetTableRequest) (
func getTable(
ctx context.Context,
config_obj *config_proto.Config,
in *api_proto.GetTableRequest) (
*api_proto.GetTableResponse, error) {

rows := uint64(0)
if in.Rows == 0 {
in.Rows = 500
}

fd, err := file_store.GetFileStore(config_obj).ReadFile(in.Path)
if err != nil {
return nil, err
}
var path_manager api.PathManager

if in.FlowId != "" && in.Artifact != "" {
path_manager = result_sets.NewArtifactPathManager(
config_obj, in.ClientId, in.FlowId, in.Artifact)

csv_reader := csv.NewReader(fd)
headers, err := csv_reader.Read()
if err != nil {
if err == io.EOF {
return &api_proto.GetTableResponse{}, nil
} else if in.FlowId != "" && in.Type != "" {
flow_path_manager := paths.NewFlowPathManager(
in.ClientId, in.FlowId)
switch in.Type {
case "log":
path_manager = flow_path_manager.Log()
case "uploads":
path_manager = flow_path_manager.UploadMetadata()
}
return nil, err
} else if in.HuntId != "" && in.Type == "clients" {
path_manager = paths.NewHuntPathManager(in.HuntId).Clients()
}

result := &api_proto.GetTableResponse{
Columns: headers,
}
result := &api_proto.GetTableResponse{}

for {
row_data, err := csv_reader.Read()
if path_manager != nil {
row_chan, err := file_store.GetTimeRange(ctx, config_obj,
path_manager, 0, 0)
if err != nil {
break
return nil, err
}
result.Rows = append(result.Rows, &api_proto.Row{
Cell: row_data,
})

rows += 1
if rows > in.Rows {
break
for row := range row_chan {
if result.Columns == nil {
result.Columns = row.Keys()
}

row_data := make([]string, 0, len(result.Columns))
for _, key := range row.Keys() {
value, _ := row.Get(key)
row_data = append(row_data, csv.AnyToString(value))
}
result.Rows = append(result.Rows, &api_proto.Row{
Cell: row_data,
})

rows += 1
if rows > in.Rows {
break
}
}
}

Expand Down
39 changes: 22 additions & 17 deletions api/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
// Implement downloads. For now we do not use gRPC for this but
// implement it directly in the API.

// Implement downloads. We do not use gRPC for this but implement it
// directly in the API.
package api

import (
Expand All @@ -42,6 +43,7 @@ import (
"www.velocidex.com/golang/velociraptor/flows"
"www.velocidex.com/golang/velociraptor/logging"
"www.velocidex.com/golang/velociraptor/paths"
"www.velocidex.com/golang/velociraptor/result_sets"
"www.velocidex.com/golang/velociraptor/utils"
)

Expand Down Expand Up @@ -93,17 +95,20 @@ func downloadFlowToZip(
return err
}

flow_path_manager := paths.NewFlowPathManager(client_id, flow_id)

// Copy the flow's logs.
copier(path.Join(flow_details.Context.Urn, "logs"))

// Copy CSV files
for _, artifacts_with_results := range flow_details.Context.ArtifactsWithResults {
artifact_name, source := paths.SplitFullSourceName(artifacts_with_results)
csv_path := paths.GetCSVPath(
flow_details.Context.Request.ClientId, "*",
flow_details.Context.SessionId,
artifact_name, source, paths.MODE_CLIENT)
copier(csv_path)
copier(flow_path_manager.Log().Path())

// Copy result sets
for _, artifact_with_results := range flow_details.Context.ArtifactsWithResults {
path_manager := result_sets.NewArtifactPathManager(config_obj,
flow_details.Context.Request.ClientId,
flow_details.Context.SessionId, artifact_with_results)
rs_path, err := path_manager.GetPathForWriting()
if err == nil {
copier(rs_path)
}
}

// Get all file uploads
Expand All @@ -117,14 +122,13 @@ func downloadFlowToZip(
// are. Users can do their own post processing.

// File uploads are stored in their own CSV file.
file_path := paths.GetUploadsMetadata(client_id, flow_id)
fd, err := file_store_factory.ReadFile(file_path)
row_chan, err := file_store.GetTimeRange(
ctx, config_obj, flow_path_manager.UploadMetadata(), 0, 0)
if err != nil {
return err
}
defer fd.Close()

for row := range csv.GetCSVReader(fd) {
for row := range row_chan {
vfs_path_any, pres := row.Get("vfs_path")
if pres {
err = copier(vfs_path_any.(string))
Expand All @@ -140,7 +144,8 @@ func createDownloadFile(config_obj *config_proto.Config,
return errors.New("Client Id and Flow Id should be specified.")
}

download_file := paths.GetDownloadsFile(client_id, flow_id)
flow_path_manager := paths.NewFlowPathManager(client_id, flow_id)
download_file := flow_path_manager.GetDownloadsFile().Path()

logger := logging.GetLogger(config_obj, &logging.GUIComponent)
logger.WithFields(logrus.Fields{
Expand Down
Loading

0 comments on commit e4f5971

Please sign in to comment.