Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in listener function can't call command such as 'exists, create, etc.' #405

Open
tekpig opened this issue Sep 14, 2016 · 9 comments
Open

Comments

@tekpig
Copy link

tekpig commented Sep 14, 2016

I implements my listener as follows:

def my_listener(state):
global zk
global child_path
global child_value

if state == KazooState.LOST:
    logger.warn("lost connection to zookeeper server")
elif state == KazooState.SUSPENDED:
    logger.warn("connection has been lost but may be recovered")
else:
    logger.info("connect/reconnect to zookeeper server")
    if zk is not None and child_path is not None:
        if not zk.exists(child_path):
            try:
                zk.create(child_path, child_value.encode('utf-8'), ephemeral=True)
            except Exception as e:
                logger.exception(e)

But when command is executed till calling function 'exists', the client gets stuck.

I add logs to track what happened, then I found that because in _connect function the command 'client._session_callback(KeeperState.CONNECTED)' doesn't return.

Next, I found the issue in "remove = listener(state)" in client.py.

In document, it said "creating ephemeral nodes, its highly recommended to add a state listener so that your program can properly deal with connection interruptions or a Zookeeper session loss."

Then, how can I do when connection comes back?

@luofeilong
Copy link
Contributor

maybe you should use try before 'if not zk.exists(child_path)' command , because all methods in zkclient will raise exception when something wrong in it, just like:

if state == KazooState.LOST:
    logger.warn("lost connection to zookeeper server")
elif state == KazooState.SUSPENDED:
    logger.warn("connection has been lost but may be recovered")
else:
    logger.info("connect/reconnect to zookeeper server")
    if zk is not None and child_path is not None:
        try:
            if not zk.exists(child_path):
                zk.create(child_path, child_value.encode('utf-8'), ephemeral=True)
        except Exception as e:
            logger.exception(e)

@tekpig
Copy link
Author

tekpig commented Sep 15, 2016

@luofeilong it's not that case, i have tried it.

You can reproduce it easily.

  1. let all zookeeper servers down;
  2. cusomerize the listener function, and add this listener to the KazooClient
    in the listener function, add some zookeeper operations in 'state == KazooState.CONNECTED' branch, such as ensure_path, exists, etc.
  3. some minutes later, let all zookeeper servers come up again;
  4. you can find no any response from zookeeper server, if you dig into it, you will find the real fact is the request in 'state == KazooState.CONNECTED' branch hasn't been sent out though the client is in connected state.

i guess the problem is in zk_loop.

@luofeilong
Copy link
Contributor

can you show you test code? and kazoo version in your project

@tekpig
Copy link
Author

tekpig commented Sep 19, 2016

@luofeilong version is kazoo (2.2.1)

`# -- coding: utf-8 --

import sys
import logging
import signal
import re
import subprocess
import time

logging.basicConfig(level=logging.NOTSET)
logger = logging.getLogger()

def is_running(process):
s = subprocess.Popen(['ps', 'axw'], stdout=subprocess.PIPE)
for x in s.stdout:
if re.search(process, x):
return True
return False

class RedisNotUpException(Exception):
def init(self, arg):
self.args = arg

zk = None
parent_path = None
child_path = None
child_value = None

from kazoo.exceptions import NodeExistsError
from kazoo.protocol.states import KazooState
from kazoo.client import KazooClient
from kazoo.retry import KazooRetry

class NotConnectToZkServers(Exception):
def init(self, arg):
self.args = arg

def my_listener(state):
global zk
global child_path
global child_value

if state == KazooState.LOST:
    logger.warn("lost connection to zookeeper server")
elif state == KazooState.SUSPENDED:
    logger.warn("connection has been lost but may be recovered")
else:
    logger.info("connect/reconnect to zookeeper server")
    if zk is not None and child_path is not None:
        try:
            if not zk.exists(child_path):
                zk.create(child_path, child_value.encode('utf-8'), ephemeral=True)
        except Exception as e:
            logger.exception(e)

def signal_handler(signal, frame):
logger.info("receive signal %s" % signal)
global zk
zk.remove_listener(my_listener)
zk.stop()
zk.close()
sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

if name == 'main':
if len(sys.argv) != 4:
logger.error("besides file name, 3 parameters are needed. they're address of zk servers, service name and instance id.")
sys.exit(1)
if isinstance(sys.argv[1], str):
zksrvs = sys.argv[1]
else:
zksrvs = str(sys.argv[1])
if isinstance(sys.argv[2], str):
service = sys.argv[2]
else:
service = str(sys.argv[2])
if isinstance(sys.argv[3], str):
instId = sys.argv[3]
else:
instId = str(sys.argv[3])

kz_retry = KazooRetry(max_tries=-1, delay=0.5, max_delay=30)

zk = KazooClient(hosts=zksrvs, connection_retry=kz_retry)
try:
    zk.start(timeout=30)
except Exception as e:
    logger.exception(e)

if zk.state != KazooState.CONNECTED:
    raise NotConnectToZkServers("oops! connection can't be established!")

if True: #is_running('redis-server'):
    parent_path = '/REDIS/' + service.upper()
    child_path = parent_path + '/' + str(instId)
    #global child_value = redis_ip + ':' + redis_port + '*' + redis_db
    child_value = '127.0.0.1:9999*0'
    zk.ensure_path(parent_path)
    try:
        zk.create(child_path, child_value.encode('utf-8'), ephemeral=True)
    except NodeExistsError:
        zk.set(child_path, child_value.encode('utf-8'))
    except Exception as e:
        logger.exception(e)
else:
    raise RedisNotUpException("oops! redis can't get up!")

zk.add_listener(my_listener)

while True: #is_running('redis-server')
    time.sleep(1)

`

@luofeilong
Copy link
Contributor

ok, i found the problem
you can debug in your listener func where you want to call kazoo func, and then you can see it is a dead lock. i show it for you as follow:

in connection.py file

  • reconnect
    _connect_attempt() func will call _connect to retry connect to zk, when it suc, in _connect() func will call client._session_callback(KeeperState.CONNECTED), and then callback to your listener func, then you are waitting for request result
  • dead lock
    after _connect in _connect_attempt(), the working thread will try to receive event from socket to get reqeust in queue, but because your listener func has not return, so no one can deal the request queue

your listener func is waitting for working thread to deal your request;
but, working thread is waiting for your listener func to return

@tekpig
Copy link
Author

tekpig commented Sep 19, 2016

@luofeilong

yes, you're right.

So, if i want to do the operation as my listener function shows, how can i do that?
From my perspective, it's reasonable to do something when the connection gets back, especially for the created ephemeral nodes.

@luofeilong
Copy link
Contributor

maybe you can use another thread to deal event

@tekpig
Copy link
Author

tekpig commented Sep 19, 2016

so it's the drawback of this library, i think it should be improved.

@luofeilong
Copy link
Contributor

yes,you can do it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants