Skip to content

Commit b26a0ff

Browse files
authored
Handle ts collision pushing to redistimeseries (#295)
* Handle timestamp collision while pushing data to redistimeseries - If a datapoint already exists in that millisecond, try to add it in the next * Stop container before removing it to prevent Docker exeption * Add test for duplicated timestamps handling
1 parent 2908019 commit b26a0ff

File tree

3 files changed

+163
-27
lines changed

3 files changed

+163
-27
lines changed

redis_benchmarks_specification/__common__/timeseries.py

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -495,39 +495,47 @@ def push_data_to_redistimeseries(rts, time_series_dict: dict, expire_msecs=0):
495495
)
496496
for timeseries_name, time_series in time_series_dict.items():
497497
exporter_create_ts(rts, time_series, timeseries_name)
498-
for timestamp, value in time_series["data"].items():
499-
try:
500-
if timestamp is None:
501-
logging.warning("The provided timestamp is null. Using auto-ts")
502-
rts.ts().add(
503-
timeseries_name,
504-
value,
505-
duplicate_policy="last",
506-
)
507-
else:
498+
for orig_timestamp, value in time_series["data"].items():
499+
if orig_timestamp is None:
500+
logging.warning("The provided timestamp is null. Using auto-ts")
501+
timestamp = "*"
502+
else:
503+
timestamp = orig_timestamp
504+
505+
try_to_insert = True
506+
retry_count = 0
507+
while try_to_insert and retry_count < 100:
508+
# (try to) insert the datapoint in given timestamp
509+
try_to_insert = False
510+
511+
try:
508512
rts.ts().add(
509513
timeseries_name,
510514
timestamp,
511515
value,
512-
duplicate_policy="last",
516+
duplicate_policy="block",
513517
)
514-
datapoint_inserts += 1
515-
except redis.exceptions.DataError:
516-
logging.warning(
517-
"Error while inserting datapoint ({} : {}) in timeseries named {}. ".format(
518-
timestamp, value, timeseries_name
518+
datapoint_inserts += 1
519+
except redis.exceptions.DataError:
520+
logging.warning(
521+
"Error while inserting datapoint ({} : {}) in timeseries named {}. ".format(
522+
timestamp, value, timeseries_name
523+
)
519524
)
520-
)
521-
datapoint_errors += 1
522-
pass
523-
except redis.exceptions.ResponseError:
524-
logging.warning(
525-
"Error while inserting datapoint ({} : {}) in timeseries named {}. ".format(
526-
timestamp, value, timeseries_name
527-
)
528-
)
529-
datapoint_errors += 1
530-
pass
525+
datapoint_errors += 1
526+
except redis.exceptions.ResponseError as e:
527+
if "DUPLICATE_POLICY" in e.__str__():
528+
# duplicate timestamp: try to insert again, but in the next milisecond
529+
timestamp += 1
530+
try_to_insert = True
531+
retry_count += 1
532+
else:
533+
logging.warning(
534+
"Error while inserting datapoint ({} : {}) in timeseries named {}. ".format(
535+
timestamp, value, timeseries_name
536+
)
537+
)
538+
datapoint_errors += 1
531539
if expire_msecs > 0:
532540
rts.pexpire(timeseries_name, expire_msecs)
533541
progress.update()

redis_benchmarks_specification/__self_contained_coordinator__/self_contained_coordinator.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1296,6 +1296,7 @@ def process_self_contained_coordinator_stream(
12961296
stdout=True, stderr=True
12971297
)
12981298
)
1299+
redis_container.stop()
12991300
redis_container.remove()
13001301
except docker.errors.NotFound:
13011302
logging.info(

utils/tests/test_self_contained_coordinator_memtier.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import yaml
66
from pathlib import Path
77
import logging
8+
import datetime
89

910
from redisbench_admin.utils.benchmark_config import get_defaults
1011
from redisbench_admin.utils.remote import get_overall_dashboard_keynames
@@ -1283,3 +1284,129 @@ def test_self_contained_coordinator_blocking_read_valkey():
12831284

12841285
except redis.exceptions.ConnectionError:
12851286
pass
1287+
1288+
1289+
def test_self_contained_coordinator_duplicated_ts():
1290+
try:
1291+
if run_coordinator_tests_dockerhub():
1292+
db_port = int(os.getenv("DATASINK_PORT", "6379"))
1293+
conn = redis.StrictRedis(port=db_port)
1294+
conn.ping()
1295+
conn.flushall()
1296+
1297+
id = "dockerhub"
1298+
redis_version = "7.4.0"
1299+
run_image = f"redis:{redis_version}"
1300+
build_arch = "amd64"
1301+
testDetails = {}
1302+
build_os = "test_build_os"
1303+
1304+
# generate 2 stream requests with the same timestamp
1305+
timestamp = int(datetime.datetime.now().timestamp())
1306+
for _ in range(0, 2):
1307+
build_stream_fields, result = generate_benchmark_stream_request(
1308+
id,
1309+
conn,
1310+
run_image,
1311+
build_arch,
1312+
testDetails,
1313+
build_os,
1314+
git_timestamp_ms=timestamp,
1315+
use_git_timestamp=True,
1316+
)
1317+
build_stream_fields["mnt_point"] = ""
1318+
if result is True:
1319+
benchmark_stream_id = conn.xadd(
1320+
STREAM_KEYNAME_NEW_BUILD_EVENTS, build_stream_fields
1321+
)
1322+
logging.info(
1323+
"sucessfully requested a new run {}. Stream id: {}".format(
1324+
build_stream_fields, benchmark_stream_id
1325+
)
1326+
)
1327+
1328+
assert conn.exists(STREAM_KEYNAME_NEW_BUILD_EVENTS)
1329+
assert conn.xlen(STREAM_KEYNAME_NEW_BUILD_EVENTS) == 2
1330+
1331+
running_platform = "fco-ThinkPad-T490"
1332+
1333+
# process the 2 stream requests
1334+
for _ in range(0, 2):
1335+
1336+
build_runners_consumer_group_create(conn, running_platform, "0")
1337+
datasink_conn = redis.StrictRedis(port=db_port)
1338+
docker_client = docker.from_env()
1339+
home = str(Path.home())
1340+
stream_id = ">"
1341+
topologies_map = get_topologies(
1342+
"./redis_benchmarks_specification/setups/topologies/topologies.yml"
1343+
)
1344+
# we use a benchmark spec with smaller CPU limit for client given github machines only contain 2 cores
1345+
# and we need 1 core for DB and another for CLIENT
1346+
testsuite_spec_files = [
1347+
"./utils/tests/test_data/test-suites/test-memtier-dockerhub.yml"
1348+
]
1349+
defaults_filename = "./utils/tests/test_data/test-suites/defaults.yml"
1350+
(
1351+
_,
1352+
_,
1353+
default_metrics,
1354+
_,
1355+
_,
1356+
_,
1357+
) = get_defaults(defaults_filename)
1358+
1359+
(
1360+
result,
1361+
stream_id,
1362+
number_processed_streams,
1363+
num_process_test_suites,
1364+
) = self_contained_coordinator_blocking_read(
1365+
conn,
1366+
True,
1367+
docker_client,
1368+
home,
1369+
stream_id,
1370+
datasink_conn,
1371+
testsuite_spec_files,
1372+
topologies_map,
1373+
running_platform,
1374+
False,
1375+
[],
1376+
"",
1377+
0,
1378+
6399,
1379+
1,
1380+
False,
1381+
5,
1382+
default_metrics,
1383+
"amd64",
1384+
None,
1385+
0,
1386+
10000,
1387+
"unstable",
1388+
"",
1389+
True,
1390+
False,
1391+
)
1392+
assert result == True
1393+
assert number_processed_streams == 1
1394+
assert num_process_test_suites == 1
1395+
1396+
stat_key = f"ci.benchmarks.redislabs/by.version/ci/redis/redis/memtier_benchmark-1Mkeys-load-string-with-10B-values/dockerhub/{running_platform}/oss-standalone/{redis_version}/ALL_STATS.Totals.Ops/sec"
1397+
assert datasink_conn.exists(stat_key)
1398+
rts = datasink_conn.ts()
1399+
1400+
rts_info = rts.info(stat_key)
1401+
1402+
# we have two datapoints
1403+
assert rts_info.total_samples == 2
1404+
1405+
# first was inserted on the original timestamp
1406+
assert rts_info.first_timestamp == timestamp
1407+
1408+
# the second has clashed, so it was resolved by adding 1ms to the timestamp
1409+
assert rts_info.last_timestamp == timestamp + 1
1410+
1411+
except redis.exceptions.ConnectionError:
1412+
pass

0 commit comments

Comments
 (0)