Skip to content

Commit dbe73f5

Browse files
authored
Eagerly deserialize SQL columns of type OBJECT [API-1964] (#619)
* Eagerly deserialize SQL columns of type OBJECT Due to lazy deserialization, Compact serialized objects were not working nicely with SQL. It was possible to get deserialization errors that cannot be easily understood by the user. Similar to what we did for other APIs, we have removed lazy deserialization from this API as well. To do that efficiently, I have modified the protocol code generator so that we can pass to_object function to the SqlPageCodec where we can do deserialization while filling rows and columns, so that we will not iterate over the result set once more. * check non-existence with is None
1 parent a76d653 commit dbe73f5

File tree

8 files changed

+70
-72
lines changed

8 files changed

+70
-72
lines changed

docs/serialization.rst

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -267,18 +267,6 @@ depending on the object in that method might result in an undefined behavior.
267267
Additionally, evolved serializers must have the same type name with the
268268
initial version of the serializer.
269269
270-
Limitations
271-
~~~~~~~~~~~
272-
273-
Currently, the following APIs are not fully supported with the Compact
274-
serialization format. They may or may not work, depending on whether the
275-
schema is available on the client or not.
276-
277-
All of these APIs will work with the Compact serialization format, once it is
278-
promoted to the stable status.
279-
280-
- Reading OBJECT columns of the SQL results
281-
282270
IdentifiedDataSerializable Serialization
283271
----------------------------------------
284272

hazelcast/protocol/builtin.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -730,7 +730,7 @@ def decode(msg):
730730

731731
class SqlPageCodec:
732732
@staticmethod
733-
def decode(msg):
733+
def decode(msg, to_object_fn):
734734
from hazelcast.sql import SqlColumnType, _SqlPage
735735

736736
# begin frame
@@ -744,7 +744,7 @@ def decode(msg):
744744
column_count = len(column_type_ids)
745745

746746
# read columns
747-
columns = [None] * column_count
747+
columns: typing.List = [None] * column_count
748748

749749
for i in range(column_count):
750750
column_type_id = column_type_ids[i]
@@ -783,7 +783,11 @@ def decode(msg):
783783
column = [None for _ in range(size)]
784784
columns[i] = column
785785
elif column_type_id == SqlColumnType.OBJECT:
786-
columns[i] = ListMultiFrameCodec.decode_contains_nullable(msg, DataCodec.decode)
786+
787+
def decode(m):
788+
return to_object_fn(DataCodec.decode(m))
789+
790+
columns[i] = ListMultiFrameCodec.decode_contains_nullable(msg, decode)
787791
elif column_type_id == SqlColumnType.JSON:
788792
columns[i] = ListMultiFrameCodec.decode_contains_nullable(
789793
msg, HazelcastJsonValueCodec.decode

hazelcast/protocol/codec/sql_execute_codec.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ def encode_request(sql, parameters, timeout_millis, cursor_buffer_size, schema,
3636
return OutboundMessage(buf, False, True)
3737

3838

39-
def decode_response(msg):
39+
def decode_response(msg, to_object_fn):
4040
initial_frame = msg.next_frame()
4141
response = dict()
4242
response["update_count"] = FixSizedTypesCodec.decode_long(initial_frame.buf, _RESPONSE_UPDATE_COUNT_OFFSET)
4343
response["row_metadata"] = ListMultiFrameCodec.decode_nullable(msg, SqlColumnMetadataCodec.decode)
44-
response["row_page"] = CodecUtil.decode_nullable(msg, SqlPageCodec.decode)
44+
response["row_page"] = CodecUtil.decode_nullable(msg, lambda m: SqlPageCodec.decode(m, to_object_fn))
4545
response["error"] = CodecUtil.decode_nullable(msg, SqlErrorCodec.decode)
4646
return response

hazelcast/protocol/codec/sql_fetch_codec.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ def encode_request(query_id, cursor_buffer_size):
2222
return OutboundMessage(buf, False)
2323

2424

25-
def decode_response(msg):
25+
def decode_response(msg, to_object_fn):
2626
msg.next_frame()
2727
response = dict()
28-
response["row_page"] = CodecUtil.decode_nullable(msg, SqlPageCodec.decode)
28+
response["row_page"] = CodecUtil.decode_nullable(msg, lambda m: SqlPageCodec.decode(m, to_object_fn))
2929
response["error"] = CodecUtil.decode_nullable(msg, SqlErrorCodec.decode)
3030
return response

hazelcast/serialization/portable/serializer.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,18 @@ def find_portable_version(self, factory_id, class_id, portable):
5353
return current_version
5454

5555
def create_new_portable_instance(self, factory_id, class_id):
56-
try:
57-
portable_factory = self._portable_factories[factory_id]
58-
except KeyError:
56+
portable_factory = self._portable_factories.get(factory_id)
57+
if portable_factory is None:
5958
raise HazelcastSerializationError(
6059
"Could not find portable_factory for factory-id: %s" % factory_id
6160
)
6261

63-
portable = portable_factory[class_id]
62+
portable = portable_factory.get(class_id)
6463
if portable is None:
6564
raise HazelcastSerializationError(
6665
"Could not create Portable for class-id: %s" % class_id
6766
)
67+
6868
return portable()
6969

7070
def create_reader(self, inp, factory_id, class_id, version, portable_version):

hazelcast/sql.py

Lines changed: 28 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from hazelcast.errors import HazelcastError
88
from hazelcast.future import Future, ImmediateFuture, ImmediateExceptionFuture
99
from hazelcast.invocation import Invocation
10-
from hazelcast.serialization.compact import SchemaNotReplicatedError
10+
from hazelcast.serialization.compact import SchemaNotReplicatedError, SchemaNotFoundError
1111
from hazelcast.util import (
1212
UUIDUtil,
1313
to_millis,
@@ -544,18 +544,17 @@ class SqlRow:
544544
If an integer value is passed to the ``[]`` operator, it will implicitly
545545
call the :func:`get_object_with_index` and return the result.
546546
547-
For any other type passed into the the ``[]`` operator, :func:`get_object`
547+
For any other type passed into the ``[]`` operator, :func:`get_object`
548548
will be called. Note that, :func:`get_object` expects ``str`` values.
549549
Hence, the ``[]`` operator will raise error for any type other than integer
550550
and string.
551551
"""
552552

553-
__slots__ = ("_row_metadata", "_row", "_deserialize_fn")
553+
__slots__ = ("_row_metadata", "_row")
554554

555-
def __init__(self, row_metadata, row, deserialize_fn):
555+
def __init__(self, row_metadata, row):
556556
self._row_metadata = row_metadata
557557
self._row = row
558-
self._deserialize_fn = deserialize_fn
559558

560559
def get_object(self, column_name: str) -> typing.Any:
561560
"""Gets the value in the column indicated by the column name.
@@ -567,13 +566,6 @@ def get_object(self, column_name: str) -> typing.Any:
567566
The type of the returned value depends on the SQL type of the column.
568567
No implicit conversions are performed on the value.
569568
570-
Warnings:
571-
572-
Each call to this method might result in a deserialization if the
573-
column type for this object is :const:`SqlColumnType.OBJECT`.
574-
It is advised to assign the result of this method call to some
575-
variable and reuse it.
576-
577569
Args:
578570
column_name: The column name.
579571
@@ -583,7 +575,6 @@ def get_object(self, column_name: str) -> typing.Any:
583575
Raises:
584576
ValueError: If a column with the given name does not exist.
585577
AssertionError: If the column name is not a string.
586-
HazelcastSqlError: If the object cannot be deserialized.
587578
588579
See Also:
589580
:attr:`metadata`
@@ -597,21 +588,14 @@ def get_object(self, column_name: str) -> typing.Any:
597588
index = self._row_metadata.find_column(column_name)
598589
if index == SqlRowMetadata.COLUMN_NOT_FOUND:
599590
raise ValueError("Column '%s' doesn't exist" % column_name)
600-
return self._deserialize_fn(self._row[index])
591+
return self._row[index]
601592

602593
def get_object_with_index(self, column_index: int) -> typing.Any:
603594
"""Gets the value of the column by index.
604595
605596
The class of the returned value depends on the SQL type of the column.
606597
No implicit conversions are performed on the value.
607598
608-
Warnings:
609-
610-
Each call to this method might result in a deserialization if the
611-
column type for this object is :const:`SqlColumnType.OBJECT`.
612-
It is advised to assign the result of this method call to some
613-
variable and reuse it.
614-
615599
Args:
616600
column_index: Zero-based column index.
617601
@@ -621,15 +605,14 @@ def get_object_with_index(self, column_index: int) -> typing.Any:
621605
Raises:
622606
IndexError: If the column index is out of bounds.
623607
AssertionError: If the column index is not an integer.
624-
HazelcastSqlError: If the object cannot be deserialized.
625608
626609
See Also:
627610
:attr:`metadata`
628611
629612
:attr:`SqlColumnMetadata.type`
630613
"""
631614
check_is_int(column_index, "Column index must be an integer")
632-
return self._deserialize_fn(self._row[column_index])
615+
return self._row[column_index]
633616

634617
@property
635618
def metadata(self) -> SqlRowMetadata:
@@ -680,23 +663,19 @@ class _IteratorBase:
680663
__slots__ = (
681664
"row_metadata",
682665
"fetch_fn",
683-
"deserialize_fn",
684666
"page",
685667
"row_count",
686668
"position",
687669
"is_last",
688670
)
689671

690-
def __init__(self, row_metadata, fetch_fn, deserialize_fn):
672+
def __init__(self, row_metadata, fetch_fn):
691673
self.row_metadata = row_metadata
692674
"""SqlRowMetadata: Row metadata."""
693675

694676
self.fetch_fn = fetch_fn
695677
"""function: Fetches the next page. It produces a Future[_SqlPage]."""
696678

697-
self.deserialize_fn = deserialize_fn
698-
"""function: Deserializes the value."""
699-
700679
self.page = None
701680
"""_SqlPage: Current page."""
702681

@@ -729,7 +708,6 @@ def _get_current_row(self):
729708
list: The row pointed by the current position.
730709
"""
731710

732-
# Deserialization happens lazily while getting the object.
733711
return [self.page.get_value(i, self.position) for i in range(self.page.column_count)]
734712

735713

@@ -766,7 +744,7 @@ def _has_next_continuation(self, future):
766744

767745
row = self._get_current_row()
768746
self.position += 1
769-
return SqlRow(self.row_metadata, row, self.deserialize_fn)
747+
return SqlRow(self.row_metadata, row)
770748

771749
def _has_next(self):
772750
"""Returns a Future indicating whether there are more rows
@@ -826,7 +804,7 @@ def __next__(self):
826804

827805
row = self._get_current_row()
828806
self.position += 1
829-
return SqlRow(self.row_metadata, row, self.deserialize_fn)
807+
return SqlRow(self.row_metadata, row)
830808

831809
def _has_next(self):
832810
while self.position == self.row_count:
@@ -1065,13 +1043,11 @@ def _get_iterator(self, should_get_blocking):
10651043
iterator = _BlockingIterator(
10661044
response.row_metadata,
10671045
self._fetch_next_page,
1068-
self._sql_service.deserialize_object,
10691046
)
10701047
else:
10711048
iterator = _FutureProducingIterator(
10721049
response.row_metadata,
10731050
self._fetch_next_page,
1074-
self._sql_service.deserialize_object,
10751051
)
10761052

10771053
# Pass the first page information to the iterator
@@ -1275,7 +1251,9 @@ def execute(self, sql, params, cursor_buffer_size, timeout, expected_result_type
12751251
)
12761252

12771253
invocation = Invocation(
1278-
request, connection=connection, response_handler=sql_execute_codec.decode_response
1254+
request,
1255+
connection=connection,
1256+
response_handler=lambda m: sql_execute_codec.decode_response(m, self._to_object),
12791257
)
12801258
self._invocation_service.invoke(invocation)
12811259
return invocation.future.continue_with(
@@ -1290,17 +1268,6 @@ def execute(self, sql, params, cursor_buffer_size, timeout, expected_result_type
12901268
except Exception as e:
12911269
return ImmediateExceptionFuture(self.re_raise(e, connection))
12921270

1293-
def deserialize_object(self, obj):
1294-
try:
1295-
return self._serialization_service.to_object(obj)
1296-
except Exception as e:
1297-
raise HazelcastSqlError(
1298-
self.get_client_id(),
1299-
_SqlErrorCode.GENERIC,
1300-
"Failed to deserialize query result value: %s" % try_to_get_error_message(e),
1301-
e,
1302-
)
1303-
13041271
def fetch(self, connection, query_id, cursor_buffer_size):
13051272
"""Fetches the next page of the query execution.
13061273
@@ -1317,7 +1284,9 @@ def fetch(self, connection, query_id, cursor_buffer_size):
13171284
"""
13181285
request = sql_fetch_codec.encode_request(query_id, cursor_buffer_size)
13191286
invocation = Invocation(
1320-
request, connection=connection, response_handler=sql_fetch_codec.decode_response
1287+
request,
1288+
connection=connection,
1289+
response_handler=lambda m: sql_fetch_codec.decode_response(m, self._to_object),
13211290
)
13221291
self._invocation_service.invoke(invocation)
13231292
return invocation.future
@@ -1376,6 +1345,19 @@ def close(self, connection, query_id):
13761345
self._invocation_service.invoke(invocation)
13771346
return invocation.future
13781347

1348+
def _to_object(self, data):
1349+
try:
1350+
return self._serialization_service.to_object(data)
1351+
except SchemaNotFoundError as e:
1352+
raise e
1353+
except Exception as e:
1354+
raise HazelcastSqlError(
1355+
self.get_client_id(),
1356+
_SqlErrorCode.GENERIC,
1357+
"Failed to deserialize query result value: %s" % try_to_get_error_message(e),
1358+
e,
1359+
)
1360+
13791361
def _get_query_connection(self):
13801362
try:
13811363
connection = self._connection_manager.get_random_connection_for_sql()

tests/integration/backward_compatible/serialization/compact_compatibility/compact_compatibility_test.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1847,14 +1847,14 @@ def tearDown(self) -> None:
18471847
super().tearDown()
18481848

18491849
def test_sql(self):
1850-
self._put_from_another_client(1, INNER_COMPACT_INSTANCE)
1850+
self._put_from_another_client(1, OUTER_COMPACT_INSTANCE)
18511851
result = self.client.sql.execute(
18521852
f'SELECT this FROM "{self.map_name}" WHERE ? IS NOT NULL',
1853-
OUTER_COMPACT_INSTANCE,
1853+
INNER_COMPACT_INSTANCE,
18541854
).result()
18551855

18561856
rows = [row["this"] for row in result]
1857-
self.assertEqual([INNER_COMPACT_INSTANCE], rows)
1857+
self.assertEqual([OUTER_COMPACT_INSTANCE], rows)
18581858

18591859
def _put_from_another_client(self, key, value):
18601860
other_client = self.create_client(self.client_config)

tests/integration/backward_compatible/sql_test.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
skip_if_server_version_older_than,
2020
skip_if_server_version_newer_than_or_equal,
2121
skip_if_client_version_older_than,
22+
skip_if_client_version_newer_than_or_equal,
2223
)
2324

2425
try:
@@ -582,6 +583,9 @@ def test_with_statement_when_iteration_throws(self):
582583

583584
def test_lazy_deserialization(self):
584585
skip_if_client_version_older_than(self, "5.0")
586+
# client no longer performs lazy deserialization starting from 5.2
587+
# to be compatible with Compact serialization
588+
skip_if_client_version_newer_than_or_equal(self, "5.2")
585589

586590
# Using a Portable that is not defined on the client-side.
587591
self._create_mapping_for_portable(666, 1, {})
@@ -609,6 +613,26 @@ def test_lazy_deserialization(self):
609613
with self.assertRaises(HazelcastSqlError):
610614
row.get_object("this")
611615

616+
def test_deserialization_error(self):
617+
skip_if_client_version_older_than(self, "5.2")
618+
619+
# Using a Portable that is not defined on the client-side.
620+
self._create_mapping_for_portable(666, 1, {})
621+
622+
script = (
623+
"""
624+
var m = instance_0.getMap("%s");
625+
m.put(1, new com.hazelcast.client.test.Employee(1, "Joe"));
626+
"""
627+
% self.map_name
628+
)
629+
630+
res = self.rc.executeOnController(self.cluster.id, script, Lang.JAVASCRIPT)
631+
self.assertTrue(res.success)
632+
633+
with self.assertRaisesRegex(HazelcastSqlError, "Failed to deserialize query result value"):
634+
self.execute('SELECT __key, this FROM "%s"' % self.map_name)
635+
612636
def test_rows_as_dict_or_list(self):
613637
skip_if_client_version_older_than(self, "5.0")
614638

0 commit comments

Comments
 (0)