dkredis - Python interface to Redis¶
A thin convenience wrapper around redis-py for storing Python values (via pickle), working with Redis hashes, performing atomic updates, and managing distributed locking primitives.
Installation¶
pip install dkredis
Usage¶
Connecting¶
from dkredis import dkredis
r = dkredis.connect()
connect() reads REDIS_HOST and REDIS_PASSWORD from environment
variables, defaulting to localhost:6379.
Storing Python values¶
Any pickleable value can be stored and retrieved:
from dkredis.dkredis import set_pyval, get_pyval, pop_pyval
set_pyval('mykey', {'answer': 42}, secs=300) # expires in 5 minutes
get_pyval('mykey') # {'answer': 42}
pop_pyval('mykey') # {'answer': 42}, then deletes the key
Dict / hash operations¶
Store and retrieve dicts as Redis hashes (string values only):
from dkredis.dkredis import set_dict, get_dict
set_dict('user:1', {'name': 'Alice', 'role': 'admin'}, secs=600)
get_dict('user:1') # {'name': 'Alice', 'role': 'admin'}
Atomic updates¶
update() uses WATCH/MULTI pipelines for optimistic locking:
from dkredis.dkredis import update, setmax, setmin
r.set('counter', 40)
update('counter', lambda val: val + 2) # atomically set to 42
setmax('highscore', b'100') # r[key] := max(r[key], val)
setmin('lowscore', b'5') # r[key] := min(r[key], val)
remove_if() atomically deletes a key only if it holds an expected value
(implemented via a Lua script):
from dkredis.dkredis import remove_if
remove_if('mykey', expected_value)
Locking primitives¶
fetch_lock – prevents thundering-herd on cache misses. Only one process
fetches fresh data; others receive False and should fall back to stale
cache:
from dkredis.dkredislocks import fetch_lock
with fetch_lock('weatherapi', timeout=10) as should_fetch:
if should_fetch:
data = call_external_api()
cache.set('weather', data, 60)
else:
time.sleep(1)
data = cache.get('weather')
rate_limiting_lock – sets multiple keys atomically with MSETNX to
enforce a per-resource cooldown period:
from dkredis.dkredislocks import rate_limiting_lock
if rate_limiting_lock(['smtp.example.com'], seconds=15):
send_email()
mutex – a polling mutex using SETNX with expiry-based recovery:
from dkredis.dkredislocks import mutex
with mutex('mylock', seconds=30, timeout=60):
# mutual exclusion zone
...
Multi-hash field lookup¶
mhkeyget() fetches one field from all hashes matching a key pattern:
from dkredis.dkredis import mhkeyget
mhkeyget('lock.*', 'x')
# {'lock.a': '1', 'lock.b': '2', 'lock.c': '3'}
Requirements¶
Python 3
redis==5.0.1A running Redis server (
localhost:6379by default)
Module contents¶
Submodules¶
dkredis.dkredis module¶
Interface to our redis instance.
Redis command reference: http://redis.io/commands
Usage:
from dkredis import dkredis
r = dkredis.connect()
r.set('foo', 'bar')
r.get('foo')
r.delete('foo')
Python interface peculiarities are explained here: https://github.com/andymccurdy/redis-py
The windows version of the redis server that we use is from https://github.com/rgl/redis/downloads —-
- dkredis.dkredis.connect(host=None, port=6379, db=0, password=None)[source]¶
Return a connection to the redis server.
- dkredis.dkredis.mhkeyget(keypattern, field, cn=None)[source]¶
Get a field from multiple hashes.
Usage:
>>> r = connect() >>> r.hset('lock.a', {'x': 1}) True >>> r.hset('lock.b', {'x': 2}) True >>> r.hset('lock.c', {'x': 3}) True >>> mhkeyget('lock.*', 'x') {'lock.a': '1', 'lock.c': '3', 'lock.b': '2'} # cleanup >>> #r.delete('lock.a', 'lock.b', 'lock.c') True
- dkredis.dkredis.remove_if(key, val, cn=None)[source]¶
Atomically remove key if it has the value val.
- dkredis.dkredis.set_dict(key, dictval, secs=None, cn=None)[source]¶
All values in dictval should be strings. They’ll be read back as strings – use py_setval to set dicts with any values.
- dkredis.dkredis.set_pyval(key, val, secs=None, cn=None)[source]¶
Store any (picleable) value in Redis.
- dkredis.dkredis.setmax(key, val, cn=None)[source]¶
Update key with the max of the current value and val:
r[key] := max(r[key], val)
returns the maximum value.
dkredis.dkredislocks module¶
- dkredis.dkredislocks.fetch_lock(apiname: str, timeout=5, cn=None)[source]¶
Use this lock to ensure that only one process is fetching expired cached data from an external api.
It is important to have a timeout on the lock, so it will be released even if the process crashes.
A process that doesn’t get the lock should not wait for the lock, but should wait and try using the cached data instead.
Usage:
def get_weather_data(): try: return cache.get('weatherdata') except cache.DoesNotExist: with fetch_lock('weatherapi') as should_fetch: if should_fetch: weatherdata = fetch_weather_data() cache.put('weatherdata', weatherdata, 60) return weatherdata else: # another process is fetching data, wait for it time.sleep(1) return cache.get_value('weatherdata', default=None)
- dkredis.dkredislocks.mutex(name, seconds: int = 30, timeout: int = 60, unlock: bool = True, waitsecs: int = 3)[source]¶
Lock the
nameforseconds, waitingwaitsecsseconds between each attempt at locking the name. Locking means creating a key ‘dkredis:mutex:’ + key.It will raise a Timeout exception if more than
timeoutseconds has elapsed.Usage:
from dkredis import dkredis with dkredis.lock('mymutex'): # mutual exclusion zone ;-)
- dkredis.dkredislocks.rate_limiting_lock(resources, seconds=30, cn=None)[source]¶
Lock all the keys and keep them locked for
secondsseconds. Useful e.g. to prevent sending email to the same domain more often than every 15 seconds.- XXX: Currently doesn’t recover from crashed clients (can be done as
an else: clause to the if r.msetnx(), similarly to the mutex function (below).
dkredis.rediscache module¶
Object cache implementation using redis as a backend.
- class dkredis.rediscache.Cached[source]¶
Bases:
objectMixin class to invalidate cache keys on model.save().
Usage:
class MyModel(Cached, models.Model): # models.Model must be last @property def cache_keys(self): return [...] ...
(that’s it. All cache keys will be removed whenever MyModel.save() is called).
- cache_keys = []¶
- class dkredis.rediscache.cache[source]¶
Bases:
objectPython value cache.
Usage:
from dkredis.rediscache import cache try: v = cache.get(key) except cache.DoesNotExist: v = mk_object_value(...) cache.put( key, v, duration=secs) # or datetime.duration()
you can use a datetime value for the valid_until parameter, or anything the timeperiod.when() function accepts.
- dkredis.rediscache.cached(cache_key=None, timeout=3600)[source]¶
Function result cache decorator.
Usage:
@cached() def can_view_user(user, username): ... return True # will be cached for 1hr (3600 secs) class MenuItem(models.Model): @classmethod @cached('menu_root', 3600*24) def get_root(self): return MenuItem.objects.get(pk=1) @cached(lambda u: 'user_privileges_%s' % u.username, 3600) def get_user_privileges(user): #...
dkredis.utils module¶
- dkredis.utils.convert_to_bytes(r)[source]¶
Converts the input object to bytes.
- Parameters:
r (object): The input object to convert.
- Returns:
- bytes: The converted object as bytes. If the input object is
already of type ‘bytes’, it is returned as is.
If the input object is of type ‘str’, it is encoded to bytes using the ‘utf-8’ encoding.
For any other input object, it is converted to a string and then encoded to bytes using the ‘utf-8’ encoding.