Skip to content

Commit

Permalink
Add support for downloading osquery results with GrrFlowCollector (lo…
Browse files Browse the repository at this point in the history
…g2timeline#907)

* Merge osquery results

* Add check for OSqueryCollectedFile

* Update grr_hunt.py

* Update grr_hosts.py

* Fix

* Fixes

* whitespace...

* Add unit test

* Update test
  • Loading branch information
sydp authored Aug 22, 2024
1 parent 63a6e8a commit 7853b9e
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 14 deletions.
91 changes: 77 additions & 14 deletions dftimewolf/lib/collectors/grr_hosts.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,59 @@ def _DownloadTimeline(
timeline.WriteToFile(final_bodyfile_path)
return final_bodyfile_path

def _DownloadOsquery(
self,
client: Client,
flow_id: str,
flow_output_dir: str
) -> Optional[str]:
"""Download osquery results as a CSV file.
Args:
client: the GRR Client.
flow_id: the Osquery flow ID to download results from.
flow_output_dir: the directory to store the downloaded timeline.
Returns:
str: the path to the CSV file or None if there are no results.
"""
grr_flow = client.Flow(flow_id)
list_results = list(grr_flow.ListResults())

if not list_results:
self.logger.warning(f"No results returned for flow ID {flow_id}")
return None

results = []
for result in list_results:
payload = result.payload
if isinstance(payload, osquery_flows.OsqueryCollectedFile):
# We don't do anything with any collected files for now as we are just
# interested in the osquery results.
self.logger.info(
f'Skipping collected file - {payload.stat_entry.path_spec}.')
continue
if not isinstance(payload, osquery_flows.OsqueryResult):
self.logger.error(f'Incorrect results format from flow ID {flow_id}')
continue

headers = [column.name for column in payload.table.header.columns]
data = []
for row in payload.table.rows:
data.append(row.values)
data_frame = pd.DataFrame.from_records(data, columns=headers)
results.append(data_frame)

fqdn = client.data.os_info.fqdn.lower()
output_file_path = os.path.join(
flow_output_dir,
'.'.join(str(val) for val in (fqdn, flow_id, 'csv')))
with open(output_file_path, mode='w') as fd:
merged_data_frame = pd.concat(results)
merged_data_frame.to_csv(fd)

return output_file_path

def _DownloadFiles(self, client: Client, flow_id: str) -> Optional[str]:
"""Download files/results from the specified flow.
Expand All @@ -445,11 +498,16 @@ def _DownloadFiles(self, client: Client, flow_id: str) -> Optional[str]:
os.makedirs(flow_output_dir, exist_ok=True)

flow_name = flow_handle.data.name
if flow_name == "TimelineFlow":
self.logger.debug("Downloading timeline from GRR")
if flow_name == 'TimelineFlow':
self.logger.debug('Downloading timeline from GRR')
self._DownloadTimeline(client, flow_handle, flow_output_dir)
return flow_output_dir

if flow_name == 'OsqueryFlow':
self.logger.debug('Downloading osquery results from GRR')
self._DownloadOsquery(client, flow_id, flow_output_dir)
return flow_output_dir

payloads = []
for r in flow_handle.ListResults():
payloads.append(r.payload)
Expand Down Expand Up @@ -1130,6 +1188,11 @@ def _DownloadResults(self,
results = []
for result in list_results:
payload = result.payload
if isinstance(payload, osquery_flows.OsqueryCollectedFile):
# We don't do anything with any collected files for now as we are just
# interested in the osquery results.
self.logger.info(f'File collected - {payload.stat_entry.path_spec}.')
continue
if not isinstance(payload, osquery_flows.OsqueryResult):
self.logger.error(f'Incorrect results format from flow ID {grr_flow}')
continue
Expand Down Expand Up @@ -1192,20 +1255,20 @@ def _ProcessQuery(
self.state.StoreContainer(results_container)
return

for data_frame in results:
self.logger.info(
f'{str(flow_id)} ({hostname}): {len(data_frame)} rows collected')
merged_results = pd.concat(results)
self.logger.info(
f'{str(flow_id)} ({hostname}): {len(merged_results)} rows collected')

dataframe_container = containers.OsqueryResult(
name=name,
description=description,
query=query,
hostname=hostname,
data_frame=data_frame,
flow_identifier=flow_identifier,
client_identifier=client_identifier)
dataframe_container = containers.OsqueryResult(
name=name,
description=description,
query=query,
hostname=hostname,
data_frame=merged_results,
flow_identifier=flow_identifier,
client_identifier=client_identifier)

self.state.StoreContainer(dataframe_container)
self.state.StoreContainer(dataframe_container)

def Process(self, container: containers.Host
) -> None: # pytype: disable=signature-mismatch
Expand Down
6 changes: 6 additions & 0 deletions dftimewolf/lib/collectors/grr_hunt.py
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,12 @@ def _GetAndWriteResults(
grr_client = list(self.grr_api.SearchClients(result.client.client_id))[0]
client_hostname = grr_client.data.os_info.fqdn.lower()

if isinstance(payload, osquery_flows.OsqueryCollectedFile):
# We don't do anything with any collected files for now as we are just
# interested in the osquery results.
self.logger.info(f'File collected - {payload.stat_entry.path_spec}.')
continue

if not isinstance(payload, osquery_flows.OsqueryResult):
self.ModuleError(
f'Incorrect results format from {result.client.client_id} '
Expand Down
12 changes: 12 additions & 0 deletions tests/lib/collectors/grr_hosts.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,18 @@ def testDownloadTimelineBodyForFlow(
"/tmp/random/tomchop/F:12345", exist_ok=True
)

@mock.patch("builtins.open")
@mock.patch('grr_api_client.flow.FlowBase.ListResults')
def testDownloadOsqueryForFlow(self, mock_ListResults, unused_mock_open):
"""Tests if Osquery results are downloaded in the correct directories."""
mock_flowresult = mock.MagicMock()
mock_flowresult.payload = osquery_pb2.OsqueryResult()
mock_ListResults.return_value = [
mock_flowresult
]
return_value = self.grr_flow_module._DownloadOsquery(
mock_grr_hosts.MOCK_CLIENT, 'F:12345', '/tmp/random')
self.assertEqual(return_value, '/tmp/random/tomchop.F:12345.csv')

class GRRArtifactCollectorTest(unittest.TestCase):
"""Tests for the GRR artifact collector."""
Expand Down

0 comments on commit 7853b9e

Please sign in to comment.