启动ZK连接
调用方可以如下创建一个zk连接:
代码语言:javascript复制zookeeper = KazooClient(hosts=get_address(address), timeout=timeout,
client_id=client_id, handler=handler,
default_acl=default_acl, auth_data=auth_data,
read_only=read_only,
randomize_hosts=randomize_hosts,
connection_retry=con_retry,
command_retry=command_retry, logger=logger)
try:
scheme, credential = kazooACL.get_auth_acl()
zookeeper.start(timeout=90)
zookeeper.add_auth(scheme, credential)
zookeeper.default_acl = (kazooACL.get_set_acl_val(),)
return zookeeper
except Exception:
zookeeper.stop()
zookeeper.close()
raise
KazooClient类
上面首先实例化了一个KazooClient对象, 实例化KazooClient对象的时候,生成了一个ConnectionHandler实例
代码语言:javascript复制self._connection = ConnectionHandler(
self, self._conn_retry.copy(), logger=self.logger)
初始化了这个连接对象,进入该连接对象定义:kazoo.protocol.connection.py
代码语言:javascript复制class ConnectionHandler(object):
"""Zookeeper connection handler"""
def __init__(self, client, retry_sleeper, logger=None):
self.client = client
self.handler = client.handler
self.retry_sleeper = retry_sleeper
self.logger = logger or log
# Our event objects
self.connection_closed = client.handler.event_object()
self.connection_closed.set()
self.connection_stopped = client.handler.event_object()
self.connection_stopped.set()
self.ping_outstanding = client.handler.event_object()
self._read_sock = None
self._write_sock = None
self._socket = None
self._xid = None
self._rw_server = None
self._ro_mode = False
self._ro = False
self._connection_routine = None
self.sasl_cli = None
启动zk连接
再启动ZKClient的时候,也就是调用了该ConnectionHandler对象的start方法:
代码语言:javascript复制def start(self):
"""Start the connection up"""
if self.connection_closed.is_set():
rw_sockets = self.handler.create_socket_pair()
self._read_sock, self._write_sock = rw_sockets
self.connection_closed.clear()
if self._connection_routine:
raise Exception("Unable to start, connection routine already "
"active.")
self._connection_routine = self.handler.spawn(self.zk_loop)
这里是新起了一个协程进行执行轮训:
代码语言:javascript复制def zk_loop(self):
"""Main Zookeeper handling loop"""
self.logger.log(BLATHER, 'ZK loop started')
self.connection_stopped.clear()
retry = self.retry_sleeper.copy()
try:
while not self.client._stopped.is_set():
# If the connect_loop returns STOP_CONNECTING, stop retrying
if retry(self._connect_loop, retry) is STOP_CONNECTING:
break
except RetryFailedError:
self.logger.warning("Failed connecting to Zookeeper "
"within the connection retry policy.")
finally:
self.connection_stopped.set()
self.client._session_callback(KeeperState.CLOSED)
self.logger.log(BLATHER, 'Connection stopped')
def _connect_loop(self, retry):
# Iterate through the hosts a full cycle before starting over
status = None
host_ports = self._expand_client_hosts()
# Check for an empty hostlist, indicating none resolved
if len(host_ports) == 0:
return STOP_CONNECTING
for host, port in host_ports:
if self.client._stopped.is_set():
status = STOP_CONNECTING
break
status = self._connect_attempt(host, port, retry)
if status is STOP_CONNECTING:
break
if status is STOP_CONNECTING:
return STOP_CONNECTING
else:
raise ForceRetryError('Reconnecting')
连接状态变更
代码语言:javascript复制def _connect_attempt(self, host, port, retry):
client = self.client
KazooTimeoutError = self.handler.timeout_exception
close_connection = False
self._socket = None
# Were we given a r/w server? If so, use that instead
if self._rw_server:
self.logger.log(BLATHER,
"Found r/w server to use, %s:%s", host, port)
host, port = self._rw_server
self._rw_server = None
if client._state != KeeperState.CONNECTING:
client._session_callback(KeeperState.CONNECTING)
try:
self._xid = 0
read_timeout, connect_timeout = self._connect(host, port)
read_timeout = read_timeout / 1000.0
connect_timeout = connect_timeout / 1000.0
retry.reset()
self.ping_outstanding.clear()
with self._socket_error_handling():
while not close_connection:
# Watch for something to read or send
jitter_time = random.randint(0, 40) / 100.0
# Ensure our timeout is positive
timeout = max([read_timeout / 2.0 - jitter_time,
jitter_time])
s = self.handler.select([self._socket, self._read_sock],
[], [], timeout)[0]
if not s:
if self.ping_outstanding.is_set():
self.ping_outstanding.clear()
raise ConnectionDropped(
"outstanding heartbeat ping not received")
self._send_ping(connect_timeout)
elif s[0] == self._socket:
response = self._read_socket(read_timeout)
close_connection = response == CLOSE_RESPONSE
else:
self._send_request(read_timeout, connect_timeout)
self.logger.info('Closing connection to %s:%s', host, port)
client._session_callback(KeeperState.CLOSED)
return STOP_CONNECTING
except (ConnectionDropped, KazooTimeoutError) as e:
if isinstance(e, ConnectionDropped):
self.logger.warning('Connection dropped: %s', e)
else:
self.logger.warning('Connection time-out: %s', e)
if client._state != KeeperState.CONNECTING:
self.logger.warning("Transition to CONNECTING")
client._session_callback(KeeperState.CONNECTING)
except AuthFailedError:
retry.reset()
self.logger.warning('AUTH_FAILED closing')
client._session_callback(KeeperState.AUTH_FAILED)
return STOP_CONNECTING
except SessionExpiredError:
retry.reset()
self.logger.warning('Session has expired')
client._session_callback(KeeperState.EXPIRED_SESSION)
except RWServerAvailable:
retry.reset()
self.logger.warning('Found a RW server, dropping connection')
client._session_callback(KeeperState.CONNECTING)
except Exception:
self.logger.exception('Unhandled exception in connection loop')
raise
finally:
if self._socket is not None:
self._socket.close()
当连接的状态变化的时候,都会调用client._session_callback
回调方法
监听器
zk的客户端持有一个监听器的集合属性
代码语言:javascript复制
代码语言:javascript复制class KazooClient
self.state_listeners = set()
增加listener
代码语言:javascript复制def add_listener(self, listener):
"""Add a function to be called for connection state changes.
This function will be called with a
:class:`~kazoo.protocol.states.KazooState` instance indicating
the new connection state on state transitions.
.. warning::
This function must not block. If its at all likely that it
might need data or a value that could result in blocking
than the :meth:`~kazoo.interfaces.IHandler.spawn` method
should be used so that the listener can return immediately.
"""
if not (listener and callable(listener)):
raise ConfigurationError("listener must be callable")
self.state_listeners.add(listener)
代码语言:javascript复制执行listener
从上面的分析我们知道,在client跟zk建立连接之后,client会监控session的状态
代码语言:javascript复制def _session_callback(self, state):
if state == self._state:
return
# Note that we don't check self.state == LOST since that's also
# the client's initial state
dead_state = self._state in LOST_STATES
self._state = state
# If we were previously closed or had an expired session, and
# are now connecting, don't bother with the rest of the
# transitions since they only apply after
# we've established a connection
if dead_state and state == KeeperState.CONNECTING:
self.logger.log(BLATHER, "Skipping state change")
return
if state in (KeeperState.CONNECTED, KeeperState.CONNECTED_RO):
self.logger.info("Zookeeper connection established, "
"state: %s", state)
self._live.set()
self._make_state_change(KazooState.CONNECTED)
elif state in LOST_STATES:
self.logger.info("Zookeeper session lost, state: %s", state)
self._live.clear()
self._make_state_change(KazooState.LOST)
self._notify_pending(state)
self._reset()
else:
self.logger.info("Zookeeper connection lost")
# Connection lost
self._live.clear()
self._notify_pending(state)
self._make_state_change(KazooState.SUSPENDED)
self._reset_watchers()
每个状态的变更都调用了_make_state_change
方法
def _make_state_change(self, state):
# skip if state is current
if self.state == state:
return
self.state = state
# Create copy of listeners for iteration in case one needs to
# remove itself
for listener in list(self.state_listeners):
try:
remove = listener(state)
if remove is True:
self.remove_listener(listener)
except Exception:
self.logger.exception("Error in connection state listener")
该方法遍历所有的listener,然后执行每个回调方法,比如,DataWatcher的get_data方法
Watcher
DataWatcher
代码语言:javascript复制class DataWatch(object):
def __init__(self, client, path, func=None, *args, **kwargs):
"""Create a data watcher for a path
:param client: A zookeeper client.
:type client: :class:`~kazoo.client.KazooClient`
:param path: The path to watch for data changes on.
:type path: str
:param func: Function to call initially and every time the
node changes. `func` will be called with a
tuple, the value of the node and a
:class:`~kazoo.client.ZnodeStat` instance.
:type func: callable
"""
self._client = client
self._path = path
self._func = func
self._stopped = False
self._run_lock = client.handler.lock_object()
self._version = None
self._retry = KazooRetry(max_tries=None,
sleep_func=client.handler.sleep_func)
self._include_event = None
self._ever_called = False
self._used = False
if args or kwargs:
warnings.warn('Passing additional arguments to DataWatch is'
' deprecated. ignore_missing_node is now assumed '
' to be True by default, and the event will be '
' sent if the function can handle receiving it',
DeprecationWarning, stacklevel=2)
# Register our session listener if we're going to resume
# across session losses
if func is not None:
self._used = True
self._client.add_listener(self._session_watcher)
self._get_data()
在实例化该watch的时候,就会通过_get_data
方法通知下所有的客户端执行
增加watcher
代码语言:javascript复制@_ignore_closed
def _get_data(self, event=None):
# Ensure this runs one at a time, possible because the session
# watcher may trigger a run
with self._run_lock:
if self._stopped:
return
initial_version = self._version
try:
data, stat = self._retry(self._client.get,
self._path, self._watcher)
except NoNodeError:
data = None
这里调用了KazooClient的get方法,进而调用get_async
def get_async(self, path, watch=None):
"""Asynchronously get the value of a node. Takes the same
arguments as :meth:`get`.
:rtype: :class:`~kazoo.interfaces.IAsyncResult`
"""
if not isinstance(path, string_types):
raise TypeError("Invalid type for 'path' (string expected)")
if watch and not callable(watch):
raise TypeError("Invalid type for 'watch' (must be a callable)")
async_result = self.handler.async_result()
self._call(GetData(_prefix_root(self.chroot, path), watch),
async_result)
return async_result
这里将path、watch封装成了一个对象GetData
,也就是下面的request
def _call(self, request, async_object):
"""Ensure there's an active connection and put the request in
the queue if there is.
Returns False if the call short circuits due to AUTH_FAILED,
CLOSED, EXPIRED_SESSION or CONNECTING state.
"""
if self._state == KeeperState.AUTH_FAILED:
async_object.set_exception(AuthFailedError())
return False
elif self._state == KeeperState.CLOSED:
async_object.set_exception(ConnectionClosedError(
"Connection has been closed"))
return False
elif self._state in (KeeperState.EXPIRED_SESSION,
KeeperState.CONNECTING):
async_object.set_exception(SessionExpiredError())
return False
self._queue.append((request, async_object))
# wake the connection, guarding against a race with close()
write_sock = self._connection._write_sock
if write_sock is None:
async_object.set_exception(ConnectionClosedError(
"Connection has been closed"))
try:
write_sock.send(b'