Skip to content

k3zkutil

Action-CI Documentation Status Package

Helper functions for ZooKeeper with kazoo. Provides distributed locking, configuration management, ACL utilities, and CAS operations.

k3zkutil is a component of pykit3 project: a python3 toolkit set.

Installation

pip install k3zkutil

Quick Start

import k3zkutil

# Create a distributed lock
with k3zkutil.ZKLock('my_lock', zk_client):
    # Do something with exclusive access
    pass

API Reference

k3zkutil

Some helper function to make life easier with zookeeper.

CachedReader

Bases: dict

Source code in k3zkutil/cached_reader.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class CachedReader(dict):
    # bar = {
    #    'jobs': {
    #        'num' : 10
    #    }
    # }
    # cr = CachedReader('127.0.0.1:2181', 'bar')
    # for i in range(cr['jobs']['num']):
    #     doit()
    def __init__(self, zk, path, callback=None):
        """

        :param zk: is the connection argument, which can be:
        -   Comma separated host list, such as
            `"127.0.0.1:2181,127.0.0.2:2181"`.

        -   A `zkutil.ZKConf` instance specifying connection argument with other
            config.

        -   A plain `dict` to create a `zkutil.ZKConf` instance.
        :param path: the path of the node in zookeeper.
        :param callback: give a callback when the node change. Defaults to `None`.
        It has 3 arguments `(path, old_dict, new_dict)`.
        """
        super(CachedReader, self).__init__()

        self.zke, self.owning_zk = zkconf.kazoo_client_ext(zk)
        self.path = path
        self.callback = callback
        self.available_ev = threading.Event()
        self.stopped = False
        self.val = [None, None]
        # lock for update the dict
        self.lock = threading.RLock()

        def _conn_change_cb(state):
            self._on_conn_change(state)

        def _node_change_cb(event):
            self._on_node_change(event)

        self.conn_change_cb = _conn_change_cb
        self.node_change_cb = _node_change_cb
        self.zke.add_listener(self.conn_change_cb)
        self._update()

    def watch(self, timeout=None):
        """
        Wait until the node change and return a list `[old_dict, new_dict]`.
        If timeout, raise a `ZKWaitTimeout`.
        :param timeout: specifies the time(in second) to wait.
        By default it is `None` which means to wait for a year
        :return: If close the `CachedReader` by `zkutil.CachedReader.close()`, it return `None`.
        If the node change, it return a list `[old_dict, new_dict]`
        """
        self.available_ev.clear()

        timeout = timeout or 86400 * 365

        if self.available_ev.wait(timeout):
            if self.stopped:
                return None

            else:
                return self.val

        else:
            raise exceptions.ZKWaitTimeout("timeout {t} sec".format(t=timeout))

    def close(self):
        """
        Stop the `zkutil.CachedReader.watch` and the callback.
        :return: nothing
        """
        self.stopped = True
        self.available_ev.set()

        self.zke.remove_listener(self.conn_change_cb)
        if self.owning_zk:
            zkutil.close_zk(self.zke)

    def _on_conn_change(self, state):
        logger.info("state changed: {state}".format(state=state))
        self.stopped = True
        self.available_ev.set()

    def _on_node_change(self, event):
        logger.info("node state changed:{ev}".format(ev=event))

        if self.stopped:
            return

        self._update()
        self.available_ev.set()

        if self.callback is None:
            return

        self.callback(self.path, self.val[0], self.val[1])

    def _update(self):
        with self.lock:
            curr, _ = self.zke.get(self.path, watch=self.node_change_cb)
            self.val = [self.val[1], curr]
            self.update(curr)

            keys = list(self.keys())
            for k in keys:
                if k not in curr:
                    del self[k]

__init__(zk, path, callback=None)

:param zk: is the connection argument, which can be: - Comma separated host list, such as "127.0.0.1:2181,127.0.0.2:2181".

  • A zkutil.ZKConf instance specifying connection argument with other config.

  • A plain dict to create a zkutil.ZKConf instance. :param path: the path of the node in zookeeper. :param callback: give a callback when the node change. Defaults to None. It has 3 arguments (path, old_dict, new_dict).

Source code in k3zkutil/cached_reader.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def __init__(self, zk, path, callback=None):
    """

    :param zk: is the connection argument, which can be:
    -   Comma separated host list, such as
        `"127.0.0.1:2181,127.0.0.2:2181"`.

    -   A `zkutil.ZKConf` instance specifying connection argument with other
        config.

    -   A plain `dict` to create a `zkutil.ZKConf` instance.
    :param path: the path of the node in zookeeper.
    :param callback: give a callback when the node change. Defaults to `None`.
    It has 3 arguments `(path, old_dict, new_dict)`.
    """
    super(CachedReader, self).__init__()

    self.zke, self.owning_zk = zkconf.kazoo_client_ext(zk)
    self.path = path
    self.callback = callback
    self.available_ev = threading.Event()
    self.stopped = False
    self.val = [None, None]
    # lock for update the dict
    self.lock = threading.RLock()

    def _conn_change_cb(state):
        self._on_conn_change(state)

    def _node_change_cb(event):
        self._on_node_change(event)

    self.conn_change_cb = _conn_change_cb
    self.node_change_cb = _node_change_cb
    self.zke.add_listener(self.conn_change_cb)
    self._update()

close()

Stop the zkutil.CachedReader.watch and the callback. :return: nothing

Source code in k3zkutil/cached_reader.py
83
84
85
86
87
88
89
90
91
92
93
def close(self):
    """
    Stop the `zkutil.CachedReader.watch` and the callback.
    :return: nothing
    """
    self.stopped = True
    self.available_ev.set()

    self.zke.remove_listener(self.conn_change_cb)
    if self.owning_zk:
        zkutil.close_zk(self.zke)

watch(timeout=None)

Wait until the node change and return a list [old_dict, new_dict]. If timeout, raise a ZKWaitTimeout. :param timeout: specifies the time(in second) to wait. By default it is None which means to wait for a year :return: If close the CachedReader by zkutil.CachedReader.close(), it return None. If the node change, it return a list [old_dict, new_dict]

Source code in k3zkutil/cached_reader.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def watch(self, timeout=None):
    """
    Wait until the node change and return a list `[old_dict, new_dict]`.
    If timeout, raise a `ZKWaitTimeout`.
    :param timeout: specifies the time(in second) to wait.
    By default it is `None` which means to wait for a year
    :return: If close the `CachedReader` by `zkutil.CachedReader.close()`, it return `None`.
    If the node change, it return a list `[old_dict, new_dict]`
    """
    self.available_ev.clear()

    timeout = timeout or 86400 * 365

    if self.available_ev.wait(timeout):
        if self.stopped:
            return None

        else:
            return self.val

    else:
        raise exceptions.ZKWaitTimeout("timeout {t} sec".format(t=timeout))

ZKConf

Bases: object

It is a config wrapper, provding several method for accessing config. If one of the config field is not spedified when initializing this class, it falls back to using config.zk_<field>.

Classes in this module relies on this class to access config.

E.g.:

Source code in k3zkutil/zkconf.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class ZKConf(object):
    """
    It is a config wrapper, provding several method for accessing config.
    If one of the config field is not spedified when initializing this class, it
    falls back to using `config.zk_<field>`.

    Classes in this module relies on this class to access config.

    E.g.:
    """

    # config.zk_journal_dir = "my_dir/"
    # c = k3zkutil.ZKConf(hosts="127.0.0.1:9999")
    # c.hosts()       # "127.0.0.1:9999" # specified
    # c.journal_dir() # "my_dir/"        # by default using `config.zk_<field>`
    """
    zk dir:
        <prefix>/record/<key>
        <prefix>/lock/<key>
        <prefix>/tx/
                    alive/0000000001
                    journal/0000000001
                    state/0000000001
                    journal_id_set
                    txid_maker
        <prefix>/seq/<key>

    alive:      Contains `ephemeral` node each of which represents a transaction.
                Its modifications is used.

    journal:    Contains journal transaction modifications. Each of them is a complete transaction.

    journal_id_set: Committed and Purged journal id.
    """

    def __init__(
        self, hosts=None, tx_dir=None, seq_dir=None, record_dir=None, lock_dir=None, node_id=None, auth=None, acl=None
    ):
        self.conf = {
            "hosts": hosts,
            "tx_dir": tx_dir,
            "seq_dir": seq_dir,
            "record_dir": record_dir,
            "lock_dir": lock_dir,
            "node_id": node_id,
            "auth": auth,
            "acl": acl,
        }

    def hosts(self):
        return self._get_config("hosts")

    def record_dir(self):
        return self._get_config("record_dir")

    def lock_dir(self):
        return self._get_config("lock_dir")

    def node_id(self):
        return self._get_config("node_id")

    def auth(self):
        return self._get_config("auth")

    def acl(self):
        return self._get_config("acl")

    def lock(self, key=""):
        return "".join([self.lock_dir(), _dump_txid(key)])

    def record(self, key=""):
        return "".join([self.record_dir(), key])

    def seq_dir(self):
        return self._get_config("seq_dir")

    def seq(self, key=""):
        return "".join([self.seq_dir(), key])

    def tx_dir(self):
        return self._get_config("tx_dir")

    def tx_alive(self, txid=""):
        return "".join([self.tx_dir(), "alive/", _dump_txid(txid)])

    def tx_state(self, txid=""):
        return "".join([self.tx_dir(), "state/", _dump_txid(txid)])

    def journal(self, journal_id=""):
        return "".join([self.tx_dir(), "journal/", _dump_journal_id(journal_id)])

    def journal_id_set(self):
        return "".join([self.tx_dir(), "journal_id_set"])

    def txid_maker(self):
        return "".join([self.tx_dir(), "txid_maker"])

    def kazoo_digest_acl(self):
        a = self.acl()
        if a is None:
            return a

        return zkutil.make_kazoo_digest_acl(a)

    def kazoo_auth(self):
        a = self.auth()
        if a is None:
            return None

        return a[0], a[1] + ":" + a[2]

    def _get_config(self, name):
        if self.conf[name] is None:
            return getattr(conf, "zk_" + name)
        else:
            return self.conf[name]

ZKLock

Bases: object

ZKLock implements a zookeeper based distributed lock.

Source code in k3zkutil/zklock.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
class ZKLock(object):
    """
    ZKLock implements a zookeeper based distributed lock.
    """

    def __init__(
        self, lock_name, zkconf=None, zkclient=None, on_lost=None, identifier=None, ephemeral=True, timeout=10
    ):
        if zkconf is None:
            zkconf = ZKConf()
        if isinstance(zkconf, dict):
            zkconf = ZKConf(**zkconf)
        self.zkconf = zkconf

        if zkclient is None:
            # If user does not pass a zkclient instance,
            # we need to create one for lock to use.
            # This zkclient will be closed after lock is released

            if on_lost is None:
                raise ValueError("on_lost must be specified to watch zk connection issue if no zkclient specified")

            zkclient = make_owning_zkclient(zkconf.hosts(), zkconf.auth())
            self.owning_client = True
        else:
            self.owning_client = False

        if identifier is None:
            identifier = zkutil.lock_id(zkconf.node_id())

        if not isinstance(identifier, dict):
            identifier = make_identifier(identifier, None)

        assert sorted(["id", "val"]) == sorted(list(identifier.keys()))

        # a copy of hosts for debugging and tracking
        self._hosts = ",".join(["{0}:{1}".format(*x) for x in zkclient.hosts])

        self.zkclient = zkclient
        if isinstance(self.zkclient, KazooClientExt):
            self.zkclient = self.zkclient._zk

        self.on_lost = on_lost

        self.lock_name = lock_name
        self.lock_path = zkconf.lock(self.lock_name)
        self.identifier = identifier
        self.ephemeral = ephemeral
        self.timeout = timeout

        self.mutex = threading.RLock()
        self.maybe_available = threading.Event()
        self.maybe_available.set()
        self.lock_holder = None

        logger.info("adding event listener: {s}".format(s=self))
        self.zkclient.add_listener(self.on_connection_change)

    def on_node_change(self, watchevent):
        # Must be locked first.
        # Or there is a chance on_node_change is triggered before
        #         self.maybe_available.clear()
        with self.mutex:
            self.maybe_available.set()

            # If locked. the node change is treated as losing a lock
            if self.is_locked():
                if self.on_lost is not None:
                    self.on_lost()

        logger.info("node state changed:{ev}, lock might be released: {s}".format(ev=watchevent, s=str(self)))

    def on_connection_change(self, state):
        # notify zklock to re-do acquiring procedure, to trigger Connection Error
        with self.mutex:
            self.maybe_available.set()

        if self.on_lost is not None:
            self.on_lost()

    def acquire_loop(self, timeout=None):
        if timeout is None:
            timeout = self.timeout

        expire_at = time.time() + timeout

        while True:
            # Even if timeout is smaller than 0, try-loop continue on until
            # maybe_available is not ready.
            #
            # There is a chance that:
            #  - Failed to create lock node(lock is occupied by other)
            #  - Failed to get lock node(just deleted)
            #  - Failed to create lock node(lock is occupied by other)
            #  - Failed to get lock node(just deleted)
            #  - ...
            if not self.maybe_available.wait(timeout=expire_at - time.time()):
                logger.debug("lock is still held by others: " + str(self))

                if time.time() > expire_at:
                    raise LockTimeout("lock: " + str(self.lock_path))

            # Always proceed the "get" phase, in order to add a watch handler.
            # To watch node change event.

            self._create()
            self._acquire_by_get()
            if self.is_locked():
                return

            # If it is possible to acquire the lock in next retry, do not yield
            if self.maybe_available.is_set():
                continue
            else:
                yield self.lock_holder[0], self.lock_holder[1]

    def acquire(self, timeout=None):
        for holder, ver in self.acquire_loop(timeout=timeout):
            continue

    def try_acquire(self):
        """
        Try to acquire the lock and return result.
        It never blocks.
        :return: a tuple of result, lock holder and lock holder version.
        Such as `(True, "aa-xx-bb", -1)` or `(False, "aa-xx-cc", 12)`
        """
        # If lock is acquired:
        # - the 1st element is `True`,
        # - the 2nd is identifier of this lock,
        # - the 3rd is `-1`.
        #
        # If lock is not acquired:
        # - the 1st element is `False`,
        # - the 2nd is identifier of the lock holder,
        # - the 3rd is a non-negative integer, which is the version of the zk node.
        try:
            self.acquire(timeout=-1)
        except LockTimeout:
            pass

        # if_locked, lock holder identifier, holder version
        return self.is_locked(), self.lock_holder[0], self.lock_holder[1]

    def try_release(self):
        """
        Release lock if current lock holder is this lock.
        It never blocks.
        :return: a tuple of result, lock holder and lock holder version.
        Such as `(True, "aa-xx-bb", -1)` or `(False, "aa-xx-cc", 12)`
        """
        # If lock holder is this `ZKLock`(checked by identifier), or lock is not acquired
        # by anyone):
        #
        # - the 1st element is `True`,
        #           - the 2nd is identifier of this lock,
        #                                           - the 3rd is `-1` or `zstat.version` or the lock zknode.
        #
        # Otherwise:
        #
        # - the 1st element is `False`,
        #           - the 2nd is identifier of the lock holder,
        #           - the 3rd is a non-negative integer, which is the version of the zk node.
        logger.debug("try to release if I am locker holder: {s}".format(s=str(self)))

        try:
            holder, zstat = self.zkclient.get(self.lock_path)
            holder = k3utfjson.load(holder)

            self.lock_holder = (holder, zstat.version)

            logger.debug("got lock holder: {s}".format(s=str(self)))

            if self.cmp_identifier(holder, self.identifier):
                self.zkclient.remove_listener(self.on_connection_change)

                try:
                    self.zkclient.delete(self.lock_path, version=zstat.version)
                except NoNodeError as e:
                    logger.info(repr(e) + " while delete lock: " + str(self))

                self.lock_holder = None

                return True, holder, zstat.version
            else:
                return False, holder, zstat.version

        except NoNodeError as e:
            logger.info(repr(e) + " while try_release: {s}".format(s=str(self)))
            return True, self.identifier, -1

    def release(self):
        """
        Release the lock if it has been locked.
        Otherwise return silently.

        If this lock initiated a connection by itself, it will be closed.

        :return: Nothing
        """
        with self.mutex:
            if self.is_locked():
                # remove listener to avoid useless event triggering
                self.zkclient.remove_listener(self.on_connection_change)

                try:
                    self.zkclient.delete(self.lock_path)
                except NoNodeError as e:
                    logger.info(repr(e) + " while delete lock: " + str(self))

                self.lock_holder = None

                logger.info("RELEASED: {s}".format(s=str(self)))
            else:
                logger.info("not acquired, do not need to release")

        self.close()

    def close(self):
        self.zkclient.remove_listener(self.on_connection_change)

        if self.owning_client:
            logger.info("zk client is made by me, close it")
            zkutil.close_zk(self.zkclient)

    def is_locked(self):
        holder = self.lock_holder
        if holder is None:
            return False

        return self.cmp_identifier(holder[0], self.identifier)

    def _create(self):
        logger.debug("to creaet: {s}".format(s=str(self)))

        try:
            self.zkclient.create(
                self.lock_path,
                k3utfjson.dump(self.identifier).encode("utf-8"),
                ephemeral=self.ephemeral,
                acl=self.zkconf.kazoo_digest_acl(),
            )

        except NodeExistsError as e:
            # NOTE Success create on server side might also results in failure
            # on client side due to network issue.
            # 'get' after 'create' to check if existent node belongs to this
            # client.

            logger.debug(repr(e) + " while create lock: {s}".format(s=str(self)))
            self.lock_holder = None
            return

        logger.info("CREATE OK: {s}".format(s=str(self)))

    def set_lock_val(self, val, version=-1):
        locked, holder, ver = self.try_acquire()
        if not locked:
            raise ZKUtilError("set non-locked: {k}".format(k=self.lock_name))

        self.identifier["val"] = val

        value = k3utfjson.dump(self.identifier).encode("utf-8")
        st = self.zkclient.set(self.lock_path, value, version=version)

        return st.version

    def get_lock_val(self):
        holder, zstat = self.zkclient.get(self.lock_path)
        holder = k3utfjson.load(holder)

        return holder["val"], zstat.version

    def cmp_identifier(self, ia, ib):
        return ia["id"] == ib["id"]

    def _acquire_by_get(self):
        logger.debug("to get: {s}".format(s=str(self)))

        try:
            with self.mutex:
                holder, zstat = self.zkclient.get(self.lock_path, watch=self.on_node_change)
                holder = k3utfjson.load(holder)

                self.lock_holder = (holder, zstat.version)

                logger.debug("got lock holder: {s}".format(s=str(self)))

                if self.cmp_identifier(holder, self.identifier):
                    logger.info("ACQUIRED: {s}".format(s=str(self)))
                    return

                logger.debug("other holds: {s}".format(s=str(self)))
                self.maybe_available.clear()

        except NoNodeError as e:
            # create failed but when getting it, it has been deleted
            logger.info(repr(e) + " while get lock: {s}".format(s=str(self)))
            with self.mutex:
                self.lock_holder = None
                self.maybe_available.set()

    def __str__(self):
        return "<id={id} {l}:[{holder}] on {h}>".format(
            id=self.identifier["id"],
            l=self.lock_path,
            holder=(self.lock_holder or ""),
            h=str(self._hosts),
        )

    def __enter__(self):
        self.acquire()

    def __exit__(self, tp, value, tb):
        self.release()

release()

Release the lock if it has been locked. Otherwise return silently.

If this lock initiated a connection by itself, it will be closed.

:return: Nothing

Source code in k3zkutil/zklock.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def release(self):
    """
    Release the lock if it has been locked.
    Otherwise return silently.

    If this lock initiated a connection by itself, it will be closed.

    :return: Nothing
    """
    with self.mutex:
        if self.is_locked():
            # remove listener to avoid useless event triggering
            self.zkclient.remove_listener(self.on_connection_change)

            try:
                self.zkclient.delete(self.lock_path)
            except NoNodeError as e:
                logger.info(repr(e) + " while delete lock: " + str(self))

            self.lock_holder = None

            logger.info("RELEASED: {s}".format(s=str(self)))
        else:
            logger.info("not acquired, do not need to release")

    self.close()

try_acquire()

Try to acquire the lock and return result. It never blocks. :return: a tuple of result, lock holder and lock holder version. Such as (True, "aa-xx-bb", -1) or (False, "aa-xx-cc", 12)

Source code in k3zkutil/zklock.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def try_acquire(self):
    """
    Try to acquire the lock and return result.
    It never blocks.
    :return: a tuple of result, lock holder and lock holder version.
    Such as `(True, "aa-xx-bb", -1)` or `(False, "aa-xx-cc", 12)`
    """
    # If lock is acquired:
    # - the 1st element is `True`,
    # - the 2nd is identifier of this lock,
    # - the 3rd is `-1`.
    #
    # If lock is not acquired:
    # - the 1st element is `False`,
    # - the 2nd is identifier of the lock holder,
    # - the 3rd is a non-negative integer, which is the version of the zk node.
    try:
        self.acquire(timeout=-1)
    except LockTimeout:
        pass

    # if_locked, lock holder identifier, holder version
    return self.is_locked(), self.lock_holder[0], self.lock_holder[1]

try_release()

Release lock if current lock holder is this lock. It never blocks. :return: a tuple of result, lock holder and lock holder version. Such as (True, "aa-xx-bb", -1) or (False, "aa-xx-cc", 12)

Source code in k3zkutil/zklock.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def try_release(self):
    """
    Release lock if current lock holder is this lock.
    It never blocks.
    :return: a tuple of result, lock holder and lock holder version.
    Such as `(True, "aa-xx-bb", -1)` or `(False, "aa-xx-cc", 12)`
    """
    # If lock holder is this `ZKLock`(checked by identifier), or lock is not acquired
    # by anyone):
    #
    # - the 1st element is `True`,
    #           - the 2nd is identifier of this lock,
    #                                           - the 3rd is `-1` or `zstat.version` or the lock zknode.
    #
    # Otherwise:
    #
    # - the 1st element is `False`,
    #           - the 2nd is identifier of the lock holder,
    #           - the 3rd is a non-negative integer, which is the version of the zk node.
    logger.debug("try to release if I am locker holder: {s}".format(s=str(self)))

    try:
        holder, zstat = self.zkclient.get(self.lock_path)
        holder = k3utfjson.load(holder)

        self.lock_holder = (holder, zstat.version)

        logger.debug("got lock holder: {s}".format(s=str(self)))

        if self.cmp_identifier(holder, self.identifier):
            self.zkclient.remove_listener(self.on_connection_change)

            try:
                self.zkclient.delete(self.lock_path, version=zstat.version)
            except NoNodeError as e:
                logger.info(repr(e) + " while delete lock: " + str(self))

            self.lock_holder = None

            return True, holder, zstat.version
        else:
            return False, holder, zstat.version

    except NoNodeError as e:
        logger.info(repr(e) + " while try_release: {s}".format(s=str(self)))
        return True, self.identifier, -1

cas_loop(zkclient, path, json=True)

A helper generator for doing CAS(check and set or compare and swap) on zk. See CAS

A general CAS loop is like following(check the version when update): :param zkclient: is a KazooClient instance connected to zk. It can also be a string, in which case it is treated as a comma separated hosts list, and a zkclient is created with default setting. It can also be a dict or an instance of ZKConf, in which case it create a zkclient with ZKConf defined setting. :param path: is the zk-node path to get and set. :param json: whether to do a json load after reading the value from zk and to do a json dump before updating the value to zk. :return: a generator yields a tuple of 2 element:

Source code in k3zkutil/zkacid.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def cas_loop(zkclient, path, json=True):
    # while True:
    #     curr_val, zstat = zkclient.get(path)
    # new_val = curr_val + ':foo'
    # try:
    #     zkclient.set(path, new_val, version=zstat.version)
    # except BadVersionError as e:
    #     continue
    # else:
    #     return
    """
    A helper generator for doing CAS(check and set or compare and swap) on zk.
    See [CAS](https://en.wikipedia.org/wiki/Compare-and-swap)

    A general CAS loop is like following(check the version when update):
    :param zkclient: is a `KazooClient` instance connected to zk.
    It can also be a string, in which case it is treated as a comma separated
    hosts list, and a `zkclient` is created with default setting.
    It can also be a `dict` or an instance of `ZKConf`, in which case it create
    a `zkclient` with `ZKConf` defined setting.
    :param path: is the zk-node path to get and set.
    :param json: whether to do a json load after reading the value from zk and to do a json dump
    before updating the value to zk.
    :return: a generator yields a `tuple` of 2 element:
    """
    zkclient, owning_zk = kazoo_client_ext(zkclient, json=json)

    def setter(path, val, zstat):
        zkclient.set(path, val, version=zstat.version)

    try:
        for curr in k3txutil.cas_loop(zkclient.get, setter, args=(path,), conflicterror=BadVersionError):
            yield curr
    finally:
        if owning_zk:
            zkutil.close_zk(zkclient)

close_zk(zk)

Stop and close a zk client. :param zk: a KazooClient or KazooClientExt instance, otherwise raise a TypeError. :return: nothing

Source code in k3zkutil/zkutil.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def close_zk(zk):
    """
    Stop and close a zk client.
    :param zk: a `KazooClient` or `KazooClientExt` instance, otherwise raise a `TypeError`.
    :return: nothing
    """
    if not isinstance(zk, KazooClient):
        raise TypeError("expect KazooClient or KazooClientExt, but got {t}".format(t=type(zk)))

    try:
        zk.stop()

    except KazooException as e:
        logger.exception(repr(e) + " while stop zk client")

    try:
        zk.close()

    except Exception as e:
        logger.exception(repr(e) + " while close zk client")

export_hierarchy(zkcli, zkpath)

Exporting a zookeeper node in a tree structure, and you can import the data into zookeeper using zkutil.init_hierarchy :param zkcli: is a KazooClient instance connected to zk. :param zkpath: is zookeeper root path that you want export :return:

Source code in k3zkutil/zkutil.py
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
def export_hierarchy(zkcli, zkpath):
    """
    Exporting a zookeeper node in a tree structure, and you can import the data into zookeeper
    using `zkutil.init_hierarchy`
    :param zkcli: is a `KazooClient` instance connected to zk.
    :param zkpath: is zookeeper root path that you want export
    :return:
    """
    if zkpath != "/":
        zkpath = zkpath.rstrip("/")

    if not zkpath.startswith("/"):
        raise ZkPathError("zkpath: {0} Error, Should be absolute path".format(zkpath))

    zk_node = _export_hierarchy(zkcli, zkpath)

    return zk_node

get_next(zkclient, path, version=-1)

Wait until zk-node path version becomes greater than version then return node value and zstat. :param version: the version that path version must be greater than. :return: zk node value and zstat.

Source code in k3zkutil/zkutil.py
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
@make_conditioned_getter
def get_next(zkclient, path, version=-1):
    """
    Wait until zk-node `path` version becomes greater than `version` then return
    node value and `zstat`.
    :param version: the version that `path` version must be greater than.
    :return: zk node value and `zstat`.
    """
    NeedWait, set_available = yield

    val, zstat = zkclient.get(path, watch=lambda watchevent: set_available())
    if zstat.version > version:
        yield val, zstat

    yield NeedWait

init_hierarchy(hosts, hierarchy, users, auth)

It initialize a zookeeper cluster, including initializing the tree structure, setting value and permissions for each node. :param hosts: comma-separated list of hosts to connect to, such as '127.0.0.1:2181,127.0.0.1:2182,[::1]:2183'. :param hierarchy: a dict of zk node structure with value or acl optional for each node. For example,

Source code in k3zkutil/zkutil.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def init_hierarchy(hosts, hierarchy, users, auth):
    """
    It initialize a zookeeper cluster, including initializing the tree structure,
    setting value and permissions for each node.
    :param hosts: comma-separated list of hosts to connect to, such as `'127.0.0.1:2181,127.0.0.1:2182,[::1]:2183'`.
    :param hierarchy: a dict of zk node structure with value or acl optional for each node.
    For example,
    """
    # node1:
    #   __val__: "json, optional, by default a '{}'"
    #   __acl__: # optional, same with parent if not given
    #     user_1: "cdrwa"
    #     user_2: "rw"
    #                 ...
    # node2:
    #     node21: {...}
    """
    As shown above, each node has two attributes `__val__` and `__acl__` which are used to set the corresponding node.
    :param users:  a dict in form `{<username>: <password>}` containing all users in `hierarchy`.
    :param auth: a tuple in form `(<scheme>, <user>, <password>)`.
    It is the authorization info to connect to zookeeper which is used to initialize the zookeeper cluster.
    :return: None.
    """
    zkcli = KazooClient(hosts)
    zkcli.start()

    scheme, name, passw = auth
    zkcli.add_auth(scheme, name + ":" + passw)

    def _init_hierarchy(hierarchy, parent_path):
        if len(hierarchy) == 0:
            return

        for node, attr_children in hierarchy.items():
            val = attr_children.get("__val__", {})
            val = k3utfjson.dump(val).encode("utf-8")
            acl = attr_children.get("__acl__")

            path = _init_node(zkcli, parent_path, node, val, acl, users)
            children = {k: v for k, v in attr_children.items() if k not in ("__val__", "__acl__")}

            _init_hierarchy(children, path)

    _init_hierarchy(hierarchy, "/")
    close_zk(zkcli)

is_backward_locking(locked_keys, key)

Check if the operation of locking key is a backward-locking. :param locked_keys: is a collection support in operator that contains already locked keys. :param key: is the key to lock. :return: a bool indicate if locking key would be a backward-locking.

Source code in k3zkutil/zkutil.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
def is_backward_locking(locked_keys, key):
    # ### Naive dead lock detect:
    #
    # Locks must be acquired in alphabetic order(or other order, from left to right.
    #     Trying to acquire a `lock>=right_most_locked`, is a forward locking Otherwise it
    #                                                                                   is a backward locking.
    #
    #     Always do forward locking we can guarantee there won't be a dead lock.
    # Since a deadlock needs at least one backward locking to form a circular dependency.
    #
    #     If a process fails to acquire a lock in a backward locking,
    #     it should release all locks it holds and redo the entire transaction.
    #
    #     E.g. suppose X has acquired lock a and c, Y has acquired lock b:
    #
    # ```
    # Acquired locks by process X and Y
    # locks are ordered left-to-right
    #                   ---------------------------------------------
    # proc-X   a(locked)                  c(locked)
    # proc-Y               b(locked)
    # ```
    #
    # If
    # X tries to acquire b(**backward**),
    #                    Y tries to acquire c(forward):
    # There is a deadlock. X should release all locks.
    #
    # ```
    # ---------------------------------------------
    # proc-X   a(locked)   .------------ c(locked)
    # v             ^
    # proc-Y               b(locked) ----'
    # ```
    #
    # If
    # X tries to acquire b(**backward**)
    # Y tries to acquires a(**backward**)
    # There is a deadlock, X and Y should both release their locks.
    #
    # ```
    # ---------------------------------------------
    # proc-X   a(locked)   .------------ c(locked)
    #          ^           v
    # proc-Y   '---------- b(locked)
    """
    Check if the operation of locking `key` is a backward-locking.
    :param locked_keys: is a collection support `in` operator that contains already locked keys.
    :param key: is the key to lock.
    :return: a `bool` indicate if locking `key` would be a backward-locking.
    """
    locked_keys = sorted(locked_keys)
    assert key not in locked_keys, "must not re-lock a key"

    if len(locked_keys) == 0:
        is_backward = False
    else:
        is_backward = key < locked_keys[-1]

    return is_backward

kazoo_client_ext(zk, json=True)

return zkclient created or original zkclient, and if zkclient is created

Source code in k3zkutil/zkconf.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def kazoo_client_ext(zk, json=True):
    """
    return zkclient created or original zkclient, and if zkclient is created
    """

    zkconf = None

    if isinstance(zk, str):
        zkconf = ZKConf(hosts=zk)

    if isinstance(zk, dict):
        zkconf = ZKConf(**zk)

    if isinstance(zk, ZKConf):
        zkconf = zk

    if zkconf is None:
        zkconf = ZKConf()
        owning = False
    else:
        zk = KazooClient(zkconf.hosts())
        owning = True

    zkclient = KazooClientExt(zk, json=json)

    if zkclient._zkconf is None:
        zkclient._zkconf = zkconf

    zkclient.start()

    auth = zkconf.kazoo_auth()
    if auth is not None:
        zkclient.add_auth(*auth)

    return zkclient, owning

lock_id(node_id=None)

It builds a string used as zk lock node value, to describe lock holder's information. The result string is in form: ---

e.g. web-192.168.0.2-1233-0000000001

ip is chosen from all local ipv4. If there is an intra net ip, use it. Otherwise, use an public ip it found. :param node_id: is an arbitrary string representing a host. If it is None, config.zk_node_id is used.

:return: a string for lock identifier.

Source code in k3zkutil/zkutil.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def lock_id(node_id=None):
    """
    It builds a string used as zk lock node value, to describe lock holder's information.
    The result string is in form:
    <node_id>-<ip>-<process_id>-<counter>
    # e.g. web-192.168.0.2-1233-0000000001

    `ip` is chosen from all local ipv4.
    If there is an intra net ip, use it.
    Otherwise, use an public ip it found.
    :param node_id: is an arbitrary string representing a host.
    If it is `None`, `config.zk_node_id` is used.

    :return: a string for lock identifier.
    """
    """
    Embed lock holder information into the zk node data for the lock.

    `node_id` is a user defined identifier of a host.
    """

    if node_id is None:
        node_id = conf.zk_node_id

    ip = (host_ip4 + ["unknownip"])[0]

    seq = [node_id, ip, str(os.getpid()), str(uuid.uuid4()).replace("-", "")]

    return "-".join(seq)

make_acl_entry(username, password, permissions)

It concat username, digest and permissions to a string as acl entry. :param username: the name of zookeeper user. :param password: the password of zookeeper user. :param permissions: a string, a list or a tuple. each element in permissions is a char who should be included in cdrwa. If permissions contains extra element, PermTypeError will be raised. :return: a string in form: "digest:::" where digest is a string build by zkutil.make_digest()

Source code in k3zkutil/zkutil.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def make_acl_entry(username, password, permissions):
    """
    It concat `username`, `digest` and `permissions` to a string as acl entry.
    :param username: the name of zookeeper user.
    :param password: the password of zookeeper user.
    :param permissions: a string, a list or a tuple.
    each element in `permissions` is a char who should be included in `cdrwa`. If `permissions` contains extra element, `PermTypeError` will be raised.
    :return: a string in form:
    "digest:<username>:<digest>:<permissions>"
    where `digest` is a string build by `zkutil.make_digest()`
    """
    perms = ""
    for c in permissions:
        if c not in PERM_TO_LONG:
            raise PermTypeError(c)
        perms += c

    return "digest:{username}:{digest}:{permissions}".format(
        username=username, digest=make_digest(username + ":" + password), permissions=perms
    )

make_digest(acc)

It make a digest for string acc

:param acc: The acc string is in form: :return: a digest string.

Source code in k3zkutil/zkutil.py
152
153
154
155
156
157
158
159
160
161
162
def make_digest(acc):
    # acc = "username:password"
    """
    It make a digest for string acc

    :param acc: The `acc` string is in form:
    :return: a digest string.
    """
    digest = hashlib.sha1(acc.encode()).digest()
    rst = base64.b64encode(digest).strip().decode()
    return rst

make_kazoo_digest_acl(acl)

:param acl: acl in tuple or list. :return: a list of kazoo.security.ACL.

Source code in k3zkutil/zkutil.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def make_kazoo_digest_acl(acl):
    # acl = (('xp', '123', 'cdrwa'),
    #        ('foo', 'passw', 'rw'),
    # )
    # print zkutil.make_kazoo_digest_acl(acl)
    #
    # [
    #     ACL(perms=12, acl_list=['CREATE', 'DELETE'],
    #         id=Id(scheme='digest', id=u'foo:VNy+Z9IdXrOUk9Rtia4fQS071t4=')),
    #     ACL(perms=31, acl_list=['ALL'],
    #         id=Id(scheme='digest', id=u'xp:LNcZO17uqqJ4GuYoSclIsGjYniQ='))
    # ]

    """

    :param acl: acl in tuple or list.
    :return: a `list` of `kazoo.security.ACL`.
    """
    if acl is None:
        return None

    rst = []
    for name, passw, perms in acl:
        perm_dict = {p: True for p in perm_to_long(perms)}
        acl_entry = security.make_digest_acl(name, passw, **perm_dict)
        rst.append(acl_entry)

    return rst

parse_kazoo_acl(acls)

Convert kazoo style acls in list/tuple to a list in form [(<scheme>, <user>, <perm>)]. :param acls: a list/tuple of kazoo.security.ACL. :return: a list of acl in form [(<scheme>, <user>, <perm>)]

Source code in k3zkutil/zkutil.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def parse_kazoo_acl(acls):
    """
    Convert kazoo style acls in list/tuple to a list in form `[(<scheme>, <user>, <perm>)]`.
    :param acls: a list/tuple of `kazoo.security.ACL`.
    :return: a list of acl in form `[(<scheme>, <user>, <perm>)]`
    """
    # acls = [ACL(perms=31,
    #            acl_list=['ALL'],
    #            id=Id(scheme='digest', id=u'user:+Ir5sN1lGJEEs8xBZhZXK='))]

    rst = []
    for acl in acls:
        if "ALL" in acl.acl_list:
            acl_list = "cdrwa"
        else:
            acl_list = perm_to_short(acl.acl_list)

        rst.append((acl.id.scheme, acl.id.id.split(":")[0], acl_list))

    return rst

parse_lock_id(data_str)

It parse string built by zkutil.lock_id() to an dictionary. :param data_str: string built by zkutil.lock_id().

Source code in k3zkutil/zkutil.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def parse_lock_id(data_str):
    """
    It parse string built by `zkutil.lock_id()` to an dictionary.
    :param data_str: string built by `zkutil.lock_id()`.
    """
    # :return:
    # {
    #     "node_id": "<node_id>",
    #     "ip": "<ip>",
    #     "process_id": process_id,
    #     "counter": <integer>,
    # "txid": "<txid>",
    # }
    """
    Parse string generated by lock_id()
    `process_id` and `counter` is `int`, the others are `string`s.

    If any of these three field can not be parsed correctly, it will be `None`.
    If `node_id` is in form of `txid:...`, `txid` is filled with the text after `:`.
    Otherwise it is `None`.
    This is added for zk-transaction implementation.
    """

    node_id, ip, process_id, _uuid = (data_str.split("-", 3) + ([None] * 4))[:4]

    if isinstance(process_id, str) and process_id.isdigit():
        process_id = int(process_id)
    else:
        process_id = None

    rst = {"node_id": node_id, "ip": ip, "process_id": process_id, "uuid": _uuid, "txid": None}

    if node_id.startswith("txid:"):
        rst["txid"] = node_id.split(":", 1)[1]

    return rst

perm_to_long(short, lower=True)

Convert short style zookeeper permissions(cdrwa or CDRWA) to standard permission list(['create', 'delete', 'read', 'write', 'admin']). :param short: is iterable of short permissions that can be used in for-loop. Such as "cdrw" or ['c', 'd'] :param lower: specifies if convert result to lower or upper case. By default it is True, for lower case. :return: a list of standard permission.

Source code in k3zkutil/zkutil.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def perm_to_long(short, lower=True):
    """
    Convert short style zookeeper permissions(`cdrwa` or `CDRWA`)
    to standard permission list(`['create', 'delete', 'read', 'write', 'admin']`).
    :param short: is `iterable` of short permissions that can be used in `for-loop`.
    Such as `"cdrw"` or `['c', 'd']`
    :param lower: specifies if convert result to lower or upper case.
    By default it is `True`, for lower case.
    :return: a list of standard permission.
    """
    rst = []

    for c in short:
        c = c.lower()
        if c not in PERM_TO_LONG:
            raise PermTypeError(c)

        rst.append(PERM_TO_LONG[c])

    if not lower:
        rst = [x.upper() for x in rst]

    return rst

perm_to_short(lst, lower=True)

The reverse of perm_to_long: It convert a list of standard permissions back to a short permission string, such as cdrw. :param lst: is a list of standard permissions, such as ['create', 'read']. :param lower: specifies if convert result to lower or upper case. By default it is True, for lower case. :return: a string of short permissions.

Source code in k3zkutil/zkutil.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def perm_to_short(lst, lower=True):
    """
    The reverse of `perm_to_long`:
    It convert a list of standard permissions back to a short permission string,
    such as `cdrw`.
    :param lst: is a list of standard permissions, such as `['create', 'read']`.
    :param lower: specifies if convert result to lower or upper case.
    By default it is `True`, for lower case.
    :return: a string of short permissions.
    """
    rst = ""

    for p in lst:
        p = p.lower()
        if p not in PERM_TO_SHORT:
            raise PermTypeError(p)

        rst += PERM_TO_SHORT[p]

    if not lower:
        rst = rst.upper()

    return rst

wait_absent(zkclient, path)

Wait at most timeout seconds for zk-node path to be absent.

If path does not exist, it returns at once. :param zkclient: :param path: :return:

Source code in k3zkutil/zkutil.py
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
@make_conditioned_getter
def wait_absent(zkclient, path):
    """
    Wait at most `timeout` seconds for zk-node `path` to be absent.

    If `path` does not exist, it returns at once.
    :param zkclient:
    :param path:
    :return:
    """
    NeedWait, set_available = yield

    try:
        zkclient.get(path, watch=lambda watchevent: set_available())
    except NoNodeError as e:
        logger.info(repr(e) + " found, return")
        yield None

    yield NeedWait

License

The MIT License (MIT) - Copyright (c) 2015 Zhang Yanpo (张炎泼)