Source code for redisent.models

from __future__ import annotations

import logging
import pickle
import json

from dataclasses import dataclass, field, fields, asdict, Field, InitVar
from tabulate import tabulate
from typing import Mapping, Any, Optional, MutableMapping, cast, ClassVar

from redisent import RedisentHelper
from redisent.errors import RedisError

logger = logging.getLogger(__name__)


[docs]@dataclass() class RedisEntry: """ Base dataclass that should be inherited from with additional :py:func:`dataclasses.field` s for each attribute the entry will store. All ``RedisEntry`` instances must define a unique-to-Redis value for ``redis_id``. If the entry is going to be stored as a hash-map, the class must also define a value for ``redis_name``. If the ``redis_name`` attribute is set, at a high level, the storing and fetching of values looks like this: .. code-block:: ipython In [1]: import pickle ...: import redis ...: ...: # Setup some dummy values ...: redis_id = 'blah' ...: redis_name = 'entry_one' ...: ent_values = {'value_one': 1, 'value_two': 2} In [2]: # Pickle the dictionary data to bytes ...: ent_pickle = pickle.dumps(ent_values) In [3]: # Build Redis connection ...: conn = redis.StrictRedis('localhost') In [4]: # Store them ...: conn.hset(redis_id, redis_name, ent_pickle) Out[4]: 1 In [5]: # Fetch back the raw pickled values ...: res_raw = conn.hget(redis_id, redis_name) In [6]: # Assert they equal the values we set (obviously this should be true) ...: assert res_raw == ent_pickle In [7]: # Now, decode the response ...: res_values = pickle.loads(res_raw) In [8]: # And finally assert that "res_values" is the same as the original "ent_values" ...: assert res_values == ent_values In [9]: res_values Out[9]: {'value_one': 1, 'value_two': 2} """ # Redis ID -- class-level attribute redis_id: ClassVar[str] # Redis Hash Key == optional instance attribute redis_name: Optional[str] = field(default=False, init=False, metadata={'internal_field': True}) def __post_init__(self, *args, redis_name: str = None, **kwargs) -> None: if redis_name: self.redis_name = redis_name
[docs] def dump(self, include_internal_fields: bool = False) -> str: """ Helper for dumping a textual representation of a particular :py:class:`redisent.models.RedisEntry` instance """ dump_out = f'RedisEntry ({type(self).__name__}) for key "{self.redis_id}"' if self.redis_name: dump_out = f'{dump_out}, hash entry "{self.redis_name}":' entry_attrs = self.get_entry_fields(include_internal_fields=include_internal_fields) entry_data = [[attr, getattr(self, attr)] for attr in entry_attrs] tbl = tabulate(entry_data, headers=['Attribute', 'Value'], tablefmt='presto') dump_out += f'\n\n{tbl}' return dump_out
[docs] @classmethod def get_entry_fields(cls, include_internal_fields: bool = False) -> Mapping[str, Field]: """ Class method used for building a list of strings for each field name, based on the provided filering attributes :param include_internal_fields: if set, include internal fields which are used by ``redisent`` only (any marked with metadata attribute ``internal_field``) """ return {fld.name: fld for fld in fields(cls) if include_internal_fields or not fld.metadata.get('internal_field', False)}
@property def is_hashmap(self) -> bool: """ Property helper to determine if this entry is a hash-map or not This is simply determined by if ``redis_name`` is set or not. """ return True if self.redis_name else False
[docs] @classmethod def load_dict(cls, redis_id: str = None, redis_name: str = None, **ent_kwargs) -> RedisEntry: """ Class method for loading a RedisEntry from a provided dictionary of values :param redis_id: unique Redis ID for entry :param redis_name: unique Redis hashmap name (if entity is stored as a hashmap, this is required) :param ent_kwargs: keyword arguments used to build entry values """ redis_id = redis_id or cls.redis_id if not redis_name: if 'redis_name' in ent_kwargs: redis_name = ent_kwargs.pop('redis_name') ent_fields = cls.get_entry_fields(include_internal_fields=False) cls_kwargs: MutableMapping[str, Any] = {attr: ent_kwargs[attr] for attr in ent_fields if attr in ent_kwargs} if redis_name: cls_kwargs['redis_name'] = redis_name return cls(**cls_kwargs)
[docs] def as_dict(self, include_internal_fields: bool = False, use_json: bool = False, ignore_invalid: bool = True) -> Mapping[str, Any]: """ Return a mapping representing this entry by making use of :py:func:`dataclasses.asdict` along with optionally excluding any Redis-related (or internal) fields. By default no internal or redis fields (i.e. ``redis_id`` or ``redis_name``) are returned :param include_internal_fields: if set, include internal fields which are used by ``redisent`` only (any marked with metadata attribute ``internal_field``) """ ent_dict = asdict(self) if not use_json: if include_internal_fields: return ent_dict flds = self.get_entry_fields(include_internal_fields=include_internal_fields) return {attr: value for attr, value in ent_dict.items() if attr in flds} ent_json = {} for attr, value in ent_dict.items(): try: if isinstance(value, RedisEntry): json_val = value.as_dict(include_internal_fields=include_internal_fields, use_json=use_json, ignore_invalid=ignore_invalid) else: json_val = json.dumps(value) ent_json[attr] = json_val except Exception as ex: if not ignore_invalid: raise ex continue return ent_json
[docs] @classmethod def decode_entry(cls, entry_bytes, use_redis_id: str = None, use_redis_name: str = None): """ Class method for attempting to build a :py:class:`redisent.models.RedisEntry` instance from the provided ``bytes`` value ``entry_bytes`` Under the hood, this makes use of :py:func:`pickle.loads` and :py:class:`redisent.models.RedisEntry.load_dict` to actually attempt to build the entry while catching any related exceptions and propagating them as :py:exc:`redisent.errors.RedisError` exceptions. """ try: ent: MutableMapping[str, Any] = pickle.loads(entry_bytes) if isinstance(ent, Mapping): redis_id = ent.pop('redis_id', cls.redis_id) redis_id = use_redis_id or redis_id if not redis_id: raise RedisError('Unable to convert dictionary from Redis into RedisEntry (no value for "redis_id" found)') redis_name = ent.pop('redis_name', None) redis_name = use_redis_name or redis_name return cls.load_dict(redis_id, redis_name=redis_name, **ent) elif not isinstance(ent, RedisEntry): raise RedisError('Decoded entry is neither a dictionary nor a Mapping') return ent except RedisError as ex: raise ex except pickle.PickleError as ex: err_message = f'Error decoding entry using pickle: {ex}' logger.exception(err_message) raise RedisError(err_message, base_exception=ex) except Exception as ex: err_message = f'General error while attempting to decode possible RedisEntry: {ex}' logger.exception(err_message) raise RedisError(err_message, base_exception=ex)
[docs] @classmethod def encode_entry(cls, entry: RedisEntry, as_mapping: bool = None) -> bytes: """ Class method for encoding a given :py:class:`redisent.models.RedisEntry` instance as ``bytes`` using the :py:func:`pickle.dumps` method. :param entry: the :py:class:`redisent.models.RedisEntry` instance to be encoded :param as_mapping: if provided, ``entry`` will be treated as a Redis hashmap entry. otherwise, the default behavior is to check :py:attr:`RedisEntry.redis_name` """ if as_mapping is None: as_mapping = True if entry.redis_name else False try: return pickle.dumps(entry.as_dict(include_internal_fields=False) if as_mapping is True else entry) except Exception as ex: ent_str = f' (entry name: "{entry.redis_name}")' if entry.redis_name else '' raise Exception(f'Error encoding entry for "{entry.redis_id}"{ent_str} using pickle: {ex}')
[docs] def store(self, helper: RedisentHelper) -> bool: """ Blocking / synchronous method for storing this entry in Redis, using the provided :py:class:`redisent.helpers.RedisentHelper` instance. This method will do the actual encoding using the :py:func:`RedisEntry.encode_entry` method as well as make use of the provided helper :py:func:`redisent.helpers.RedisentHelper.wrapped_redis` (actually, this method makes use of the :py:func:`redisent.helpers.RedisentHelper.wrapped_redis`) context manager for storing the entry in Redis. :param helper: configured instance of :py:class:`redisent.helpers.RedisentHelper` to be used for storing entry """ entry_bytes = self.encode_entry(self) return helper.set(self.redis_id, entry_bytes, redis_name=self.redis_name)
[docs] @classmethod def fetch_all(cls, helper: RedisentHelper, redis_id: str, check_exists: bool = True) -> Mapping[str, RedisEntry]: """ Method for fetching **all** entries for a given hashmap from Redis, using the provided :py:class:`redisent.helpers.RedisentHelper` instance. Under the hood, this method will call the :py:func:`redisent.helpers.RedisentHelper.keys` method to enumerate all of the hashmap entry keys and iteratively fetch them and return a mapping of hash keys to ``RedisEntry`` instances. :param helper: configured instance of :py:class:`redisent.helpers.RedisentHelper` to be used for fetching entries :param redis_id: unique Redis ID for hash map entries :param check_exists: if set, check first that there is an existing Redis entry to fetch. If not, a :py:exc:`RedisEntry` is raised """ if check_exists and not helper.exists(redis_id): raise RedisError(f'Failed to find entry for "{redis_id}" in Redis while fetching all instances') ent_keys = helper.keys(redis_id=redis_id) return {e_key: cls.fetch(helper, redis_id=redis_id, redis_name=e_key) for e_key in ent_keys}
[docs] @classmethod def fetch(cls, helper: RedisentHelper, redis_id: str, redis_name: str = None, check_exists: bool = True) -> RedisEntry: """ Method for fetching entries from Redis, using the provided :py:class:`redisent.helpers.RedisentHelper` instance. This method will do the actual decoding using the :py:func:`RedisEntry.decode_entry` method after fetching the ``bytes`` value from Redis using the helper-provided :py:func:`redisent.helpers.RedisentHelper.wrapped_redis` (actually, this method makes use of the :py:func:`redisent.helpers.RedisentHelper.wrapped_redis`) context manager for actually fetching from Redis. :param helper: configured instance of :py:class:`redisent.helpers.RedisentHelper` to be used for storing entry :param redis_id: unique Redis ID for entry :param redis_name: unique Redis hashmap name (if entity is stored as a hashmap, this is required) :param check_exists: if set, check first that there is an existing Redis entry to fetch. If not, a :py:exc:`RedisEntry` is raised """ red_ent = f'hash entry for "{redis_name}" in "{redis_id}"' if redis_name else f'entry "{redis_id}"' if check_exists and not helper.exists(redis_id, redis_name=redis_name): raise RedisError(f'Failed to find {red_ent} was not found in Redis') entry_bytes = helper.get(redis_id, redis_name=redis_name) if not entry_bytes: raise RedisError(f'Failure during fetch of {red_ent}: No data returned') return cast(RedisEntry, cls.decode_entry(entry_bytes))
[docs] def delete(self, helper: RedisentHelper, check_exists: bool = True) -> Optional[bool]: """ Method responsible for actually deleting a RedisEntry from Redis :param helper: configured instance of :py:class:`redisent.helpers.RedisentHelper` to be used to delete the entry :param check_exists: if set, check first that there is an existing Redis entry for this instance """ return helper.delete(self.redis_id, redis_name=self.redis_name, check_exists=check_exists)