Source code for ewokscore.hashing

import hashlib
import random
from collections.abc import Iterable
from collections.abc import Mapping
from collections.abc import Set
from typing import Any
from typing import Optional
from typing import Type
from typing import Union

import numpy
from ewoksutils.import_utils import qualname

from . import missing_data


[docs] def classhashdata(cls: Type) -> bytes: return qualname(cls).encode()
[docs] def multitype_sorted(sequence: Iterable, key=None) -> list: try: return sorted(sequence, key=key) except TypeError: pass if key is None: def key(item): return item adict = dict() for item in sequence: typename = type(key(item)).__name__ adict.setdefault(typename, list()).append(item) return [ item for _, items in sorted(adict.items(), key=lambda tpl: tpl[0]) for item in sorted(items, key=key) ]
[docs] class UniversalHash: def __init__(self, hexdigest: Union[str, bytes]): if isinstance(hexdigest, bytes): hexdigest = hexdigest.decode() if not isinstance(hexdigest, str): raise TypeError(hexdigest, type(hexdigest)) self._hexdigest = hexdigest def __hash__(self): # make it python hashable (to use in sets and dict keys) return hash(self._hexdigest) def __repr__(self): return "UniversalHash('{}')".format(self) def __str__(self): return self._hexdigest def __eq__(self, other): return str(self) == str(other) def __lt__(self, other): return str(self) < str(other)
[docs] def uhash(value, _hash=None) -> UniversalHash: """Universial hash (as opposed to python's `hash`).""" # Avoid using python's hash! bdigest = _hash is None if bdigest: _hash = hashlib.sha256() _hash.update(classhashdata(type(value))) if value is None: pass elif isinstance(value, HasUhash): _hash.update(repr(value.uhash).encode()) elif isinstance(value, UniversalHash): _hash.update(repr(value).encode()) elif isinstance(value, bytes): _hash.update(value) elif isinstance(value, str): _hash.update(value.encode()) elif isinstance(value, int): _hash.update(hex(value).encode()) elif isinstance(value, float): _hash.update(value.hex().encode()) elif isinstance(value, (numpy.ndarray, numpy.number)): _hash.update(value.tobytes()) elif isinstance(value, Mapping): lst = multitype_sorted(value.items(), key=lambda item: item[0]) if lst: keys, values = zip(*lst) else: keys = values = list() uhash(keys, _hash=_hash) uhash(values, _hash=_hash) elif isinstance(value, Set): values = multitype_sorted(value) uhash(values, _hash=_hash) elif isinstance(value, Iterable): # Ordered for v in value: uhash(v, _hash=_hash) else: # TODO: register custom types raise TypeError(f"cannot uhash {value} (type: {type(value)})") if bdigest: return UniversalHash(_hash.hexdigest())
[docs] class HasUhash: @property def uhash(self) -> Optional[UniversalHash]: raise NotImplementedError def __hash__(self): # make it python hashable (to use in sets and dict keys) uhash = self.uhash if uhash is None: return hash(id(self)) else: return hash(uhash) def __eq__(self, other): if isinstance(other, HasUhash): uhash = other.uhash elif isinstance(other, UniversalHash): uhash = other else: raise TypeError(other, type(other)) return self.uhash == uhash def _get_repr_data(self) -> dict: data = dict() uhash = self.uhash if uhash is None: data["uhash"] = None else: data["uhash"] = repr(str(uhash)) return data def __repr__(self): data = self._get_repr_data() if data: sdata = ", ".join([f"{k}={v}" for k, v in data.items()]) return f"{super().__repr__()}({sdata})" else: return super().__repr__() def __str__(self): data = self._get_repr_data() if data: sdata = ", ".join([f"{k}={v}" for k, v in data.items()]) return f"{qualname(type(self))}({sdata})" else: return qualname(type(self))
PreUhashTypes = Union[str, bytes, UniversalHash, HasUhash]
[docs] class UniversalHashable(HasUhash): """The universal hash of an instance of this class is based on: * pre-uhash * instance nonce (if any) The universal hash is equal to the pre-hash when an instance nonce is not provided. The pre-uhash is either provided or based on: * data * class nonce (class qualifier name, class version, superclass nonce) """ __CLASS_NONCE = None __VERSION = None MISSING_DATA = missing_data.MISSING_DATA def __init__( self, pre_uhash: Optional[PreUhashTypes] = None, instance_nonce: Optional[Any] = None, ): self.set_uhash_init(pre_uhash=pre_uhash, instance_nonce=instance_nonce) def __init_subclass__(subcls, version=None, **kwargs): super().__init_subclass__(**kwargs) supercls_data = subcls.class_nonce() subcls.__VERSION = version subcls_data = subcls.class_nonce_data() subcls.__CLASS_NONCE = str(uhash((subcls_data, supercls_data)))
[docs] def set_uhash_init( self, pre_uhash: Optional[PreUhashTypes] = None, instance_nonce: Optional[Any] = None, ): self.__set_pre_uhash(pre_uhash) self.__original_pre_uhash = self.__pre_uhash self.__instance_nonce = instance_nonce self.__original__instance_nonce = instance_nonce
[docs] def get_uhash_init(self, serialize=False): pre_uhash = self.__original_pre_uhash if serialize: if isinstance(pre_uhash, HasUhash): pre_uhash = str(pre_uhash.uhash) elif isinstance(pre_uhash, UniversalHash): pre_uhash = str(pre_uhash) return { "pre_uhash": pre_uhash, "instance_nonce": self.__original__instance_nonce, }
def __set_pre_uhash(self, pre_uhash): if pre_uhash is None: self.__pre_uhash = None elif isinstance(pre_uhash, (str, bytes)): self.__pre_uhash = UniversalHash(pre_uhash) elif isinstance(pre_uhash, (UniversalHash, HasUhash)): self.__pre_uhash = pre_uhash else: self.__pre_uhash = uhash(pre_uhash)
[docs] @classmethod def class_nonce(cls): return cls.__CLASS_NONCE
[docs] @classmethod def class_nonce_data(cls): return qualname(cls), cls.__VERSION
[docs] def instance_nonce(self): return self.__instance_nonce
[docs] def fix_uhash(self): """Fix the uhash when it is derived from the uhash data.""" if self.__pre_uhash is not None: return keep, self.__instance_nonce = self.__instance_nonce, None try: pre_uhash = self.uhash finally: self.__instance_nonce = keep self.__set_pre_uhash(pre_uhash)
[docs] def undo_fix_uhash(self): self.__pre_uhash = self.__original_pre_uhash
[docs] def cleanup_references(self): """Remove all references to other hashables. Side effect: fixes the uhash when it depends on another hashable. """ if isinstance(self.__pre_uhash, HasUhash): pre_uhash = self.__pre_uhash.uhash self.__pre_uhash = pre_uhash self.__original_pre_uhash = pre_uhash
@property def uhash(self) -> Optional[UniversalHash]: _uhash = self.__pre_uhash if _uhash is None: data = self._uhash_data() if missing_data.is_missing_data(data): return None cnonce = self.class_nonce() inonce = self.instance_nonce() if inonce is None: return uhash((data, cnonce)) else: return uhash((data, cnonce, inonce)) else: if isinstance(_uhash, HasUhash): _uhash = _uhash.uhash if _uhash is None: return None inonce = self.instance_nonce() if inonce is None: return _uhash else: return uhash((_uhash, inonce)) def _uhash_data(self): return self.MISSING_DATA
[docs] def uhash_randomize(self): self.__instance_nonce = random.randint(-1e100, 1e100)
[docs] def undo_randomize(self): self.__instance_nonce = self.__original__instance_nonce