from dataclasses import dataclass, fields, MISSING from enum import Enum from functools import reduce from itertools import chain from typing import ClassVar, get_origin, Union, get_args, Mapping, Any, Sequence from types import UnionType from warnings import warn JsonableDataclassArgs = {"init": True, "repr": True, "eq": True, "order": False, "frozen": True, "match_args": True, "kw_only": True, "slots": True, "weakref_slot": True} JsonableParentArgs = {"init": False, "repr": True, "eq": True, "order": False, "frozen": True, "match_args": True, "kw_only": True, "slots": True, "weakref_slot": False} @dataclass(**JsonableParentArgs) class Jsonable(object): subclass_unique_attr_dict: ClassVar[dict[str, type["Jsonable"]]] = {} subclass_attr_set: ClassVar[set[str]] = set() def __init__(self, **kwargs): raise TypeError(f"{friendly_type(type(self))} is not instantiable") @classmethod def instantiable(cls): return "__init__" in cls.__dict__ and cls.__init__ is not Jsonable.__init__ def __init_subclass__(cls): cls.subclass_attr_set = set(field.name for field in fields(cls)) cls.subclass_unique_attr_dict = dict.fromkeys(cls.subclass_attr_set, cls) if cls.instantiable() else {} bases = list(cls.__bases__) while len(bases) > 0: base = bases.pop(0) if (hasattr(base, "subclass_attr_set") and isinstance(base.subclass_attr_set, set) and hasattr(base, "subclass_unique_attr_dict") and isinstance(base.subclass_unique_attr_dict, dict)): if hasattr(base, "instantiable") and callable(base.instantiable) and base.instantiable(): warn(f"Created subclass {cls} of instantiable class {base}") bases.extend(base.__bases__) deleted_classes = {cls} if cls.instantiable() else set() for key in cls.subclass_attr_set.intersection(base.subclass_unique_attr_dict.keys()): deleted_classes.update((base.subclass_unique_attr_dict.pop(key),)) if cls.instantiable(): unique_attrs = cls.subclass_attr_set.difference(base.subclass_attr_set) base.subclass_unique_attr_dict.update(((key, cls) for key in unique_attrs)) lost_classes = deleted_classes.difference(base.subclass_unique_attr_dict.values()) if len(lost_classes) > 0: lost_class_string = "/".join(friendly_type(subclass) for subclass in lost_classes) warn(f'No unique attributes for {lost_class_string} in {friendly_type(base)}', category=RuntimeWarning, stacklevel=5) base.subclass_attr_set.update(cls.subclass_attr_set) @classmethod def from_json_object_self_only(cls, value: Mapping[str, Any]): result = {} raised = [] missing = set() unused = set(value.keys()) for field in fields(cls): if field.name not in value: if field.default is MISSING and field.default_factory is MISSING: missing.add(field.name) else: unused.remove(field.name) try: result[field.name] = convert_json_to_object(value[field.name], field.type) except Exception as ex: ex.add_note( f"when deserializing value {repr(value[field.name])} ({friendly_type(type(field.name))}) " f"for field {field.name} ({friendly_type(field.type)}) " f"of {friendly_type(cls)}") raised.append(ex) if len(unused) > 0: raised.append(ValueError(f'Unused fields {", ".join(sorted(repr(n) for n in unused))}')) if len(missing) > 0: raised.append(ValueError(f'Missing fields {", ".join(sorted(repr(n) for n in missing))}')) raise_exceptions(raised, for_type=cls, value=value) return cls(**result) @classmethod def from_json_object(cls, value: Mapping[str, Any]): if cls.instantiable(): return cls.from_json_object_self_only(value) else: for key, subclass in cls.subclass_unique_attr_dict.items(): if key in value: return subclass.from_json_object_self_only(value) raise TypeError( f"No obvious subclass of {friendly_type(cls)} " f"(didn't find any of {', '.join(sorted(repr(key) for key in cls.subclass_unique_attr_dict.keys()))}) " f"for {repr(value)}") def to_json_object(self) -> Mapping[str, Any]: return dict(chain(((field.name, convert_object_to_json(getattr(self, field.name))) for field in fields(self)), ((key, convert_object_to_json(value)) for key, value in self.__dict__.items()) if hasattr(self, "__dict__") else tuple())) def raise_exceptions(raised: Sequence[Exception], for_type, value) -> None: if len(raised) == 0: return elif len(raised) == 1: raised = raised[0] raised.add_note(f'for {friendly_type(for_type)}') raise raised else: raise ExceptionGroup( f'When deserializing {repr(value)} ({friendly_type(type(value))}) ' f'to {friendly_type(for_type)}', raised) class WrongTypeError(TypeError): def __init__(self, actual, *, for_type, expected_type): super().__init__(f'Wrong type {friendly_type(type(actual))} for value {repr(actual)} ' f'when deserializing {friendly_type(for_type)} ' f'- expected {friendly_type(expected_type)}') def expected_json_type(value_type): if get_origin(value_type) in [Union, UnionType]: return reduce(lambda x, y: x | y, (expected_json_type(t) for t in get_args(value_type))) elif value_type in (str, int, float, bool, type(None)): return value_type elif (is_heterogeneous_tuple_type(value_type) or is_homogeneous_tuple_type(value_type) or get_origin(value_type) in (list, set, frozenset)): return list elif issubclass(value_type, Jsonable) or value_type is dict: return dict elif issubclass(value_type, Enum): return get_enum_type(value_type) else: raise TypeError(f'Cannot deserialize objects of type {friendly_type(value_type)}') def convert_json_to_object(value, value_type): expect_type = expected_json_type(value_type) if not isinstance(value, expect_type): raise WrongTypeError(value, for_type=value_type, expected_type=expect_type) if get_origin(value_type) in [Union, UnionType]: raised = [] for subtype in get_args(value_type): if not isinstance(value, expected_json_type(subtype)): continue try: return convert_json_to_object(value, subtype) except Exception as ex: raised.append(ex) raise_exceptions(raised, for_type=value_type, value=value) raise AssertionError( f'Unexpectedly failed to throw or return when deserializing ' f'value {repr(value)} to {friendly_type(value_type)} - expected {friendly_type(expect_type)} ' f'and was {friendly_type(type(value))} but still no type matched') elif value_type in (str, int, float, bool, type(None)): return value elif is_heterogeneous_tuple_type(value_type): if not isinstance(value, list): raise WrongTypeError(value, for_type=value_type, expected_type=list) item_types = get_args(value_type) if len(value) != len(item_types): raise ValueError( f'Wrong number of elements {len(value)} (should be {len(item_types)}) ' f'for {repr(value)} ({friendly_type(type(value))}) ' f'to deserialize it to {friendly_type(value_type)}') result = [] raised = [] for index, pair in enumerate(zip(value, item_types)): item, item_type = pair try: result.append(convert_json_to_object(item, item_type)) except Exception as ex: ex.add_note(f'when deserializing element #{index}' f' - {repr(item)} ({friendly_type(type(item))}) - ' f'to {friendly_type(item_type)}') raised.append(ex) raise_exceptions(raised, for_type=value_type, value=value) return value_type(result) elif is_homogeneous_tuple_type(value_type) or get_origin(value_type) in [set, frozenset, list]: if not isinstance(value, list): raise WrongTypeError(value, for_type=value_type, expected_type=list) item_type = get_args(value_type)[0] result = [] raised = [] for index, item in enumerate(value): try: result.append(convert_json_to_object(item, item_type)) except Exception as ex: ex.add_note(f'when deserializing element #{index}' f' - {repr(item)} ({friendly_type(type(item))}) - ' f'to {friendly_type(item_type)}') raised.append(ex) raise_exceptions(raised, for_type=value_type, value=value) return value_type(result) elif get_origin(value_type) is dict: if not isinstance(value, dict): raise WrongTypeError(value, for_type=value_type, expected_type=dict) k_type, v_type = get_args(value_type) result = {} raised = [] for k, v in value.items(): success = True converted_key, converted_value = None, None try: converted_key = convert_json_to_object(k, k_type) except Exception as ex: success = False ex.add_note( f'while deserializing key {repr(k)} ({friendly_type(type(k))}) ' f'to {friendly_type(k_type)}') raised.append(ex) try: converted_value = convert_json_to_object(v, v_type) except Exception as ex: success = False ex.add_note(f'while deserializing value {repr(v)} ({friendly_type(type(v))}) ' f'corresponding to key {repr(k)} ' f'to {friendly_type(v_type)}') raised.append(ex) if success: result[converted_key] = converted_value raise_exceptions(raised, for_type=value_type, value=value) return dict(result) elif issubclass(value_type, Jsonable): if not isinstance(value, dict): raise WrongTypeError(value, for_type=value_type, expected_type=dict) return value_type.from_json_object(value) elif issubclass(value_type, Enum): return value_type(value) raise TypeError(f'Cannot deserialize objects of type {friendly_type(value_type)}') def get_enum_type(enum_type): if all(isinstance(v.value, str) for v in enum_type): return str elif all(isinstance(v.value, int) for v in enum_type): return int else: raise TypeError(f"Enum type {friendly_type(enum_type)} is not all str or all int") def convert_object_to_json(source): if (isinstance(source, int) or isinstance(source, float) or isinstance(source, str) or isinstance(source, bool) or isinstance(source, type(None))): return source if isinstance(source, list) or isinstance(source, tuple) or isinstance(source, set): return [convert_object_to_json(item) for item in source] if isinstance(source, dict): return dict((str(key), convert_object_to_json(value)) for key, value in source.items()) if isinstance(source, Jsonable): return source.to_json_object() if isinstance(source, Enum): return convert_object_to_json(source.value) raise TypeError(f"No adapter for {friendly_type(type(source))} to deserialize it to JSON") def is_homogeneous_tuple_type(t) -> bool: if get_origin(t) is not tuple: return False a = get_args(t) if len(a) == 2 and a[1] is ...: return True return False def is_heterogeneous_tuple_type(t) -> bool: if get_origin(t) is not tuple: return False a = get_args(t) if len(a) != 2 or a[1] is not ...: return True return False def friendly_type(t) -> str: if isinstance(t, type): return t.__name__ else: return str(t)