Skip to content

Commit

Permalink
Changes to have file parsers use binary data format super class (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz authored and ydkhatri committed Feb 19, 2019
1 parent 70d020a commit b228967
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 117 deletions.
78 changes: 78 additions & 0 deletions UnifiedLog/data_format.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# -*- coding: utf-8 -*-
'''Shared functionality for parsing binary data formats.'''

from __future__ import unicode_literals

import datetime
import struct

from UnifiedLog import logger


class BinaryDataFormat(object):
'''Binary data format.'''

def _ReadAPFSTime(self, mac_apfs_time): # Mac APFS timestamp is nano second time epoch beginning 1970/1/1
'''Returns datetime object, or empty string upon error'''
if mac_apfs_time not in ( 0, None, ''):
try:
if type(mac_apfs_time) in (str, unicode):
mac_apfs_time = float(mac_apfs_time)
return datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=mac_apfs_time/1000000000.)
except Exception as ex:
logger.error("ReadAPFSTime() Failed to convert timestamp from value " + str(mac_apfs_time) + " Error was: " + str(ex))
return ''

def _ReadCString(self, data, max_len=1024):
'''Returns a C utf8 string (excluding terminating null)'''
pos = 0
max_len = min(len(data), max_len)
string = ''
try:
null_pos = data.find(b'\x00', 0, max_len)
if null_pos == -1:
logger.warning("Possible corrupted string encountered")
string = data.decode('utf8')
else:
string = data[0:null_pos].decode('utf8')
except:
logger.exception('Error reading C-String')

return string

def _ReadCStringAndEndPos(self, data, max_len=1024):
'''Returns a tuple containing a C utf8 string (excluding terminating null)
and the end position in the data
("utf8-string", pos)
'''
pos = 0
max_len = min(len(data), max_len)
string = ''
null_pos = -1
try:
null_pos = data.find(b'\x00', 0, max_len)
if null_pos == -1:
logger.warning("Possible corrupted string encountered")
string = data.decode('utf8')
else:
string = data[0:null_pos].decode('utf8')
except:
logger.exception('Error reading C-String')
return string, null_pos

def _ReadNtSid(self, data):
'''Reads a windows SID from its raw binary form'''
sid = ''
size = len(data)
if size < 8:
logger.error('Not a windows sid')
rev = struct.unpack("<B", data[0])[0]
num_sub_auth = struct.unpack("<B", data[1])[0]
authority = struct.unpack(">I", data[4:8])[0]

if size < (8 + (num_sub_auth * 4)):
logger.error('buffer too small or truncated - cant fit all sub_auth')
return ''
sub_authorities = struct.unpack('<{}I'.format(num_sub_auth), data[8:8*num_sub_auth])
sid = 'S-{}-{}-'.format(rev, authority) + '-'.join([str(sa) for sa in sub_authorities])
return sid
77 changes: 39 additions & 38 deletions UnifiedLog/dsc_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,27 @@
import struct
import uuid

from UnifiedLog import data_format
from UnifiedLog import logger


class Dsc(object):
class Dsc(data_format.BinaryDataFormat):
'''Shared-Cache strings (dsc) file parser.
Attributes:
range_entries (list[tuple[int, int, int, int]]): range entries.
uuid_entries (list[tuple[int, int, uuid.UUID, str, str]]): UUID entries.
'''

def __init__(self, v_file):
'''Initializes a shared-Cache strings (dsc) file parser.
Args:
v_file (VirtualFile): a virtual file.
'''
super(Dsc, self).__init__()
self.file = v_file
self.version = 0
self.num_range_entries = 0
self.num_uuid_entries = 0
self._file = v_file
self._format_version = None
self.range_entries = [] # [ [uuid_index, v_off, data_offset, data_len], [..], ..] # data_offset is absolute in file
self.uuid_entries = [] # [ [v_off, size, uuid, lib_path, lib_name], [..], ..] # v_off is virt offset

Expand All @@ -44,10 +55,12 @@ def _ParseFileObject(self, file_object):
'(hcsd)').format(signature_base16))
return False

self.version, self.num_range_entries, self.num_uuid_entries = (
struct.unpack("<III", file_header_data[4:16]))
major_version, minor_version, num_range_entries, num_uuid_entries = (
struct.unpack("<HHII", file_header_data[4:16]))

self._format_version = '{0:d}.{1:d}'.format(major_version, minor_version)

while len(self.range_entries) < self.num_range_entries:
while len(self.range_entries) < num_range_entries:
range_entry_data = file_object.read(16)

uuid_index, v_off, data_offset, data_len = struct.unpack(
Expand All @@ -56,7 +69,7 @@ def _ParseFileObject(self, file_object):
self.range_entries.append(range_entry)

uuid_entry_offset = file_object.tell()
while len(self.uuid_entries) < self.num_uuid_entries:
while len(self.uuid_entries) < num_uuid_entries:
file_object.seek(uuid_entry_offset, os.SEEK_SET)
uuid_entry_data = file_object.read(28)

Expand All @@ -73,24 +86,6 @@ def _ParseFileObject(self, file_object):

return True

# TODO: move this into a shared DataFormat class.
def _ReadCString(self, data, max_len=1024):
'''Returns a C utf8 string (excluding terminating null)'''
pos = 0
max_len = min(len(data), max_len)
string = ''
try:
null_pos = data.find(b'\x00', 0, max_len)
if null_pos == -1:
logger.warning("Possible corrupted string encountered")
string = data.decode('utf8')
else:
string = data[0:null_pos].decode('utf8')
except:
logger.exception('Error reading C-String')

return string

def FindVirtualOffsetEntries(self, v_offset):
'''Return tuple (range_entry, uuid_entry) where range_entry[xx].size <= v_offset'''
ret_range_entry = None
Expand Down Expand Up @@ -126,9 +121,9 @@ def ReadFmtStringAndEntriesFromVirtualOffset(self, v_offset):
v_offset))

rel_offset = v_offset - range_entry[1]
f = self.file.file_pointer
f.seek(range_entry[2] + rel_offset)
cstring_data = f.read(range_entry[3] - rel_offset)
file_object = self._file.file_pointer
file_object.seek(range_entry[2] + rel_offset)
cstring_data = file_object.read(range_entry[3] - rel_offset)
cstring = self._ReadCString(cstring_data)
return cstring, range_entry, uuid_entry

Expand All @@ -143,25 +138,31 @@ def GetUuidEntryFromVirtualOffset(self, v_offset):
return None

def DebugPrintDsc(self):
logger.debug("DSC version={} file={}".format(self.version, self.file.filename))
logger.debug("DSC version={0:s} file={1:s}".format(
self._format_version, self._file.filename))

logger.debug("Range entry values")
for a in self.range_entries:
logger.debug("{} {} {} {}".format(a[0], a[1], a[2], a[3]))
for range_entry in self.range_entries:
logger.debug("{0:d} {1:d} {2:d} {3:d}".format(
range_entry[0], range_entry[1], range_entry[2], range_entry[3]))

logger.debug("Uuid entry values")
for b in self.uuid_entries:
logger.debug("{} {} {} {} {}".format(b[0], b[1], b[2], b[3], b[4]))
for uuid_entry in self.uuid_entries:
logger.debug("{0:d} {1:d} {2!s} {3:s} {3:s}".format(
uuid_entry[0], uuid_entry[1], uuid_entry[2], uuid_entry[3],
uuid_entry[4]))

def Parse(self):
'''Parses a dsc file.
self.file.is_valid is set to False if this method encounters issues
self._file.is_valid is set to False if this method encounters issues
parsing the file.
Returns:
bool: True if the dsc file-like object was successfully parsed,
False otherwise.
'''
file_object = self.file.open()
file_object = self._file.open()
if not file_object:
return False

Expand All @@ -172,6 +173,6 @@ def Parse(self):
result = False

if not result:
self.file.is_valid = False
self._file.is_valid = False

return result
74 changes: 4 additions & 70 deletions UnifiedLog/tracev3_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@

import biplist

from UnifiedLog import data_format
from UnifiedLog import logger
from UnifiedLog import resources


class TraceV3(object):
class TraceV3(data_format.BinaryDataFormat):
'''Tracev3 file parser.'''

def __init__(self, v_fs, v_file, ts_list, uuidtext_folder_path, cached_files=None):
'''
Input params:
Expand Down Expand Up @@ -128,75 +131,6 @@ def _FindClosestTimesyncItemInList(self, ts_items, continuousTime):
closest_tsi = item
return closest_tsi

# TODO: move this into a shared DataFormat class.
def _ReadAPFSTime(self, mac_apfs_time): # Mac APFS timestamp is nano second time epoch beginning 1970/1/1
'''Returns datetime object, or empty string upon error'''
if mac_apfs_time not in ( 0, None, ''):
try:
if type(mac_apfs_time) in (str, unicode):
mac_apfs_time = float(mac_apfs_time)
return datetime.datetime(1970, 1, 1) + datetime.timedelta(seconds=mac_apfs_time/1000000000.)
except Exception as ex:
logger.error("ReadAPFSTime() Failed to convert timestamp from value " + str(mac_apfs_time) + " Error was: " + str(ex))
return ''

# TODO: move this into a shared DataFormat class.
def _ReadCString(self, data, max_len=1024):
'''Returns a C utf8 string (excluding terminating null)'''
pos = 0
max_len = min(len(data), max_len)
string = ''
try:
null_pos = data.find(b'\x00', 0, max_len)
if null_pos == -1:
logger.warning("Possible corrupted string encountered")
string = data.decode('utf8')
else:
string = data[0:null_pos].decode('utf8')
except:
logger.exception('Error reading C-String')

return string

# TODO: move this into a shared DataFormat class.
def _ReadCStringAndEndPos(self, data, max_len=1024):
'''Returns a tuple containing a C utf8 string (excluding terminating null)
and the end position in the data
("utf8-string", pos)
'''
pos = 0
max_len = min(len(data), max_len)
string = ''
null_pos = -1
try:
null_pos = data.find(b'\x00', 0, max_len)
if null_pos == -1:
logger.warning("Possible corrupted string encountered")
string = data.decode('utf8')
else:
string = data[0:null_pos].decode('utf8')
except:
logger.exception('Error reading C-String')
return string, null_pos

# TODO: move this into a shared DataFormat class.
def _ReadNtSid(self, data):
'''Reads a windows SID from its raw binary form'''
sid = ''
size = len(data)
if size < 8:
logger.error('Not a windows sid')
rev = struct.unpack("<B", data[0])[0]
num_sub_auth = struct.unpack("<B", data[1])[0]
authority = struct.unpack(">I", data[4:8])[0]

if size < (8 + (num_sub_auth * 4)):
logger.error('buffer too small or truncated - cant fit all sub_auth')
return ''
sub_authorities = struct.unpack('<{}I'.format(num_sub_auth), data[8:8*num_sub_auth])
sid = 'S-{}-{}-'.format(rev, authority) + '-'.join([str(sa) for sa in sub_authorities])
return sid

def _Read_CLClientManagerStateTrackerState(self, data):
''' size=0x8 int, bool '''
locationServicesEnabledStatus, locationRestricted = struct.unpack('<ii', data[0:8])
Expand Down
5 changes: 4 additions & 1 deletion UnifiedLog/uuidtext_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@

import os

from UnifiedLog import data_format
from UnifiedLog import logger


class Uuidtext(object):
class Uuidtext(data_format.BinaryDataFormat):
'''Uuidtext file parser.'''

def __init__(self, v_file, uuid):
super(Uuidtext, self).__init__()
self.file = v_file
Expand Down
45 changes: 45 additions & 0 deletions tests/data_format.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''Tests for the Shared-Cache strings (dsc) file parser.'''

from __future__ import unicode_literals

import unittest

from UnifiedLog import data_format

from tests import test_lib


class BinaryDataFormat(test_lib.BaseTestCase):
'''Tests for the binary data format.'''

# TODO: add tests for _ReadAPFSTime

def testReadCString(self):
'''Tests the _ReadCString function.'''
test_format = data_format.BinaryDataFormat()

string = test_format._ReadCString(b'test\0bogus')
self.assertEqual(string, 'test')

string = test_format._ReadCString(b'\xff\xff\xff')
self.assertEqual(string, '')

def testReadCStringAndEndPos(self):
'''Tests the _ReadCStringAndEndPos function.'''
test_format = data_format.BinaryDataFormat()

string, end_pos = test_format._ReadCStringAndEndPos(b'test\0bogus')
self.assertEqual(string, 'test')
self.assertEqual(end_pos, 4)

string, end_pos = test_format._ReadCStringAndEndPos(b'\xff\xff\xff')
self.assertEqual(string, '')
self.assertEqual(end_pos, -1)

# TODO: add tests for _ReadNtSid


if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit b228967

Please sign in to comment.