Source code for cassis.json

import base64
import json
import math
from collections import OrderedDict
from io import TextIOBase, TextIOWrapper
from math import isnan

from cassis.cas import NAME_DEFAULT_SOFA, Cas, IdGenerator, Sofa, View
from cassis.typesystem import *

RESERVED_FIELD_PREFIX = "%"
REF_FEATURE_PREFIX = "@"
NUMBER_FEATURE_PREFIX = "#"
ANCHOR_FEATURE_PREFIX = "^"
TYPE_FIELD = RESERVED_FIELD_PREFIX + "TYPE"
RANGE_FIELD = RESERVED_FIELD_PREFIX + "RANGE"
TYPES_FIELD = RESERVED_FIELD_PREFIX + "TYPES"
FEATURES_FIELD = RESERVED_FIELD_PREFIX + "FEATURES"
VIEWS_FIELD = RESERVED_FIELD_PREFIX + "VIEWS"
VIEW_SOFA_FIELD = RESERVED_FIELD_PREFIX + "SOFA"
VIEW_MEMBERS_FIELD = RESERVED_FIELD_PREFIX + "MEMBERS"
FEATURE_STRUCTURES_FIELD = RESERVED_FIELD_PREFIX + "FEATURE_STRUCTURES"
NAME_FIELD = RESERVED_FIELD_PREFIX + "NAME"
SUPER_TYPE_FIELD = RESERVED_FIELD_PREFIX + "SUPER_TYPE"
DESCRIPTION_FIELD = RESERVED_FIELD_PREFIX + "DESCRIPTION"
ELEMENT_TYPE_FIELD = RESERVED_FIELD_PREFIX + "ELEMENT_TYPE"
MULTIPLE_REFERENCES_ALLOWED_FIELD = RESERVED_FIELD_PREFIX + "MULTIPLE_REFERENCES_ALLOWED"
ID_FIELD = RESERVED_FIELD_PREFIX + "ID"
FLAGS_FIELD = RESERVED_FIELD_PREFIX + "FLAGS"
FLAG_DOCUMENT_ANNOTATION = "DocumentAnnotation"
ARRAY_SUFFIX = "[]"
ELEMENTS_FIELD = RESERVED_FIELD_PREFIX + "ELEMENTS"
NAN_VALUE = "NaN"
POSITIVE_INFINITE_VALUE = "Infinity"
POSITIVE_INFINITE_VALUE_ABBR = "Inf"
NEGATIVE_INFINITE_VALUE = "-Infinity"
NEGATIVE_INFINITE_VALUE_ABBR = "-Inf"


[docs] def load_cas_from_json( source: Union[IO, str], typesystem: TypeSystem = None, lenient: bool = False, merge_typesystem: bool = True ) -> Cas: """Loads a CAS from a JSON source. Args: source: The JSON source. If `source` is a string, then it is assumed to be an JSON string. If `source` is a file-like object, then the data is read from it. typesystem: The type system that belongs to this CAS. If `None`, an empty type system is provided. lenient: If `True`, unknown Types will be ignored. If `False`, unknown Types will cause an exception. The default is `False`. Returns: The deserialized CAS """ if typesystem is None: typesystem = TypeSystem() deserializer = CasJsonDeserializer() return deserializer.deserialize(source, typesystem=typesystem, lenient=lenient, merge_typesystem=merge_typesystem)
class CasJsonDeserializer: def __init__(self): self._max_xmi_id = 0 self._max_sofa_num = 0 self._post_processors = [] def deserialize( self, source: Union[IO, str], typesystem: Optional[TypeSystem] = None, lenient: bool = False, merge_typesystem: bool = True, ) -> Cas: if isinstance(source, str): data = json.loads(source) else: data = json.load(source) self._max_xmi_id = 0 self._max_sofa_num = 0 self._post_processors = [] if merge_typesystem: json_typesystem = data.get(TYPES_FIELD) embedded_typesystem = TypeSystem( add_document_annotation_type=not (json_typesystem.get(FLAG_DOCUMENT_ANNOTATION)) ) # First, build a dependency graph to support cases where a child type is defined before its super type type_dependencies = defaultdict(set) for type_name, json_type in json_typesystem.items(): type_dependencies[type_name].add(json_type[SUPER_TYPE_FIELD]) # Second, load all the types but no features since features of a type X might be of a later loaded type Y for type_name in toposort_flatten(type_dependencies): if is_predefined(type_name) or embedded_typesystem.contains_type(type_name): continue self._parse_type(embedded_typesystem, type_name, json_typesystem[type_name]) # Now we are sure we know all the types, we can create the features for type_name, json_type in json_typesystem.items(): self._parse_features(embedded_typesystem, type_name, json_type) typesystem = merge_typesystems(typesystem, embedded_typesystem) cas = Cas(typesystem=typesystem, lenient=lenient) feature_structures = {} json_feature_structures = data.get(FEATURE_STRUCTURES_FIELD) if isinstance(json_feature_structures, list): def parse_and_add(json_fs_): parsed = self._parse_feature_structure(typesystem, json_fs_.get(ID_FIELD), json_fs_, feature_structures) feature_structures[parsed.xmiID] = parsed # According to the JSON CAS 0.4.0 spec, we should be able to do this in a single loop as SofaFSes # should normally appear before any FSes referring to them. However, the Java implementation currently # does not do this, so we do two passes to be able to read its data. for json_fs in json_feature_structures: if json_fs.get(TYPE_FIELD) == TYPE_NAME_SOFA: # In case the Sofa references a byte array that has not been parsed yet, we need to fetch it sofa_byte_array_ref = json_fs.get(REF_FEATURE_PREFIX + FEATURE_BASE_NAME_SOFAARRAY) if sofa_byte_array_ref and not feature_structures.get(sofa_byte_array_ref): for json_fs_2 in json_feature_structures: if json_fs_2.get(ID_FIELD) == sofa_byte_array_ref: parse_and_add(json_fs_2) fs_id = json_fs.get(ID_FIELD) fs = self._parse_sofa(cas, fs_id, json_fs, feature_structures) feature_structures[fs.xmiID] = fs for json_fs in json_feature_structures: if json_fs.get(TYPE_FIELD) != TYPE_NAME_SOFA: parse_and_add(json_fs) if isinstance(json_feature_structures, dict): def parse_and_add(fs_id_, json_fs_): parsed = self._parse_feature_structure(typesystem, int(fs_id_), json_fs_, feature_structures) feature_structures[parsed.xmiID] = parsed # According to the JSON CAS 0.4.0 spec, we should be able to do this in a single loop as SofaFSes # should normally appear before any FSes referring to them. However, the Java implementation currently # does not do this, so we do two passes to be able to read its data. for fs_id, json_fs in json_feature_structures.items(): if json_fs.get(TYPE_FIELD) == TYPE_NAME_SOFA: # In case the Sofa references a byte array that has not been parsed yet, we need to fetch it sofa_byte_array_ref = json_fs.get(REF_FEATURE_PREFIX + FEATURE_BASE_NAME_SOFAARRAY) if sofa_byte_array_ref and not feature_structures.get(sofa_byte_array_ref): parse_and_add(sofa_byte_array_ref, json_feature_structures.get(sofa_byte_array_ref)) fs_id = int(fs_id) fs = self._parse_sofa(cas, fs_id, json_fs, feature_structures) feature_structures[fs.xmiID] = fs for fs_id, json_fs in json_feature_structures.items(): if json_fs.get(TYPE_FIELD) != TYPE_NAME_SOFA: parse_and_add(fs_id, json_fs) for post_processor in self._post_processors: post_processor() cas._xmi_id_generator = IdGenerator(self._max_xmi_id + 1) cas._sofa_num_generator = IdGenerator(self._max_sofa_num + 1) # At this point all views for which we have a sofa with a known ID and sofaNum have already been created # as part of parsing the feature structures. Thus, if there are any views remaining that are only declared # in the views section, we just create them with auto-assigned IDs json_views = data.get(VIEWS_FIELD) for view_name, json_view in json_views.items(): self._parse_view(cas, view_name, json_view, feature_structures) return cas def _parse_type(self, typesystem: TypeSystem, type_name: str, json_type: Dict[str, any]): super_type_name = json_type[SUPER_TYPE_FIELD] description = json_type.get(DESCRIPTION_FIELD) typesystem.create_type(type_name, super_type_name, description=description) def _parse_features(self, typesystem: TypeSystem, type_name: str, json_type: Dict[str, any]): new_type = typesystem.get_type(type_name) for key, json_feature in json_type.items(): if key.startswith(RESERVED_FIELD_PREFIX): continue range_type = json_feature[RANGE_FIELD] element_type = json_feature.get(ELEMENT_TYPE_FIELD) if range_type.endswith("[]"): element_type = range_type[:-2] range_type = array_type_name_for_type(element_type) typesystem.create_feature( new_type, name=key, rangeType=range_type, elementType=element_type, description=json_feature.get(DESCRIPTION_FIELD), multipleReferencesAllowed=json_feature.get(MULTIPLE_REFERENCES_ALLOWED_FIELD), ) def _get_or_create_view( self, cas: Cas, view_name: str, fs_id: Optional[int] = None, sofa_num: Optional[int] = None ) -> Cas: if view_name == NAME_DEFAULT_SOFA: view = cas.get_view(NAME_DEFAULT_SOFA) # We need to make sure that the sofa gets the real xmi, see #155 if fs_id is not None: view.get_sofa().xmiID = fs_id return view else: return cas.create_view(view_name, xmiID=fs_id, sofaNum=sofa_num) def _parse_view(self, cas: Cas, view_name: str, json_view: Dict[str, any], feature_structures: Dict[str, any]): view = self._get_or_create_view(cas, view_name) for member_id in json_view[VIEW_MEMBERS_FIELD]: fs = feature_structures[member_id] view.add(fs, keep_id=True) def _parse_sofa(self, cas: Cas, fs_id: int, json_fs: Dict[str, any], feature_structures: Dict[int, any]) -> Sofa: view = self._get_or_create_view( cas, json_fs.get(FEATURE_BASE_NAME_SOFAID), fs_id, json_fs.get(FEATURE_BASE_NAME_SOFANUM) ) view.sofa_string = json_fs.get(FEATURE_BASE_NAME_SOFASTRING) view.sofa_mime = json_fs.get(FEATURE_BASE_NAME_SOFAMIME) view.sofa_uri = json_fs.get(FEATURE_BASE_NAME_SOFAURI) view.sofa_array = feature_structures.get(json_fs.get(REF_FEATURE_PREFIX + FEATURE_BASE_NAME_SOFAARRAY)) return view.get_sofa() def _parse_feature_structure( self, typesystem: TypeSystem, fs_id: int, json_fs: Dict[str, any], feature_structures: Dict[int, any] ): type_name = json_fs.get(TYPE_FIELD) if type_name.endswith("[]"): type_name = array_type_name_for_type(type_name) AnnotationType = typesystem.get_type(type_name) attributes = dict(json_fs) # Map the JSON FS ID to xmiID attributes["xmiID"] = fs_id # Remap features that use a reserved Python name if "self" in attributes: attributes["self_"] = attributes.pop("self") if "type" in attributes: attributes["type_"] = attributes.pop("type") if typesystem.is_primitive_array(AnnotationType.name): attributes["elements"] = self._parse_primitive_array(AnnotationType.name, json_fs.get(ELEMENTS_FIELD)) elif AnnotationType.name == TYPE_NAME_FS_ARRAY: # Resolve id-ref at the end of processing def fix_up(elements): return lambda: setattr(fs, "elements", [feature_structures.get(e) for e in elements]) self._post_processors.append(fix_up(json_fs.get(ELEMENTS_FIELD))) self._strip_reserved_json_keys(attributes) ref_features = {} for key, value in list(attributes.items()): if key.startswith(REF_FEATURE_PREFIX): ref_features[key[1:]] = value attributes.pop(key) if key.startswith(NUMBER_FEATURE_PREFIX): attributes[key[1:]] = self._parse_float_value(value) attributes.pop(key) self._max_xmi_id = max(attributes["xmiID"], self._max_xmi_id) fs = AnnotationType(**attributes) self._resolve_references(fs, ref_features, feature_structures) # Map from offsets in UIMA UTF-16 based offsets to Unicode codepoints if typesystem.is_instance_of(fs.type, TYPE_NAME_ANNOTATION): sofa = fs.sofa fs.begin = sofa._offset_converter.external_to_python(fs.begin) fs.end = sofa._offset_converter.external_to_python(fs.end) return fs def _parse_float_value(self, value: Union[str, float]) -> float: if isinstance(value, float): return value elif value == NAN_VALUE: return float("nan") elif value == POSITIVE_INFINITE_VALUE or value == POSITIVE_INFINITE_VALUE_ABBR: return float("inf") elif value == NEGATIVE_INFINITE_VALUE or value == NEGATIVE_INFINITE_VALUE_ABBR: return float("-inf") raise ValueError( f"Illegal floating point value [{value}]. Must be a float literal or one of {NAN_VALUE}, " f"{POSITIVE_INFINITE_VALUE}, {POSITIVE_INFINITE_VALUE_ABBR}, {NEGATIVE_INFINITE_VALUE}, or " f"{NEGATIVE_INFINITE_VALUE_ABBR}" ) def _parse_primitive_array(self, type_name: str, elements: [list, str]) -> List: if elements and type_name == TYPE_NAME_BYTE_ARRAY: return base64.b64decode(elements) if elements and (type_name == TYPE_NAME_FLOAT_ARRAY or type_name == TYPE_NAME_DOUBLE_ARRAY): return [self._parse_float_value(v) for v in elements] else: return elements def _resolve_references(self, fs, ref_features: Dict[str, any], feature_structures: Dict[int, any]): for key, value in ref_features.items(): target_fs = feature_structures.get(value) if target_fs: # Resolve id-ref now setattr(fs, key, target_fs) else: # Resolve id-ref at the end of processing def fix_up(k, v): return lambda: setattr(fs, k, feature_structures.get(v)) self._post_processors.append(fix_up(key, value)) def _strip_reserved_json_keys( self, attributes: Dict[str, any], ): for key in list(attributes): if key.startswith(RESERVED_FIELD_PREFIX): attributes.pop(key) class CasJsonSerializer: _COMMON_FIELD_NAMES = {"xmiID", "type"} def __init__(self): pass def serialize( self, sink: Union[IO, str, None], cas: Cas, pretty_print: bool = True, ensure_ascii: bool = False, type_system_mode: TypeSystemMode = TypeSystemMode.FULL, ) -> Union[str, None]: feature_structures = [] views = {} for view in cas.views: views[view.sofa.sofaID] = self._serialize_view(view) if view.sofa.sofaArray: json_sofa_array_fs = self._serialize_feature_structure(view.sofa.sofaArray) feature_structures.append(json_sofa_array_fs) json_sofa_fs = self._serialize_feature_structure(view.sofa) feature_structures.append(json_sofa_fs) # Find all fs, even the ones that are not directly added to a sofa used_types = set() for fs in sorted(cas._find_all_fs(include_inlinable_arrays_and_lists=True), key=lambda a: a.xmiID): used_types.add(fs.type) json_fs = self._serialize_feature_structure(fs) feature_structures.append(json_fs) types = None if type_system_mode is not TypeSystemMode.NONE: types = {} if type_system_mode is TypeSystemMode.MINIMAL: # Build transitive closure of used types by following parents, features, etc. types_to_include = cas.typesystem.transitive_closure(used_types) elif type_system_mode is TypeSystemMode.FULL: types_to_include = cas.typesystem.get_types() else: raise Exception(f"Invalid type system mode: [{type_system_mode}]") for type_ in sorted(types_to_include, key=lambda x: x.name): if type_.name == TYPE_NAME_DOCUMENT_ANNOTATION: continue json_type = self._serialize_type(type_) types[json_type[NAME_FIELD]] = json_type data = {} if types is not None: data[TYPES_FIELD] = types if feature_structures is not None: data[FEATURE_STRUCTURES_FIELD] = feature_structures if views is not None: data[VIEWS_FIELD] = views if sink and not isinstance(sink, TextIOBase): sink = TextIOWrapper(sink, encoding="utf-8", write_through=True) if sink: json.dump( data, sink, sort_keys=False, indent=2 if pretty_print else None, ensure_ascii=ensure_ascii, allow_nan=False, ) else: return json.dumps( data, sort_keys=False, indent=2 if pretty_print else None, ensure_ascii=ensure_ascii, allow_nan=False ) if isinstance(sink, TextIOWrapper): sink.detach() # Prevent TextIOWrapper from closing the BytesIO return None def _serialize_type(self, type_: Type): type_name = self._to_external_type_name(type_.name) supertype_name = self._to_external_type_name(type_.supertype.name) json_type = { NAME_FIELD: type_name, SUPER_TYPE_FIELD: supertype_name, } if type_.description: json_type[DESCRIPTION_FIELD] = type_.description for feature in list(type_.features): json_feature = self._serialize_feature(json_type, feature) json_type[json_feature[NAME_FIELD]] = json_feature return json_type def _serialize_feature(self, json_type, feature: Feature): # If the feature name is a reserved name like `self`, then we added an # underscore to it before so Python can handle it. We now need to remove it. feature_name = feature.name if feature._has_reserved_name: feature_name = feature_name[:-1] range_type_name = self._to_external_type_name(feature.rangeType.name) skip_element_type = False if is_array(feature.rangeType): skip_element_type = True if is_primitive_array(feature.rangeType): range_type_name = element_type_name_for_array_type(feature.rangeType) + "[]" elif feature.elementType: range_type_name = self._to_external_type_name(feature.elementType.name) + "[]" else: range_type_name = TYPE_NAME_TOP + "[]" json_feature = { NAME_FIELD: feature_name, RANGE_FIELD: range_type_name, } if feature.description: json_feature[DESCRIPTION_FIELD] = feature.description if feature.multipleReferencesAllowed is not None: json_feature[MULTIPLE_REFERENCES_ALLOWED_FIELD] = feature.multipleReferencesAllowed if not skip_element_type and feature.elementType is not None: json_feature[ELEMENT_TYPE_FIELD] = self._to_external_type_name(feature.elementType.name) return json_feature def _serialize_feature_structure(self, fs) -> dict: type_name = fs.type.name json_fs = OrderedDict() json_fs[ID_FIELD] = fs.xmiID json_fs[TYPE_FIELD] = type_name if type_name == TYPE_NAME_BYTE_ARRAY: if fs.elements: json_fs[ELEMENTS_FIELD] = base64.b64encode(bytes(fs.elements)).decode("ascii") return json_fs elif type_name in {TYPE_NAME_DOUBLE_ARRAY, TYPE_NAME_FLOAT_ARRAY}: if fs.elements: json_fs[ELEMENTS_FIELD] = [self._serialize_float_value(e) for e in fs.elements] return json_fs elif is_primitive_array(fs.type): if fs.elements: json_fs[ELEMENTS_FIELD] = fs.elements return json_fs elif TYPE_NAME_FS_ARRAY == type_name: if fs.elements: json_fs[ELEMENTS_FIELD] = [self._serialize_ref(e) for e in fs.elements] return json_fs for feature in fs.type.all_features: if feature.name in CasJsonSerializer._COMMON_FIELD_NAMES: continue feature_name = feature.name # Strip the underscore we added for reserved names if feature._has_reserved_name: feature_name = feature.name[:-1] # Skip over 'None' features value = getattr(fs, feature.name) if value is None: continue # Map back from offsets in Unicode codepoints to UIMA UTF-16 based offsets if feature.domainType.name == TYPE_NAME_ANNOTATION and feature_name == "begin" or feature_name == "end": sofa: Sofa = getattr(fs, "sofa") value = sofa._offset_converter.python_to_external(value) if feature.rangeType.name in {TYPE_NAME_DOUBLE, TYPE_NAME_FLOAT}: float_value = self._serialize_float_value(value) if isinstance(float_value, str): feature_name = NUMBER_FEATURE_PREFIX + feature_name json_fs[feature_name] = self._serialize_float_value(value) elif is_primitive(feature.rangeType): json_fs[feature_name] = value else: # We need to encode non-primitive features as a reference json_fs[REF_FEATURE_PREFIX + feature_name] = self._serialize_ref(value) return json_fs def _serialize_float_value(self, value) -> Union[float, str]: if isnan(value): return NAN_VALUE elif math.isinf(value): if value > 0: return POSITIVE_INFINITE_VALUE else: return NEGATIVE_INFINITE_VALUE return value def _serialize_ref(self, fs) -> int: if not fs: return None return fs.xmiID def _serialize_view(self, view: View): return { VIEW_SOFA_FIELD: view.sofa.xmiID, VIEW_MEMBERS_FIELD: sorted(x.xmiID for x in view.get_all_annotations()), } def _to_external_type_name(self, type_name: str): if type_name.startswith("uima.noNamespace."): return type_name.replace("uima.noNamespace.", "") return type_name