Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions examples/sql/sql_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import hazelcast
from hazelcast.serialization.api import Portable
from hazelcast.sql import SqlStatement


class Customer(Portable):
Expand Down Expand Up @@ -93,16 +92,13 @@ def __repr__(self):

print(name, age, is_active)

# Construct a statement object to control the properties of the query
# Special keywords __key and this can be used to refer to key and value.
# Also, a placeholder parameters can be specified
statement = SqlStatement("SELECT __key, age FROM customers WHERE name LIKE ?")
query = "SELECT __key, age FROM customers WHERE name LIKE ?"

# Parameters will replace the placeholders on the server side
statement.add_parameter("Jo%")
statement.timeout = 5

with client.sql.execute_statement(statement).result() as result:
# Parameters will replace the placeholders on the server side.
# Properties of the query can be configured with keyword arguments.
with client.sql.execute(query, "Jo%", timeout=5).result() as result:
# Row metadata can also be retrieved from the result
row_metadata = result.get_row_metadata()

Expand All @@ -128,3 +124,5 @@ def __repr__(self):

# Query can be closed explicitly
result.close().result()

client.shutdown()
261 changes: 80 additions & 181 deletions hazelcast/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,41 +81,77 @@ class SqlService(object):
def __init__(self, internal_sql_service):
self._service = internal_sql_service

def execute(self, sql, *params):
"""Convenient method to execute a distributed query with the given
parameters.

Converts passed SQL string and parameters into an :class:`SqlStatement`
object and invokes :func:`execute_statement`.
def execute(self, sql, *params, **kwargs):
"""Executes an SQL statement.

Args:
sql (str): SQL string.
*params: Query parameters that will be passed to
:func:`SqlStatement.add_parameter`.
*params: Query parameters that will replace the placeholders at
the server-side. You may define parameter placeholders in the
query with the ``?`` character. For every placeholder, a
parameter value must be provided.

Returns:
hazelcast.future.Future[SqlResult]: The execution result.
Keyword Args:
cursor_buffer_size (int): The cursor buffer size measured in the
number of rows.

Raises:
HazelcastSqlError: In case of execution error.
AssertionError: If the SQL parameter is not a string.
ValueError: If the SQL parameter is an empty string.
"""
return self._service.execute(sql, *params)
When a statement is submitted for execution, a
:class:`SqlResult` is returned as a result. When rows are ready
to be consumed, they are put into an internal buffer of the
cursor. This parameter defines the maximum number of rows in
that buffer. When the threshold is reached, the backpressure
mechanism will slow down the execution, possibly to a complete
halt, to prevent out-of-memory.

def execute_statement(self, statement):
"""Executes an SQL statement.
Only positive values are allowed.

Args:
statement (SqlStatement): Statement to be executed
The default value is expected to work well for most workloads.
A bigger buffer size may give you a slight performance boost
for queries with large result sets at the cost of increased
memory consumption.

Defaults to ``4096``.
timeout (float or int): The execution timeout in seconds.

If the timeout is reached for a running statement, it will be
cancelled forcefully.

Zero value means no timeout. ``-1`` means that the value from
the server-side config will be used. Other negative values are
prohibited.

Defaults to ``-1``.
expected_result_type (SqlExpectedResultType): The expected result
type.
schema (str or None): The schema name.

The engine will try to resolve the non-qualified object
identifiers from the statement in the given schema. If not
found, the default search path will be used.

The schema name is case sensitive. For example, ``foo`` and
``Foo`` are different schemas.

The default value is ``None`` meaning only the default search
path is used.

Returns:
hazelcast.future.Future[SqlResult]: The execution result.

Raises:
HazelcastSqlError: In case of execution error.
AssertionError: If the ``sql`` parameter is not a string, the
``schema`` is not a string or ``None``, the ``timeout`` is not
an integer or float, or the ``cursor_buffer_size`` is not an
integer.
ValueError: If the ``sql`` parameter is an empty string, the
``timeout`` is negative and not equal to ``-1``, the
``cursor_buffer_size`` is not positive.
TypeError: If the ``expected_result_type`` does not equal to one of
the values or names of the members of the
:class:`SqlExpectedResultType`.
"""
return self._service.execute_statement(statement)
return self._service.execute(sql, params, kwargs)


class _SqlQueryId(object):
Expand Down Expand Up @@ -1167,32 +1203,18 @@ def __init__(self, connection_manager, serialization_service, invocation_service
self._serialization_service = serialization_service
self._invocation_service = invocation_service

def execute(self, sql, *params):
def execute(self, sql, params, kwargs):
"""Constructs a statement and executes it.

Args:
sql (str): SQL string.
*params: Query parameters.

Returns:
hazelcast.future.Future[SqlResult]: The execution result.
"""
statement = SqlStatement(sql)

for param in params:
statement.add_parameter(param)

return self.execute_statement(statement)

def execute_statement(self, statement):
"""Executes the given statement.

Args:
statement (SqlStatement): The statement to execute.
params (tuple): Query parameters.
kwargs (dict): Arguments to customize the query.

Returns:
hazelcast.future.Future[SqlResult]: The execution result.
"""
statement = _SqlStatement(sql, params, **kwargs)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the reviewers, I construct the statement outside of the try-except block below so that, for validation errors, we immediately throw the exception not ImmediateFutureException


connection = None
try:
Expand Down Expand Up @@ -1393,39 +1415,31 @@ class SqlExpectedResultType(object):
"""


class SqlStatement(object):
"""Definition of an SQL statement.

This object is mutable. Properties are read once before the execution
is started. Changes to properties do not affect the behavior of already
running statements.
"""

TIMEOUT_NOT_SET = -1

TIMEOUT_DISABLED = 0
_TIMEOUT_NOT_SET = -1
_DEFAULT_CURSOR_BUFFER_SIZE = 4096

DEFAULT_TIMEOUT = TIMEOUT_NOT_SET

DEFAULT_CURSOR_BUFFER_SIZE = 4096
class _SqlStatement(object):
"""Definition of an SQL statement."""

def __init__(self, sql):
def __init__(
self,
sql,
parameters,
timeout=_TIMEOUT_NOT_SET,
cursor_buffer_size=_DEFAULT_CURSOR_BUFFER_SIZE,
schema=None,
expected_result_type=SqlExpectedResultType.ANY,
):
self.sql = sql
self._parameters = []
self._timeout = SqlStatement.DEFAULT_TIMEOUT
self._cursor_buffer_size = SqlStatement.DEFAULT_CURSOR_BUFFER_SIZE
self._schema = None
self._expected_result_type = SqlExpectedResultType.ANY
self.parameters = parameters
self.timeout = timeout
self.cursor_buffer_size = cursor_buffer_size
self.schema = schema
self.expected_result_type = expected_result_type

@property
def sql(self):
"""str: The SQL string to be executed.

The setter raises:

- **AssertionError**: If the SQL parameter is not a string.
- **ValueError**: If the SQL parameter is an empty string.
"""
return self._sql

@sql.setter
Expand All @@ -1439,22 +1453,6 @@ def sql(self, sql):

@property
def schema(self):
"""str: The schema name. The engine will try to resolve the
non-qualified object identifiers from the statement in the given
schema. If not found, the default search path will be used, which
looks for objects in the predefined schemas ``partitioned`` and
``public``.

The schema name is case sensitive. For example, ``foo`` and ``Foo``
are different schemas.

The default value is ``None`` meaning only the default search path is
used.

The setter raises:

- **AssertionError**: If the schema is not a string or ``None``.
"""
return self._schema

@schema.setter
Expand All @@ -1465,80 +1463,20 @@ def schema(self, schema):
)
self._schema = schema

@property
def parameters(self):
"""list: Sets the statement parameters.

You may define parameter placeholders in the statement with the ``?``
character. For every placeholder, a parameter value must be provided.

When the setter is called, the content of the parameters list is copied.
Subsequent changes to the original list don't change the statement parameters.

The setter raises:

- **AssertionError**: If the parameter is not a list.
"""
return self._parameters

@parameters.setter
def parameters(self, parameters):
check_true(isinstance(parameters, list), "Parameters must be a list")
self._parameters = list(parameters)

@property
def timeout(self):
"""float or int: The execution timeout in seconds.

If the timeout is reached for a running statement, it will be
cancelled forcefully.

Zero value means no timeout. :const:`TIMEOUT_NOT_SET` means that
the value from the server-side config will be used. Other negative
values are prohibited.

Defaults to :const:`TIMEOUT_NOT_SET`.

The setter raises:

- **AssertionError**: If the timeout is not an integer or float.
- **ValueError**: If the timeout is negative and not equal to
:const:`TIMEOUT_NOT_SET`.
"""
return self._timeout

@timeout.setter
def timeout(self, timeout):
check_is_number(timeout, "Timeout must be an integer or float")
if timeout < 0 and timeout != SqlStatement.TIMEOUT_NOT_SET:
if timeout < 0 and timeout != _TIMEOUT_NOT_SET:
raise ValueError("Timeout must be non-negative or -1, not %s" % timeout)

self._timeout = timeout

@property
def cursor_buffer_size(self):
"""int: The cursor buffer size (measured in the number of rows).

When a statement is submitted for execution, a :class:`SqlResult`
is returned as a result. When rows are ready to be consumed,
they are put into an internal buffer of the cursor. This parameter
defines the maximum number of rows in that buffer. When the threshold
is reached, the backpressure mechanism will slow down the execution,
possibly to a complete halt, to prevent out-of-memory.

Only positive values are allowed.

The default value is expected to work well for most workloads. A bigger
buffer size may give you a slight performance boost for queries with
large result sets at the cost of increased memory consumption.

Defaults to :const:`DEFAULT_CURSOR_BUFFER_SIZE`.

The setter raises:

- **AssertionError**: If the cursor buffer size is not an integer.
- **ValueError**: If the cursor buffer size is not positive.
"""
return self._cursor_buffer_size

@cursor_buffer_size.setter
Expand All @@ -1550,14 +1488,6 @@ def cursor_buffer_size(self, cursor_buffer_size):

@property
def expected_result_type(self):
"""SqlExpectedResultType: The expected result type.

The setter raises:

- **TypeError**: If the expected result type does not equal to one of
the values or names of the members of the
:class:`SqlExpectedResultType`.
"""
return self._expected_result_type

@expected_result_type.setter
Expand All @@ -1566,37 +1496,6 @@ def expected_result_type(self, expected_result_type):
expected_result_type, SqlExpectedResultType
)

def add_parameter(self, parameter):
"""Adds a single parameter to the end of the parameters list.

Args:
parameter: The parameter.

See Also:
:attr:`parameters`

:func:`clear_parameters`
"""
self._parameters.append(parameter)

def clear_parameters(self):
"""Clears statement parameters."""
self._parameters = []

def copy(self):
"""Creates a copy of this instance.

Returns:
SqlStatement: The new copy.
"""
copied = SqlStatement(self.sql)
copied.parameters = list(self.parameters)
copied.timeout = self.timeout
copied.cursor_buffer_size = self.cursor_buffer_size
copied.schema = self.schema
copied.expected_result_type = self.expected_result_type
return copied

def __repr__(self):
return (
"SqlStatement(schema=%s, sql=%s, parameters=%s, timeout=%s,"
Expand Down
2 changes: 1 addition & 1 deletion start_rc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import sys
from os.path import isfile

SERVER_VERSION = "5.0-SNAPSHOT"
SERVER_VERSION = "5.0"
RC_VERSION = "0.8-SNAPSHOT"

RELEASE_REPO = "http://repo1.maven.apache.org/maven2"
Expand Down
Loading