Source code for redisent.models

from __future__ import annotations

import logging
import pickle

from dataclasses import dataclass, field, fields, asdict, Field
from tabulate import tabulate
from typing import Mapping, Any, List, Optional, MutableMapping, cast

from redisent import RedisentHelper
from redisent.errors import RedisError

logger = logging.getLogger(__name__)


[docs]@dataclass() class RedisEntry: """ Base dataclass that should be inherited from with additional :py:func:`dataclasses.field` s for each attribute the entry will store. All ``RedisEntry`` instances must define a unique-to-Redis value for ``redis_id``. If the entry is going to be stored as a hash-map, the class must also define a value for ``redis_name``. If the ``redis_name`` attribute is set, at a high level, the storing and fetching of values looks like this: .. code-block:: ipython In [1]: import pickle ...: import redis ...: ...: # Setup some dummy values ...: redis_id = 'blah' ...: redis_name = 'entry_one' ...: ent_values = {'value_one': 1, 'value_two': 2} In [2]: # Pickle the dictionary data to bytes ...: ent_pickle = pickle.dumps(ent_values) In [3]: # Build Redis connection ...: conn = redis.StrictRedis('localhost') In [4]: # Store them ...: conn.hset(redis_id, redis_name, ent_pickle) Out[4]: 1 In [5]: # Fetch back the raw pickled values ...: res_raw = conn.hget(redis_id, redis_name) In [6]: # Assert they equal the values we set (obviously this should be true) ...: assert res_raw == ent_pickle In [7]: # Now, decode the response ...: res_values = pickle.loads(res_raw) In [8]: # And finally assert that "res_values" is the same as the original "ent_values" ...: assert res_values == ent_values In [9]: res_values Out[9]: {'value_one': 1, 'value_two': 2} """ redis_id: str = field(metadata={'redis_field': True}) #: Redis ID for this entry redis_name: Optional[str] = field(default_factory=str, metadata={'redis_field': True}) #: Optional Redis hashmap name
[docs] def dump(self, include_redis_fields: bool = True) -> str: """ Helper for dumping a textual representation of a particular :py:class:`redisent.models.RedisEntry` instance """ dump_out = f'RedisEntry ({type(self).__name__}) for key "{self.redis_id}"' if self.redis_name: dump_out = f'{dump_out}, hash entry "{self.redis_name}":' entry_attrs = self.get_entry_fields(include_redis_fields=include_redis_fields, include_internal_fields=False) entry_data = [[attr, getattr(self, attr)] for attr in entry_attrs] tbl = tabulate(entry_data, headers=['Attribute', 'Value'], tablefmt='presto') dump_out += f'\n\n{tbl}' return dump_out
[docs] @classmethod def get_entry_fields(cls, include_redis_fields: bool = False, include_internal_fields: bool = False) -> Mapping[str, Field]: """ Class method used for building a list of strings for each field name, based on the provided filering attributes :param include_redis_fields: if set, include fields with metadata indicating they are Redis-related fields (i.e. ``redis_id`` or ``redis_name``) :param include_internal_fields: if set, include internal fields which are used by ``redisent`` only (any marked with metadata attribute ``internal_field``) """ flds = {} for fld in fields(cls): is_redis_fld = fld.metadata.get('redis_field', False) is_int_fld = fld.metadata.get('internal_field', False) if is_redis_fld and not include_redis_fields: continue if is_int_fld and not include_internal_fields: continue if not fld.init: continue flds[fld.name] = fld return flds
@property def entry_fields(self) -> List[str]: """ Property for returning a list of entry-only related fields This method will not include any fields marked with metadata attributes ``redis_field`` or ``internal_field`` and thus will only return fields related to this specific entry's dataclass definition """ return list(self.get_entry_fields(include_redis_fields=False, include_internal_fields=False).keys()) @property def is_hashmap(self) -> bool: """ Property helper to determine if this entry is a hash-map or not This is simply determined by if ``redis_name`` is set or not. """ return True if self.redis_name else False
[docs] @classmethod def load_dict(cls, redis_id: str, redis_name: str = None, **ent_kwargs) -> RedisEntry: """ Class method for loading a RedisEntry from a provided dictionary of values :param redis_id: unique Redis ID for entry :param redis_name: unique Redis hashmap name (if entity is stored as a hashmap, this is required) :param ent_kwargs: keyword arguments used to build entry values """ if not redis_name: if 'redis_name' in ent_kwargs: redis_name = ent_kwargs.pop('redis_name') ent_fields = cls.get_entry_fields(include_redis_fields=False, include_internal_fields=False) cls_kwargs: MutableMapping[str, Any] = {attr: ent_kwargs[attr] for attr in ent_fields if attr in ent_kwargs} cls_kwargs['redis_id'] = redis_id if redis_name: cls_kwargs['redis_name'] = redis_name return cls(**cls_kwargs)
[docs] def as_dict(self, include_redis_fields: bool = True, include_internal_fields: bool = False) -> Mapping[str, Any]: """ Return a mapping representing this entry by making use of :py:func:`dataclasses.asdict` along with optionally excluding any Redis-related (or internal) fields. By default no internal or redis fields (i.e. ``redis_id`` or ``redis_name``) are returned :param include_redis_fields: if set, include fields with metadata indicating they are Redis-related fields (i.e. ``redis_id`` or ``redis_name``) :param include_internal_fields: if set, include internal fields which are used by ``redisent`` only (any marked with metadata attribute ``internal_field``) """ ent_dict = asdict(self) if include_redis_fields and include_internal_fields: return ent_dict flds = self.get_entry_fields(include_redis_fields=include_redis_fields, include_internal_fields=include_internal_fields) return {attr: value for attr, value in ent_dict.items() if attr in flds}
[docs] @classmethod def decode_entry(cls, entry_bytes, use_redis_id: str = None, use_redis_name: str = None): """ Class method for attempting to build a :py:class:`redisent.models.RedisEntry` instance from the provided ``bytes`` value ``entry_bytes`` Under the hood, this makes use of :py:func:`pickle.loads` and :py:class:`redisent.models.RedisEntry.load_dict` to actually attempt to build the entry while catching any related exceptions and propagating them as :py:exc:`redisent.errors.RedisError` exceptions. """ try: ent: MutableMapping[str, Any] = pickle.loads(entry_bytes) if isinstance(ent, Mapping): redis_id = ent.pop('redis_id', None) redis_id = use_redis_id or redis_id if not redis_id: raise RedisError('Unable to convert dictionary from Redis into RedisEntry (no value for "redis_id" found)') redis_name = ent.pop('redis_name', None) redis_name = use_redis_name or redis_name return cls.load_dict(redis_id, redis_name=redis_name, **ent) elif not isinstance(ent, RedisEntry): raise RedisError('Decoded entry is neither a dictionary nor a Mapping') return ent except RedisError as ex: raise ex except pickle.PickleError as ex: err_message = f'Error decoding entry using pickle: {ex}' logger.exception(err_message) raise RedisError(err_message, base_exception=ex) except Exception as ex: err_message = 'General error while attempting to decode possible RedisEntry' logger.exception(f'{err_message}: {ex}') raise RedisError(err_message, base_exception=ex)
[docs] @classmethod def encode_entry(cls, entry: RedisEntry, as_mapping: bool = None) -> bytes: """ Class method for encoding a given :py:class:`redisent.models.RedisEntry` instance as ``bytes`` using the :py:func:`pickle.dumps` method. :param entry: the :py:class:`redisent.models.RedisEntry` instance to be encoded :param as_mapping: if provided, ``entry`` will be treated as a Redis hashmap entry. otherwise, the default behavior is to check :py:attr:`RedisEntry.redis_name` """ if as_mapping is None: as_mapping = True if entry.redis_name else False try: return pickle.dumps(entry.as_dict(include_redis_fields=True, include_internal_fields=False) if as_mapping is True else entry) except Exception as ex: ent_str = f' (entry name: "{entry.redis_name}")' if entry.redis_name else '' raise Exception(f'Error encoding entry for "{entry.redis_id}"{ent_str} using pickle: {ex}')
[docs] def store(self, helper: RedisentHelper) -> bool: """ Blocking / synchronous method for storing this entry in Redis, using the provided :py:class:`redisent.helpers.RedisentHelper` instance. This method will do the actual encoding using the :py:func:`RedisEntry.encode_entry` method as well as make use of the provided helper :py:func:`redisent.helpers.RedisentHelper.wrapped_redis` (actually, this method makes use of the :py:func:`redisent.helpers.RedisentHelper.wrapped_redis`) context manager for storing the entry in Redis. :param helper: configured instance of :py:class:`redisent.helpers.RedisentHelper` to be used for storing entry """ entry_bytes = self.encode_entry(self) return helper.set(self.redis_id, entry_bytes, redis_name=self.redis_name)
[docs] @classmethod def fetch_all(cls, helper: RedisentHelper, redis_id: str, check_exists: bool = True) -> Mapping[str, RedisEntry]: """ Method for fetching **all** entries for a given hashmap from Redis, using the provided :py:class:`redisent.helpers.RedisentHelper` instance. Under the hood, this method will call the :py:func:`redisent.helpers.RedisentHelper.keys` method to enumerate all of the hashmap entry keys and iteratively fetch them and return a mapping of hash keys to ``RedisEntry`` instances. :param helper: configured instance of :py:class:`redisent.helpers.RedisentHelper` to be used for fetching entries :param redis_id: unique Redis ID for hash map entries :param check_exists: if set, check first that there is an existing Redis entry to fetch. If not, a :py:exc:`RedisEntry` is raised """ if check_exists and not helper.exists(redis_id): raise RedisError(f'Failed to find entry for "{redis_id}" in Redis while fetching all instances') ent_keys = helper.keys(redis_id=redis_id) return {e_key: cls.fetch(helper, redis_id=redis_id, redis_name=e_key) for e_key in ent_keys}
[docs] @classmethod def fetch(cls, helper: RedisentHelper, redis_id: str, redis_name: str = None, check_exists: bool = True) -> RedisEntry: """ Method for fetching entries from Redis, using the provided :py:class:`redisent.helpers.RedisentHelper` instance. This method will do the actual decoding using the :py:func:`RedisEntry.decode_entry` method after fetching the ``bytes`` value from Redis using the helper-provided :py:func:`redisent.helpers.RedisentHelper.wrapped_redis` (actually, this method makes use of the :py:func:`redisent.helpers.RedisentHelper.wrapped_redis`) context manager for actually fetching from Redis. :param helper: configured instance of :py:class:`redisent.helpers.RedisentHelper` to be used for storing entry :param redis_id: unique Redis ID for entry :param redis_name: unique Redis hashmap name (if entity is stored as a hashmap, this is required) :param check_exists: if set, check first that there is an existing Redis entry to fetch. If not, a :py:exc:`RedisEntry` is raised """ red_ent = f'hash entry for "{redis_name}" in "{redis_id}"' if redis_name else f'entry "{redis_id}"' if check_exists and not helper.exists(redis_id, redis_name=redis_name): raise RedisError(f'Failed to find {red_ent} was not found in Redis') entry_bytes = helper.get(redis_id, redis_name=redis_name) if not entry_bytes: raise RedisError(f'Failure during fetch of {red_ent}: No data returned') return cast(RedisEntry, cls.decode_entry(entry_bytes))
[docs] def delete(self, helper: RedisentHelper, check_exists: bool = True) -> Optional[bool]: """ Method responsible for actually deleting a RedisEntry from Redis :param helper: configured instance of :py:class:`redisent.helpers.RedisentHelper` to be used to delete the entry :param check_exists: if set, check first that there is an existing Redis entry for this instance """ return helper.delete(self.redis_id, redis_name=self.redis_name, check_exists=check_exists)