Skip to content

Recovery from connection loss or server-side errors #144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions happybase/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ def send(self):
logger.debug("Sending batch for '%s' (%d mutations on %d rows)",
self._table.name, self._mutation_count, len(bms))
if self._timestamp is None:
self._table.connection.client.mutateRows(self._table.name, bms, {})
self._table.connection.client.mutateRows(self._table.name, bms, {}, no_retry=True)
else:
self._table.connection.client.mutateRowsTs(
self._table.name, bms, self._timestamp, {})
self._table.name, bms, self._timestamp, {}, no_retry=True)

self._reset_mutations()

Expand Down
46 changes: 46 additions & 0 deletions happybase/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
from thriftpy.thrift import TClient
from thriftpy.thrift import TApplicationException
from thriftpy.transport import TTransportException
from socket import error as socket_error
from collections import deque
from time import sleep
import logging


logger = logging.getLogger(__name__)


class RecoveringClient(TClient):
def __init__(self, *args, **kwargs):
self._connection = kwargs.pop("connection", None)
self._retries = kwargs.pop("retries", (0, 5, 30))
super(RecoveringClient, self).__init__(*args, **kwargs)

def _req(self, _api, *args, **kwargs):
no_retry = kwargs.pop("no_retry", False)
retries = deque(self._retries)
interval = 0
client = super(RecoveringClient, self)
while True:
try:
return client._req(_api, *args, **kwargs)
except (TApplicationException, socket_error, TTransportException) as exc:
logger.exception("Got exception")
while True:
interval = retries.popleft() if retries else interval
logger.info("Sleeping for %d seconds", interval)
sleep(interval)
logger.info("Trying to reconnect")
try:
self._connection._refresh_thrift_client()
self._connection.open()
client = super(RecoveringClient, self._connection.client)
logger.debug("New client is initialized")
except TTransportException:
logger.exception("Got exception, while trying to reconnect. Continuing")
pass
else:
break
if no_retry:
raise exc
16 changes: 8 additions & 8 deletions happybase/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
import logging

import six
from thriftpy.thrift import TClient
from thriftpy.transport import TBufferedTransport, TFramedTransport, TSocket
from thriftpy.protocol import TBinaryProtocol, TCompactProtocol

from Hbase_thrift import Hbase, ColumnDescriptor

from .client import RecoveringClient
from .table import Table
from .util import ensure_bytes, pep8_to_camel_case

Expand Down Expand Up @@ -157,7 +157,7 @@ def _refresh_thrift_client(self):

self.transport = self._transport_class(socket)
protocol = self._protocol_class(self.transport, decode_response=False)
self.client = TClient(Hbase, protocol)
self.client = RecoveringClient(Hbase, protocol, connection=self)

def _table_name(self, name):
"""Construct a table name by optionally adding a table name prefix."""
Expand Down Expand Up @@ -308,7 +308,7 @@ def create_table(self, name, families):

column_descriptors.append(ColumnDescriptor(**kwargs))

self.client.createTable(name, column_descriptors)
self.client.createTable(name, column_descriptors, no_retry=True)

def delete_table(self, name, disable=False):
"""Delete the specified table.
Expand All @@ -327,23 +327,23 @@ def delete_table(self, name, disable=False):
self.disable_table(name)

name = self._table_name(name)
self.client.deleteTable(name)
self.client.deleteTable(name, no_retry=True)

def enable_table(self, name):
"""Enable the specified table.

:param str name: The table name
"""
name = self._table_name(name)
self.client.enableTable(name)
self.client.enableTable(name, no_retry=True)

def disable_table(self, name):
"""Disable the specified table.

:param str name: The table name
"""
name = self._table_name(name)
self.client.disableTable(name)
self.client.disableTable(name, no_retry=True)

def is_table_enabled(self, name):
"""Return whether the specified table is enabled.
Expand All @@ -364,6 +364,6 @@ def compact_table(self, name, major=False):
"""
name = self._table_name(name)
if major:
self.client.majorCompact(name)
self.client.majorCompact(name, no_retry=True)
else:
self.client.compact(name)
self.client.compact(name, no_retry=True)
2 changes: 1 addition & 1 deletion happybase/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ def counter_inc(self, row, column, value=1):
:rtype: int
"""
return self.connection.client.atomicIncrement(
self.name, row, column, value)
self.name, row, column, value, no_retry=True)

def counter_dec(self, row, column, value=1):
"""Atomically decrement (or increments) a counter column.
Expand Down