forked from MaterializeInc/materialize
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mzcompose.py
62 lines (57 loc) · 1.8 KB
/
mzcompose.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import time
from materialize.mzcompose import Composition
from materialize.mzcompose.services import (
Kafka,
Materialized,
SchemaRegistry,
Testdrive,
Zookeeper,
)
SERVICES = [
Zookeeper(),
Kafka(name="kafka1", broker_id=1, offsets_topic_replication_factor=2),
Kafka(name="kafka2", broker_id=2, offsets_topic_replication_factor=2),
Kafka(name="kafka3", broker_id=3, offsets_topic_replication_factor=2),
SchemaRegistry(
kafka_servers=[("kafka1", "9092"), ("kafka2", "9092"), ("kafka3", "9092")]
),
Materialized(),
Testdrive(
entrypoint=[
"testdrive",
"--schema-registry-url=http://schema-registry:8081",
"--materialized-url=postgres://materialize@materialized:6875",
"--kafka-option=acks=all",
"--seed=1",
]
),
]
def workflow_default(c: Composition) -> None:
c.start_and_wait_for_tcp(
services=[
"zookeeper",
"kafka1",
"kafka2",
"kafka3",
"schema-registry",
"materialized",
]
)
c.run("testdrive-svc", "--kafka-addr=kafka2", "01-init.td")
time.sleep(10)
c.kill("kafka1")
time.sleep(10)
c.run(
"testdrive-svc", "--kafka-addr=kafka2,kafka3", "--no-reset", "02-after-leave.td"
)
c.up("kafka1")
time.sleep(10)
c.run("testdrive-svc", "--kafka-addr=kafka1", "--no-reset", "03-after-join.td")