Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions docs/serialization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -267,18 +267,6 @@ depending on the object in that method might result in an undefined behavior.
Additionally, evolved serializers must have the same type name with the
initial version of the serializer.

Limitations
~~~~~~~~~~~

Currently, the following APIs are not fully supported with the Compact
serialization format. They may or may not work, depending on whether the
schema is available on the client or not.

All of these APIs will work with the Compact serialization format, once it is
promoted to the stable status.

- Reading OBJECT columns of the SQL results

IdentifiedDataSerializable Serialization
----------------------------------------

Expand Down
11 changes: 8 additions & 3 deletions hazelcast/protocol/builtin.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import typing
import uuid
from datetime import date, time, datetime, timedelta, timezone
from decimal import Decimal
Expand Down Expand Up @@ -701,7 +702,7 @@ def decode(msg):

class SqlPageCodec:
@staticmethod
def decode(msg):
def decode(msg, to_object_fn):
from hazelcast.sql import SqlColumnType, _SqlPage

# begin frame
Expand All @@ -715,7 +716,7 @@ def decode(msg):
column_count = len(column_type_ids)

# read columns
columns = [None] * column_count
columns: typing.List = [None] * column_count

for i in range(column_count):
column_type_id = column_type_ids[i]
Expand Down Expand Up @@ -754,7 +755,11 @@ def decode(msg):
column = [None for _ in range(size)]
columns[i] = column
elif column_type_id == SqlColumnType.OBJECT:
columns[i] = ListMultiFrameCodec.decode_contains_nullable(msg, DataCodec.decode)

def decode(m):
return to_object_fn(DataCodec.decode(m))

columns[i] = ListMultiFrameCodec.decode_contains_nullable(msg, decode)
elif column_type_id == SqlColumnType.JSON:
columns[i] = ListMultiFrameCodec.decode_contains_nullable(
msg, HazelcastJsonValueCodec.decode
Expand Down
4 changes: 2 additions & 2 deletions hazelcast/protocol/codec/sql_execute_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ def encode_request(sql, parameters, timeout_millis, cursor_buffer_size, schema,
return OutboundMessage(buf, False)


def decode_response(msg):
def decode_response(msg, to_object_fn):
initial_frame = msg.next_frame()
response = dict()
response["update_count"] = FixSizedTypesCodec.decode_long(initial_frame.buf, _RESPONSE_UPDATE_COUNT_OFFSET)
response["row_metadata"] = ListMultiFrameCodec.decode_nullable(msg, SqlColumnMetadataCodec.decode)
response["row_page"] = CodecUtil.decode_nullable(msg, SqlPageCodec.decode)
response["row_page"] = CodecUtil.decode_nullable(msg, lambda m: SqlPageCodec.decode(m, to_object_fn))
response["error"] = CodecUtil.decode_nullable(msg, SqlErrorCodec.decode)
return response
4 changes: 2 additions & 2 deletions hazelcast/protocol/codec/sql_fetch_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ def encode_request(query_id, cursor_buffer_size):
return OutboundMessage(buf, False)


def decode_response(msg):
def decode_response(msg, to_object_fn):
msg.next_frame()
response = dict()
response["row_page"] = CodecUtil.decode_nullable(msg, SqlPageCodec.decode)
response["row_page"] = CodecUtil.decode_nullable(msg, lambda m: SqlPageCodec.decode(m, to_object_fn))
response["error"] = CodecUtil.decode_nullable(msg, SqlErrorCodec.decode)
return response
8 changes: 4 additions & 4 deletions hazelcast/serialization/portable/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,18 @@ def find_portable_version(self, factory_id, class_id, portable):
return current_version

def create_new_portable_instance(self, factory_id, class_id):
try:
portable_factory = self._portable_factories[factory_id]
except KeyError:
portable_factory = self._portable_factories.get(factory_id)
if portable_factory is None:
raise HazelcastSerializationError(
"Could not find portable_factory for factory-id: %s" % factory_id
)

portable = portable_factory[class_id]
portable = portable_factory.get(class_id)
if portable is None:
raise HazelcastSerializationError(
"Could not create Portable for class-id: %s" % class_id
)

return portable()

def create_reader(self, inp, factory_id, class_id, version, portable_version):
Expand Down
74 changes: 28 additions & 46 deletions hazelcast/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from hazelcast.errors import HazelcastError
from hazelcast.future import Future, ImmediateFuture, ImmediateExceptionFuture
from hazelcast.invocation import Invocation
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.serialization.compact import SchemaNotReplicatedError, SchemaNotFoundError
from hazelcast.util import (
UUIDUtil,
to_millis,
Expand Down Expand Up @@ -544,18 +544,17 @@ class SqlRow:
If an integer value is passed to the ``[]`` operator, it will implicitly
call the :func:`get_object_with_index` and return the result.

For any other type passed into the the ``[]`` operator, :func:`get_object`
For any other type passed into the ``[]`` operator, :func:`get_object`
will be called. Note that, :func:`get_object` expects ``str`` values.
Hence, the ``[]`` operator will raise error for any type other than integer
and string.
"""

__slots__ = ("_row_metadata", "_row", "_deserialize_fn")
__slots__ = ("_row_metadata", "_row")

def __init__(self, row_metadata, row, deserialize_fn):
def __init__(self, row_metadata, row):
self._row_metadata = row_metadata
self._row = row
self._deserialize_fn = deserialize_fn

def get_object(self, column_name: str) -> typing.Any:
"""Gets the value in the column indicated by the column name.
Expand All @@ -567,13 +566,6 @@ def get_object(self, column_name: str) -> typing.Any:
The type of the returned value depends on the SQL type of the column.
No implicit conversions are performed on the value.

Warnings:

Each call to this method might result in a deserialization if the
column type for this object is :const:`SqlColumnType.OBJECT`.
It is advised to assign the result of this method call to some
variable and reuse it.

Args:
column_name: The column name.

Expand All @@ -583,7 +575,6 @@ def get_object(self, column_name: str) -> typing.Any:
Raises:
ValueError: If a column with the given name does not exist.
AssertionError: If the column name is not a string.
HazelcastSqlError: If the object cannot be deserialized.

See Also:
:attr:`metadata`
Expand All @@ -597,21 +588,14 @@ def get_object(self, column_name: str) -> typing.Any:
index = self._row_metadata.find_column(column_name)
if index == SqlRowMetadata.COLUMN_NOT_FOUND:
raise ValueError("Column '%s' doesn't exist" % column_name)
return self._deserialize_fn(self._row[index])
return self._row[index]

def get_object_with_index(self, column_index: int) -> typing.Any:
"""Gets the value of the column by index.

The class of the returned value depends on the SQL type of the column.
No implicit conversions are performed on the value.

Warnings:

Each call to this method might result in a deserialization if the
column type for this object is :const:`SqlColumnType.OBJECT`.
It is advised to assign the result of this method call to some
variable and reuse it.

Args:
column_index: Zero-based column index.

Expand All @@ -621,15 +605,14 @@ def get_object_with_index(self, column_index: int) -> typing.Any:
Raises:
IndexError: If the column index is out of bounds.
AssertionError: If the column index is not an integer.
HazelcastSqlError: If the object cannot be deserialized.

See Also:
:attr:`metadata`

:attr:`SqlColumnMetadata.type`
"""
check_is_int(column_index, "Column index must be an integer")
return self._deserialize_fn(self._row[column_index])
return self._row[column_index]

@property
def metadata(self) -> SqlRowMetadata:
Expand Down Expand Up @@ -680,23 +663,19 @@ class _IteratorBase:
__slots__ = (
"row_metadata",
"fetch_fn",
"deserialize_fn",
"page",
"row_count",
"position",
"is_last",
)

def __init__(self, row_metadata, fetch_fn, deserialize_fn):
def __init__(self, row_metadata, fetch_fn):
self.row_metadata = row_metadata
"""SqlRowMetadata: Row metadata."""

self.fetch_fn = fetch_fn
"""function: Fetches the next page. It produces a Future[_SqlPage]."""

self.deserialize_fn = deserialize_fn
"""function: Deserializes the value."""

self.page = None
"""_SqlPage: Current page."""

Expand Down Expand Up @@ -729,7 +708,6 @@ def _get_current_row(self):
list: The row pointed by the current position.
"""

# Deserialization happens lazily while getting the object.
return [self.page.get_value(i, self.position) for i in range(self.page.column_count)]


Expand Down Expand Up @@ -766,7 +744,7 @@ def _has_next_continuation(self, future):

row = self._get_current_row()
self.position += 1
return SqlRow(self.row_metadata, row, self.deserialize_fn)
return SqlRow(self.row_metadata, row)

def _has_next(self):
"""Returns a Future indicating whether there are more rows
Expand Down Expand Up @@ -826,7 +804,7 @@ def __next__(self):

row = self._get_current_row()
self.position += 1
return SqlRow(self.row_metadata, row, self.deserialize_fn)
return SqlRow(self.row_metadata, row)

def _has_next(self):
while self.position == self.row_count:
Expand Down Expand Up @@ -1065,13 +1043,11 @@ def _get_iterator(self, should_get_blocking):
iterator = _BlockingIterator(
response.row_metadata,
self._fetch_next_page,
self._sql_service.deserialize_object,
)
else:
iterator = _FutureProducingIterator(
response.row_metadata,
self._fetch_next_page,
self._sql_service.deserialize_object,
)

# Pass the first page information to the iterator
Expand Down Expand Up @@ -1275,7 +1251,9 @@ def execute(self, sql, params, cursor_buffer_size, timeout, expected_result_type
)

invocation = Invocation(
request, connection=connection, response_handler=sql_execute_codec.decode_response
request,
connection=connection,
response_handler=lambda m: sql_execute_codec.decode_response(m, self._to_object),
)
self._invocation_service.invoke(invocation)
return invocation.future.continue_with(
Expand All @@ -1290,17 +1268,6 @@ def execute(self, sql, params, cursor_buffer_size, timeout, expected_result_type
except Exception as e:
return ImmediateExceptionFuture(self.re_raise(e, connection))

def deserialize_object(self, obj):
try:
return self._serialization_service.to_object(obj)
except Exception as e:
raise HazelcastSqlError(
self.get_client_id(),
_SqlErrorCode.GENERIC,
"Failed to deserialize query result value: %s" % try_to_get_error_message(e),
e,
)

def fetch(self, connection, query_id, cursor_buffer_size):
"""Fetches the next page of the query execution.

Expand All @@ -1317,7 +1284,9 @@ def fetch(self, connection, query_id, cursor_buffer_size):
"""
request = sql_fetch_codec.encode_request(query_id, cursor_buffer_size)
invocation = Invocation(
request, connection=connection, response_handler=sql_fetch_codec.decode_response
request,
connection=connection,
response_handler=lambda m: sql_fetch_codec.decode_response(m, self._to_object),
)
self._invocation_service.invoke(invocation)
return invocation.future
Expand Down Expand Up @@ -1376,6 +1345,19 @@ def close(self, connection, query_id):
self._invocation_service.invoke(invocation)
return invocation.future

def _to_object(self, data):
try:
return self._serialization_service.to_object(data)
except SchemaNotFoundError as e:
raise e
except Exception as e:
raise HazelcastSqlError(
self.get_client_id(),
_SqlErrorCode.GENERIC,
"Failed to deserialize query result value: %s" % try_to_get_error_message(e),
e,
)

def _get_query_connection(self):
try:
connection = self._connection_manager.get_random_connection_for_sql()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1847,14 +1847,14 @@ def tearDown(self) -> None:
super().tearDown()

def test_sql(self):
self._put_from_another_client(1, INNER_COMPACT_INSTANCE)
self._put_from_another_client(1, OUTER_COMPACT_INSTANCE)
result = self.client.sql.execute(
f'SELECT this FROM "{self.map_name}" WHERE ? IS NOT NULL',
OUTER_COMPACT_INSTANCE,
INNER_COMPACT_INSTANCE,
).result()

rows = [row["this"] for row in result]
self.assertEqual([INNER_COMPACT_INSTANCE], rows)
self.assertEqual([OUTER_COMPACT_INSTANCE], rows)

def _put_from_another_client(self, key, value):
other_client = self.create_client(self.client_config)
Expand Down
24 changes: 24 additions & 0 deletions tests/integration/backward_compatible/sql_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
skip_if_server_version_older_than,
skip_if_server_version_newer_than_or_equal,
skip_if_client_version_older_than,
skip_if_client_version_newer_than_or_equal,
)

try:
Expand Down Expand Up @@ -582,6 +583,9 @@ def test_with_statement_when_iteration_throws(self):

def test_lazy_deserialization(self):
skip_if_client_version_older_than(self, "5.0")
# client no longer performs lazy deserialization starting from 5.2
# to be compatible with Compact serialization
skip_if_client_version_newer_than_or_equal(self, "5.2")

# Using a Portable that is not defined on the client-side.
self._create_mapping_for_portable(666, 1, {})
Expand Down Expand Up @@ -609,6 +613,26 @@ def test_lazy_deserialization(self):
with self.assertRaises(HazelcastSqlError):
row.get_object("this")

def test_deserialization_error(self):
skip_if_client_version_older_than(self, "5.2")

# Using a Portable that is not defined on the client-side.
self._create_mapping_for_portable(666, 1, {})

script = (
"""
var m = instance_0.getMap("%s");
m.put(1, new com.hazelcast.client.test.Employee(1, "Joe"));
"""
% self.map_name
)

res = self.rc.executeOnController(self.cluster.id, script, Lang.JAVASCRIPT)
self.assertTrue(res.success)

with self.assertRaisesRegex(HazelcastSqlError, "Failed to deserialize query result value"):
self.execute('SELECT __key, this FROM "%s"' % self.map_name)

def test_rows_as_dict_or_list(self):
skip_if_client_version_older_than(self, "5.0")

Expand Down