dkredis - Python interface to Redis

Latest PyPI version CI/CD Pipeline Documentation Status https://codecov.io/gh/datakortet/dkredis/branch/master/graph/badge.svg

A thin convenience wrapper around redis-py for storing Python values (via pickle), working with Redis hashes, performing atomic updates, and managing distributed locking primitives.

Installation

pip install dkredis

Usage

Connecting

from dkredis import dkredis

r = dkredis.connect()

connect() reads REDIS_HOST and REDIS_PASSWORD from environment variables, defaulting to localhost:6379.

Storing Python values

Any pickleable value can be stored and retrieved:

from dkredis.dkredis import set_pyval, get_pyval, pop_pyval

set_pyval('mykey', {'answer': 42}, secs=300)  # expires in 5 minutes
get_pyval('mykey')          # {'answer': 42}
pop_pyval('mykey')          # {'answer': 42}, then deletes the key

Dict / hash operations

Store and retrieve dicts as Redis hashes (string values only):

from dkredis.dkredis import set_dict, get_dict

set_dict('user:1', {'name': 'Alice', 'role': 'admin'}, secs=600)
get_dict('user:1')          # {'name': 'Alice', 'role': 'admin'}

Atomic updates

update() uses WATCH/MULTI pipelines for optimistic locking:

from dkredis.dkredis import update, setmax, setmin

r.set('counter', 40)
update('counter', lambda val: val + 2)  # atomically set to 42

setmax('highscore', b'100')  # r[key] := max(r[key], val)
setmin('lowscore', b'5')     # r[key] := min(r[key], val)

remove_if() atomically deletes a key only if it holds an expected value (implemented via a Lua script):

from dkredis.dkredis import remove_if

remove_if('mykey', expected_value)

Locking primitives

fetch_lock – prevents thundering-herd on cache misses. Only one process fetches fresh data; others receive False and should fall back to stale cache:

from dkredis.dkredislocks import fetch_lock

with fetch_lock('weatherapi', timeout=10) as should_fetch:
    if should_fetch:
        data = call_external_api()
        cache.set('weather', data, 60)
    else:
        time.sleep(1)
        data = cache.get('weather')

rate_limiting_lock – sets multiple keys atomically with MSETNX to enforce a per-resource cooldown period:

from dkredis.dkredislocks import rate_limiting_lock

if rate_limiting_lock(['smtp.example.com'], seconds=15):
    send_email()

mutex – a polling mutex using SETNX with expiry-based recovery:

from dkredis.dkredislocks import mutex

with mutex('mylock', seconds=30, timeout=60):
    # mutual exclusion zone
    ...

Multi-hash field lookup

mhkeyget() fetches one field from all hashes matching a key pattern:

from dkredis.dkredis import mhkeyget

mhkeyget('lock.*', 'x')
# {'lock.a': '1', 'lock.b': '2', 'lock.c': '3'}

Requirements

  • Python 3

  • redis==5.0.1

  • A running Redis server (localhost:6379 by default)

Module contents

Submodules

dkredis.dkredis module

Interface to our redis instance.

Redis command reference: http://redis.io/commands

Usage:

from dkredis import dkredis
r = dkredis.connect()
r.set('foo', 'bar')
r.get('foo')
r.delete('foo')

Python interface peculiarities are explained here: https://github.com/andymccurdy/redis-py

The windows version of the redis server that we use is from https://github.com/rgl/redis/downloads —-

exception dkredis.dkredis.Timeout(retry=0)[source]

Bases: Exception

A timout limit was exceeded.

dkredis.dkredis.connect(host=None, port=6379, db=0, password=None)[source]

Return a connection to the redis server.

dkredis.dkredis.get_dict(key, cn=None)[source]

Return a redis hash as a python dict.

dkredis.dkredis.get_pyval(key, cn=None, missing_value=None)[source]

Get a Python value from Redis.

dkredis.dkredis.mhkeyget(keypattern, field, cn=None)[source]

Get a field from multiple hashes.

Usage:

>>> r = connect()
>>> r.hset('lock.a', {'x': 1})
True
>>> r.hset('lock.b', {'x': 2})
True
>>> r.hset('lock.c', {'x': 3})
True
>>> mhkeyget('lock.*', 'x')
{'lock.a': '1', 'lock.c': '3', 'lock.b': '2'}

# cleanup
>>> #r.delete('lock.a', 'lock.b', 'lock.c')
True
dkredis.dkredis.pop_pyval(key, cn=None)[source]

Get value and remove key.

dkredis.dkredis.remove(key, cn=None)[source]

Remove a key from redis.

dkredis.dkredis.remove_if(key, val, cn=None)[source]

Atomically remove key if it has the value val.

dkredis.dkredis.set_dict(key, dictval, secs=None, cn=None)[source]

All values in dictval should be strings. They’ll be read back as strings – use py_setval to set dicts with any values.

dkredis.dkredis.set_pyval(key, val, secs=None, cn=None)[source]

Store any (picleable) value in Redis.

dkredis.dkredis.setmax(key, val, cn=None)[source]

Update key with the max of the current value and val:

r[key] := max(r[key], val)

returns the maximum value.

dkredis.dkredis.setmin(key, val, cn=None)[source]

Update key with the max of the current value and val:

r[key] := min(r[key], val)

returns the maximum value.

dkredis.dkredis.update(key, fn, cn=None)[source]

Usage

update(KEY, lambda val: val + 42)

dkredis.dkredislocks module

dkredis.dkredislocks.fetch_lock(apiname: str, timeout=5, cn=None)[source]

Use this lock to ensure that only one process is fetching expired cached data from an external api.

It is important to have a timeout on the lock, so it will be released even if the process crashes.

A process that doesn’t get the lock should not wait for the lock, but should wait and try using the cached data instead.

Usage:

def get_weather_data():
    try:
        return cache.get('weatherdata')
    except cache.DoesNotExist:
        with fetch_lock('weatherapi') as should_fetch:
            if should_fetch:
                weatherdata = fetch_weather_data()
                cache.put('weatherdata', weatherdata, 60)
                return weatherdata
            else:
                # another process is fetching data, wait for it
                time.sleep(1)
                return cache.get_value('weatherdata', default=None)
dkredis.dkredislocks.mutex(name, seconds: int = 30, timeout: int = 60, unlock: bool = True, waitsecs: int = 3)[source]

Lock the name for seconds, waiting waitsecs seconds between each attempt at locking the name. Locking means creating a key ‘dkredis:mutex:’ + key.

It will raise a Timeout exception if more than timeout seconds has elapsed.

Usage:

from dkredis import dkredis

with dkredis.lock('mymutex'):
    # mutual exclusion zone ;-)
dkredis.dkredislocks.rate_limiting_lock(resources, seconds=30, cn=None)[source]

Lock all the keys and keep them locked for seconds seconds. Useful e.g. to prevent sending email to the same domain more often than every 15 seconds.

XXX: Currently doesn’t recover from crashed clients (can be done as

an else: clause to the if r.msetnx(), similarly to the mutex function (below).

dkredis.rediscache module

Object cache implementation using redis as a backend.

class dkredis.rediscache.Cached[source]

Bases: object

Mixin class to invalidate cache keys on model.save().

Usage:

class MyModel(Cached, models.Model):   # models.Model must be last
    @property
    def cache_keys(self):
        return [...]
    ...

(that’s it. All cache keys will be removed whenever MyModel.save() is called).

cache_keys = []
get_cache_values()[source]

Get all cached values for this object.

Note

returns None both for keys that are None and non-existant keys!

save(*args, **kwargs)[source]
class dkredis.rediscache.cache[source]

Bases: object

Python value cache.

Usage:

from dkredis.rediscache import cache

try:
    v = cache.get(key)
except cache.DoesNotExist:
    v = mk_object_value(...)
    cache.put(
        key, v,
        duration=secs)  # or datetime.duration()

you can use a datetime value for the valid_until parameter, or anything the timeperiod.when() function accepts.

exception DoesNotExist[source]

Bases: Exception

Value not in cache (possibly due to expiration).

classmethod get(key)[source]

Fetch value for key from redis.

classmethod get_value(key, default=None)[source]
classmethod ping()[source]
classmethod put(key, value, duration=None)[source]

Put value in cache, under key, for duration seconds.

static rediskey(key)[source]

The redis key is obj-cache. + the md5 hexdigest of its serialization.

classmethod remove(key)[source]

Remove key from cache.

dkredis.rediscache.cached(cache_key=None, timeout=3600)[source]

Function result cache decorator.

Usage:

@cached()
def can_view_user(user, username):
    ...
    return True   # will be cached for 1hr (3600 secs)

class MenuItem(models.Model):
    @classmethod
    @cached('menu_root', 3600*24)
    def get_root(self):
        return MenuItem.objects.get(pk=1)

@cached(lambda u: 'user_privileges_%s' % u.username, 3600)
def get_user_privileges(user):
    #...
class dkredis.rediscache.djangocache[source]

Bases: object

Django facade to the rediscache.

classmethod get(key, default=None)[source]
classmethod set(key, value, duration)[source]
dkredis.rediscache.writeln(*args, **kw)[source]

dkredis.utils module

dkredis.utils.convert_to_bytes(r)[source]

Converts the input object to bytes.

Parameters:

r (object): The input object to convert.

Returns:
bytes: The converted object as bytes. If the input object is

already of type ‘bytes’, it is returned as is.

If the input object is of type ‘str’, it is encoded to bytes using the ‘utf-8’ encoding.

For any other input object, it is converted to a string and then encoded to bytes using the ‘utf-8’ encoding.

dkredis.utils.is_valid_identifier(s: str) bool[source]

Return True if s is a valid python identifier.

dkredis.utils.later(n=0.0)[source]

Return timestamp n seconds from now.

dkredis.utils.now()[source]

Return timestamp.

dkredis.utils.unique_id(fast=True)[source]

Return a unique id.

Indices and tables