Skip to content

Commit ef640ff

Browse files
committed
wip
1 parent a4b1787 commit ef640ff

22 files changed

+3280
-35
lines changed

B.py

Whitespace-only changes.

docs/api/modules.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ API Documentation
1414
predicate
1515
proxy/modules
1616
serialization
17+
sql
1718
transaction
1819
util
1920

docs/api/sql.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
SQL
2+
===========
3+
4+
.. automodule:: hazelcast.sql

hazelcast/client.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
)
3737
from hazelcast.reactor import AsyncoreReactor
3838
from hazelcast.serialization import SerializationServiceV1
39+
from hazelcast.sql import _InternalSqlService, SqlService
3940
from hazelcast.statistics import Statistics
4041
from hazelcast.transaction import TWO_PHASE, TransactionManager
4142
from hazelcast.util import AtomicInteger, RoundRobinLB
@@ -388,6 +389,10 @@ def __init__(self, **kwargs):
388389
self._invocation_service.init(
389390
self._internal_partition_service, self._connection_manager, self._listener_service
390391
)
392+
self._internal_sql_service = _InternalSqlService(
393+
self._connection_manager, self._serialization_service, self._invocation_service
394+
)
395+
self.sql = SqlService(self._internal_sql_service)
391396
self._init_context()
392397
self._start()
393398

hazelcast/connection.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -146,17 +146,20 @@ def add_listener(self, on_connection_opened=None, on_connection_closed=None):
146146
def get_connection(self, member_uuid):
147147
return self.active_connections.get(member_uuid, None)
148148

149-
def get_random_connection(self):
149+
def get_random_connection(self, should_get_data_member=False):
150150
if self._smart_routing_enabled:
151-
member = self._load_balancer.next()
152-
if member:
153-
connection = self.get_connection(member.uuid)
154-
if connection:
155-
return connection
151+
connection = self._get_connection_from_load_balancer(should_get_data_member)
152+
if connection:
153+
return connection
156154

157155
# We should not get to this point under normal circumstances.
158156
# Therefore, copying the list should be OK.
159-
for connection in list(six.itervalues(self.active_connections)):
157+
for member_uuid, connection in list(six.iteritems(self.active_connections)):
158+
if should_get_data_member:
159+
member = self._cluster_service.get_member(member_uuid)
160+
if not member or member.lite_member:
161+
continue
162+
160163
return connection
161164

162165
return None
@@ -256,6 +259,21 @@ def check_invocation_allowed(self):
256259
else:
257260
raise IOError("No connection found to cluster")
258261

262+
def _get_connection_from_load_balancer(self, should_get_data_member):
263+
load_balancer = self._load_balancer
264+
if should_get_data_member:
265+
if load_balancer.can_get_next_data_member():
266+
member = load_balancer.next_data_member()
267+
else:
268+
member = None
269+
else:
270+
member = load_balancer.next()
271+
272+
if not member:
273+
return None
274+
275+
return self.get_connection(member.uuid)
276+
259277
def _get_or_connect_to_address(self, address):
260278
for connection in list(six.itervalues(self.active_connections)):
261279
if connection.remote_address == address:

hazelcast/protocol/builtin.py

Lines changed: 270 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import uuid
2+
from datetime import date, time, datetime, timedelta
3+
from decimal import Decimal
24

35
from hazelcast import six
46
from hazelcast.six.moves import range
@@ -11,7 +13,7 @@
1113
NULL_FINAL_FRAME_BUF,
1214
END_FINAL_FRAME_BUF,
1315
)
14-
from hazelcast.serialization import (
16+
from hazelcast.serialization.bits import (
1517
LONG_SIZE_IN_BYTES,
1618
UUID_SIZE_IN_BYTES,
1719
LE_INT,
@@ -23,8 +25,21 @@
2325
LE_INT8,
2426
UUID_MSB_SHIFT,
2527
UUID_LSB_MASK,
28+
BYTE_SIZE_IN_BYTES,
29+
SHORT_SIZE_IN_BYTES,
30+
LE_INT16,
31+
FLOAT_SIZE_IN_BYTES,
32+
LE_FLOAT,
33+
LE_DOUBLE,
34+
DOUBLE_SIZE_IN_BYTES,
2635
)
2736
from hazelcast.serialization.data import Data
37+
from hazelcast.util import int_from_bytes, timezone
38+
39+
LOCAL_DATE_SIZE_IN_BYTES = SHORT_SIZE_IN_BYTES + BYTE_SIZE_IN_BYTES * 2
40+
LOCAL_TIME_SIZE_IN_BYTES = BYTE_SIZE_IN_BYTES * 3 + INT_SIZE_IN_BYTES
41+
LOCAL_DATE_TIME_SIZE_IN_BYTES = LOCAL_DATE_SIZE_IN_BYTES + LOCAL_TIME_SIZE_IN_BYTES
42+
OFFSET_DATE_TIME_SIZE_IN_BYTES = LOCAL_DATE_TIME_SIZE_IN_BYTES + INT_SIZE_IN_BYTES
2843

2944

3045
class CodecUtil(object):
@@ -274,6 +289,49 @@ def decode_uuid(buf, offset):
274289
)
275290
return uuid.UUID(bytes=bytes(b))
276291

292+
@staticmethod
293+
def decode_short(buf, offset):
294+
return LE_INT16.unpack_from(buf, offset)[0]
295+
296+
@staticmethod
297+
def decode_float(buf, offset):
298+
return LE_FLOAT.unpack_from(buf, offset)[0]
299+
300+
@staticmethod
301+
def decode_double(buf, offset):
302+
return LE_DOUBLE.unpack_from(buf, offset)[0]
303+
304+
@staticmethod
305+
def decode_local_date(buf, offset):
306+
year = FixSizedTypesCodec.decode_short(buf, offset)
307+
month = FixSizedTypesCodec.decode_byte(buf, offset + SHORT_SIZE_IN_BYTES)
308+
day = FixSizedTypesCodec.decode_byte(buf, offset + SHORT_SIZE_IN_BYTES + BYTE_SIZE_IN_BYTES)
309+
310+
return date(year, month, day)
311+
312+
@staticmethod
313+
def decode_local_time(buf, offset):
314+
hour = FixSizedTypesCodec.decode_byte(buf, offset)
315+
minute = FixSizedTypesCodec.decode_byte(buf, offset + BYTE_SIZE_IN_BYTES)
316+
second = FixSizedTypesCodec.decode_byte(buf, offset + BYTE_SIZE_IN_BYTES * 2)
317+
nano = FixSizedTypesCodec.decode_int(buf, offset + BYTE_SIZE_IN_BYTES * 3)
318+
319+
return time(hour, minute, second, int(nano / 1000.0))
320+
321+
@staticmethod
322+
def decode_local_date_time(buf, offset):
323+
date_value = FixSizedTypesCodec.decode_local_date(buf, offset)
324+
time_value = FixSizedTypesCodec.decode_local_time(buf, offset + LOCAL_DATE_SIZE_IN_BYTES)
325+
326+
return datetime.combine(date_value, time_value)
327+
328+
@staticmethod
329+
def decode_offset_date_time(buf, offset):
330+
datetime_value = FixSizedTypesCodec.decode_local_date_time(buf, offset)
331+
offset_seconds = FixSizedTypesCodec.decode_int(buf, offset + LOCAL_DATE_TIME_SIZE_IN_BYTES)
332+
333+
return datetime_value.replace(tzinfo=timezone(timedelta(seconds=offset_seconds)))
334+
277335

278336
class ListIntegerCodec(object):
279337
@staticmethod
@@ -496,3 +554,214 @@ def encode(buf, value, is_final=False):
496554
@staticmethod
497555
def decode(msg):
498556
return msg.next_frame().buf.decode("utf-8")
557+
558+
559+
class ListCNFixedSizeCodec(object):
560+
_TYPE_NULL_ONLY = 1
561+
_TYPE_NOT_NULL_ONLY = 2
562+
_TYPE_MIXED = 3
563+
564+
_ITEMS_PER_BITMASK = 8
565+
566+
_HEADER_SIZE = BYTE_SIZE_IN_BYTES + INT_SIZE_IN_BYTES
567+
568+
@staticmethod
569+
def decode(msg, item_size, decoder):
570+
frame = msg.next_frame()
571+
type = FixSizedTypesCodec.decode_byte(frame.buf, 0)
572+
count = FixSizedTypesCodec.decode_int(frame.buf, 1)
573+
574+
response = []
575+
if type == ListCNFixedSizeCodec._TYPE_NULL_ONLY:
576+
for _ in range(count):
577+
response.append(None)
578+
elif type == ListCNFixedSizeCodec._TYPE_NOT_NULL_ONLY:
579+
for i in range(count):
580+
response.append(
581+
decoder(frame.buf, ListCNFixedSizeCodec._HEADER_SIZE + i * item_size)
582+
)
583+
else:
584+
position = ListCNFixedSizeCodec._HEADER_SIZE
585+
read_count = 0
586+
587+
while read_count < count:
588+
bitmask = FixSizedTypesCodec.decode_byte(frame.buf, position)
589+
position += 1
590+
591+
i = 0
592+
while i < ListCNFixedSizeCodec._ITEMS_PER_BITMASK and read_count < count:
593+
mask = 1 << i
594+
if (bitmask & mask) == mask:
595+
response.append(decoder(frame.buf, position))
596+
position += item_size
597+
else:
598+
response.append(None)
599+
read_count += 1
600+
601+
i += 1
602+
603+
return response
604+
605+
606+
class ListCNBooleanCodec(object):
607+
@staticmethod
608+
def decode(msg):
609+
return ListCNFixedSizeCodec.decode(
610+
msg, BOOLEAN_SIZE_IN_BYTES, FixSizedTypesCodec.decode_boolean
611+
)
612+
613+
614+
class ListCNByteCodec(object):
615+
@staticmethod
616+
def decode(msg):
617+
return ListCNFixedSizeCodec.decode(msg, BYTE_SIZE_IN_BYTES, FixSizedTypesCodec.decode_byte)
618+
619+
620+
class ListCNShortCodec(object):
621+
@staticmethod
622+
def decode(msg):
623+
return ListCNFixedSizeCodec.decode(
624+
msg, SHORT_SIZE_IN_BYTES, FixSizedTypesCodec.decode_short
625+
)
626+
627+
628+
class ListCNIntegerCodec(object):
629+
@staticmethod
630+
def decode(msg):
631+
return ListCNFixedSizeCodec.decode(msg, INT_SIZE_IN_BYTES, FixSizedTypesCodec.decode_int)
632+
633+
634+
class ListCNLongCodec(object):
635+
@staticmethod
636+
def decode(msg):
637+
return ListCNFixedSizeCodec.decode(msg, LONG_SIZE_IN_BYTES, FixSizedTypesCodec.decode_long)
638+
639+
640+
class ListCNFloatCodec(object):
641+
@staticmethod
642+
def decode(msg):
643+
return ListCNFixedSizeCodec.decode(
644+
msg, FLOAT_SIZE_IN_BYTES, FixSizedTypesCodec.decode_float
645+
)
646+
647+
648+
class ListCNDoubleCodec(object):
649+
@staticmethod
650+
def decode(msg):
651+
return ListCNFixedSizeCodec.decode(
652+
msg, DOUBLE_SIZE_IN_BYTES, FixSizedTypesCodec.decode_double
653+
)
654+
655+
656+
class ListCNLocalDateCodec(object):
657+
@staticmethod
658+
def decode(msg):
659+
return ListCNFixedSizeCodec.decode(
660+
msg, LOCAL_DATE_SIZE_IN_BYTES, FixSizedTypesCodec.decode_local_date
661+
)
662+
663+
664+
class ListCNLocalTimeCodec(object):
665+
@staticmethod
666+
def decode(msg):
667+
return ListCNFixedSizeCodec.decode(
668+
msg, LOCAL_TIME_SIZE_IN_BYTES, FixSizedTypesCodec.decode_local_time
669+
)
670+
671+
672+
class ListCNLocalDateTimeCodec(object):
673+
@staticmethod
674+
def decode(msg):
675+
return ListCNFixedSizeCodec.decode(
676+
msg, LOCAL_DATE_TIME_SIZE_IN_BYTES, FixSizedTypesCodec.decode_local_date_time
677+
)
678+
679+
680+
class ListCNOffsetDateTimeCodec(object):
681+
@staticmethod
682+
def decode(msg):
683+
return ListCNFixedSizeCodec.decode(
684+
msg, OFFSET_DATE_TIME_SIZE_IN_BYTES, FixSizedTypesCodec.decode_offset_date_time
685+
)
686+
687+
688+
class BigDecimalCodec(object):
689+
@staticmethod
690+
def decode(msg):
691+
buf = msg.next_frame().buf
692+
size = FixSizedTypesCodec.decode_int(buf, 0)
693+
unscaled_value = int_from_bytes(buf[INT_SIZE_IN_BYTES : INT_SIZE_IN_BYTES + size])
694+
scale = FixSizedTypesCodec.decode_int(buf, INT_SIZE_IN_BYTES + size)
695+
sign = 0 if unscaled_value >= 0 else 1
696+
return Decimal((sign, tuple(map(int, str(abs(unscaled_value)))), -1 * scale))
697+
698+
@staticmethod
699+
def decode_nullable(msg):
700+
if CodecUtil.next_frame_is_null_frame(msg):
701+
return None
702+
else:
703+
return BigDecimalCodec.decode(msg)
704+
705+
706+
class SqlPageCodec(object):
707+
@staticmethod
708+
def decode(msg):
709+
from hazelcast.sql import SqlColumnType, _SqlPage
710+
711+
# begin frame
712+
msg.next_frame()
713+
714+
# read the "last" flag
715+
is_last = LE_INT8.unpack_from(msg.next_frame().buf, 0)[0] == 1
716+
717+
# read column types
718+
column_type_ids = ListIntegerCodec.decode(msg)
719+
column_types = []
720+
721+
# read columns
722+
columns = []
723+
724+
for column_type_id in column_type_ids:
725+
column_types.append(column_type_id)
726+
727+
if column_type_id == SqlColumnType.VARCHAR:
728+
columns.append(
729+
ListMultiFrameCodec.decode_contains_nullable(msg, StringCodec.decode)
730+
)
731+
elif column_type_id == SqlColumnType.BOOLEAN:
732+
columns.append(ListCNBooleanCodec.decode(msg))
733+
elif column_type_id == SqlColumnType.TINYINT:
734+
columns.append(ListCNByteCodec.decode(msg))
735+
elif column_type_id == SqlColumnType.SMALLINT:
736+
columns.append(ListCNShortCodec.decode(msg))
737+
elif column_type_id == SqlColumnType.INTEGER:
738+
columns.append(ListCNIntegerCodec.decode(msg))
739+
elif column_type_id == SqlColumnType.BIGINT:
740+
columns.append(ListCNLongCodec.decode(msg))
741+
elif column_type_id == SqlColumnType.REAL:
742+
columns.append(ListCNFloatCodec.decode(msg))
743+
elif column_type_id == SqlColumnType.DOUBLE:
744+
columns.append(ListCNDoubleCodec.decode(msg))
745+
elif column_type_id == SqlColumnType.DATE:
746+
columns.append(ListCNLocalDateCodec.decode(msg))
747+
elif column_type_id == SqlColumnType.TIME:
748+
columns.append(ListCNLocalTimeCodec.decode(msg))
749+
elif column_type_id == SqlColumnType.TIMESTAMP:
750+
columns.append(ListCNLocalDateTimeCodec.decode(msg))
751+
elif column_type_id == SqlColumnType.TIMESTAMP_WITH_TIME_ZONE:
752+
columns.append(ListCNOffsetDateTimeCodec.decode(msg))
753+
elif column_type_id == SqlColumnType.DECIMAL:
754+
columns.append(ListMultiFrameCodec.decode(msg, BigDecimalCodec.decode_nullable))
755+
elif column_type_id == SqlColumnType.NULL:
756+
frame = msg.next_frame()
757+
size = FixSizedTypesCodec.decode_int(frame.buf, 0)
758+
column = [None for _ in range(size)]
759+
columns.append(column)
760+
elif column_type_id == SqlColumnType.OBJECT:
761+
columns.append(ListMultiFrameCodec.decode(msg, DataCodec.decode_nullable))
762+
else:
763+
raise ValueError("Unknown type %s" % column_type_id)
764+
765+
CodecUtil.fast_forward_to_end_frame(msg)
766+
767+
return _SqlPage(column_types, columns, is_last)

0 commit comments

Comments
 (0)