Skip to content

Commit f84ea35

Browse files
committed
clean up bash scripts, add url type checking
1 parent f01dce4 commit f84ea35

File tree

10 files changed

+51
-48
lines changed

10 files changed

+51
-48
lines changed

confluent_kafka/avro/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def __init__(self, config, default_key_schema=None,
3030
default_value_schema=None, schema_registry=None):
3131

3232
sr_conf = {key.replace("schema.registry.", ""): value
33-
for key, value in config.items() if key.startswith("schema.registry.")}
33+
for key, value in config.items() if key.startswith("schema.registry")}
3434

3535
if config.get("schema.registry.basic.auth.credentials.source") == 'SASL_INHERIT':
3636
sr_conf['sasl.mechanisms'] = config.get('sasl.mechanisms', '')
@@ -103,7 +103,7 @@ class AvroConsumer(Consumer):
103103
def __init__(self, config, schema_registry=None):
104104

105105
sr_conf = {key.replace("schema.registry.", ""): value
106-
for key, value in config.items() if key.startswith("schema.registry.")}
106+
for key, value in config.items() if key.startswith("schema.registry")}
107107

108108
if config.get("schema.registry.basic.auth.credentials.source") == 'SASL_INHERIT':
109109
sr_conf['sasl.mechanisms'] = config.get('sasl.mechanisms', '')

confluent_kafka/avro/cached_schema_registry_client.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,16 @@
2929
from .error import ClientError
3030
from . import loads
3131

32+
import sys
33+
34+
PY3 = sys.version_info[0] == 3
35+
36+
# Python 2 considers int an instance of str
37+
if PY3:
38+
string_types = str,
39+
else:
40+
string_types = basestring,
41+
3242
VALID_LEVELS = ['NONE', 'FULL', 'FORWARD', 'BACKWARD']
3343
VALID_METHODS = ['GET', 'POST', 'PUT', 'DELETE']
3444
VALID_AUTH_PROVIDERS = ['URL', 'USER_INFO', 'SASL_INHERIT']
@@ -62,6 +72,7 @@ def __init__(self, url, max_schemas_per_subject=1000, ca_location=None, cert_loc
6272
# In order to maintain compatibility the url(conf in future versions) param has been preserved for now.
6373
conf = url
6474
if not isinstance(url, dict):
75+
6576
conf = {
6677
'url': url,
6778
'ssl.ca.location': ca_location,
@@ -79,7 +90,8 @@ def __init__(self, url, max_schemas_per_subject=1000, ca_location=None, cert_loc
7990
"""Construct a Schema Registry client"""
8091

8192
# Ensure URL valid scheme is included; http[s]
82-
if not conf.get('url', '').startswith('http'):
93+
url = conf.get('url', '')
94+
if not isinstance(url, string_types) or not url.startswith('http'):
8395
raise ValueError("Invalid URL provided for Schema Registry")
8496

8597
# subj => { schema => id }
@@ -98,7 +110,7 @@ def __init__(self, url, max_schemas_per_subject=1000, ca_location=None, cert_loc
98110
self._session = s
99111

100112
if len(conf) > 0:
101-
raise ValueError("Unrecognized configuration key(s): {}".format(conf.keys()))
113+
raise ValueError("Unrecognized configuration properties: {}".format(conf.keys()))
102114

103115
def __del__(self):
104116
self.close()

docker/bin/certify.sh

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,15 @@
1-
#!/usr/bin/env bash
2-
set -eu
1+
#!/usr/bin/env bash -eu
32

43
DOCKER_BIN="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
5-
export PASS="abcdefgh"
4+
PASS="abcdefgh"
5+
66
source ${DOCKER_BIN}/../.env
77

88
if [[ -f ${TLS}/ca-cert ]]; then
99
echo "${TLS}/ca-cert found; skipping certificate generation.."
1010
exit 0
1111
fi
1212

13-
# Clean up old certs, warning this is
14-
#for file in $(ls ${TLS});do
15-
# rm ${TLS}/${file}
16-
#done
17-
1813
HOST=$(hostname -f)
1914

2015
echo "Creating ca-cert..."

docker/bin/cluster_up.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ await_http() {
1111
local exit_code
1212
local attempt=0
1313

14-
until [[ $(curl ${2}) ]] || [[ ${attempt} -gt 5 ]]; do
14+
until [[ `curl ${2}` ]] || [[ ${attempt} -gt 5 ]]; do
1515
echo "awaiting $1..."
1616
let "attempt+=1"
1717
sleep 6
@@ -25,10 +25,10 @@ await_http() {
2525
exit 1
2626
}
2727

28-
echo "Configure Environment..."
28+
echo "Configuring Environment..."
2929
source ${DOCKER_SOURCE}/.env
3030

31-
echo "Generate SSL certs..."
31+
echo "Generating SSL certs..."
3232
${DOCKER_BIN}/certify.sh
3333

3434
echo "Deploying cluster..."

docker/bin/gen-ssl-certs.sh

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
#!/usr/bin/env bash
2-
set -eu
1+
#!/usr/bin/env bash -eu
32
#
43
#
54
# This scripts generates:
@@ -10,9 +9,6 @@ set -eu
109
# https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka
1110
#
1211

13-
DOCKER_BIN="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd )"
14-
15-
1612
if [[ "$1" == "-k" ]]; then
1713
USE_KEYTOOL=1
1814
shift

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
if sys.version_info[0] < 3:
1212
avro = 'avro'
13-
INSTALL_REQUIRES.extend(['futures', 'enum34'])
13+
INSTALL_REQUIRES.extend(['futures', 'enum34', 'requests'])
1414
else:
1515
avro = 'avro-python3'
1616

tests/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ To run the entire test suite:
4848
- ```./tests/run.sh tox [options]```
4949

5050
- Without tox (will run against current interpreter)
51-
- ```./tests/run_all.sh```
51+
- ```./tests/run.sh```
5252

5353
To run a specific `mode` or set of `modes` use the following syntax
5454

tests/avro/test_cached_client.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,17 @@ def test_empty_url(self):
163163
'url': ''
164164
})
165165

166+
def test_invalid_type_url(self):
167+
with self.assertRaises(ValueError):
168+
self.client = CachedSchemaRegistryClient(
169+
url=1)
170+
171+
def test_invalid_type_url_dict(self):
172+
with self.assertRaises(ValueError):
173+
self.client = CachedSchemaRegistryClient({
174+
"url": 1
175+
})
176+
166177
def test_invalid_url(self):
167178
with self.assertRaises(ValueError):
168179
self.client = CachedSchemaRegistryClient({

tests/integration/integration_test.py

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,10 +1050,7 @@ def verify_avro():
10501050
'api.version.request': api_version_request,
10511051
'api.version.fallback.ms': 0,
10521052
'broker.version.fallback': '0.11.0.0',
1053-
'schema.registry.url': schema_registry_url,
1054-
'default.topic.config': {
1055-
'produce.offset.report': True
1056-
}}
1053+
'schema.registry.url': schema_registry_url}
10571054

10581055
consumer_conf = dict(base_conf, **{
10591056
'group.id': 'test.py',
@@ -1071,20 +1068,16 @@ def verify_avro_https(mode_conf):
10711068
if mode_conf is None:
10721069
abort_on_missing_configuration('avro-https')
10731070

1074-
base_conf = dict({'bootstrap.servers': bootstrap_servers,
1075-
'error_cb': error_cb,
1076-
'api.version.request': api_version_request},
1077-
**mode_conf)
1071+
base_conf = dict(mode_conf, **{'bootstrap.servers': bootstrap_servers,
1072+
'error_cb': error_cb,
1073+
'api.version.request': api_version_request})
10781074

1079-
consumer_conf = dict({
1080-
'group.id': generate_group_id(),
1081-
'session.timeout.ms': 6000,
1082-
'enable.auto.commit': False,
1083-
'api.version.request': api_version_request,
1084-
'on_commit': print_commit_result,
1085-
'default.topic.config': {
1086-
'auto.offset.reset': 'earliest'
1087-
}}, **base_conf)
1075+
consumer_conf = dict(base_conf, **{'group.id': generate_group_id(),
1076+
'session.timeout.ms': 6000,
1077+
'enable.auto.commit': False,
1078+
'api.version.request': api_version_request,
1079+
'on_commit': print_commit_result,
1080+
'auto.offset.reset': 'earliest'})
10881081

10891082
run_avro_loop(base_conf, consumer_conf)
10901083

@@ -1171,7 +1164,7 @@ def run_avro_loop(producer_conf, consumer_conf):
11711164

11721165
msgcount = 0
11731166
while msgcount < len(combinations):
1174-
msg = c.poll(0)
1167+
msg = c.poll(100)
11751168

11761169
if msg is None or msg.error():
11771170
continue
@@ -1530,8 +1523,8 @@ def print_usage(exitcode, reason=None):
15301523
modes = test_modes
15311524

15321525
if bootstrap_servers is None or topic is None:
1533-
print_usage(1, "Properties bootstrap.servers and topic must be set. "
1534-
"Use {} as a template when creating a new conf file.".format(testconf_file))
1526+
print_usage(1, "Missing property bootstrap.servers."
1527+
"Ensure {} includes a valid bootstrap.servers property.".format(testconf_file))
15351528

15361529
print('Using confluent_kafka module version %s (0x%x)' % confluent_kafka.version())
15371530
print('Using librdkafka version %s (0x%x)' % confluent_kafka.libversion())

tests/integration/run.sh

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
#!/usr/bin/env bash
2-
set -eu
1+
#!/usr/bin/env bash -eu
32

43
cleanup() {
54
${DOCKER_BIN}/cluster_down.sh
@@ -18,17 +17,14 @@ fi
1817
#start cluster
1918
${DOCKER_BIN}/cluster_up.sh
2019

21-
if [[ $? -ne 0 ]]; then
22-
exit 1
23-
fi
24-
2520
run_tox() {
2621
echo "Executing tox $@"
2722
cd ${TEST_SOURCE}/..
2823
tox -r "$@"
2924
}
3025

3126
run_native() {
27+
python setup.py install
3228
for mode in "$@"; do
3329
modes="${modes:-} --${mode}"
3430
done

0 commit comments

Comments
 (0)