Skip to content

Commit

Permalink
chore(tests): sync client verification tests (#1046)
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche authored Dec 18, 2024
1 parent f974823 commit 7a6d0c5
Show file tree
Hide file tree
Showing 9 changed files with 319 additions and 15 deletions.
2 changes: 2 additions & 0 deletions .cross_sync/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ Generation can be initiated using `nox -s generate_sync`
from the root of the project. This will find all classes with the `__CROSS_SYNC_OUTPUT__ = "path/to/output"`
annotation, and generate a sync version of classes marked with `@CrossSync.convert_sync` at the output path.

There is a unit test at `tests/unit/data/test_sync_up_to_date.py` that verifies that the generated code is up to date

## Architecture

CrossSync is made up of two parts:
Expand Down
12 changes: 11 additions & 1 deletion .github/workflows/conformance.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,15 @@ jobs:
matrix:
test-version: [ "v0.0.2" ]
py-version: [ 3.8 ]
client-type: [ "async", "legacy" ]
client-type: [ "async", "sync", "legacy" ]
include:
- client-type: "sync"
# sync client does not support concurrent streams
test_args: "-skip _Generic_MultiStream"
- client-type: "legacy"
# legacy client is synchronous and does not support concurrent streams
# legacy client does not expose mutate_row. Disable those tests
test_args: "-skip _Generic_MultiStream -skip TestMutateRow_"
fail-fast: false
name: "${{ matrix.client-type }} client / python ${{ matrix.py-version }} / test tag ${{ matrix.test-version }}"
steps:
Expand All @@ -53,4 +61,6 @@ jobs:
env:
CLIENT_TYPE: ${{ matrix.client-type }}
PYTHONUNBUFFERED: 1
TEST_ARGS: ${{ matrix.test_args }}
PROXY_PORT: 9999

10 changes: 1 addition & 9 deletions .kokoro/conformance.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,7 @@ set -eo pipefail
## cd to the parent directory, i.e. the root of the git repo
cd $(dirname $0)/..

PROXY_ARGS=""
TEST_ARGS=""
if [[ "${CLIENT_TYPE^^}" == "LEGACY" ]]; then
echo "Using legacy client"
# legacy client does not expose mutate_row. Disable those tests
TEST_ARGS="-skip TestMutateRow_"
fi

# Build and start the proxy in a separate process
PROXY_PORT=9999
pushd test_proxy
nohup python test_proxy.py --port $PROXY_PORT --client_type=$CLIENT_TYPE &
proxyPID=$!
Expand All @@ -42,6 +33,7 @@ function cleanup() {
trap cleanup EXIT

# Run the conformance test
echo "running tests with args: $TEST_ARGS"
pushd cloud-bigtable-clients-test/tests
eval "go test -v -proxy_addr=:$PROXY_PORT $TEST_ARGS"
RETURN_CODE=$?
Expand Down
2 changes: 1 addition & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def system_emulated(session):


@nox.session(python=SYSTEM_TEST_PYTHON_VERSIONS)
@nox.parametrize("client_type", ["async"])
@nox.parametrize("client_type", ["async", "sync", "legacy"])
def conformance(session, client_type):
# install dependencies
constraints_path = str(
Expand Down
2 changes: 1 addition & 1 deletion test_proxy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ python test_proxy.py --port 8080
```

By default, the test_proxy targets the async client. You can change this by passing in the `--client_type` flag.
Valid options are `async` and `legacy`.
Valid options are `async`, `sync`, and `legacy`.

```
python test_proxy.py --client_type=legacy
Expand Down
185 changes: 185 additions & 0 deletions test_proxy/handlers/client_handler_data_sync_autogen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This file is automatically generated by CrossSync. Do not edit manually.

"""
This module contains the client handler process for proxy_server.py.
"""
import os
from google.cloud.environment_vars import BIGTABLE_EMULATOR
from google.cloud.bigtable.data._cross_sync import CrossSync
from client_handler_data_async import error_safe


class TestProxyClientHandler:
"""
Implements the same methods as the grpc server, but handles the client
library side of the request.
Requests received in TestProxyGrpcServer are converted to a dictionary,
and supplied to the TestProxyClientHandler methods as kwargs.
The client response is then returned back to the TestProxyGrpcServer
"""

def __init__(
self,
data_target=None,
project_id=None,
instance_id=None,
app_profile_id=None,
per_operation_timeout=None,
**kwargs
):
self.closed = False
os.environ[BIGTABLE_EMULATOR] = data_target
self.client = CrossSync._Sync_Impl.DataClient(project=project_id)
self.instance_id = instance_id
self.app_profile_id = app_profile_id
self.per_operation_timeout = per_operation_timeout

def close(self):
self.closed = True

@error_safe
async def ReadRows(self, request, **kwargs):
table_id = request.pop("table_name").split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = (
kwargs.get("operation_timeout", self.per_operation_timeout) or 20
)
result_list = table.read_rows(request, **kwargs)
serialized_response = [row._to_dict() for row in result_list]
return serialized_response

@error_safe
async def ReadRow(self, row_key, **kwargs):
table_id = kwargs.pop("table_name").split("/")[-1]
app_profile_id = self.app_profile_id or kwargs.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = (
kwargs.get("operation_timeout", self.per_operation_timeout) or 20
)
result_row = table.read_row(row_key, **kwargs)
if result_row:
return result_row._to_dict()
else:
return "None"

@error_safe
async def MutateRow(self, request, **kwargs):
from google.cloud.bigtable.data.mutations import Mutation

table_id = request["table_name"].split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = (
kwargs.get("operation_timeout", self.per_operation_timeout) or 20
)
row_key = request["row_key"]
mutations = [Mutation._from_dict(d) for d in request["mutations"]]
table.mutate_row(row_key, mutations, **kwargs)
return "OK"

@error_safe
async def BulkMutateRows(self, request, **kwargs):
from google.cloud.bigtable.data.mutations import RowMutationEntry

table_id = request["table_name"].split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = (
kwargs.get("operation_timeout", self.per_operation_timeout) or 20
)
entry_list = [
RowMutationEntry._from_dict(entry) for entry in request["entries"]
]
table.bulk_mutate_rows(entry_list, **kwargs)
return "OK"

@error_safe
async def CheckAndMutateRow(self, request, **kwargs):
from google.cloud.bigtable.data.mutations import Mutation, SetCell

table_id = request["table_name"].split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = (
kwargs.get("operation_timeout", self.per_operation_timeout) or 20
)
row_key = request["row_key"]
true_mutations = []
for mut_dict in request.get("true_mutations", []):
try:
true_mutations.append(Mutation._from_dict(mut_dict))
except ValueError:
mutation = SetCell("", "", "", 0)
true_mutations.append(mutation)
false_mutations = []
for mut_dict in request.get("false_mutations", []):
try:
false_mutations.append(Mutation._from_dict(mut_dict))
except ValueError:
false_mutations.append(SetCell("", "", "", 0))
predicate_filter = request.get("predicate_filter", None)
result = table.check_and_mutate_row(
row_key,
predicate_filter,
true_case_mutations=true_mutations,
false_case_mutations=false_mutations,
**kwargs
)
return result

@error_safe
async def ReadModifyWriteRow(self, request, **kwargs):
from google.cloud.bigtable.data.read_modify_write_rules import IncrementRule
from google.cloud.bigtable.data.read_modify_write_rules import AppendValueRule

table_id = request["table_name"].split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = (
kwargs.get("operation_timeout", self.per_operation_timeout) or 20
)
row_key = request["row_key"]
rules = []
for rule_dict in request.get("rules", []):
qualifier = rule_dict["column_qualifier"]
if "append_value" in rule_dict:
new_rule = AppendValueRule(
rule_dict["family_name"], qualifier, rule_dict["append_value"]
)
else:
new_rule = IncrementRule(
rule_dict["family_name"], qualifier, rule_dict["increment_amount"]
)
rules.append(new_rule)
result = table.read_modify_write_row(row_key, rules, **kwargs)
if result:
return result._to_dict()
else:
return "None"

@error_safe
async def SampleRowKeys(self, request, **kwargs):
table_id = request["table_name"].split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = (
kwargs.get("operation_timeout", self.per_operation_timeout) or 20
)
result = table.sample_row_keys(**kwargs)
return result
17 changes: 15 additions & 2 deletions test_proxy/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fi
SCRIPT_DIR=$(realpath $(dirname "$0"))
cd $SCRIPT_DIR

export PROXY_SERVER_PORT=50055
export PROXY_SERVER_PORT=$(shuf -i 50000-60000 -n 1)

# download test suite
if [ ! -d "cloud-bigtable-clients-test" ]; then
Expand All @@ -43,6 +43,19 @@ function finish {
}
trap finish EXIT

if [[ $CLIENT_TYPE == "legacy" ]]; then
echo "Using legacy client"
# legacy client does not expose mutate_row. Disable those tests
TEST_ARGS="-skip TestMutateRow_"
fi

if [[ $CLIENT_TYPE != "async" ]]; then
echo "Using legacy client"
# sync and legacy client do not support concurrent streams
TEST_ARGS="$TEST_ARGS -skip _Generic_MultiStream "
fi

# run tests
pushd cloud-bigtable-clients-test/tests
go test -v -proxy_addr=:$PROXY_SERVER_PORT
echo "Running with $TEST_ARGS"
go test -v -proxy_addr=:$PROXY_SERVER_PORT $TEST_ARGS
5 changes: 4 additions & 1 deletion test_proxy/test_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ def format_dict(input_obj):
if client_type == "legacy":
import client_handler_legacy
client = client_handler_legacy.LegacyTestProxyClientHandler(**json_data)
elif client_type == "sync":
import client_handler_data_sync_autogen
client = client_handler_data_sync_autogen.TestProxyClientHandler(**json_data)
else:
client = client_handler_data_async.TestProxyClientHandlerAsync(**json_data)
client_map[client_id] = client
Expand Down Expand Up @@ -150,7 +153,7 @@ def client_handler_process(request_q, queue_pool, client_type="async"):

p = argparse.ArgumentParser()
p.add_argument("--port", dest='port', default="50055")
p.add_argument("--client_type", dest='client_type', default="async", choices=["async", "legacy"])
p.add_argument("--client_type", dest='client_type', default="async", choices=["async", "sync", "legacy"])

if __name__ == "__main__":
port = p.parse_args().port
Expand Down
Loading

0 comments on commit 7a6d0c5

Please sign in to comment.