-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathstart-load-test-redis.py
executable file
·67 lines (55 loc) · 2.31 KB
/
start-load-test-redis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#!/usr/bin/env python
from kombu import Exchange, Queue
from spylunking.log.setup_logging import build_colorized_logger
from celery_connectors.utils import ev
from celery_connectors.utils import build_sample_msgs
from celery_connectors.build_ssl_options import build_ssl_options
from celery_connectors.run_publisher import run_publisher
# Credits and inspirations from these great sources:
#
# https://github.com/celery/kombu/blob/master/examples/rpc-tut6/rpc_server.py
# https://gist.github.com/oubiwann/3843016
# https://gist.github.com/eavictor/ee7856581619ac60643b57987b7ed580#file-mq_kombu_rpc_server-py
# https://github.com/Skablam/kombu-examples
# https://gist.github.com/mlavin/6671079
name = ev("APP_NAME", "robopub")
log = build_colorized_logger(
name=name)
broker_url = ev("PUB_BROKER_URL", "redis://localhost:6379/0")
exchange_name = ev("PUBLISH_EXCHANGE", "")
exchange_type = ev("PUBLISH_EXCHANGE_TYPE", "")
routing_key = ev("PUBLISH_ROUTING_KEY", "reporting.accounts")
queue_name = ev("PUBLISH_QUEUE", "reporting.accounts")
priority_routing = {"high": queue_name}
use_exchange = Exchange(exchange_name, type=exchange_type)
use_routing_key = routing_key
use_queue = Queue(queue_name, exchange=use_exchange, routing_key=routing_key)
task_queues = [
use_queue
]
ssl_options = build_ssl_options()
transport_options = {}
num_msgs_to_send = int(float(ev("NUM_MSG_TO_PUBLISH", "200000")))
log.info(("Generating messages={}")
.format(num_msgs_to_send))
# relay_task_lag = 0.0
# worker_task_lag = 0.0
# processing_lag_data = {"relay_simulate_processing_lag": worker_task_lag,
# "simulate_processing_lag": worker_task_lag}
# msgs = build_sample_msgs(num=num_msgs_to_send,
# data=processing_lag_data)
msgs = build_sample_msgs(num=num_msgs_to_send,
data={})
log.info(("Publishing messages={}")
.format(len(msgs)))
run_publisher(broker_url=broker_url,
exchange=use_exchange, # kombu.Exchange object
routing_key=use_routing_key, # string
msgs=msgs,
ssl_options=ssl_options,
transport_options=transport_options,
priority="high",
priority_routing=priority_routing,
silent=True,
publish_silent=True)
log.info("Done Publishing")