forked from MaterializeInc/materialize
-
Notifications
You must be signed in to change notification settings - Fork 0
/
scenarios_concurrency.py
154 lines (122 loc) · 3.44 KB
/
scenarios_concurrency.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from materialize.feature_benchmark.action import Action, TdAction
from materialize.feature_benchmark.measurement_source import MeasurementSource, Td
from materialize.feature_benchmark.scenario import Scenario
class Concurrency(Scenario):
"""Feature benchmarks related to testing concurrency aspects of the system"""
class ParallelIngestion(Concurrency):
"""Measure the time it takes to ingest multiple sources concurrently."""
SOURCES = 10
def shared(self) -> Action:
return TdAction(
self.schema()
+ self.keyschema()
+ f"""
$ kafka-create-topic topic=kafka-parallel-ingestion partitions=4
$ kafka-ingest format=avro topic=kafka-parallel-ingestion key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
{{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
"""
)
def benchmark(self) -> MeasurementSource:
sources = range(1, ParallelIngestion.SOURCES + 1)
drop_sources = "\n".join(
[
f"""
> DROP SOURCE IF EXISTS s{s}
"""
for s in sources
]
)
create_sources = "\n".join(
[
f"""
> CREATE CONNECTION IF NOT EXISTS csr_conn
FOR CONFLUENT SCHEMA REGISTRY
URL '${{testdrive.schema-registry-url}}';
> CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
> CREATE SOURCE s{s}
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-parallel-ingestion-${{testdrive.seed}}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
"""
for s in sources
]
)
create_indexes = "\n".join(
[
f"""
> CREATE DEFAULT INDEX ON s{s}
"""
for s in sources
]
)
selects = "\n".join(
[
f"""
> SELECT * FROM s{s} WHERE f2 = {self.n()-1}
{self.n()-1}
"""
for s in sources
]
)
return Td(
self.schema()
+ f"""
{drop_sources}
{create_sources}
> SELECT 1
/* A */
1
{create_indexes}
{selects}
> SELECT 1
/* B */
1
"""
)
class ParallelDataflows(Concurrency):
"""Measure the time it takes to compute multiple parallel dataflows."""
SCALE = 6
VIEWS = 25
def benchmark(self) -> MeasurementSource:
views = range(1, ParallelDataflows.VIEWS + 1)
create_views = "\n".join(
[
f"""
> CREATE MATERIALIZED VIEW v{v} AS
SELECT COUNT(DISTINCT generate_series) + {v} - {v} AS f1
FROM generate_series(1,{self.n()})
"""
for v in views
]
)
selects = "\n".join(
[
f"""
> SELECT * FROM v{v}
{self.n()}
"""
for v in views
]
)
return Td(
f"""
$ postgres-execute connection=postgres://mz_system@materialized:6877/materialize
DROP SCHEMA public CASCADE;
> CREATE SCHEMA public;
> SELECT 1
/* A */
1
{create_views}
{selects}
> SELECT 1
/* B */
1
"""
)