Source code for dkredis.dkredislocks

import time
from contextlib import contextmanager

from .utils import (
    is_valid_identifier,
    unique_id,
    convert_to_bytes,
    later,
    now,
)
from .dkredis import connect, Timeout, remove_if


[docs] @contextmanager def fetch_lock(apiname: str, timeout=5, cn=None): """Use this lock to ensure that only one process is fetching expired cached data from an external api. It is important to have a timeout on the lock, so it will be released even if the process crashes. A process that doesn't get the lock should not wait for the lock, but should wait and try using the cached data instead. Usage:: def get_weather_data(): try: return cache.get('weatherdata') except cache.DoesNotExist: with fetch_lock('weatherapi') as should_fetch: if should_fetch: weatherdata = fetch_weather_data() cache.put('weatherdata', weatherdata, 60) return weatherdata else: # another process is fetching data, wait for it time.sleep(1) return cache.get_value('weatherdata', default=None) """ if not is_valid_identifier(apiname): raise ValueError( f'`apiname` must be a valid lower-case python identifier, ' f'got {apiname}' ) key = f'dkredis:fetchlock:{apiname}' uniq = unique_id() r = cn or connect() if r.set(key, value=uniq, ex=timeout, nx=True): # We have the lock: # if set(..nx=True) returns True, then our value was set, and we have # the lock, yield to the context, then exit. try: yield True # the client should do the fetch finally: # Release the lock: # The lock can have timed out while we were in the context, so we # need to check that we still have the lock before deleting it. # The get + del needs to be atomic, so we have to use a lua script. # See https://redis.io/commands/eval remove_if(key, uniq, cn=r) return else: # Lock is already held by another process yield False # the client should not do the fetch
[docs] def rate_limiting_lock(resources, seconds=30, cn=None): """Lock all the keys and keep them locked for ``seconds`` seconds. Useful e.g. to prevent sending email to the same domain more often than every 15 seconds. XXX: Currently doesn't recover from crashed clients (can be done as an else: clause to the if r.msetnx(), similarly to the mutex function (below). """ if not resources: return True resources = [convert_to_bytes(r) for r in resources] keys = {b'rl-lock.' + r: later(seconds) for r in resources} r = cn or connect() if r.msetnx(keys): with r.pipeline() as pipe: for key in keys: pipe.expire(key, seconds) pipe.execute() return True return False
# XXX: [bp-2023-12-17] No idea what this is supposed to be used for, but it is # definitely not a mutex implementation...
[docs] @contextmanager def mutex(name, seconds: int = 30, timeout: int = 60, unlock: bool = True, waitsecs: int = 3): """Lock the ``name`` for ``seconds``, waiting ``waitsecs`` seconds between each attempt at locking the name. Locking means creating a key 'dkredis:mutex:' + key. It will raise a Timeout exception if more than ``timeout`` seconds has elapsed. Usage:: from dkredis import dkredis with dkredis.lock('mymutex'): # mutual exclusion zone ;-) """ # the various time.time() calls can happen at different times. if timeout == 0: timeout = 60 * 60 # 1 hour prefix = 'dkredis:mutex:' start = now() r = connect() k = f'{prefix}{name}' expire = start try: while 1: if start + timeout < now(): raise Timeout() expire = later(seconds) if r.setnx(k, expire): # we have the lock, yield to the context, then exit. yield break # we didn't get the lock, but it exists... if float(r.get(k)) > now(): # the lock is still valid (someone else has the lock). time.sleep(waitsecs) else: # lock has expired, try to grab it... expire = later(timeout) ts = float(r.getset(k, expire)) if ts < now(): # we won, yield to the context, then exit. yield break # else start again, from the top. finally: if unlock and expire < now(): # we should unlock, and our lock hasn't expired. r.delete(k)