Skip to content

Commit

Permalink
Merge pull request vitessio#1465 from alainjobart/resharding
Browse files Browse the repository at this point in the history
Adding a gRPC python client for vtgate / Vitess.
  • Loading branch information
alainjobart committed Jan 29, 2016
2 parents 6adb187 + 08c7fbd commit 86a1644
Show file tree
Hide file tree
Showing 12 changed files with 728 additions and 77 deletions.
536 changes: 536 additions & 0 deletions py/vtdb/grpc_vtgate_client.py

Large diffs are not rendered by default.

58 changes: 55 additions & 3 deletions py/vtdb/keyrange_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@
}


# (Eventually this will just go away, as keyspace.Keyspace will use
# the proto3 version directly).
def srv_keyspace_proto3_to_old(sk):
# (Eventually this will go away, as we will retire BSON)
def srv_keyspace_bson_proto3_to_old(sk):
"""Converts a bson-encoded proto3 SrvKeyspace.
Args:
Expand Down Expand Up @@ -74,3 +73,56 @@ def srv_keyspace_proto3_to_old(sk):
}
sk['Partitions'] = pmap
return sk


# (Eventually this will just go away, as keyspace.Keyspace will use
# the proto3 version directly).
def srv_keyspace_proto3_to_old(sk):
"""Converts a proto3 SrvKeyspace.
Args:
sk: proto3 SrvKeyspace.
Returns:
dict with converted values.
"""
result = {}

if sk.sharding_column_name:
result['ShardingColumnName'] = sk.sharding_column_name

if sk.sharding_column_type == 1:
result['ShardingColumnType'] = KIT_UINT64
elif sk.sharding_column_type == 2:
result['ShardingColumnType'] = KIT_BYTES

sfmap = {}
for sf in sk.served_from:
tt = PROTO3_TABLET_TYPE_TO_STRING[sf.tablet_type]
sfmap[tt] = sf.keyspace
result['ServedFrom'] = sfmap

if sk.partitions:
pmap = {}
for p in sk.partitions:
tt = PROTO3_TABLET_TYPE_TO_STRING[p.served_type]
srs = []
for sr in p.shard_references:
result_sr = {
'Name': sr.name,
}
if sr.key_range:
result_sr['KeyRange'] = {
'Start': sr.key_range.start,
'End': sr.key_range.end,
}
srs.append(result_sr)
pmap[tt] = {
'ShardReferences': srs,
}
result['Partitions'] = pmap

if sk.split_shard_count:
result['SplitShardCount'] = sk.split_shard_count

return result
11 changes: 11 additions & 0 deletions py/vtdb/vtgate_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,22 @@ def dial(self):
If successful, call close() to close the connection.
"""
raise NotImplemented('Child class needs to implement this')

def close(self):
"""Close the connection.
This object may be re-used again by calling dial().
"""
raise NotImplemented('Child class needs to implement this')

def is_closed(self):
"""Checks the connection status.
Returns:
True if this connection is closed.
"""
raise NotImplemented('Child class needs to implement this')

def cursor(self, *pargs, **kwargs):
"""Creates a cursor instance associated with this connection.
Expand Down Expand Up @@ -133,6 +136,7 @@ def begin(self, effective_caller_id=None):
this is probably an error in the code.
dbexceptions.FatalError: this query should not be retried.
"""
raise NotImplemented('Child class needs to implement this')

def commit(self):
"""Commits the current transaction.
Expand All @@ -150,6 +154,7 @@ def commit(self):
this is probably an error in the code.
dbexceptions.FatalError: this query should not be retried.
"""
raise NotImplemented('Child class needs to implement this')

def rollback(self):
"""Rolls the current transaction back.
Expand All @@ -167,6 +172,7 @@ def rollback(self):
this is probably an error in the code.
dbexceptions.FatalError: this query should not be retried.
"""
raise NotImplemented('Child class needs to implement this')

def _execute(self, sql, bind_variables, tablet_type,
keyspace=None,
Expand All @@ -178,6 +184,7 @@ def _execute(self, sql, bind_variables, tablet_type,
"""Executes the given sql.
FIXME(alainjobart): should take the session in.
FIXME(alainjobart): implementations have keyspace before tablet_type!
Args:
sql: query to execute.
Expand Down Expand Up @@ -225,6 +232,7 @@ def _execute(self, sql, bind_variables, tablet_type,
this is probably an error in the code.
dbexceptions.FatalError: this query should not be retried.
"""
raise NotImplemented('Child class needs to implement this')

def _execute_batch(
self, sql_list, bind_variables_list, tablet_type,
Expand Down Expand Up @@ -269,6 +277,7 @@ def _execute_batch(
this is probably an error in the code.
dbexceptions.FatalError: this query should not be retried.
"""
raise NotImplemented('Child class needs to implement this')

def _stream_execute(
self, sql, bind_variables, tablet_type, keyspace=None, shards=None,
Expand Down Expand Up @@ -311,6 +320,7 @@ def _stream_execute(
this is probably an error in the code.
dbexceptions.FatalError: this query should not be retried.
"""
raise NotImplemented('Child class needs to implement this')

def get_srv_keyspace(self, keyspace):
"""Returns a SrvKeyspace object.
Expand All @@ -324,3 +334,4 @@ def get_srv_keyspace(self, keyspace):
Raises:
TBD
"""
raise NotImplemented('Child class needs to implement this')
3 changes: 3 additions & 0 deletions py/vtdb/vtgate_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ def convert_to_dbexception(self, args):
Returns:
An exception from dbexceptions.
"""
# FIXME(alainjobart): this is extremely confusing: self.message is only
# used for integrity errors, and nothing else. The other cases
# have to provide the message in the args.
if self.code == vtrpc_pb2.TRANSIENT_ERROR:
return dbexceptions.TransientError(args)
if self.code == vtrpc_pb2.INTEGRITY_ERROR:
Expand Down
11 changes: 9 additions & 2 deletions py/vtdb/vtgatev2.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
This currently supports both vtgatev2 and vtgatev3.
"""

# TODO(dumbunny): Rename module, class, and tests to vtgate_gorpc_client.
# TODO(dumbunny): Rename module, class, and tests to gorpc_vtgate_client.

from itertools import izip
import logging
Expand Down Expand Up @@ -273,6 +273,9 @@ def _execute(
The (results, rowcount, lastrowid, fields) tuple.
"""

# FIXME(alainjobart): keyspace should be in routing_kwargs,
# as it's not used for v3.

routing_kwargs = {}
exec_method = None
req = None
Expand Down Expand Up @@ -368,6 +371,10 @@ def _execute_batch(
dbexceptions.ProgrammingError: On bad input.
"""

# FIXME(alainjobart): this is very confusing: we have either
# ExecuteBatchShards or ExecuteBatchKeyspaceIds, so all queries
# have to be one style or another. It is *not* a per query choice.

def build_query_list():
"""Create a query dict list from parameters."""
query_list = []
Expand Down Expand Up @@ -575,7 +582,7 @@ def get_srv_keyspace(self, name):
# we need to make it back to what keyspace.Keyspace expects
return keyspace.Keyspace(
name,
keyrange_constants.srv_keyspace_proto3_to_old(response.reply))
keyrange_constants.srv_keyspace_bson_proto3_to_old(response.reply))
except gorpc.GoRpcError as e:
raise self._convert_exception(e, keyspace=name)
except:
Expand Down
5 changes: 3 additions & 2 deletions test/grpc_protocols_flavor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from vtctl import grpc_vtctl_client # pylint: disable=unused-import
from vtdb import grpc_update_stream # pylint: disable=unused-import
from vtdb import vtgatev2 # pylint: disable=unused-import
from vtdb import grpc_vtgate_client # pylint: disable=unused-import


class GRpcProtocolsFlavor(protocols_flavor.ProtocolsFlavor):
Expand Down Expand Up @@ -43,10 +44,10 @@ def vtgate_protocol(self):
return 'grpc'

def vtgate_python_protocol(self):
return 'gorpc'
return 'grpc'

def client_error_exception_type(self):
return face.RemoteError
return face.AbortionError

def rpc_timeout_message(self):
return 'context deadline exceeded'
Expand Down
18 changes: 10 additions & 8 deletions test/keyspace_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import environment
import tablet
import utils
from protocols_flavor import protocols_flavor

SHARDED_KEYSPACE = 'TEST_KEYSPACE_SHARDED'
UNSHARDED_KEYSPACE = 'TEST_KEYSPACE_UNSHARDED'
Expand Down Expand Up @@ -175,8 +174,7 @@ def setup_unsharded_keyspace():
class TestKeyspace(unittest.TestCase):

def _read_srv_keyspace(self, keyspace_name):
addr = utils.vtgate.rpc_endpoint()
protocol = protocols_flavor().vtgate_python_protocol()
protocol, addr = utils.vtgate.rpc_endpoint(python=True)
conn = vtgate_client.connect(protocol, addr, 30.0)
result = conn.get_srv_keyspace(keyspace_name)
conn.close()
Expand Down Expand Up @@ -346,18 +344,22 @@ def test_shard_names(self):
self.assertEqual(unsharded_ks.get_shard_names(db_type), ['0'])

def test_keyspace_id_to_shard_name(self):
# test all keyspace_id in a sharded keyspace go to the right shard
sharded_ks = self._read_srv_keyspace(SHARDED_KEYSPACE)
for _, sn in enumerate(shard_names):
for keyspace_id in shard_kid_map[sn]:
self.assertEqual(
sharded_ks.keyspace_id_to_shard_name_for_db_type(keyspace_id,
'master'), sn)

# take all keyspace_ids, make sure for unsharded they stay on'0'
unsharded_ks = self._read_srv_keyspace(UNSHARDED_KEYSPACE)
for keyspace_id in shard_kid_map[sn]:
self.assertEqual(
unsharded_ks.keyspace_id_to_shard_name_for_db_type(
keyspace_id, 'master'),
'0')
for _, sn in enumerate(shard_names):
for keyspace_id in shard_kid_map[sn]:
self.assertEqual(
unsharded_ks.keyspace_id_to_shard_name_for_db_type(
keyspace_id, 'master'),
'0')

def test_get_srv_keyspace_names(self):
stdout, _ = utils.run_vtctl(['GetSrvKeyspaceNames', 'test_nj'],
Expand Down
5 changes: 4 additions & 1 deletion test/python_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@ class TestPythonClientBase(unittest.TestCase):

def setUp(self):
super(TestPythonClientBase, self).setUp()
addr = 'localhost:%d' % vtgateclienttest_port
protocol = protocols_flavor().vtgate_python_protocol()
if protocol == 'grpc':
addr = 'localhost:%d' % vtgateclienttest_grpc_port
else:
addr = 'localhost:%d' % vtgateclienttest_port
self.conn = vtgate_client.connect(protocol, addr, 30.0)
logging.info(
'Start: %s, protocol %s.',
Expand Down
40 changes: 20 additions & 20 deletions test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,6 @@ def __init__(self, port=None):
self.port = port or environment.reserve_ports(1)
if protocols_flavor().vtgate_protocol() == 'grpc':
self.grpc_port = environment.reserve_ports(1)
self.secure_port = None
self.proc = None

def start(self, cell='test_nj', retry_delay=1, retry_count=2,
Expand Down Expand Up @@ -538,10 +537,7 @@ def start(self, cell='test_nj', retry_delay=1, retry_count=2,
args.extend(extra_args)

self.proc = run_bg(args)
if self.secure_port:
wait_for_vars('vtgate', self.port, 'SecureConnections')
else:
wait_for_vars('vtgate', self.port)
wait_for_vars('vtgate', self.port)

global vtgate
if not vtgate:
Expand Down Expand Up @@ -570,18 +566,18 @@ def kill(self):
vtgate = None

def addr(self):
"""Returns the address of the vtgate process."""
"""Returns the address of the vtgate process, for web access."""
return 'localhost:%d' % self.port

def secure_addr(self):
"""Returns the secure address of the vtgate process."""
return 'localhost:%d' % self.secure_port

def rpc_endpoint(self):
"""Returns the endpoint to use for RPCs."""
if protocols_flavor().vtgate_protocol() == 'grpc':
return 'localhost:%d' % self.grpc_port
return self.addr()
def rpc_endpoint(self, python=False):
"""Returns the protocol and endpoint to use for RPCs."""
if python:
protocol = protocols_flavor().vtgate_python_protocol()
else:
protocol = protocols_flavor().vtgate_protocol()
if protocol == 'grpc':
return protocol, 'localhost:%d' % self.grpc_port
return protocol, self.addr()

def get_status(self):
"""Returns the status page for this process."""
Expand All @@ -595,10 +591,11 @@ def vtclient(self, sql, keyspace=None, shard=None, tablet_type='master',
bindvars=None, streaming=False,
verbose=False, raise_on_error=True):
"""Uses the vtclient binary to send a query to vtgate."""
protocol, addr = self.rpc_endpoint()
args = environment.binary_args('vtclient') + [
'-server', self.rpc_endpoint(),
'-server', addr,
'-tablet_type', tablet_type,
'-vtgate_protocol', protocols_flavor().vtgate_protocol()]
'-vtgate_protocol', protocol]
if keyspace:
args.extend(['-keyspace', keyspace])
if shard:
Expand All @@ -617,8 +614,9 @@ def vtclient(self, sql, keyspace=None, shard=None, tablet_type='master',

def execute(self, sql, tablet_type='master', bindvars=None):
"""Uses 'vtctl VtGateExecute' to execute a command."""
_, addr = self.rpc_endpoint()
args = ['VtGateExecute',
'-server', self.rpc_endpoint(),
'-server', addr,
'-tablet_type', tablet_type]
if bindvars:
args.extend(['-bind_variables', json.dumps(bindvars)])
Expand All @@ -628,8 +626,9 @@ def execute(self, sql, tablet_type='master', bindvars=None):
def execute_shards(self, sql, keyspace, shards, tablet_type='master',
bindvars=None):
"""Uses 'vtctl VtGateExecuteShards' to execute a command."""
_, addr = self.rpc_endpoint()
args = ['VtGateExecuteShards',
'-server', self.rpc_endpoint(),
'-server', addr,
'-keyspace', keyspace,
'-shards', shards,
'-tablet_type', tablet_type]
Expand All @@ -640,8 +639,9 @@ def execute_shards(self, sql, keyspace, shards, tablet_type='master',

def split_query(self, sql, keyspace, split_count, bindvars=None):
"""Uses 'vtctl VtGateSplitQuery' to cut a query up in chunks."""
_, addr = self.rpc_endpoint()
args = ['VtGateSplitQuery',
'-server', self.rpc_endpoint(),
'-server', addr,
'-keyspace', keyspace,
'-split_count', str(split_count)]
if bindvars:
Expand Down
4 changes: 1 addition & 3 deletions test/vertical_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import environment
import tablet
import utils
from protocols_flavor import protocols_flavor

# source keyspace, with 4 tables
source_master = tablet.Tablet()
Expand Down Expand Up @@ -205,8 +204,7 @@ def _insert_initial_values(self):
self.moving1_first, 100)

def _vtdb_conn(self):
addr = utils.vtgate.rpc_endpoint()
protocol = protocols_flavor().vtgate_python_protocol()
protocol, addr = utils.vtgate.rpc_endpoint(python=True)
return vtgate_client.connect(protocol, addr, 30.0)

# insert some values in the source master db, return the first id used
Expand Down
Loading

0 comments on commit 86a1644

Please sign in to comment.