import base64
import json
import math
from collections import OrderedDict, defaultdict
from io import TextIOBase, TextIOWrapper
from math import isnan
from typing import Union, IO, Optional, Dict, List
from toposort import toposort_flatten
from cassis.cas import NAME_DEFAULT_SOFA, Cas, IdGenerator, Sofa, View
from cassis.typesystem import (
TYPE_NAME_ANNOTATION,
TypeSystem,
is_predefined,
merge_typesystems,
TYPE_NAME_SOFA,
FEATURE_BASE_NAME_SOFAARRAY,
array_type_name_for_type,
FEATURE_BASE_NAME_SOFASTRING,
FEATURE_BASE_NAME_SOFAID,
FEATURE_BASE_NAME_SOFAMIME,
FEATURE_BASE_NAME_SOFANUM,
FEATURE_BASE_NAME_SOFAURI,
TYPE_NAME_FS_ARRAY,
TYPE_NAME_BYTE_ARRAY,
TYPE_NAME_FLOAT_ARRAY,
TYPE_NAME_DOUBLE_ARRAY,
TypeSystemMode,
TYPE_NAME_DOCUMENT_ANNOTATION,
Type,
Feature,
TYPE_NAME_TOP,
is_primitive_array,
TYPE_NAME_FLOAT,
TYPE_NAME_DOUBLE,
element_type_name_for_array_type,
is_primitive,
is_array,
)
RESERVED_FIELD_PREFIX = "%"
REF_FEATURE_PREFIX = "@"
NUMBER_FEATURE_PREFIX = "#"
ANCHOR_FEATURE_PREFIX = "^"
TYPE_FIELD = RESERVED_FIELD_PREFIX + "TYPE"
RANGE_FIELD = RESERVED_FIELD_PREFIX + "RANGE"
TYPES_FIELD = RESERVED_FIELD_PREFIX + "TYPES"
FEATURES_FIELD = RESERVED_FIELD_PREFIX + "FEATURES"
VIEWS_FIELD = RESERVED_FIELD_PREFIX + "VIEWS"
VIEW_SOFA_FIELD = RESERVED_FIELD_PREFIX + "SOFA"
VIEW_MEMBERS_FIELD = RESERVED_FIELD_PREFIX + "MEMBERS"
FEATURE_STRUCTURES_FIELD = RESERVED_FIELD_PREFIX + "FEATURE_STRUCTURES"
NAME_FIELD = RESERVED_FIELD_PREFIX + "NAME"
SUPER_TYPE_FIELD = RESERVED_FIELD_PREFIX + "SUPER_TYPE"
DESCRIPTION_FIELD = RESERVED_FIELD_PREFIX + "DESCRIPTION"
ELEMENT_TYPE_FIELD = RESERVED_FIELD_PREFIX + "ELEMENT_TYPE"
MULTIPLE_REFERENCES_ALLOWED_FIELD = RESERVED_FIELD_PREFIX + "MULTIPLE_REFERENCES_ALLOWED"
ID_FIELD = RESERVED_FIELD_PREFIX + "ID"
FLAGS_FIELD = RESERVED_FIELD_PREFIX + "FLAGS"
FLAG_DOCUMENT_ANNOTATION = "DocumentAnnotation"
ARRAY_SUFFIX = "[]"
ELEMENTS_FIELD = RESERVED_FIELD_PREFIX + "ELEMENTS"
NAN_VALUE = "NaN"
POSITIVE_INFINITE_VALUE = "Infinity"
POSITIVE_INFINITE_VALUE_ABBR = "Inf"
NEGATIVE_INFINITE_VALUE = "-Infinity"
NEGATIVE_INFINITE_VALUE_ABBR = "-Inf"
[docs]
def load_cas_from_json(
source: Union[IO, str], typesystem: TypeSystem = None, lenient: bool = False, merge_typesystem: bool = True
) -> Cas:
"""Loads a CAS from a JSON source.
Args:
source: The JSON source. If `source` is a string, then it is assumed to be an JSON string.
If `source` is a file-like object, then the data is read from it.
typesystem: The type system that belongs to this CAS. If `None`, an empty type system is provided.
lenient: If `True`, unknown Types will be ignored. If `False`, unknown Types will cause an exception.
The default is `False`.
Returns:
The deserialized CAS
"""
if typesystem is None:
typesystem = TypeSystem()
deserializer = CasJsonDeserializer()
return deserializer.deserialize(source, typesystem=typesystem, lenient=lenient, merge_typesystem=merge_typesystem)
class CasJsonDeserializer:
def __init__(self):
self._max_xmi_id = 0
self._max_sofa_num = 0
self._post_processors = []
def deserialize(
self,
source: Union[IO, str],
typesystem: Optional[TypeSystem] = None,
lenient: bool = False,
merge_typesystem: bool = True,
) -> Cas:
if isinstance(source, str):
data = json.loads(source)
else:
data = json.load(source)
self._max_xmi_id = 0
self._max_sofa_num = 0
self._post_processors = []
if merge_typesystem:
json_typesystem = data.get(TYPES_FIELD)
embedded_typesystem = TypeSystem(
add_document_annotation_type=not (json_typesystem.get(FLAG_DOCUMENT_ANNOTATION))
)
# First, build a dependency graph to support cases where a child type is defined before its super type
type_dependencies = defaultdict(set)
for type_name, json_type in json_typesystem.items():
type_dependencies[type_name].add(json_type[SUPER_TYPE_FIELD])
# Second, load all the types but no features since features of a type X might be of a later loaded type Y
for type_name in toposort_flatten(type_dependencies):
if is_predefined(type_name) or embedded_typesystem.contains_type(type_name):
continue
self._parse_type(embedded_typesystem, type_name, json_typesystem[type_name])
# Now we are sure we know all the types, we can create the features
for type_name, json_type in json_typesystem.items():
self._parse_features(embedded_typesystem, type_name, json_type)
typesystem = merge_typesystems(typesystem, embedded_typesystem)
cas = Cas(typesystem=typesystem, lenient=lenient)
feature_structures = {}
json_feature_structures = data.get(FEATURE_STRUCTURES_FIELD)
if isinstance(json_feature_structures, list):
def parse_and_add(json_fs_):
parsed = self._parse_feature_structure(typesystem, json_fs_.get(ID_FIELD), json_fs_, feature_structures)
feature_structures[parsed.xmiID] = parsed
# According to the JSON CAS 0.4.0 spec, we should be able to do this in a single loop as SofaFSes
# should normally appear before any FSes referring to them. However, the Java implementation currently
# does not do this, so we do two passes to be able to read its data.
for json_fs in json_feature_structures:
if json_fs.get(TYPE_FIELD) == TYPE_NAME_SOFA:
# In case the Sofa references a byte array that has not been parsed yet, we need to fetch it
sofa_byte_array_ref = json_fs.get(REF_FEATURE_PREFIX + FEATURE_BASE_NAME_SOFAARRAY)
if sofa_byte_array_ref and not feature_structures.get(sofa_byte_array_ref):
for json_fs_2 in json_feature_structures:
if json_fs_2.get(ID_FIELD) == sofa_byte_array_ref:
parse_and_add(json_fs_2)
fs_id = json_fs.get(ID_FIELD)
fs = self._parse_sofa(cas, fs_id, json_fs, feature_structures)
feature_structures[fs.xmiID] = fs
for json_fs in json_feature_structures:
if json_fs.get(TYPE_FIELD) != TYPE_NAME_SOFA:
parse_and_add(json_fs)
if isinstance(json_feature_structures, dict):
def parse_and_add(fs_id_, json_fs_):
parsed = self._parse_feature_structure(typesystem, int(fs_id_), json_fs_, feature_structures)
feature_structures[parsed.xmiID] = parsed
# According to the JSON CAS 0.4.0 spec, we should be able to do this in a single loop as SofaFSes
# should normally appear before any FSes referring to them. However, the Java implementation currently
# does not do this, so we do two passes to be able to read its data.
for fs_id, json_fs in json_feature_structures.items():
if json_fs.get(TYPE_FIELD) == TYPE_NAME_SOFA:
# In case the Sofa references a byte array that has not been parsed yet, we need to fetch it
sofa_byte_array_ref = json_fs.get(REF_FEATURE_PREFIX + FEATURE_BASE_NAME_SOFAARRAY)
if sofa_byte_array_ref and not feature_structures.get(sofa_byte_array_ref):
parse_and_add(sofa_byte_array_ref, json_feature_structures.get(sofa_byte_array_ref))
fs_id = int(fs_id)
fs = self._parse_sofa(cas, fs_id, json_fs, feature_structures)
feature_structures[fs.xmiID] = fs
for fs_id, json_fs in json_feature_structures.items():
if json_fs.get(TYPE_FIELD) != TYPE_NAME_SOFA:
parse_and_add(fs_id, json_fs)
for post_processor in self._post_processors:
post_processor()
cas._xmi_id_generator = IdGenerator(self._max_xmi_id + 1)
cas._sofa_num_generator = IdGenerator(self._max_sofa_num + 1)
# At this point all views for which we have a sofa with a known ID and sofaNum have already been created
# as part of parsing the feature structures. Thus, if there are any views remaining that are only declared
# in the views section, we just create them with auto-assigned IDs
json_views = data.get(VIEWS_FIELD)
for view_name, json_view in json_views.items():
self._parse_view(cas, view_name, json_view, feature_structures)
return cas
def _parse_type(self, typesystem: TypeSystem, type_name: str, json_type: Dict[str, any]):
super_type_name = json_type[SUPER_TYPE_FIELD]
description = json_type.get(DESCRIPTION_FIELD)
typesystem.create_type(type_name, super_type_name, description=description)
def _parse_features(self, typesystem: TypeSystem, type_name: str, json_type: Dict[str, any]):
new_type = typesystem.get_type(type_name)
for key, json_feature in json_type.items():
if key.startswith(RESERVED_FIELD_PREFIX):
continue
range_type = json_feature[RANGE_FIELD]
element_type = json_feature.get(ELEMENT_TYPE_FIELD)
if range_type.endswith("[]"):
element_type = range_type[:-2]
range_type = array_type_name_for_type(element_type)
typesystem.create_feature(
new_type,
name=key,
rangeType=range_type,
elementType=element_type,
description=json_feature.get(DESCRIPTION_FIELD),
multipleReferencesAllowed=json_feature.get(MULTIPLE_REFERENCES_ALLOWED_FIELD),
)
def _get_or_create_view(
self, cas: Cas, view_name: str, fs_id: Optional[int] = None, sofa_num: Optional[int] = None
) -> Cas:
if view_name == NAME_DEFAULT_SOFA:
view = cas.get_view(NAME_DEFAULT_SOFA)
# We need to make sure that the sofa gets the real xmi, see #155
if fs_id is not None:
view.get_sofa().xmiID = fs_id
return view
else:
return cas.create_view(view_name, xmiID=fs_id, sofaNum=sofa_num)
def _parse_view(self, cas: Cas, view_name: str, json_view: Dict[str, any], feature_structures: Dict[str, any]):
view = self._get_or_create_view(cas, view_name)
for member_id in json_view[VIEW_MEMBERS_FIELD]:
fs = feature_structures[member_id]
view.add(fs, keep_id=True)
def _parse_sofa(self, cas: Cas, fs_id: int, json_fs: Dict[str, any], feature_structures: Dict[int, any]) -> Sofa:
view = self._get_or_create_view(
cas, json_fs.get(FEATURE_BASE_NAME_SOFAID), fs_id, json_fs.get(FEATURE_BASE_NAME_SOFANUM)
)
view.sofa_string = json_fs.get(FEATURE_BASE_NAME_SOFASTRING)
view.sofa_mime = json_fs.get(FEATURE_BASE_NAME_SOFAMIME)
view.sofa_uri = json_fs.get(FEATURE_BASE_NAME_SOFAURI)
view.sofa_array = feature_structures.get(json_fs.get(REF_FEATURE_PREFIX + FEATURE_BASE_NAME_SOFAARRAY))
return view.get_sofa()
def _parse_feature_structure(
self, typesystem: TypeSystem, fs_id: int, json_fs: Dict[str, any], feature_structures: Dict[int, any]
):
type_name = json_fs.get(TYPE_FIELD)
if type_name.endswith("[]"):
type_name = array_type_name_for_type(type_name)
AnnotationType = typesystem.get_type(type_name)
attributes = dict(json_fs)
# Map the JSON FS ID to xmiID
attributes["xmiID"] = fs_id
# Remap features that use a reserved Python name
if "self" in attributes:
attributes["self_"] = attributes.pop("self")
if "type" in attributes:
attributes["type_"] = attributes.pop("type")
if typesystem.is_primitive_array(AnnotationType.name):
attributes["elements"] = self._parse_primitive_array(AnnotationType.name, json_fs.get(ELEMENTS_FIELD))
elif AnnotationType.name == TYPE_NAME_FS_ARRAY:
# Resolve id-ref at the end of processing
def fix_up(elements):
return lambda: setattr(fs, "elements", [feature_structures.get(e) for e in elements])
self._post_processors.append(fix_up(json_fs.get(ELEMENTS_FIELD)))
self._strip_reserved_json_keys(attributes)
ref_features = {}
for key, value in list(attributes.items()):
if key.startswith(REF_FEATURE_PREFIX):
ref_features[key[1:]] = value
attributes.pop(key)
if key.startswith(NUMBER_FEATURE_PREFIX):
attributes[key[1:]] = self._parse_float_value(value)
attributes.pop(key)
self._max_xmi_id = max(attributes["xmiID"], self._max_xmi_id)
fs = AnnotationType(**attributes)
self._resolve_references(fs, ref_features, feature_structures)
# Map from offsets in UIMA UTF-16 based offsets to Unicode codepoints
if typesystem.is_instance_of(fs.type, TYPE_NAME_ANNOTATION):
sofa = fs.sofa
fs.begin = sofa._offset_converter.external_to_python(fs.begin)
fs.end = sofa._offset_converter.external_to_python(fs.end)
return fs
def _parse_float_value(self, value: Union[str, float]) -> float:
if isinstance(value, float):
return value
elif value == NAN_VALUE:
return float("nan")
elif value == POSITIVE_INFINITE_VALUE or value == POSITIVE_INFINITE_VALUE_ABBR:
return float("inf")
elif value == NEGATIVE_INFINITE_VALUE or value == NEGATIVE_INFINITE_VALUE_ABBR:
return float("-inf")
raise ValueError(
f"Illegal floating point value [{value}]. Must be a float literal or one of {NAN_VALUE}, "
f"{POSITIVE_INFINITE_VALUE}, {POSITIVE_INFINITE_VALUE_ABBR}, {NEGATIVE_INFINITE_VALUE}, or "
f"{NEGATIVE_INFINITE_VALUE_ABBR}"
)
def _parse_primitive_array(self, type_name: str, elements: [list, str]) -> List:
if elements and type_name == TYPE_NAME_BYTE_ARRAY:
return base64.b64decode(elements)
if elements and (type_name == TYPE_NAME_FLOAT_ARRAY or type_name == TYPE_NAME_DOUBLE_ARRAY):
return [self._parse_float_value(v) for v in elements]
else:
return elements
def _resolve_references(self, fs, ref_features: Dict[str, any], feature_structures: Dict[int, any]):
for key, value in ref_features.items():
target_fs = feature_structures.get(value)
if target_fs:
# Resolve id-ref now
setattr(fs, key, target_fs)
else:
# Resolve id-ref at the end of processing
def fix_up(k, v):
return lambda: setattr(fs, k, feature_structures.get(v))
self._post_processors.append(fix_up(key, value))
def _strip_reserved_json_keys(
self,
attributes: Dict[str, any],
):
for key in list(attributes):
if key.startswith(RESERVED_FIELD_PREFIX):
attributes.pop(key)
class CasJsonSerializer:
_COMMON_FIELD_NAMES = {"xmiID", "type"}
def __init__(self):
pass
def serialize(
self,
sink: Union[IO, str, None],
cas: Cas,
pretty_print: bool = True,
ensure_ascii: bool = False,
type_system_mode: TypeSystemMode = TypeSystemMode.FULL,
) -> Union[str, None]:
feature_structures = []
views = {}
for view in cas.views:
views[view.sofa.sofaID] = self._serialize_view(view)
if view.sofa.sofaArray:
json_sofa_array_fs = self._serialize_feature_structure(view.sofa.sofaArray)
feature_structures.append(json_sofa_array_fs)
json_sofa_fs = self._serialize_feature_structure(view.sofa)
feature_structures.append(json_sofa_fs)
# Find all fs, even the ones that are not directly added to a sofa
used_types = set()
for fs in sorted(cas._find_all_fs(include_inlinable_arrays_and_lists=True), key=lambda a: a.xmiID):
used_types.add(fs.type)
json_fs = self._serialize_feature_structure(fs)
feature_structures.append(json_fs)
types = None
if type_system_mode is not TypeSystemMode.NONE:
types = {}
if type_system_mode is TypeSystemMode.MINIMAL:
# Build transitive closure of used types by following parents, features, etc.
types_to_include = cas.typesystem.transitive_closure(used_types)
elif type_system_mode is TypeSystemMode.FULL:
types_to_include = cas.typesystem.get_types()
else:
raise Exception(f"Invalid type system mode: [{type_system_mode}]")
for type_ in sorted(types_to_include, key=lambda x: x.name):
if type_.name == TYPE_NAME_DOCUMENT_ANNOTATION:
continue
json_type = self._serialize_type(type_)
types[json_type[NAME_FIELD]] = json_type
data = {}
if types is not None:
data[TYPES_FIELD] = types
if feature_structures is not None:
data[FEATURE_STRUCTURES_FIELD] = feature_structures
if views is not None:
data[VIEWS_FIELD] = views
if sink and not isinstance(sink, TextIOBase):
sink = TextIOWrapper(sink, encoding="utf-8", write_through=True)
if sink:
json.dump(
data,
sink,
sort_keys=False,
indent=2 if pretty_print else None,
ensure_ascii=ensure_ascii,
allow_nan=False,
)
else:
return json.dumps(
data, sort_keys=False, indent=2 if pretty_print else None, ensure_ascii=ensure_ascii, allow_nan=False
)
if isinstance(sink, TextIOWrapper):
sink.detach() # Prevent TextIOWrapper from closing the BytesIO
return None
def _serialize_type(self, type_: Type):
type_name = self._to_external_type_name(type_.name)
supertype_name = self._to_external_type_name(type_.supertype.name)
json_type = {
NAME_FIELD: type_name,
SUPER_TYPE_FIELD: supertype_name,
}
if type_.description:
json_type[DESCRIPTION_FIELD] = type_.description
for feature in list(type_.features):
json_feature = self._serialize_feature(json_type, feature)
json_type[json_feature[NAME_FIELD]] = json_feature
return json_type
def _serialize_feature(self, json_type, feature: Feature):
# If the feature name is a reserved name like `self`, then we added an
# underscore to it before so Python can handle it. We now need to remove it.
feature_name = feature.name
if feature._has_reserved_name:
feature_name = feature_name[:-1]
range_type_name = self._to_external_type_name(feature.rangeType.name)
skip_element_type = False
if is_array(feature.rangeType):
skip_element_type = True
if is_primitive_array(feature.rangeType):
range_type_name = element_type_name_for_array_type(feature.rangeType) + "[]"
elif feature.elementType:
range_type_name = self._to_external_type_name(feature.elementType.name) + "[]"
else:
range_type_name = TYPE_NAME_TOP + "[]"
json_feature = {
NAME_FIELD: feature_name,
RANGE_FIELD: range_type_name,
}
if feature.description:
json_feature[DESCRIPTION_FIELD] = feature.description
if feature.multipleReferencesAllowed is not None:
json_feature[MULTIPLE_REFERENCES_ALLOWED_FIELD] = feature.multipleReferencesAllowed
if not skip_element_type and feature.elementType is not None:
json_feature[ELEMENT_TYPE_FIELD] = self._to_external_type_name(feature.elementType.name)
return json_feature
def _serialize_feature_structure(self, fs) -> dict:
type_name = fs.type.name
json_fs = OrderedDict()
json_fs[ID_FIELD] = fs.xmiID
json_fs[TYPE_FIELD] = type_name
if type_name == TYPE_NAME_BYTE_ARRAY:
if fs.elements:
json_fs[ELEMENTS_FIELD] = base64.b64encode(bytes(fs.elements)).decode("ascii")
return json_fs
elif type_name in {TYPE_NAME_DOUBLE_ARRAY, TYPE_NAME_FLOAT_ARRAY}:
if fs.elements:
json_fs[ELEMENTS_FIELD] = [self._serialize_float_value(e) for e in fs.elements]
return json_fs
elif is_primitive_array(fs.type):
if fs.elements:
json_fs[ELEMENTS_FIELD] = fs.elements
return json_fs
elif TYPE_NAME_FS_ARRAY == type_name:
if fs.elements:
json_fs[ELEMENTS_FIELD] = [self._serialize_ref(e) for e in fs.elements]
return json_fs
for feature in fs.type.all_features:
if feature.name in CasJsonSerializer._COMMON_FIELD_NAMES:
continue
feature_name = feature.name
# Strip the underscore we added for reserved names
if feature._has_reserved_name:
feature_name = feature.name[:-1]
# Skip over 'None' features
value = getattr(fs, feature.name)
if value is None:
continue
# Map back from offsets in Unicode codepoints to UIMA UTF-16 based offsets
if feature.domainType.name == TYPE_NAME_ANNOTATION and feature_name == "begin" or feature_name == "end":
sofa: Sofa = getattr(fs, "sofa")
value = sofa._offset_converter.python_to_external(value)
if feature.rangeType.name in {TYPE_NAME_DOUBLE, TYPE_NAME_FLOAT}:
float_value = self._serialize_float_value(value)
if isinstance(float_value, str):
feature_name = NUMBER_FEATURE_PREFIX + feature_name
json_fs[feature_name] = self._serialize_float_value(value)
elif is_primitive(feature.rangeType):
json_fs[feature_name] = value
else:
# We need to encode non-primitive features as a reference
json_fs[REF_FEATURE_PREFIX + feature_name] = self._serialize_ref(value)
return json_fs
def _serialize_float_value(self, value) -> Union[float, str]:
if isnan(value):
return NAN_VALUE
elif math.isinf(value):
if value > 0:
return POSITIVE_INFINITE_VALUE
else:
return NEGATIVE_INFINITE_VALUE
return value
def _serialize_ref(self, fs) -> int:
if not fs:
return None
return fs.xmiID
def _serialize_view(self, view: View):
return {
VIEW_SOFA_FIELD: view.sofa.xmiID,
VIEW_MEMBERS_FIELD: sorted(x.xmiID for x in view.get_all_annotations()),
}
def _to_external_type_name(self, type_name: str):
if type_name.startswith("uima.noNamespace."):
return type_name.replace("uima.noNamespace.", "")
return type_name