Skip to content

Handle ts collision pushing to redistimeseries #295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 35 additions & 27 deletions redis_benchmarks_specification/__common__/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,39 +495,47 @@ def push_data_to_redistimeseries(rts, time_series_dict: dict, expire_msecs=0):
)
for timeseries_name, time_series in time_series_dict.items():
exporter_create_ts(rts, time_series, timeseries_name)
for timestamp, value in time_series["data"].items():
try:
if timestamp is None:
logging.warning("The provided timestamp is null. Using auto-ts")
rts.ts().add(
timeseries_name,
value,
duplicate_policy="last",
)
else:
for orig_timestamp, value in time_series["data"].items():
if orig_timestamp is None:
logging.warning("The provided timestamp is null. Using auto-ts")
timestamp = "*"
else:
timestamp = orig_timestamp

try_to_insert = True
retry_count = 0
while try_to_insert and retry_count < 100:
# (try to) insert the datapoint in given timestamp
try_to_insert = False

try:
rts.ts().add(
timeseries_name,
timestamp,
value,
duplicate_policy="last",
duplicate_policy="block",
)
datapoint_inserts += 1
except redis.exceptions.DataError:
logging.warning(
"Error while inserting datapoint ({} : {}) in timeseries named {}. ".format(
timestamp, value, timeseries_name
datapoint_inserts += 1
except redis.exceptions.DataError:
logging.warning(
"Error while inserting datapoint ({} : {}) in timeseries named {}. ".format(
timestamp, value, timeseries_name
)
)
)
datapoint_errors += 1
pass
except redis.exceptions.ResponseError:
logging.warning(
"Error while inserting datapoint ({} : {}) in timeseries named {}. ".format(
timestamp, value, timeseries_name
)
)
datapoint_errors += 1
pass
datapoint_errors += 1
except redis.exceptions.ResponseError as e:
if "DUPLICATE_POLICY" in e.__str__():
# duplicate timestamp: try to insert again, but in the next milisecond
timestamp += 1
try_to_insert = True
retry_count += 1
else:
logging.warning(
"Error while inserting datapoint ({} : {}) in timeseries named {}. ".format(
timestamp, value, timeseries_name
)
)
datapoint_errors += 1
if expire_msecs > 0:
rts.pexpire(timeseries_name, expire_msecs)
progress.update()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,7 @@ def process_self_contained_coordinator_stream(
stdout=True, stderr=True
)
)
redis_container.stop()
redis_container.remove()
except docker.errors.NotFound:
logging.info(
Expand Down
127 changes: 127 additions & 0 deletions utils/tests/test_self_contained_coordinator_memtier.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import yaml
from pathlib import Path
import logging
import datetime

from redisbench_admin.utils.benchmark_config import get_defaults
from redisbench_admin.utils.remote import get_overall_dashboard_keynames
Expand Down Expand Up @@ -1283,3 +1284,129 @@ def test_self_contained_coordinator_blocking_read_valkey():

except redis.exceptions.ConnectionError:
pass


def test_self_contained_coordinator_duplicated_ts():
try:
if run_coordinator_tests_dockerhub():
db_port = int(os.getenv("DATASINK_PORT", "6379"))
conn = redis.StrictRedis(port=db_port)
conn.ping()
conn.flushall()

id = "dockerhub"
redis_version = "7.4.0"
run_image = f"redis:{redis_version}"
build_arch = "amd64"
testDetails = {}
build_os = "test_build_os"

# generate 2 stream requests with the same timestamp
timestamp = int(datetime.datetime.now().timestamp())
for _ in range(0, 2):
build_stream_fields, result = generate_benchmark_stream_request(
id,
conn,
run_image,
build_arch,
testDetails,
build_os,
git_timestamp_ms=timestamp,
use_git_timestamp=True,
)
build_stream_fields["mnt_point"] = ""
if result is True:
benchmark_stream_id = conn.xadd(
STREAM_KEYNAME_NEW_BUILD_EVENTS, build_stream_fields
)
logging.info(
"sucessfully requested a new run {}. Stream id: {}".format(
build_stream_fields, benchmark_stream_id
)
)

assert conn.exists(STREAM_KEYNAME_NEW_BUILD_EVENTS)
assert conn.xlen(STREAM_KEYNAME_NEW_BUILD_EVENTS) == 2

running_platform = "fco-ThinkPad-T490"

# process the 2 stream requests
for _ in range(0, 2):

build_runners_consumer_group_create(conn, running_platform, "0")
datasink_conn = redis.StrictRedis(port=db_port)
docker_client = docker.from_env()
home = str(Path.home())
stream_id = ">"
topologies_map = get_topologies(
"./redis_benchmarks_specification/setups/topologies/topologies.yml"
)
# we use a benchmark spec with smaller CPU limit for client given github machines only contain 2 cores
# and we need 1 core for DB and another for CLIENT
testsuite_spec_files = [
"./utils/tests/test_data/test-suites/test-memtier-dockerhub.yml"
]
defaults_filename = "./utils/tests/test_data/test-suites/defaults.yml"
(
_,
_,
default_metrics,
_,
_,
_,
) = get_defaults(defaults_filename)

(
result,
stream_id,
number_processed_streams,
num_process_test_suites,
) = self_contained_coordinator_blocking_read(
conn,
True,
docker_client,
home,
stream_id,
datasink_conn,
testsuite_spec_files,
topologies_map,
running_platform,
False,
[],
"",
0,
6399,
1,
False,
5,
default_metrics,
"amd64",
None,
0,
10000,
"unstable",
"",
True,
False,
)
assert result == True
assert number_processed_streams == 1
assert num_process_test_suites == 1

stat_key = f"ci.benchmarks.redislabs/by.version/ci/redis/redis/memtier_benchmark-1Mkeys-load-string-with-10B-values/dockerhub/{running_platform}/oss-standalone/{redis_version}/ALL_STATS.Totals.Ops/sec"
assert datasink_conn.exists(stat_key)
rts = datasink_conn.ts()

rts_info = rts.info(stat_key)

# we have two datapoints
assert rts_info.total_samples == 2

# first was inserted on the original timestamp
assert rts_info.first_timestamp == timestamp

# the second has clashed, so it was resolved by adding 1ms to the timestamp
assert rts_info.last_timestamp == timestamp + 1

except redis.exceptions.ConnectionError:
pass