Source code for dkredis.utils

import threading
import time
import uuid


[docs] def later(n=0.0): """Return timestamp ``n`` seconds from now. """ return time.time() + n
[docs] def now(): """Return timestamp. """ return later(0)
_last_ts = 0
[docs] def unique_id(fast=True): """Return a unique id. """ if not fast: # this picks up randomness from /dev/urandom which can be very slow. return str(uuid.uuid4().hex) # machine-id:thread-id:time-in-ns global _last_ts _ts = time.time_ns() + _last_ts _last_ts += 1 return f'{uuid.getnode()}:{threading.get_ident()}:{_ts}'
[docs] def is_valid_identifier(s: str) -> bool: """Return True if s is a valid python identifier. """ return s.isidentifier() and s.islower()
[docs] def convert_to_bytes(r): """Converts the input object to bytes. Parameters: r (object): The input object to convert. Returns: bytes: The converted object as bytes. If the input object is already of type 'bytes', it is returned as is. If the input object is of type 'str', it is encoded to bytes using the 'utf-8' encoding. For any other input object, it is converted to a string and then encoded to bytes using the 'utf-8' encoding. """ if isinstance(r, bytes): return r if isinstance(r, str): return r.encode('utf-8') return str(r).encode('utf-8')