Skip to content

Commit

Permalink
test: Add blue/green deployment test
Browse files Browse the repository at this point in the history
  • Loading branch information
def- committed Dec 29, 2023
1 parent 37415b3 commit e8f6247
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 2 deletions.
88 changes: 88 additions & 0 deletions test/cluster/blue-green-deployment/deploy.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

> CREATE SCHEMA prod_deploy
> CREATE SOURCE prod_deploy.counter IN CLUSTER prod_deploy FROM LOAD GENERATOR counter (TICK INTERVAL '1s')
> CREATE MATERIALIZED VIEW prod_deploy.counter_mv IN CLUSTER prod_deploy AS SELECT count(*), 'some new value' FROM prod_deploy.counter
> CREATE DEFAULT INDEX IN CLUSTER prod_deploy ON prod_deploy.counter
> CREATE DEFAULT INDEX IN CLUSTER prod_deploy ON prod_deploy.counter_mv
> CREATE SOURCE prod_deploy.tpch
IN CLUSTER prod_deploy
FROM LOAD GENERATOR TPCH (SCALE FACTOR 1)
FOR ALL TABLES
> CREATE MATERIALIZED VIEW prod_deploy.tpch_mv
IN CLUSTER prod_deploy AS
SELECT
l_returnflag,
l_linestatus,
sum(l_quantity) AS sum_qty,
sum(l_extendedprice) AS sum_base_price,
sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
avg(l_quantity) AS avg_qty,
avg(l_extendedprice) AS avg_price,
avg(l_discount) AS avg_disc,
count(*) AS count_order
FROM
prod_deploy.lineitem
WHERE
l_shipdate <= date '1998-12-01' - interval '100' day
GROUP BY
l_returnflag,
l_linestatus
ORDER BY
l_returnflag,
l_linestatus
> CREATE DEFAULT INDEX IN CLUSTER prod_deploy ON prod_deploy.tpch_mv

> WITH
dataflows AS (
SELECT mz_indexes.id
FROM mz_indexes
JOIN mz_clusters ON mz_indexes.cluster_id = mz_clusters.id
WHERE mz_clusters.name = 'prod_deploy'
UNION ALL
SELECT mz_materialized_views.id
FROM mz_materialized_views
JOIN mz_clusters ON mz_materialized_views.cluster_id = mz_clusters.id
WHERE mz_clusters.name = 'prod_deploy'
),
-- Collect ready dataflows.
-- For a dataflow to be ready it must be hydrated and caught up.
-- We define a dataflow to be caught up if its local lag is less than 1 second.
ready_dataflows AS (
SELECT id
FROM dataflows d
JOIN mz_internal.mz_compute_hydration_statuses h ON (h.object_id = d.id)
-- Left join because some dataflows don't have dependencies and therefore
-- don't have lag either.
LEFT JOIN mz_internal.mz_materialization_lag l ON (l.object_id = d.id)
WHERE
h.hydrated AND
(l.local_lag < '1s' OR l.local_lag IS NULL)
),
-- Collect dataflows that are not yet ready.
pending_dataflows AS (
SELECT id FROM dataflows
EXCEPT
SELECT id FROM ready_dataflows
)
SELECT * FROM pending_dataflows

> BEGIN
> ALTER SCHEMA prod SWAP WITH prod_deploy
> ALTER CLUSTER prod SWAP WITH prod_deploy
> COMMIT

# Give worker a chance to finish current query
> SELECT mz_unsafe.mz_sleep(10)
<null>

> DROP CLUSTER prod_deploy CASCADE
> DROP SCHEMA prod_deploy CASCADE
95 changes: 95 additions & 0 deletions test/cluster/blue-green-deployment/setup.td
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET enable_unmanaged_cluster_replicas = true;

> DROP CLUSTER IF EXISTS prod CASCADE
> DROP CLUSTER IF EXISTS prod_deploy CASCADE
> CREATE CLUSTER prod REPLICAS (replica1 (
STORAGECTL ADDRESSES ['clusterd1:2100'],
STORAGE ADDRESSES ['clusterd1:2103'],
COMPUTECTL ADDRESSES ['clusterd1:2101'],
COMPUTE ADDRESSES ['clusterd1:2102'],
WORKERS 1))
> CREATE CLUSTER prod_deploy REPLICAS (replica1 (
STORAGECTL ADDRESSES ['clusterd2:2100'],
STORAGE ADDRESSES ['clusterd2:2103'],
COMPUTECTL ADDRESSES ['clusterd2:2101'],
COMPUTE ADDRESSES ['clusterd2:2102'],
WORKERS 1))
> DROP SCHEMA IF EXISTS prod CASCADE
> DROP SCHEMA IF EXISTS prod_deploy CASCADE

> CREATE SCHEMA prod
> CREATE SOURCE prod.counter IN CLUSTER prod FROM LOAD GENERATOR counter (TICK INTERVAL '1s')
> CREATE MATERIALIZED VIEW prod.counter_mv IN CLUSTER prod AS SELECT count(*) FROM prod.counter
> CREATE DEFAULT INDEX IN CLUSTER prod ON prod.counter
> CREATE SOURCE prod.tpch
IN CLUSTER prod
FROM LOAD GENERATOR TPCH (SCALE FACTOR 1)
FOR ALL TABLES
> CREATE MATERIALIZED VIEW prod.tpch_mv
IN CLUSTER prod AS
SELECT
l_returnflag,
l_linestatus,
sum(l_quantity) AS sum_qty,
sum(l_extendedprice) AS sum_base_price,
sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
avg(l_quantity) AS avg_qty,
avg(l_extendedprice) AS avg_price,
avg(l_discount) AS avg_disc,
count(*) AS count_order
FROM
prod.lineitem
WHERE
l_shipdate <= date '1998-12-01' - interval '90' day
GROUP BY
l_returnflag,
l_linestatus
ORDER BY
l_returnflag,
l_linestatus
> CREATE DEFAULT INDEX IN CLUSTER prod ON prod.tpch_mv

> WITH
dataflows AS (
SELECT mz_indexes.id
FROM mz_indexes
JOIN mz_clusters ON mz_indexes.cluster_id = mz_clusters.id
WHERE mz_clusters.name = 'prod'
UNION ALL
SELECT mz_materialized_views.id
FROM mz_materialized_views
JOIN mz_clusters ON mz_materialized_views.cluster_id = mz_clusters.id
WHERE mz_clusters.name = 'prod'
),
-- Collect ready dataflows.
-- For a dataflow to be ready it must be hydrated and caught up.
-- We define a dataflow to be caught up if its local lag is less than 1 second.
ready_dataflows AS (
SELECT id
FROM dataflows d
JOIN mz_internal.mz_compute_hydration_statuses h ON (h.object_id = d.id)
-- Left join because some dataflows don't have dependencies and therefore
-- don't have lag either.
LEFT JOIN mz_internal.mz_materialization_lag l ON (l.object_id = d.id)
WHERE
h.hydrated AND
(l.local_lag < '1s' OR l.local_lag IS NULL)
),
-- Collect dataflows that are not yet ready.
pending_dataflows AS (
SELECT id FROM dataflows
EXCEPT
SELECT id FROM ready_dataflows
)
SELECT * FROM pending_dataflows
87 changes: 85 additions & 2 deletions test/cluster/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def workflow_test_github_12251(c: Composition) -> None:
c.down(destroy_volumes=True)
c.up("materialized")

start_time = time.process_time()
start_time = time.time()
try:
c.sql(
"""
Expand All @@ -222,7 +222,7 @@ def workflow_test_github_12251(c: Composition) -> None:
assert "statement timeout" in e.args[0]["M"], e
# Ensure the statemenet_timeout setting is ~honored
assert (
time.process_time() - start_time < 2
time.time() - start_time < 2
), "idle_in_transaction_session_timeout not respected"
else:
assert False, "unexpected success in test_github_12251"
Expand Down Expand Up @@ -2951,3 +2951,86 @@ def workflow_statement_logging(c: Composition, parser: WorkflowArgumentParser) -
)

c.run("testdrive", "statement-logging/statement-logging.td")


class PropagatingThread(Thread):
def run(self):
self.exc = None
try:
self.ret = self._target(*self._args, **self._kwargs) # type: ignore
except BaseException as e:
self.exc = e

def join(self, timeout=None):
super().join(timeout)
if self.exc:
raise self.exc
return self.ret


def workflow_blue_green_deployment(
c: Composition, parser: WorkflowArgumentParser
) -> None:
"""Blue/Green Deployment testing, see https://www.notion.so/materialize/Testing-Plan-Blue-Green-Deployments-01528a1eec3b42c3a25d5faaff7a9bf9#f53b51b110b044859bf954afc771c63a"""
c.down(destroy_volumes=True)

running = True

def selects():
runtimes = []
try:
with c.sql_cursor() as cursor:
while running:
total_runtime = 0
queries = [
"SELECT * FROM prod.counter_mv",
"SELECT max(counter) FROM prod.counter",
"SELECT count(*) FROM prod.tpch_mv",
]

for i, query in enumerate(queries):
start_time = time.time()
cursor.execute(query)
assert int(cursor.fetchone()[0]) > 0
runtime = time.time() - start_time
assert runtime < 5, f"runtime: {runtime}"
total_runtime += runtime
runtimes.append(total_runtime)
finally:
print(f"Query runtimes: {runtimes}")

def subscribe():
cursor = c.sql_cursor()
while running:
cursor.execute("BEGIN")
cursor.execute(
"DECLARE subscribe CURSOR FOR SUBSCRIBE (SELECT * FROM prod.counter_mv)"
)
cursor.execute("FETCH ALL subscribe WITH (timeout='15s')")
assert int(cursor.fetchall()[-1][2]) > 0
cursor.execute("CLOSE subscribe")
cursor.execute("ROLLBACK")

with c.override(
Testdrive(
no_reset=True, default_timeout="300s"
), # pending dataflows can take a while
Clusterd(name="clusterd1"),
Clusterd(name="clusterd2"),
Materialized(),
):
c.up("materialized")
c.up("clusterd1")
c.up("clusterd2")
c.run("testdrive", "blue-green-deployment/setup.td")

threads = [PropagatingThread(target=fn) for fn in (selects, subscribe)]
for thread in threads:
thread.start()
time.sleep(3) # some time to make sure the queries run fine
try:
c.run("testdrive", "blue-green-deployment/deploy.td")
finally:
running = False
for thread in threads:
thread.join()

0 comments on commit e8f6247

Please sign in to comment.