You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
fabula/jsonable.py

279 lines
12 KiB

from dataclasses import dataclass, fields, MISSING
from enum import Enum
from functools import reduce
from itertools import chain
from typing import ClassVar, get_origin, Union, get_args, Mapping, Any, Sequence
from types import UnionType
from warnings import warn
JsonableDataclassArgs = {"init": True, "repr": True, "eq": True, "order": False, "frozen": True, "match_args": True,
"kw_only": True, "slots": True, "weakref_slot": True}
JsonableParentArgs = {"init": False, "repr": True, "eq": True, "order": False, "frozen": True, "match_args": True,
"kw_only": True, "slots": True, "weakref_slot": False}
@dataclass(**JsonableParentArgs)
class Jsonable(object):
subclass_unique_attr_dict: ClassVar[dict[str, type["Jsonable"]]] = {}
subclass_attr_set: ClassVar[set[str]] = set()
def __init__(self, **kwargs):
raise TypeError(f"{friendly_type(type(self))} is not instantiable")
@classmethod
def instantiable(cls):
return "__init__" in cls.__dict__ and cls.__init__ is not Jsonable.__init__
def __init_subclass__(cls):
cls.subclass_attr_set = set(field.name for field in fields(cls))
cls.subclass_unique_attr_dict = dict.fromkeys(cls.subclass_attr_set, cls) if cls.instantiable() else {}
bases = list(cls.__bases__)
while len(bases) > 0:
base = bases.pop(0)
if (hasattr(base, "subclass_attr_set") and isinstance(base.subclass_attr_set, set)
and hasattr(base, "subclass_unique_attr_dict") and isinstance(base.subclass_unique_attr_dict,
dict)):
if hasattr(base, "instantiable") and callable(base.instantiable) and base.instantiable():
warn(f"Created subclass {cls} of instantiable class {base}")
bases.extend(base.__bases__)
deleted_classes = {cls} if cls.instantiable() else set()
for key in cls.subclass_attr_set.intersection(base.subclass_unique_attr_dict.keys()):
deleted_classes.update((base.subclass_unique_attr_dict.pop(key),))
if cls.instantiable():
unique_attrs = cls.subclass_attr_set.difference(base.subclass_attr_set)
base.subclass_unique_attr_dict.update(((key, cls) for key in unique_attrs))
lost_classes = deleted_classes.difference(base.subclass_unique_attr_dict.values())
if len(lost_classes) > 0:
lost_class_string = "/".join(friendly_type(subclass) for subclass in lost_classes)
warn(f'No unique attributes for {lost_class_string} in {friendly_type(base)}',
category=RuntimeWarning, stacklevel=5)
base.subclass_attr_set.update(cls.subclass_attr_set)
@classmethod
def from_json_object_self_only(cls, value: Mapping[str, Any]):
result = {}
raised = []
missing = set()
unused = set(value.keys())
for field in fields(cls):
if field.name not in value:
if field.default is MISSING and field.default_factory is MISSING:
missing.add(field.name)
else:
unused.remove(field.name)
try:
result[field.name] = convert_json_to_object(value[field.name], field.type)
except Exception as ex:
ex.add_note(
f"when deserializing value {repr(value[field.name])} ({friendly_type(type(field.name))}) "
f"for field {field.name} ({friendly_type(field.type)}) "
f"of {friendly_type(cls)}")
raised.append(ex)
if len(unused) > 0:
raised.append(ValueError(f'Unused fields {", ".join(sorted(repr(n) for n in unused))}'))
if len(missing) > 0:
raised.append(ValueError(f'Missing fields {", ".join(sorted(repr(n) for n in missing))}'))
raise_exceptions(raised, for_type=cls, value=value)
return cls(**result)
@classmethod
def from_json_object(cls, value: Mapping[str, Any]):
if cls.instantiable():
return cls.from_json_object_self_only(value)
else:
for key, subclass in cls.subclass_unique_attr_dict.items():
if key in value:
return subclass.from_json_object_self_only(value)
raise TypeError(
f"No obvious subclass of {friendly_type(cls)} "
f"(didn't find any of {', '.join(sorted(repr(key) for key in cls.subclass_unique_attr_dict.keys()))}) "
f"for {repr(value)}")
def to_json_object(self) -> Mapping[str, Any]:
return dict(chain(((field.name, convert_object_to_json(getattr(self, field.name)))
for field in fields(self)),
((key, convert_object_to_json(value))
for key, value in self.__dict__.items())
if hasattr(self, "__dict__") else tuple()))
def raise_exceptions(raised: Sequence[Exception], for_type, value) -> None:
if len(raised) == 0:
return
elif len(raised) == 1:
raised = raised[0]
raised.add_note(f'for {friendly_type(for_type)}')
raise raised
else:
raise ExceptionGroup(
f'When deserializing {repr(value)} ({friendly_type(type(value))}) '
f'to {friendly_type(for_type)}', raised)
class WrongTypeError(TypeError):
def __init__(self, actual, *, for_type, expected_type):
super().__init__(f'Wrong type {friendly_type(type(actual))} for value {repr(actual)} '
f'when deserializing {friendly_type(for_type)} '
f'- expected {friendly_type(expected_type)}')
def expected_json_type(value_type):
if get_origin(value_type) in [Union, UnionType]:
return reduce(lambda x, y: x | y, (expected_json_type(t) for t in get_args(value_type)))
elif value_type in (str, int, float, bool, type(None)):
return value_type
elif (is_heterogeneous_tuple_type(value_type) or is_homogeneous_tuple_type(value_type)
or get_origin(value_type) in (list, set, frozenset)):
return list
elif issubclass(value_type, Jsonable) or value_type is dict:
return dict
elif issubclass(value_type, Enum):
return get_enum_type(value_type)
else:
raise TypeError(f'Cannot deserialize objects of type {friendly_type(value_type)}')
def convert_json_to_object(value, value_type):
expect_type = expected_json_type(value_type)
if not isinstance(value, expect_type):
raise WrongTypeError(value, for_type=value_type, expected_type=expect_type)
if get_origin(value_type) in [Union, UnionType]:
raised = []
for subtype in get_args(value_type):
if not isinstance(value, expected_json_type(subtype)):
continue
try:
return convert_json_to_object(value, subtype)
except Exception as ex:
raised.append(ex)
raise_exceptions(raised, for_type=value_type, value=value)
raise AssertionError(
f'Unexpectedly failed to throw or return when deserializing '
f'value {repr(value)} to {friendly_type(value_type)} - expected {friendly_type(expect_type)} '
f'and was {friendly_type(type(value))} but still no type matched')
elif value_type in (str, int, float, bool, type(None)):
return value
elif is_heterogeneous_tuple_type(value_type):
if not isinstance(value, list):
raise WrongTypeError(value, for_type=value_type, expected_type=list)
item_types = get_args(value_type)
if len(value) != len(item_types):
raise ValueError(
f'Wrong number of elements {len(value)} (should be {len(item_types)}) '
f'for {repr(value)} ({friendly_type(type(value))}) '
f'to deserialize it to {friendly_type(value_type)}')
result = []
raised = []
for index, pair in enumerate(zip(value, item_types)):
item, item_type = pair
try:
result.append(convert_json_to_object(item, item_type))
except Exception as ex:
ex.add_note(f'when deserializing element #{index}'
f' - {repr(item)} ({friendly_type(type(item))}) - '
f'to {friendly_type(item_type)}')
raised.append(ex)
raise_exceptions(raised, for_type=value_type, value=value)
return value_type(result)
elif is_homogeneous_tuple_type(value_type) or get_origin(value_type) in [set, frozenset, list]:
if not isinstance(value, list):
raise WrongTypeError(value, for_type=value_type, expected_type=list)
item_type = get_args(value_type)[0]
result = []
raised = []
for index, item in enumerate(value):
try:
result.append(convert_json_to_object(item, item_type))
except Exception as ex:
ex.add_note(f'when deserializing element #{index}'
f' - {repr(item)} ({friendly_type(type(item))}) - '
f'to {friendly_type(item_type)}')
raised.append(ex)
raise_exceptions(raised, for_type=value_type, value=value)
return value_type(result)
elif get_origin(value_type) is dict:
if not isinstance(value, dict):
raise WrongTypeError(value, for_type=value_type, expected_type=dict)
k_type, v_type = get_args(value_type)
result = {}
raised = []
for k, v in value.items():
success = True
converted_key, converted_value = None, None
try:
converted_key = convert_json_to_object(k, k_type)
except Exception as ex:
success = False
ex.add_note(
f'while deserializing key {repr(k)} ({friendly_type(type(k))}) '
f'to {friendly_type(k_type)}')
raised.append(ex)
try:
converted_value = convert_json_to_object(v, v_type)
except Exception as ex:
success = False
ex.add_note(f'while deserializing value {repr(v)} ({friendly_type(type(v))}) '
f'corresponding to key {repr(k)} '
f'to {friendly_type(v_type)}')
raised.append(ex)
if success:
result[converted_key] = converted_value
raise_exceptions(raised, for_type=value_type, value=value)
return dict(result)
elif issubclass(value_type, Jsonable):
if not isinstance(value, dict):
raise WrongTypeError(value, for_type=value_type, expected_type=dict)
return value_type.from_json_object(value)
elif issubclass(value_type, Enum):
return value_type(value)
raise TypeError(f'Cannot deserialize objects of type {friendly_type(value_type)}')
def get_enum_type(enum_type):
if all(isinstance(v.value, str) for v in enum_type):
return str
elif all(isinstance(v.value, int) for v in enum_type):
return int
else:
raise TypeError(f"Enum type {friendly_type(enum_type)} is not all str or all int")
def convert_object_to_json(source):
if (isinstance(source, int) or isinstance(source, float) or isinstance(source, str)
or isinstance(source, bool) or isinstance(source, type(None))):
return source
if isinstance(source, list) or isinstance(source, tuple) or isinstance(source, set):
return [convert_object_to_json(item) for item in source]
if isinstance(source, dict):
return dict((str(key), convert_object_to_json(value)) for key, value in source.items())
if isinstance(source, Jsonable):
return source.to_json_object()
if isinstance(source, Enum):
return convert_object_to_json(source.value)
raise TypeError(f"No adapter for {friendly_type(type(source))} to deserialize it to JSON")
def is_homogeneous_tuple_type(t) -> bool:
if get_origin(t) is not tuple:
return False
a = get_args(t)
if len(a) == 2 and a[1] is ...:
return True
return False
def is_heterogeneous_tuple_type(t) -> bool:
if get_origin(t) is not tuple:
return False
a = get_args(t)
if len(a) != 2 or a[1] is not ...:
return True
return False
def friendly_type(t) -> str:
if isinstance(t, type):
return t.__name__
else:
return str(t)