Skip to content

Commit

Permalink
Merge pull request vitessio#1494 from dumbunny/py_keyspace
Browse files Browse the repository at this point in the history
Py keyspace
  • Loading branch information
dumbunny committed Feb 10, 2016
2 parents a029ff8 + cb1ec0e commit 6a16a56
Show file tree
Hide file tree
Showing 13 changed files with 381 additions and 316 deletions.
15 changes: 9 additions & 6 deletions examples/demo/cgi-bin/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
from vtdb import grpc_vtgate_client # pylint: disable=unused-import


def exec_query(conn, title, query, response, keyspace=None, kr=None):
def exec_query(conn, title, query, response, keyspace=None, kr=None): # pylint: disable=missing-docstring
if kr:
# v2 cursor to address individual shards directly, for debug display
cursor = conn.cursor(keyspace, "master", keyranges=[keyrange.KeyRange(kr)])
cursor = conn.cursor(
tablet_type="master", keyspace=keyspace,
keyranges=[keyrange.KeyRange(kr)])
else:
# v3 cursor is automated
cursor = conn.cursor(keyspace, "master", writable=True)
cursor = conn.cursor(
tablet_type="master", keyspace=keyspace, writable=True)

try:
if not query or query == "undefined":
Expand All @@ -44,14 +47,14 @@ def exec_query(conn, title, query, response, keyspace=None, kr=None):
"results": cursor.results,
}
cursor.close()
except Exception as e:
except Exception as e: # pylint: disable=broad-except
response[title] = {
"title": title,
"error": str(e),
}


def capture_log(port, queries):
def capture_log(port, queries): # pylint: disable=missing-docstring
p = subprocess.Popen(
["curl", "-s", "-N", "http://localhost:%d/debug/querylog" % port],
stdout=subprocess.PIPE)
Expand Down Expand Up @@ -131,7 +134,7 @@ def main():
keyspace="lookup", kr="-")

print json.dumps(response)
except Exception as e:
except Exception as e: # pylint: disable=broad-except
print json.dumps({"error": str(e)})


Expand Down
58 changes: 40 additions & 18 deletions examples/kubernetes/guestbook/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Main python file."""

import os
import time
import json
Expand All @@ -10,14 +12,15 @@
from vtdb import vtgate_client

# Register gRPC protocol.
from vtdb import grpc_vtgate_client # pylint: disable=unused-import
from vtdb import grpc_vtgate_client # pylint: disable=unused-import

# conn is the connection to vtgate.
conn = None

# When using the "uint64" keyspace_id type, Vitess expects big-endian encoding.
uint64 = struct.Struct('!Q')


def get_keyspace_id(page):
"""Compute the keyspace_id for a given page number.
Expand All @@ -31,72 +34,91 @@ def get_keyspace_id(page):
For more about keyspace_id, see these references:
- http://vitess.io/overview/concepts.html#keyspace-id
- http://vitess.io/user-guide/sharding.html
Args:
page: Int page number.
Returns:
8-byte md5 of packed page number.
"""
m = hashlib.md5()
m.update(uint64.pack(page))
return m.digest()[:8]


def unpack_keyspace_id(keyspace_id):
"""Return the corresponding 64-bit unsigned integer for a keyspace_id."""
return uint64.unpack(keyspace_id)[0]

@app.route("/")

@app.route('/')
def index():
return app.send_static_file('index.html')

@app.route("/page/<int:page>")

@app.route('/page/<int:page>')
def view(page):
_ = page
return app.send_static_file('index.html')

@app.route("/lrange/guestbook/<int:page>")

@app.route('/lrange/guestbook/<int:page>')
def list_guestbook(page):
# Read the list from a replica.
"""Read the list from a replica."""
keyspace_id = get_keyspace_id(page)
cursor = conn.cursor('test_keyspace', 'replica', keyspace_ids=[keyspace_id])
cursor = conn.cursor(
tablet_type='replica', keyspace='test_keyspace',
keyspace_ids=[keyspace_id])

cursor.execute(
'SELECT message FROM messages WHERE page=%(page)s ORDER BY time_created_ns',
'SELECT message FROM messages WHERE page=%(page)s'
' ORDER BY time_created_ns',
{'page': page})
entries = [row[0] for row in cursor.fetchall()]
cursor.close()

return json.dumps(entries)

@app.route("/rpush/guestbook/<int:page>/<value>")

@app.route('/rpush/guestbook/<int:page>/<value>')
def add_entry(page, value):
# Insert a row on the master.
"""Insert a row on the master."""
keyspace_id = get_keyspace_id(page)
keyspace_id_int = unpack_keyspace_id(keyspace_id)
cursor = conn.cursor('test_keyspace', 'master', keyspace_ids=[keyspace_id], writable=True)
cursor = conn.cursor(
tablet_type='master', keyspace='test_keyspace',
keyspace_ids=[keyspace_id], writable=True)

cursor.begin()
cursor.execute(
'INSERT INTO messages (page, time_created_ns, keyspace_id, message)'
' VALUES (%(page)s, %(time_created_ns)s, %(keyspace_id)s, %(message)s)',
{
'page': page,
'time_created_ns': int(time.time() * 1e9),
'keyspace_id': keyspace_id_int,
'message': value,
'page': page,
'time_created_ns': int(time.time() * 1e9),
'keyspace_id': keyspace_id_int,
'message': value,
})
cursor.commit()

# Read the list back from master (critical read) because it's
# important that the user sees his own addition immediately.
cursor.execute(
'SELECT message FROM messages WHERE page=%(page)s ORDER BY time_created_ns',
'SELECT message FROM messages WHERE page=%(page)s'
' ORDER BY time_created_ns',
{'page': page})
entries = [row[0] for row in cursor.fetchall()]
cursor.close()

return json.dumps(entries)

@app.route("/env")

@app.route('/env')
def env():
return json.dumps(dict(os.environ))

if __name__ == "__main__":
timeout = 10 # connect timeout in seconds
if __name__ == '__main__':
timeout = 10 # connect timeout in seconds

# Get vtgate service address from Kubernetes DNS.
addr = 'vtgate-test:15991'
Expand Down
9 changes: 5 additions & 4 deletions examples/local/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@
try:
# Insert something.
print 'Inserting into master...'
cursor = conn.cursor('test_keyspace', 'master',
keyranges=UNSHARDED, writable=True)
cursor = conn.cursor(
tablet_type='master', keyspace='test_keyspace',
keyranges=UNSHARDED, writable=True)
cursor.begin()
cursor.execute(
'INSERT INTO test_table (msg) VALUES (%(msg)s)',
Expand All @@ -54,8 +55,8 @@
# Read from a replica.
# Note that this may be behind master due to replication lag.
print 'Reading from replica...'
cursor = conn.cursor('test_keyspace', 'replica',
keyranges=UNSHARDED)
cursor = conn.cursor(
tablet_type='replica', keyspace='test_keyspace', keyranges=UNSHARDED)
cursor.execute('SELECT * FROM test_table', {})
for row in cursor.fetchall():
print row
Expand Down
13 changes: 6 additions & 7 deletions py/vtdb/grpc_vtgate_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,11 @@ def rollback(self):
self.effective_caller_id = None

@vtgate_utils.exponential_backoff_retry((dbexceptions.TransientError))
def _execute(self, sql, bind_variables, keyspace_name, tablet_type,
shards=None,
keyspace_ids=None,
keyranges=None,
entity_keyspace_id_map=None, entity_column_name=None,
not_in_transaction=False, effective_caller_id=None, **kwargs):
def _execute(
self, sql, bind_variables, tablet_type, keyspace_name=None,
shards=None, keyspace_ids=None, keyranges=None,
entity_keyspace_id_map=None, entity_column_name=None,
not_in_transaction=False, effective_caller_id=None, **kwargs):

# FIXME(alainjobart): keyspace should be in routing_kwargs,
# as it's not used for v3.
Expand Down Expand Up @@ -289,7 +288,7 @@ def _execute_batch(

@vtgate_utils.exponential_backoff_retry((dbexceptions.TransientError))
def _stream_execute(
self, sql, bind_variables, keyspace_name, tablet_type,
self, sql, bind_variables, tablet_type, keyspace_name=None,
shards=None, keyspace_ids=None, keyranges=None,
not_in_transaction=False, effective_caller_id=None,
**kwargs):
Expand Down
4 changes: 2 additions & 2 deletions py/vtdb/vtgate_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def rollback(self):
raise NotImplementedError('Child class needs to implement this')

def _execute(self, sql, bind_variables, tablet_type,
keyspace=None,
keyspace_name=None,
shards=None,
keyspace_ids=None,
keyranges=None,
Expand All @@ -195,7 +195,7 @@ def _execute(self, sql, bind_variables, tablet_type,
sql: query to execute.
bind_variables: map of bind variables for the query.
tablet_type: the (string) version of the tablet type.
keyspace: if specified, the keyspace to send the query to.
keyspace_name: if specified, the keyspace to send the query to.
Required if any of the routing parameters is used.
Not required only if using vtgate v3 API.
shards: if specified, use this list of shards names to route the query.
Expand Down
18 changes: 10 additions & 8 deletions py/vtdb/vtgate_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@ class VTGateCursor(base_cursor.BaseListCursor, VTGateCursorMixin):
"""

def __init__(
self, connection, keyspace, tablet_type, shards=None, keyspace_ids=None,
keyranges=None, writable=False, as_transaction=False):
self, connection, tablet_type, keyspace=None,
shards=None, keyspace_ids=None, keyranges=None,
writable=False, as_transaction=False):
"""Init VTGateCursor.
Args:
connection: A PEP0249 connection object.
keyspace: Str keyspace or None if batch API will be used.
tablet_type: Str tablet_type.
keyspace: Str keyspace or None if batch API will be used.
shards: List of strings.
keyspace_ids: Struct('!Q').packed keyspace IDs.
keyranges: Str keyranges.
Expand Down Expand Up @@ -89,8 +90,8 @@ def execute(self, sql, bind_variables, **kwargs):
self.connection._execute( # pylint: disable=protected-access
sql,
bind_variables,
self.keyspace,
self.tablet_type,
tablet_type=self.tablet_type,
keyspace_name=self.keyspace,
shards=self.shards,
keyspace_ids=self.keyspace_ids,
keyranges=self.keyranges,
Expand Down Expand Up @@ -215,7 +216,8 @@ class StreamVTGateCursor(base_cursor.BaseStreamCursor, VTGateCursorMixin):
"""

def __init__(
self, connection, keyspace, tablet_type, shards=None, keyspace_ids=None,
self, connection, tablet_type, keyspace=None,
shards=None, keyspace_ids=None,
keyranges=None, writable=False):
super(StreamVTGateCursor, self).__init__()
self._conn = connection
Expand All @@ -240,8 +242,8 @@ def execute(self, sql, bind_variables, **kwargs):
self.generator, self.description = self.connection._stream_execute( # pylint: disable=protected-access
sql,
bind_variables,
self.keyspace,
self.tablet_type,
tablet_type=self.tablet_type,
keyspace_name=self.keyspace,
shards=self.shards,
keyspace_ids=self.keyspace_ids,
keyranges=self.keyranges,
Expand Down
10 changes: 5 additions & 5 deletions py/vtdb/vtgatev2.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def _get_rowset_from_query_result(self, query_result):

@vtgate_utils.exponential_backoff_retry((dbexceptions.TransientError))
def _execute(
self, sql, bind_variables, keyspace_name, tablet_type,
self, sql, bind_variables, tablet_type, keyspace_name=None,
shards=None, keyspace_ids=None, keyranges=None,
entity_keyspace_id_map=None, entity_column_name=None,
not_in_transaction=False, effective_caller_id=None, **kwargs):
Expand All @@ -283,8 +283,8 @@ def _execute(
sql: The sql text, with %(format)s-style tokens.
bind_variables: (str: value) dict of bind variables corresponding
to sql %(format)s tokens.
keyspace_name: Str name of keyspace.
tablet_type: Str tablet type (e.g. master, rdonly, replica).
keyspace_name: Str name of keyspace.
shards: list of shard names as strings.
keyspace_ids: bytes list of keyspace ID lists.
keyranges: KeyRange objects.
Expand Down Expand Up @@ -495,7 +495,7 @@ def merge_rowsets(query_list, keyspace_ids_rowsets, shards_rowsets):

@vtgate_utils.exponential_backoff_retry((dbexceptions.TransientError))
def _stream_execute(
self, sql, bind_variables, keyspace_name, tablet_type,
self, sql, bind_variables, tablet_type, keyspace_name=None,
shards=None, keyspace_ids=None, keyranges=None,
not_in_transaction=False, effective_caller_id=None,
**kwargs):
Expand All @@ -507,8 +507,8 @@ def _stream_execute(
Args:
sql: Str sql.
bind_variables: A (str: value) dict.
keyspace_name: Str keyspace name.
tablet_type: Str tablet_type.
keyspace_name: Str keyspace name.
shards: List of strings.
keyspace_ids: List of uint64 or bytes keyspace_ids.
keyranges: KeyRange objects.
Expand Down Expand Up @@ -734,7 +734,7 @@ def connect(vtgate_addrs, timeout, user=None, password=None):
conn = VTGateConnection(**db_params)
conn.dial()
return conn
except Exception as e:
except Exception as e: # pylint: disable=broad-except
db_exception = e
logging.warning('db connection failed: %s, %s', host_addr, e)

Expand Down
4 changes: 2 additions & 2 deletions test/custom_sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def _insert_data(self, shard, start, count, table='data'):
sql = 'insert into ' + table + '(id, name) values (%(id)s, %(name)s)'
conn = self._vtdb_conn()
cursor = conn.cursor(
'test_keyspace', 'master',
tablet_type='master', keyspace='test_keyspace',
shards=[shard],
writable=True)
for x in xrange(count):
Expand All @@ -91,7 +91,7 @@ def _check_data(self, shard, start, count, table='data'):
sql = 'select name from ' + table + ' where id=%(id)s'
conn = self._vtdb_conn()
cursor = conn.cursor(
'test_keyspace', 'master',
tablet_type='master', keyspace='test_keyspace',
shards=[shard])
for x in xrange(count):
bindvars = {
Expand Down
Loading

0 comments on commit 6a16a56

Please sign in to comment.