Skip to content

Commit

Permalink
Merge pull request sripathikrishnan#109 from hashedin/expiry_in_csv
Browse files Browse the repository at this point in the history
Adding expiry to memory csv

This adds a new column called expiry to the memory csv file. This column contains the key expiry timestamp in ISO format. The timestamp is in UTC. For keys that don't have an expiry, this column is blank.

When analysing memory, it is useful to know if the key is ephemeral or permanent. Up until now, this information was missing. With this new column, it is now possible to aggregate keys on basis of expiry. For example, you can now identify that 50% of memory used is resident/never expires, 30% is set to expire in the next hour and so on.
  • Loading branch information
sripathikrishnan authored Jan 16, 2018
2 parents a01af99 + 07f7866 commit c02b410
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 24 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@ tests/dumps/dump_dealers_vins.rdb
tests/dumps/dump_random_lists.rdb
tests/dumps/dump_sorted_sets.rdb

.idea/*
.idea/*
venv2.6/
venv2.7/
venv3/
42 changes: 24 additions & 18 deletions rdbtools/memprofiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
ZSKIPLIST_P=0.25
REDIS_SHARED_INTEGERS = 10000

MemoryRecord = namedtuple('MemoryRecord', ['database', 'type', 'key', 'bytes', 'encoding','size', 'len_largest_element'])
MemoryRecord = namedtuple('MemoryRecord', ['database', 'type', 'key', 'bytes', 'encoding','size', 'len_largest_element', 'expiry'])

class StatsAggregator(object):
def __init__(self, key_groupings = None):
Expand Down Expand Up @@ -83,8 +83,8 @@ def __init__(self, out, bytes, largest):
self._bytes = bytes
self._largest = largest
self._out = out
headers = "%s,%s,%s,%s,%s,%s,%s\n" % (
"database", "type", "key", "size_in_bytes", "encoding", "num_elements", "len_largest_element")
headers = "%s,%s,%s,%s,%s,%s,%s,%s\n" % (
"database", "type", "key", "size_in_bytes", "encoding", "num_elements", "len_largest_element", "expiry")
self._out.write(codecs.encode(headers, 'latin-1'))

if self._largest is not None:
Expand All @@ -95,9 +95,10 @@ def next_record(self, record) :
return # some records are not keys (e.g. dict)
if self._largest is None:
if self._bytes is None or record.bytes >= int(self._bytes):
rec_str = "%d,%s,%s,%d,%s,%d,%d\n" % (
rec_str = "%d,%s,%s,%d,%s,%d,%d,%s\n" % (
record.database, record.type, record.key, record.bytes, record.encoding, record.size,
record.len_largest_element)
record.len_largest_element,
record.expiry.isoformat() if record.expiry else '')
self._out.write(codecs.encode(rec_str, 'latin-1'))
else:
heappush(self._heap, (record.bytes, record))
Expand Down Expand Up @@ -131,6 +132,7 @@ def __init__(self, stream, architecture, redis_version='3.2', string_escape=None
self._current_encoding = None
self._current_length = 0
self._len_largest_element = 0
self._key_expiry = None
self._db_keys = 0
self._db_expires = 0
self._aux_used_mem = None
Expand All @@ -147,10 +149,10 @@ def __init__(self, stream, architecture, redis_version='3.2', string_escape=None
self._long_size = 4
self._architecture = 32

def emit_record(self, record_type, key, byte_count, encoding, size, largest_el):
def emit_record(self, record_type, key, byte_count, encoding, size, largest_el, expiry):
if key is not None:
key = bytes_to_unicode(key, self._escape, skip_printable=True)
record = MemoryRecord(self._dbnum, record_type, key, byte_count, encoding, size, largest_el)
record = MemoryRecord(self._dbnum, record_type, key, byte_count, encoding, size, largest_el, expiry)
self._stream.next_record(record)

def start_rdb(self):
Expand All @@ -171,8 +173,8 @@ def start_database(self, db_number):
self._db_expires = 0

def end_database(self, db_number):
self.emit_record("dict", None, self.hashtable_overhead(self._db_keys), None, None, None)
self.emit_record("dict", None, self.hashtable_overhead(self._db_expires), None, None, None)
self.emit_record("dict", None, self.hashtable_overhead(self._db_keys), None, None, None, None)
self.emit_record("dict", None, self.hashtable_overhead(self._db_expires), None, None, None, None)
if hasattr(self._stream, 'end_database'):
self._stream.end_database(db_number)

Expand All @@ -184,14 +186,14 @@ def end_rdb(self):
def set(self, key, value, expiry, info):
self._current_encoding = info['encoding']
size = self.top_level_object_overhead(key, expiry) + self.sizeof_string(value)

length = self.element_length(value)
self.emit_record("string", key, size, self._current_encoding, length, length)
self.emit_record("string", key, size, self._current_encoding, length, length, expiry)
self.end_key()

def start_hash(self, key, length, expiry, info):
self._current_encoding = info['encoding']
self._current_length = length
self._current_length = length
self._key_expiry = expiry
size = self.top_level_object_overhead(key, expiry)

if 'sizeof_value' in info:
Expand All @@ -217,7 +219,7 @@ def hset(self, key, field, value):

def end_hash(self, key):
self.emit_record("hash", key, self._current_size, self._current_encoding, self._current_length,
self._len_largest_element)
self._len_largest_element, self._key_expiry)
self.end_key()

def start_set(self, key, cardinality, expiry, info):
Expand All @@ -236,7 +238,7 @@ def sadd(self, key, member):

def end_set(self, key):
self.emit_record("set", key, self._current_size, self._current_encoding, self._current_length,
self._len_largest_element)
self._len_largest_element, self._key_expiry)
self.end_key()

def start_list(self, key, expiry, info):
Expand All @@ -245,6 +247,7 @@ def start_list(self, key, expiry, info):
self._list_items_zipped_size = 0
self._current_encoding = info['encoding']
size = self.top_level_object_overhead(key, expiry)
self._key_expiry = expiry

# ignore the encoding in the rdb, and predict the encoding that will be used at the target redis version
if self._redis_version >= StrictVersion('3.2'):
Expand Down Expand Up @@ -297,10 +300,11 @@ def end_list(self, key, info):
self._current_size += self.robj_overhead() * self._current_length
self._current_size += self._list_items_size
self.emit_record("list", key, self._current_size, self._current_encoding, self._current_length,
self._len_largest_element)
self._len_largest_element, self._key_expiry)
self.end_key()

def start_module(self, key, module_id, expiry):
self._key_expiry = expiry
self._current_encoding = module_id
self._current_size = self.top_level_object_overhead(key, expiry)
self._current_size += 8 + 1 # add the module id length and EOF byte
Expand All @@ -309,14 +313,15 @@ def start_module(self, key, module_id, expiry):

def end_module(self, key, buffer_size, buffer=None):
size = self._current_size + buffer_size
self.emit_record("module", key, size, self._current_encoding, 1, size)
self.emit_record("module", key, size, self._current_encoding, 1, size, self._key_expiry)
self.end_key()

def start_sorted_set(self, key, length, expiry, info):
self._current_length = length
self._current_encoding = info['encoding']
size = self.top_level_object_overhead(key, expiry)

self._key_expiry = expiry

if 'sizeof_value' in info:
size += info['sizeof_value']
elif 'encoding' in info and info['encoding'] == 'skiplist':
Expand All @@ -338,14 +343,15 @@ def zadd(self, key, score, member):

def end_sorted_set(self, key):
self.emit_record("sortedset", key, self._current_size, self._current_encoding, self._current_length,
self._len_largest_element)
self._len_largest_element, self._key_expiry)
self.end_key()

def end_key(self):
self._db_keys += 1
self._current_encoding = None
self._current_size = 0
self._len_largest_element = 0
self._key_expiry = None

def sizeof_string(self, string):
# https://github.com/antirez/redis/blob/unstable/src/sds.h
Expand Down
2 changes: 0 additions & 2 deletions rdbtools/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -1026,5 +1026,3 @@ def end_database(self, db_number):

def end_rdb(self):
print(']')


54 changes: 51 additions & 3 deletions tests/memprofiler_tests.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
import sys
import os
from io import BytesIO

import unittest

from rdbtools import RdbParser
from rdbtools import MemoryCallback
import os

from rdbtools.memprofiler import MemoryRecord

from rdbtools.memprofiler import MemoryRecord, PrintAllKeys

CSV_WITH_EXPIRY = """database,type,key,size_in_bytes,encoding,num_elements,len_largest_element,expiry
0,string,expires_ms_precision,128,string,27,27,2022-12-25T10:11:12.573000
"""

CSV_WITHOUT_EXPIRY = """database,type,key,size_in_bytes,encoding,num_elements,len_largest_element,expiry
0,list,ziplist_compresses_easily,301,quicklist,6,36,
"""

CSV_WITH_MODULE = """database,type,key,size_in_bytes,encoding,num_elements,len_largest_element,expiry
0,string,simplekey,72,string,7,7,
0,module,foo,101,ReJSON-RL,1,101,
"""

class Stats(object):
def __init__(self):
Expand All @@ -22,11 +38,43 @@ def get_stats(file_name):
parser.parse(os.path.join(os.path.dirname(__file__), 'dumps', file_name))
return stats.records

def get_csv(dump_file_name):
buff = BytesIO()
callback = MemoryCallback(PrintAllKeys(buff, None, None), 64)
parser = RdbParser(callback)
parser.parse(os.path.join(os.path.dirname(__file__),
'dumps', dump_file_name))
csv = buff.getvalue().decode()
return csv

class MemoryCallbackTestCase(unittest.TestCase):
def setUp(self):
pass

def test_csv_with_expiry(self):
csv = get_csv('keys_with_expiry.rdb')
self.assertEquals(csv, CSV_WITH_EXPIRY)

def test_csv_without_expiry(self):
csv = get_csv('ziplist_that_compresses_easily.rdb')
self.assertEquals(csv, CSV_WITHOUT_EXPIRY)

def test_csv_with_module(self):
csv = get_csv('redis_40_with_module.rdb')
self.assertEquals(csv, CSV_WITH_MODULE)

def test_expiry(self):
stats = get_stats('keys_with_expiry.rdb')

expiry = stats['expires_ms_precision'].expiry
self.assertEquals(expiry.year, 2022)
self.assertEquals(expiry.month, 12)
self.assertEquals(expiry.day, 25)
self.assertEquals(expiry.hour, 10)
self.assertEquals(expiry.minute, 11)
self.assertEquals(expiry.second, 12)
self.assertEquals(expiry.microsecond, 573000)

def test_len_largest_element(self):
stats = get_stats('ziplist_that_compresses_easily.rdb')

Expand All @@ -39,5 +87,5 @@ def test_rdb_with_module(self):
self.assertTrue('foo' in stats)
expected_record = MemoryRecord(database=0, type='module', key='foo',
bytes=101, encoding='ReJSON-RL', size=1,
len_largest_element=101)
len_largest_element=101, expiry=None)
self.assertEquals(stats['foo'], expected_record)

0 comments on commit c02b410

Please sign in to comment.