Skip to content

Commit 558cdb0

Browse files
committed
AvroSerializer/AvroDeserializer Serializer/Deserializer implementation
1 parent 677964e commit 558cdb0

21 files changed

+1859
-0
lines changed

confluent_kafka/avro/__init__.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,28 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2020 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
119
"""
220
Avro schema registry module: Deals with encoding and decoding of messages with avro schemas
321
422
"""
523

24+
import warnings
25+
626
from confluent_kafka import Producer, Consumer
727
from confluent_kafka.avro.error import ClientError
828
from confluent_kafka.avro.load import load, loads # noqa
@@ -158,3 +178,11 @@ def poll(self, timeout=None):
158178
message.offset(),
159179
e))
160180
return message
181+
182+
183+
warnings.warn(
184+
"The Avro package has been deprecated."
185+
" Use the SerializingConsumer and SerializingProducer with"
186+
" AvroDeserializer and AvroSerializer instances instead."
187+
"`ssl.key.location` respectively",
188+
category=DeprecationWarning, stacklevel=2)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2020 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
from .config import SchemaRegistryConfig, RegistrySerializerConfig
20+
from .error import SchemaRegistryClientError
21+
from .schema_registry_client import SchemaRegistryClient, CompatibilityType
22+
from .serdes import RecordNameStrategy,\
23+
TopicNameStrategy, TopicRecordNameStrategy
24+
25+
__all__ = ['CompatibilityType',
26+
'SchemaRegistryClient', 'SchemaRegistryClientError',
27+
'SchemaRegistryConfig', 'RecordNameStrategy',
28+
'RegistrySerializerConfig', 'TopicNameStrategy', 'TopicRecordNameStrategy']
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2020 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
# Python 2 considers int an instance of str
20+
21+
try:
22+
string_type = basestring # noqa
23+
except NameError:
24+
string_type = str
25+
26+
from requests import utils
27+
28+
from ..config import Config
29+
from .schema_registry_client import SchemaRegistryClient, DefaultRestClient
30+
from .serdes import SubjectNameStrategy, TopicRecordNameStrategy, TopicNameStrategy, \
31+
RecordNameStrategy
32+
33+
"""
34+
REST Client
35+
"""
36+
URL = 'url'
37+
REST_CLIENT = 'rest.client'
38+
"""
39+
Serializer Config
40+
"""
41+
SUBJECT_NAME_STRATEGY = 'subject.name.strategy'
42+
AUTO_REGISTRATION = 'auto.register.schemas'
43+
44+
"""
45+
TLS configuration properties
46+
"""
47+
CA_LOCATION = 'ssl.ca.location'
48+
CERTIFICATE_LOCATION = 'ssl.certificate.location'
49+
KEY_LOCATION = 'ssl.key.location'
50+
51+
"""
52+
Basic Auth configuration properties
53+
"""
54+
BASIC_AUTH_CREDENTIALS_SOURCE = 'basic.auth.credentials.source'
55+
BASIC_AUTH_USER_INFO = 'basic.auth.user.info'
56+
SASL_USERNAME = 'sasl.username'
57+
SASL_PASSWORD = 'sasl.password'
58+
SASL_MECHANISM = 'sasl.mechanism'
59+
60+
61+
class BasicAuthCredentialSource(object):
62+
"""
63+
Specifies the configuration property(ies) that provide the basic
64+
authentication credentials.
65+
"""
66+
USERINFO = "USERINFO"
67+
SASL_INHERIT = "SASL_INHERIT"
68+
URL = "URL"
69+
70+
71+
class SchemaRegistryConfig(Config):
72+
"""
73+
74+
Keyword Args:
75+
- url (str): URL for Schema Registry instance.
76+
77+
- schema.registry.ca.location (str, optional):
78+
- schema.registry.certificate.location (str, optional):
79+
- schema.registry.key.location (str, optional):
80+
81+
- schema.registry.basic.auth.credentials.source (str, optional):
82+
Specifies the configuration property(ies) that provide the basic
83+
authentication credentials.
84+
85+
USER_INFO: Credentials are specified via the
86+
``schema.registry.basic.auth.user.info`` config property.
87+
88+
SASL_INHERIT: Credentials are specified via the ``sasl.username``
89+
and ``sasl.password`` configuration properties.
90+
91+
URL(default): Credentials are pulled from the URL if available.
92+
93+
- schema.registry.basic.auth.user.info (str, optional): Basic auth
94+
credentials in the form {username}:{password}.
95+
- schema.registry.sasl.username (str, optional): SASL username for use
96+
with the PLAIN mechanism.
97+
- schema.registry.sasl.password (str, optional): SASL password for use
98+
with the PLAIN mechanism.
99+
- schema.registry.sasl.mechanism (str, optional): SASL mechanism to
100+
use for authentication. Only ``PLAIN`` is supported by
101+
``basic.auth.credentials.source`` SASL_INHERIT.
102+
103+
"""
104+
properties = {
105+
URL: None,
106+
CA_LOCATION: None,
107+
CERTIFICATE_LOCATION: None,
108+
KEY_LOCATION: None,
109+
BASIC_AUTH_CREDENTIALS_SOURCE: BasicAuthCredentialSource.URL,
110+
BASIC_AUTH_USER_INFO: None,
111+
SASL_USERNAME: None,
112+
SASL_PASSWORD: None,
113+
SASL_MECHANISM: 'GSSAPI',
114+
REST_CLIENT: DefaultRestClient
115+
}
116+
117+
def _err_str(self, val):
118+
return self.prefix + val
119+
120+
def validate(self):
121+
# URL validation test requires a string method
122+
if not isinstance(self[URL], basestring):
123+
raise TypeError("{} must be a string".format(self._err_str(URL)))
124+
125+
if not self[URL] or not self[URL].startswith("http"):
126+
raise ValueError('Invalid URL {}'.format(self[URL]))
127+
128+
# Ensure we have both keys if set
129+
if bool(self[CERTIFICATE_LOCATION]) != bool(self[KEY_LOCATION]):
130+
raise ValueError("{} and {} must be set together".format(
131+
self._err_str(CERTIFICATE_LOCATION),
132+
self._err_str(KEY_LOCATION)))
133+
134+
if self[BASIC_AUTH_CREDENTIALS_SOURCE] == \
135+
BasicAuthCredentialSource.SASL_INHERIT:
136+
if bool(self[SASL_USERNAME]) != bool(self[SASL_PASSWORD]):
137+
raise ValueError("{} and {} must be set together".format(
138+
self._err_str(SASL_USERNAME),
139+
self._err_str(SASL_PASSWORD)))
140+
141+
if self[BASIC_AUTH_CREDENTIALS_SOURCE] == \
142+
BasicAuthCredentialSource.SASL_INHERIT and \
143+
'GSSAPI' == self[SASL_MECHANISM]:
144+
raise ValueError("{} does not support GSSAPI".format(
145+
self._err_str(BASIC_AUTH_CREDENTIALS_SOURCE)))
146+
147+
if self[BASIC_AUTH_CREDENTIALS_SOURCE] == \
148+
BasicAuthCredentialSource.USERINFO:
149+
150+
if self[BASIC_AUTH_USER_INFO] is None:
151+
raise ValueError("{} is required when {} is {}".format(
152+
self._err_str(BASIC_AUTH_USER_INFO),
153+
self._err_str(BASIC_AUTH_CREDENTIALS_SOURCE),
154+
BasicAuthCredentialSource.USERINFO))
155+
156+
@property
157+
def Url(self):
158+
return self[URL]
159+
160+
@property
161+
def SchemaRegistry(self):
162+
return SchemaRegistryClient(self)
163+
164+
@property
165+
def RestClient(self):
166+
return self[REST_CLIENT](self)
167+
168+
@property
169+
def CA(self):
170+
return self[CA_LOCATION]
171+
172+
@property
173+
def Certificate(self):
174+
if CERTIFICATE_LOCATION in self.data:
175+
return self[CERTIFICATE_LOCATION], self[KEY_LOCATION]
176+
return None
177+
178+
@property
179+
def Credentials(self):
180+
if self[BASIC_AUTH_CREDENTIALS_SOURCE] == BasicAuthCredentialSource.SASL_INHERIT:
181+
return self[SASL_USERNAME], self[SASL_PASSWORD]
182+
if self[BASIC_AUTH_CREDENTIALS_SOURCE] == BasicAuthCredentialSource.USERINFO:
183+
return self[BASIC_AUTH_USER_INFO].split(':')
184+
if self[BASIC_AUTH_CREDENTIALS_SOURCE] == BasicAuthCredentialSource.URL:
185+
return utils.get_auth_from_url(self[URL])
186+
187+
188+
class RegistrySerializerConfig(Config):
189+
include = [SchemaRegistryConfig]
190+
191+
properties = {
192+
SUBJECT_NAME_STRATEGY: SubjectNameStrategy.TopicNameStrategy,
193+
AUTO_REGISTRATION: True,
194+
}
195+
196+
def validate(self):
197+
if not isinstance(self.SubjectNameStrategy, SubjectNameStrategy):
198+
raise TypeError("{} must be an instance of {}".format(
199+
[SUBJECT_NAME_STRATEGY], str(SubjectNameStrategy)))
200+
if not isinstance(self[AUTO_REGISTRATION], bool):
201+
raise TypeError("{} must be an instance of {}".format(
202+
self.prefix + AUTO_REGISTRATION, bool))
203+
204+
@property
205+
def SchemaIDStrategy(self):
206+
return self[AUTO_REGISTRATION]
207+
208+
@property
209+
def SubjectNameStrategy(self):
210+
if self[SUBJECT_NAME_STRATEGY] \
211+
is SubjectNameStrategy.TopicNameStrategy:
212+
return TopicNameStrategy()
213+
if self[SUBJECT_NAME_STRATEGY] \
214+
is SubjectNameStrategy.TopicRecordNameStrategy:
215+
return TopicRecordNameStrategy()
216+
if self[SUBJECT_NAME_STRATEGY] \
217+
is SubjectNameStrategy.RecordNameStrategy:
218+
return RecordNameStrategy()
219+
raise TypeError("Invalid {}: {}".format(
220+
SUBJECT_NAME_STRATEGY, str(self[SUBJECT_NAME_STRATEGY])))
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#!/usr/bin/env python
2+
#
3+
# Copyright 2017 Confluent Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
from fastavro.schema import SchemaParseException, UnknownType
19+
20+
__all__ = ['SchemaRegistryClientError', 'SchemaParseException', 'UnknownType']
21+
22+
23+
class SchemaRegistryClientError(BaseException):
24+
"""
25+
Error thrown by Schema Registry clients
26+
27+
Args:
28+
error_message (str) = description of th error
29+
30+
Keyword Args:
31+
code (int, optional) = Error code
32+
33+
See Also:
34+
https://docs.confluent.io/current/schema-registry/develop/api.html#errors
35+
36+
"""
37+
def __init__(self, error_message, code=None):
38+
self.error_message = error_message
39+
self.code = code
40+
print("{} {}".format(error_message, code))
41+
42+
def __repr__(self):
43+
return self.error_message
44+
45+
def __str__(self):
46+
return self.error_message

0 commit comments

Comments
 (0)