Source code for cassis.json

import base64
import json
import math
from collections import OrderedDict, defaultdict
from io import TextIOBase, TextIOWrapper
from math import isnan
from typing import Union, IO, Optional, Dict, List
from toposort import toposort_flatten

from cassis.cas import NAME_DEFAULT_SOFA, Cas, IdGenerator, Sofa, View
from cassis.typesystem import (
    TYPE_NAME_ANNOTATION,
    TypeSystem,
    is_predefined,
    merge_typesystems,
    TYPE_NAME_SOFA,
    FEATURE_BASE_NAME_SOFAARRAY,
    array_type_name_for_type,
    FEATURE_BASE_NAME_SOFASTRING,
    FEATURE_BASE_NAME_SOFAID,
    FEATURE_BASE_NAME_SOFAMIME,
    FEATURE_BASE_NAME_SOFANUM,
    FEATURE_BASE_NAME_SOFAURI,
    TYPE_NAME_FS_ARRAY,
    TYPE_NAME_BYTE_ARRAY,
    TYPE_NAME_FLOAT_ARRAY,
    TYPE_NAME_DOUBLE_ARRAY,
    TypeSystemMode,
    TYPE_NAME_DOCUMENT_ANNOTATION,
    Type,
    Feature,
    TYPE_NAME_TOP,
    is_primitive_array,
    TYPE_NAME_FLOAT,
    TYPE_NAME_DOUBLE,
    element_type_name_for_array_type,
    is_primitive,
    is_array,
)

RESERVED_FIELD_PREFIX = "%"
REF_FEATURE_PREFIX = "@"
NUMBER_FEATURE_PREFIX = "#"
ANCHOR_FEATURE_PREFIX = "^"
TYPE_FIELD = RESERVED_FIELD_PREFIX + "TYPE"
RANGE_FIELD = RESERVED_FIELD_PREFIX + "RANGE"
TYPES_FIELD = RESERVED_FIELD_PREFIX + "TYPES"
FEATURES_FIELD = RESERVED_FIELD_PREFIX + "FEATURES"
VIEWS_FIELD = RESERVED_FIELD_PREFIX + "VIEWS"
VIEW_SOFA_FIELD = RESERVED_FIELD_PREFIX + "SOFA"
VIEW_MEMBERS_FIELD = RESERVED_FIELD_PREFIX + "MEMBERS"
FEATURE_STRUCTURES_FIELD = RESERVED_FIELD_PREFIX + "FEATURE_STRUCTURES"
NAME_FIELD = RESERVED_FIELD_PREFIX + "NAME"
SUPER_TYPE_FIELD = RESERVED_FIELD_PREFIX + "SUPER_TYPE"
DESCRIPTION_FIELD = RESERVED_FIELD_PREFIX + "DESCRIPTION"
ELEMENT_TYPE_FIELD = RESERVED_FIELD_PREFIX + "ELEMENT_TYPE"
MULTIPLE_REFERENCES_ALLOWED_FIELD = RESERVED_FIELD_PREFIX + "MULTIPLE_REFERENCES_ALLOWED"
ID_FIELD = RESERVED_FIELD_PREFIX + "ID"
FLAGS_FIELD = RESERVED_FIELD_PREFIX + "FLAGS"
FLAG_DOCUMENT_ANNOTATION = "DocumentAnnotation"
ARRAY_SUFFIX = "[]"
ELEMENTS_FIELD = RESERVED_FIELD_PREFIX + "ELEMENTS"
NAN_VALUE = "NaN"
POSITIVE_INFINITE_VALUE = "Infinity"
POSITIVE_INFINITE_VALUE_ABBR = "Inf"
NEGATIVE_INFINITE_VALUE = "-Infinity"
NEGATIVE_INFINITE_VALUE_ABBR = "-Inf"


[docs] def load_cas_from_json( source: Union[IO, str], typesystem: TypeSystem = None, lenient: bool = False, merge_typesystem: bool = True ) -> Cas: """Loads a CAS from a JSON source. Args: source: The JSON source. If `source` is a string, then it is assumed to be an JSON string. If `source` is a file-like object, then the data is read from it. typesystem: The type system that belongs to this CAS. If `None`, an empty type system is provided. lenient: If `True`, unknown Types will be ignored. If `False`, unknown Types will cause an exception. The default is `False`. Returns: The deserialized CAS """ if typesystem is None: typesystem = TypeSystem() deserializer = CasJsonDeserializer() return deserializer.deserialize(source, typesystem=typesystem, lenient=lenient, merge_typesystem=merge_typesystem)
class CasJsonDeserializer: def __init__(self): self._max_xmi_id = 0 self._max_sofa_num = 0 self._post_processors = [] def deserialize( self, source: Union[IO, str], typesystem: Optional[TypeSystem] = None, lenient: bool = False, merge_typesystem: bool = True, ) -> Cas: if isinstance(source, str): data = json.loads(source) else: data = json.load(source) self._max_xmi_id = 0 self._max_sofa_num = 0 self._post_processors = [] if merge_typesystem: json_typesystem = data.get(TYPES_FIELD) embedded_typesystem = TypeSystem( add_document_annotation_type=not (json_typesystem.get(FLAG_DOCUMENT_ANNOTATION)) ) # First, build a dependency graph to support cases where a child type is defined before its super type type_dependencies = defaultdict(set) for type_name, json_type in json_typesystem.items(): type_dependencies[type_name].add(json_type[SUPER_TYPE_FIELD]) # Second, load all the types but no features since features of a type X might be of a later loaded type Y for type_name in toposort_flatten(type_dependencies): if is_predefined(type_name) or embedded_typesystem.contains_type(type_name): continue self._parse_type(embedded_typesystem, type_name, json_typesystem[type_name]) # Now we are sure we know all the types, we can create the features for type_name, json_type in json_typesystem.items(): self._parse_features(embedded_typesystem, type_name, json_type) typesystem = merge_typesystems(typesystem, embedded_typesystem) cas = Cas(typesystem=typesystem, lenient=lenient) feature_structures = {} json_feature_structures = data.get(FEATURE_STRUCTURES_FIELD) if isinstance(json_feature_structures, list): def parse_and_add(json_fs_): parsed = self._parse_feature_structure(typesystem, json_fs_.get(ID_FIELD), json_fs_, feature_structures) feature_structures[parsed.xmiID] = parsed # According to the JSON CAS 0.4.0 spec, we should be able to do this in a single loop as SofaFSes # should normally appear before any FSes referring to them. However, the Java implementation currently # does not do this, so we do two passes to be able to read its data. for json_fs in json_feature_structures: if json_fs.get(TYPE_FIELD) == TYPE_NAME_SOFA: # In case the Sofa references a byte array that has not been parsed yet, we need to fetch it sofa_byte_array_ref = json_fs.get(REF_FEATURE_PREFIX + FEATURE_BASE_NAME_SOFAARRAY) if sofa_byte_array_ref and not feature_structures.get(sofa_byte_array_ref): for json_fs_2 in json_feature_structures: if json_fs_2.get(ID_FIELD) == sofa_byte_array_ref: parse_and_add(json_fs_2) fs_id = json_fs.get(ID_FIELD) fs = self._parse_sofa(cas, fs_id, json_fs, feature_structures) feature_structures[fs.xmiID] = fs for json_fs in json_feature_structures: if json_fs.get(TYPE_FIELD) != TYPE_NAME_SOFA: parse_and_add(json_fs) if isinstance(json_feature_structures, dict): def parse_and_add(fs_id_, json_fs_): parsed = self._parse_feature_structure(typesystem, int(fs_id_), json_fs_, feature_structures) feature_structures[parsed.xmiID] = parsed # According to the JSON CAS 0.4.0 spec, we should be able to do this in a single loop as SofaFSes # should normally appear before any FSes referring to them. However, the Java implementation currently # does not do this, so we do two passes to be able to read its data. for fs_id, json_fs in json_feature_structures.items(): if json_fs.get(TYPE_FIELD) == TYPE_NAME_SOFA: # In case the Sofa references a byte array that has not been parsed yet, we need to fetch it sofa_byte_array_ref = json_fs.get(REF_FEATURE_PREFIX + FEATURE_BASE_NAME_SOFAARRAY) if sofa_byte_array_ref and not feature_structures.get(sofa_byte_array_ref): parse_and_add(sofa_byte_array_ref, json_feature_structures.get(sofa_byte_array_ref)) fs_id = int(fs_id) fs = self._parse_sofa(cas, fs_id, json_fs, feature_structures) feature_structures[fs.xmiID] = fs for fs_id, json_fs in json_feature_structures.items(): if json_fs.get(TYPE_FIELD) != TYPE_NAME_SOFA: parse_and_add(fs_id, json_fs) for post_processor in self._post_processors: post_processor() cas._xmi_id_generator = IdGenerator(self._max_xmi_id + 1) cas._sofa_num_generator = IdGenerator(self._max_sofa_num + 1) # At this point all views for which we have a sofa with a known ID and sofaNum have already been created # as part of parsing the feature structures. Thus, if there are any views remaining that are only declared # in the views section, we just create them with auto-assigned IDs json_views = data.get(VIEWS_FIELD) for view_name, json_view in json_views.items(): self._parse_view(cas, view_name, json_view, feature_structures) return cas def _parse_type(self, typesystem: TypeSystem, type_name: str, json_type: Dict[str, any]): super_type_name = json_type[SUPER_TYPE_FIELD] description = json_type.get(DESCRIPTION_FIELD) typesystem.create_type(type_name, super_type_name, description=description) def _parse_features(self, typesystem: TypeSystem, type_name: str, json_type: Dict[str, any]): new_type = typesystem.get_type(type_name) for key, json_feature in json_type.items(): if key.startswith(RESERVED_FIELD_PREFIX): continue range_type = json_feature[RANGE_FIELD] element_type = json_feature.get(ELEMENT_TYPE_FIELD) if range_type.endswith("[]"): element_type = range_type[:-2] range_type = array_type_name_for_type(element_type) typesystem.create_feature( new_type, name=key, rangeType=range_type, elementType=element_type, description=json_feature.get(DESCRIPTION_FIELD), multipleReferencesAllowed=json_feature.get(MULTIPLE_REFERENCES_ALLOWED_FIELD), ) def _get_or_create_view( self, cas: Cas, view_name: str, fs_id: Optional[int] = None, sofa_num: Optional[int] = None ) -> Cas: if view_name == NAME_DEFAULT_SOFA: view = cas.get_view(NAME_DEFAULT_SOFA) # We need to make sure that the sofa gets the real xmi, see #155 if fs_id is not None: view.get_sofa().xmiID = fs_id return view else: return cas.create_view(view_name, xmiID=fs_id, sofaNum=sofa_num) def _parse_view(self, cas: Cas, view_name: str, json_view: Dict[str, any], feature_structures: Dict[str, any]): view = self._get_or_create_view(cas, view_name) for member_id in json_view[VIEW_MEMBERS_FIELD]: fs = feature_structures[member_id] view.add(fs, keep_id=True) def _parse_sofa(self, cas: Cas, fs_id: int, json_fs: Dict[str, any], feature_structures: Dict[int, any]) -> Sofa: view = self._get_or_create_view( cas, json_fs.get(FEATURE_BASE_NAME_SOFAID), fs_id, json_fs.get(FEATURE_BASE_NAME_SOFANUM) ) view.sofa_string = json_fs.get(FEATURE_BASE_NAME_SOFASTRING) view.sofa_mime = json_fs.get(FEATURE_BASE_NAME_SOFAMIME) view.sofa_uri = json_fs.get(FEATURE_BASE_NAME_SOFAURI) view.sofa_array = feature_structures.get(json_fs.get(REF_FEATURE_PREFIX + FEATURE_BASE_NAME_SOFAARRAY)) return view.get_sofa() def _parse_feature_structure( self, typesystem: TypeSystem, fs_id: int, json_fs: Dict[str, any], feature_structures: Dict[int, any] ): type_name = json_fs.get(TYPE_FIELD) if type_name.endswith("[]"): type_name = array_type_name_for_type(type_name) AnnotationType = typesystem.get_type(type_name) attributes = dict(json_fs) # Map the JSON FS ID to xmiID attributes["xmiID"] = fs_id # Remap features that use a reserved Python name if "self" in attributes: attributes["self_"] = attributes.pop("self") if "type" in attributes: attributes["type_"] = attributes.pop("type") if typesystem.is_primitive_array(AnnotationType.name): attributes["elements"] = self._parse_primitive_array(AnnotationType.name, json_fs.get(ELEMENTS_FIELD)) elif AnnotationType.name == TYPE_NAME_FS_ARRAY: # Resolve id-ref at the end of processing def fix_up(elements): return lambda: setattr(fs, "elements", [feature_structures.get(e) for e in elements]) self._post_processors.append(fix_up(json_fs.get(ELEMENTS_FIELD))) self._strip_reserved_json_keys(attributes) ref_features = {} for key, value in list(attributes.items()): if key.startswith(REF_FEATURE_PREFIX): ref_features[key[1:]] = value attributes.pop(key) if key.startswith(NUMBER_FEATURE_PREFIX): attributes[key[1:]] = self._parse_float_value(value) attributes.pop(key) self._max_xmi_id = max(attributes["xmiID"], self._max_xmi_id) fs = AnnotationType(**attributes) self._resolve_references(fs, ref_features, feature_structures) # Map from offsets in UIMA UTF-16 based offsets to Unicode codepoints if typesystem.is_instance_of(fs.type, TYPE_NAME_ANNOTATION): sofa = fs.sofa fs.begin = sofa._offset_converter.external_to_python(fs.begin) fs.end = sofa._offset_converter.external_to_python(fs.end) return fs def _parse_float_value(self, value: Union[str, float]) -> float: if isinstance(value, float): return value elif value == NAN_VALUE: return float("nan") elif value == POSITIVE_INFINITE_VALUE or value == POSITIVE_INFINITE_VALUE_ABBR: return float("inf") elif value == NEGATIVE_INFINITE_VALUE or value == NEGATIVE_INFINITE_VALUE_ABBR: return float("-inf") raise ValueError( f"Illegal floating point value [{value}]. Must be a float literal or one of {NAN_VALUE}, " f"{POSITIVE_INFINITE_VALUE}, {POSITIVE_INFINITE_VALUE_ABBR}, {NEGATIVE_INFINITE_VALUE}, or " f"{NEGATIVE_INFINITE_VALUE_ABBR}" ) def _parse_primitive_array(self, type_name: str, elements: [list, str]) -> List: if elements and type_name == TYPE_NAME_BYTE_ARRAY: return base64.b64decode(elements) if elements and (type_name == TYPE_NAME_FLOAT_ARRAY or type_name == TYPE_NAME_DOUBLE_ARRAY): return [self._parse_float_value(v) for v in elements] else: return elements def _resolve_references(self, fs, ref_features: Dict[str, any], feature_structures: Dict[int, any]): for key, value in ref_features.items(): target_fs = feature_structures.get(value) if target_fs: # Resolve id-ref now setattr(fs, key, target_fs) else: # Resolve id-ref at the end of processing def fix_up(k, v): return lambda: setattr(fs, k, feature_structures.get(v)) self._post_processors.append(fix_up(key, value)) def _strip_reserved_json_keys( self, attributes: Dict[str, any], ): for key in list(attributes): if key.startswith(RESERVED_FIELD_PREFIX): attributes.pop(key) class CasJsonSerializer: _COMMON_FIELD_NAMES = {"xmiID", "type"} def __init__(self): pass def serialize( self, sink: Union[IO, str, None], cas: Cas, pretty_print: bool = True, ensure_ascii: bool = False, type_system_mode: TypeSystemMode = TypeSystemMode.FULL, ) -> Union[str, None]: feature_structures = [] views = {} for view in cas.views: views[view.sofa.sofaID] = self._serialize_view(view) if view.sofa.sofaArray: json_sofa_array_fs = self._serialize_feature_structure(view.sofa.sofaArray) feature_structures.append(json_sofa_array_fs) json_sofa_fs = self._serialize_feature_structure(view.sofa) feature_structures.append(json_sofa_fs) # Find all fs, even the ones that are not directly added to a sofa used_types = set() for fs in sorted(cas._find_all_fs(include_inlinable_arrays_and_lists=True), key=lambda a: a.xmiID): used_types.add(fs.type) json_fs = self._serialize_feature_structure(fs) feature_structures.append(json_fs) types = None if type_system_mode is not TypeSystemMode.NONE: types = {} if type_system_mode is TypeSystemMode.MINIMAL: # Build transitive closure of used types by following parents, features, etc. types_to_include = cas.typesystem.transitive_closure(used_types) elif type_system_mode is TypeSystemMode.FULL: types_to_include = cas.typesystem.get_types() else: raise Exception(f"Invalid type system mode: [{type_system_mode}]") for type_ in sorted(types_to_include, key=lambda x: x.name): if type_.name == TYPE_NAME_DOCUMENT_ANNOTATION: continue json_type = self._serialize_type(type_) types[json_type[NAME_FIELD]] = json_type data = {} if types is not None: data[TYPES_FIELD] = types if feature_structures is not None: data[FEATURE_STRUCTURES_FIELD] = feature_structures if views is not None: data[VIEWS_FIELD] = views if sink and not isinstance(sink, TextIOBase): sink = TextIOWrapper(sink, encoding="utf-8", write_through=True) if sink: json.dump( data, sink, sort_keys=False, indent=2 if pretty_print else None, ensure_ascii=ensure_ascii, allow_nan=False, ) else: return json.dumps( data, sort_keys=False, indent=2 if pretty_print else None, ensure_ascii=ensure_ascii, allow_nan=False ) if isinstance(sink, TextIOWrapper): sink.detach() # Prevent TextIOWrapper from closing the BytesIO return None def _serialize_type(self, type_: Type): type_name = self._to_external_type_name(type_.name) supertype_name = self._to_external_type_name(type_.supertype.name) json_type = { NAME_FIELD: type_name, SUPER_TYPE_FIELD: supertype_name, } if type_.description: json_type[DESCRIPTION_FIELD] = type_.description for feature in list(type_.features): json_feature = self._serialize_feature(json_type, feature) json_type[json_feature[NAME_FIELD]] = json_feature return json_type def _serialize_feature(self, json_type, feature: Feature): # If the feature name is a reserved name like `self`, then we added an # underscore to it before so Python can handle it. We now need to remove it. feature_name = feature.name if feature._has_reserved_name: feature_name = feature_name[:-1] range_type_name = self._to_external_type_name(feature.rangeType.name) skip_element_type = False if is_array(feature.rangeType): skip_element_type = True if is_primitive_array(feature.rangeType): range_type_name = element_type_name_for_array_type(feature.rangeType) + "[]" elif feature.elementType: range_type_name = self._to_external_type_name(feature.elementType.name) + "[]" else: range_type_name = TYPE_NAME_TOP + "[]" json_feature = { NAME_FIELD: feature_name, RANGE_FIELD: range_type_name, } if feature.description: json_feature[DESCRIPTION_FIELD] = feature.description if feature.multipleReferencesAllowed is not None: json_feature[MULTIPLE_REFERENCES_ALLOWED_FIELD] = feature.multipleReferencesAllowed if not skip_element_type and feature.elementType is not None: json_feature[ELEMENT_TYPE_FIELD] = self._to_external_type_name(feature.elementType.name) return json_feature def _serialize_feature_structure(self, fs) -> dict: type_name = fs.type.name json_fs = OrderedDict() json_fs[ID_FIELD] = fs.xmiID json_fs[TYPE_FIELD] = type_name if type_name == TYPE_NAME_BYTE_ARRAY: if fs.elements: json_fs[ELEMENTS_FIELD] = base64.b64encode(bytes(fs.elements)).decode("ascii") return json_fs elif type_name in {TYPE_NAME_DOUBLE_ARRAY, TYPE_NAME_FLOAT_ARRAY}: if fs.elements: json_fs[ELEMENTS_FIELD] = [self._serialize_float_value(e) for e in fs.elements] return json_fs elif is_primitive_array(fs.type): if fs.elements: json_fs[ELEMENTS_FIELD] = fs.elements return json_fs elif TYPE_NAME_FS_ARRAY == type_name: if fs.elements: json_fs[ELEMENTS_FIELD] = [self._serialize_ref(e) for e in fs.elements] return json_fs for feature in fs.type.all_features: if feature.name in CasJsonSerializer._COMMON_FIELD_NAMES: continue feature_name = feature.name # Strip the underscore we added for reserved names if feature._has_reserved_name: feature_name = feature.name[:-1] # Skip over 'None' features value = getattr(fs, feature.name) if value is None: continue # Map back from offsets in Unicode codepoints to UIMA UTF-16 based offsets if feature.domainType.name == TYPE_NAME_ANNOTATION and feature_name == "begin" or feature_name == "end": sofa: Sofa = getattr(fs, "sofa") value = sofa._offset_converter.python_to_external(value) if feature.rangeType.name in {TYPE_NAME_DOUBLE, TYPE_NAME_FLOAT}: float_value = self._serialize_float_value(value) if isinstance(float_value, str): feature_name = NUMBER_FEATURE_PREFIX + feature_name json_fs[feature_name] = self._serialize_float_value(value) elif is_primitive(feature.rangeType): json_fs[feature_name] = value else: # We need to encode non-primitive features as a reference json_fs[REF_FEATURE_PREFIX + feature_name] = self._serialize_ref(value) return json_fs def _serialize_float_value(self, value) -> Union[float, str]: if isnan(value): return NAN_VALUE elif math.isinf(value): if value > 0: return POSITIVE_INFINITE_VALUE else: return NEGATIVE_INFINITE_VALUE return value def _serialize_ref(self, fs) -> int: if not fs: return None return fs.xmiID def _serialize_view(self, view: View): return { VIEW_SOFA_FIELD: view.sofa.xmiID, VIEW_MEMBERS_FIELD: sorted(x.xmiID for x in view.get_all_annotations()), } def _to_external_type_name(self, type_name: str): if type_name.startswith("uima.noNamespace."): return type_name.replace("uima.noNamespace.", "") return type_name