Source code for cassis.xmi

import warnings
from collections import defaultdict
from io import BytesIO
from math import isinf, isnan
from pathlib import Path
from typing import IO, Dict, List, Union

import attr
from lxml import etree

from cassis.cas import Cas, IdGenerator, Sofa, View
from cassis.typesystem import (


class ProtoView:
    """A view element from XMI."""

    sofa: int = attr.ib(validator=attr.validators.instance_of(int))
    members: List[int] = attr.ib(factory=list)

[docs] def load_cas_from_xmi( source: Union[IO, Path, str], typesystem: TypeSystem = None, lenient: bool = False, trusted: bool = False ) -> Cas: """Loads a CAS from a XMI source. Args: source: The XML source. If `source` is a string, then it is assumed to be an XML string. If `source` is a file-like object, then the data is read from it. If `source` is a `Path`, then load the file at the given location. typesystem: The type system that belongs to this CAS. If `None`, an empty type system is provided. lenient: If `True`, unknown Types will be ignored. If `False`, unknown Types will cause an exception. The default is `False`. trusted: If `True`, disables checks like XML parser security restrictions. Returns: The deserialized CAS """ if typesystem is None: typesystem = TypeSystem() deserializer = CasXmiDeserializer() if isinstance(source, str): return deserializer.deserialize( BytesIO(source.encode("utf-8")), typesystem=typesystem, lenient=lenient, trusted=trusted ) if isinstance(source, Path): with"rb") as src: return deserializer.deserialize(src, typesystem=typesystem, lenient=lenient, trusted=trusted) else: return deserializer.deserialize(source, typesystem=typesystem, lenient=lenient, trusted=trusted)
class CasXmiDeserializer: def __init__(self): self._max_xmi_id = 0 self._max_sofa_num = 0 def deserialize(self, source: Union[IO, str], typesystem: TypeSystem, lenient: bool, trusted: bool): # namespaces NS_XMI = "{}" NS_CAS = "{http:///uima/cas.ecore}" TAG_XMI = NS_XMI + "XMI" TAG_CAS_SOFA = NS_CAS + "Sofa" TAG_CAS_VIEW = NS_CAS + "View" OUTSIDE_FS = 1 INSIDE_FS = 2 INSIDE_ARRAY = 3 sofas = {} views = {} feature_structures = {} children = defaultdict(list) lenient_ids = set() context = etree.iterparse(source, events=("start", "end"), huge_tree=trusted) state = OUTSIDE_FS self._max_xmi_id = 0 self._max_sofa_num = 0 for event, elem in context: # Ignore the 'xmi:XMI' if elem.tag == TAG_XMI: pass elif elem.tag == TAG_CAS_SOFA: if event == "end": sofa = self._parse_sofa(typesystem, elem) sofas[sofa.xmiID] = sofa elif elem.tag == TAG_CAS_VIEW: if event == "end": proto_view = self._parse_view(elem) views[proto_view.sofa] = proto_view else: """ In XMI, array element features can be encoded as <cas:StringArray> <elements>LNC</elements> <elements>MTH</elements> <elements>SNOMEDCT_US</elements> </cas:StringArray> In order to parse this with an incremental XML parser, we need to employ a simple state machine. It is depicted in the following. "start" "start" +-----------+-------->+-----------+-------->+--------+ | Outside | | Inside | | Inside | +--->+ feature | | feature | | array | | structure | | structure | | element| +-----------+<--------+-----------+<--------+--------+ "end" "end" """ if event == "start": if state == OUTSIDE_FS: # We saw the opening tag of a new feature structure state = INSIDE_FS elif state == INSIDE_FS: # We saw the opening tag of an array element state = INSIDE_ARRAY else: raise RuntimeError(f"Invalid state transition: [{state}] 'start'") elif event == "end": if state == INSIDE_FS: # We saw the closing tag of a new feature state = OUTSIDE_FS # If a type was not found, ignore it if lenient, else raise an exception try: fs = self._parse_feature_structure(typesystem, elem, children) feature_structures[fs.xmiID] = fs except TypeNotFoundError as e: if not lenient: raise e warnings.warn(e.message) xmiID = elem.attrib.get("{}id", None) if xmiID: lenient_ids.add(int(xmiID)) children.clear() elif state == INSIDE_ARRAY: # We saw the closing tag of an array element children[elem.tag].append(elem.text) state = INSIDE_FS else: raise RuntimeError(f"Invalid state transition: [{state}] 'end'") else: raise RuntimeError(f"Invalid XML event: [{event}]") # Free already processed elements from memory if event == "end": self._clear_elem(elem) # See # The checking for each feature if it is a StringArray is rather slow, hence, we cache the results is_instance_of_string_array_map = {} # Post-process feature values for xmi_id, fs in feature_structures.items(): t = typesystem.get_type( for feature in t.all_features: feature_name = value = fs[feature_name] if feature_name == "sofa": fs[feature_name] = sofas[value] continue if not in is_instance_of_string_array_map: is_instance_of_string_array_map[] = typesystem.is_instance_of(, TYPE_NAME_STRING_ARRAY ) if is_instance_of_string_array_map[]: # We already parsed string arrays to a Python list of string # before, so we do not need to work more on this continue elif typesystem.is_primitive(feature.rangeType): fs[feature_name] = self._parse_primitive_value(feature.rangeType, value) continue elif typesystem.is_primitive_array(fs.type) and feature_name == "elements": # Separately rendered arrays (typically used with multipleReferencesAllowed = True) fs[feature_name] = self._parse_primitive_array(fs.type, value) elif typesystem.is_primitive_array(feature.rangeType) and not feature.multipleReferencesAllowed: # Array feature rendered inline (multipleReferencesAllowed = False|None) # We also end up here for array features that were rendered as child elements. No need to parse # them again, so we check if the value is still a string (i.e. attribute value) and only then # process it if isinstance(value, str): FSType = feature.rangeType fs[feature_name] = FSType(elements=self._parse_primitive_array(feature.rangeType, value)) elif typesystem.is_primitive_list(feature.rangeType) and not feature.multipleReferencesAllowed: # Array feature rendered inline (multipleReferencesAllowed = False|None) # We also end up here for array features that were rendered as child elements. No need to parse # them again, so we check if the value is still a string (i.e. attribute value) and only then # process it if isinstance(value, str): fs[feature_name] = self._parse_primitive_list(feature.rangeType, value) else: # Resolve references here if value is None: continue # Resolve references if == TYPE_NAME_FS_ARRAY or ( == TYPE_NAME_FS_ARRAY and not feature.multipleReferencesAllowed ): # An array of references is a list of integers separated # by single spaces, e.g. <foo:bar elements="1 2 3 42" /> targets = [] for ref in value.split(): target_id = int(ref) target = feature_structures[target_id] targets.append(target) if == TYPE_NAME_FS_ARRAY: # Wrap inline array into the appropriate array object ArrayType = typesystem.get_type(TYPE_NAME_FS_ARRAY) targets = ArrayType(elements=targets) fs[feature_name] = targets elif == TYPE_NAME_FS_LIST and not feature.multipleReferencesAllowed: # Array feature rendered inline (multipleReferencesAllowed = False|None) # We also end up here for array features that were rendered as child elements. No need to parse # them again, so we check if the value is still a string (i.e. attribute value) and only then # process it if isinstance(value, list) or isinstance(value, str): fs[feature_name] = self._parse_fs_list(feature_structures, feature.rangeType, value) else: target_id = int(value) fs[feature_name] = feature_structures[target_id] cas = Cas(typesystem=typesystem, lenient=lenient) for sofa in sofas.values(): if sofa.sofaID == "_InitialView": view = cas.get_view("_InitialView") # We need to make sure that the sofa gets the real xmi, see #155 view.get_sofa().xmiID = sofa.xmiID else: view = cas.create_view(sofa.sofaID, xmiID=sofa.xmiID, sofaNum=sofa.sofaNum) # Directly set the sofaString and offsetConverter for the sofa to avoid recomputing the offset convertion (slow!) when using the setter view.get_sofa()._sofaString = sofa.sofaString view.get_sofa()._offset_converter = sofa._offset_converter view.sofa_mime = sofa.mimeType # If a sofa has no members, then UIMA might omit the view. In that case, # we create an empty view for it. if sofa.xmiID in views: proto_view = views[sofa.xmiID] else: proto_view = ProtoView(sofa.xmiID) for member_id in proto_view.members: # We ignore ids of feature structures for which we do not have a type if member_id in lenient_ids: continue fs = feature_structures[member_id] # Map from offsets in UIMA UTF-16 based offsets to Unicode codepoints if typesystem.is_instance_of(, TYPE_NAME_ANNOTATION): fs.begin = sofa._offset_converter.external_to_python(fs.begin) fs.end = sofa._offset_converter.external_to_python(fs.end) view.add(fs, keep_id=True) cas._xmi_id_generator = IdGenerator(self._max_xmi_id + 1) cas._sofa_num_generator = IdGenerator(self._max_sofa_num + 1) return cas def _parse_sofa(self, typesystem: TypeSystem, elem) -> Sofa: attributes = dict(elem.attrib) attributes["xmiID"] = int(attributes.pop("{}id")) attributes["sofaNum"] = int(attributes["sofaNum"]) attributes["type"] = typesystem.get_type(TYPE_NAME_SOFA) self._max_xmi_id = max(attributes["xmiID"], self._max_xmi_id) self._max_sofa_num = max(attributes["sofaNum"], self._max_sofa_num) return Sofa(**attributes) def _parse_view(self, elem) -> ProtoView: attributes = elem.attrib sofa = int(attributes["sofa"]) members = [int(e) for e in attributes.get("members", "").strip().split()] result = ProtoView(sofa=sofa, members=members) attr.validate(result) return result def _parse_feature_structure(self, typesystem: TypeSystem, elem, children: Dict[str, List[str]]): # Strip the http prefix, replace / with ., remove the ecore part # TODO: Error checking type_name: str = elem.tag[9:].replace("/", ".").replace("ecore}", "").strip() if type_name.startswith("uima.noNamespace."): type_name = type_name[17:] AnnotationType = typesystem.get_type(type_name) attributes = dict(elem.attrib) attributes.update(children) # Map the xmi:id attribute to xmiID attributes["xmiID"] = int(attributes.pop("{}id")) if "begin" in attributes: attributes["begin"] = int(attributes["begin"]) if "end" in attributes: attributes["end"] = int(attributes["end"]) if "sofa" in attributes: attributes["sofa"] = int(attributes["sofa"]) # Remap features that use a reserved Python name if "self" in attributes: attributes["self_"] = attributes.pop("self") if "type" in attributes: attributes["type_"] = attributes.pop("type") # Arrays which were represented as nested elements in the XMI have so far have only been parsed into a Python # arrays. Now we convert them to proper UIMA arrays/lists if not typesystem.is_primitive_array(type_name): for feature_name, feature_value in children.items(): feature = AnnotationType.get_feature(feature_name) if typesystem.is_primitive_array(feature.rangeType): ArrayType = feature.rangeType attributes[feature_name] = ArrayType(elements=attributes[feature_name]) if typesystem.is_primitive_list(feature.rangeType): attributes[feature_name] = self._parse_primitive_list(feature.rangeType, attributes[feature_name]) self._max_xmi_id = max(attributes["xmiID"], self._max_xmi_id) return AnnotationType(**attributes) def _parse_primitive_list(self, type_: Type, value: Union[str, List[str]]): if value is None: return None # Convert the inline array into the linked NonEmptyList/EmptyList instances if == TYPE_NAME_INTEGER_LIST: EmptyList = type_.typesystem.get_type(TYPE_NAME_EMPTY_INTEGER_LIST) NonEmptyList = type_.typesystem.get_type(TYPE_NAME_NON_EMPTY_INTEGER_LIST) conv = int elif == TYPE_NAME_FLOAT_LIST: EmptyList = type_.typesystem.get_type(TYPE_NAME_EMPTY_FLOAT_LIST) NonEmptyList = type_.typesystem.get_type(TYPE_NAME_NON_EMPTY_FLOAT_LIST) conv = float elif == TYPE_NAME_STRING_LIST: EmptyList = type_.typesystem.get_type(TYPE_NAME_EMPTY_STRING_LIST) NonEmptyList = type_.typesystem.get_type(TYPE_NAME_NON_EMPTY_STRING_LIST) conv = str else: raise ValueError(f"Unexpected primitive list type: {}") elements = value.split() if isinstance(value, str) else value head = EmptyList() for e in reversed(elements): tail = head head = NonEmptyList() head.set(FEATURE_BASE_NAME_HEAD, conv(e)) head.set(FEATURE_BASE_NAME_TAIL, tail) return head def _parse_fs_list(self, feature_structures, type_: Type, value: str): # Convert the inline array into the linked NonEmptyFSList/EmptyFSList instances NonEmptyFSList = type_.typesystem.get_type(TYPE_NAME_NON_EMPTY_FS_LIST) EmptyFSList = type_.typesystem.get_type(TYPE_NAME_EMPTY_FS_LIST) elements = value.split() if isinstance(value, str) else value head = EmptyFSList() for e in reversed(elements): tail = head head = NonEmptyFSList() head.set(FEATURE_BASE_NAME_HEAD, feature_structures[int(e)]) head.set(FEATURE_BASE_NAME_TAIL, tail) return head def _parse_primitive_array(self, type_: Type, value: Union[str, List[str]]) -> List: """Primitive collections are serialized as white space separated primitive values""" if value is None: return None # TODO: Use type name global variable here instead of hardcoded string literal elements = value.split() if isinstance(value, str) else value type_name = if type_name in [TYPE_NAME_FLOAT_ARRAY, TYPE_NAME_DOUBLE_ARRAY]: return [float(e) for e in elements] if value else [] elif type_name in [TYPE_NAME_INTEGER_ARRAY, TYPE_NAME_SHORT_ARRAY, TYPE_NAME_LONG_ARRAY]: return [int(e) for e in elements] if value else [] elif type_name == TYPE_NAME_STRING_ARRAY: if elements: raise ValueError(f"String array values must be provided as nested elements: {elements}") return [] elif type_name == TYPE_NAME_BOOLEAN_ARRAY: return [self._parse_bool(e) for e in elements] if value else [] elif type_name == TYPE_NAME_BYTE_ARRAY: return list(bytearray.fromhex(value)) if value else [] else: raise ValueError(f"Not a primitive collection type: {type_name}") def _parse_primitive_value(self, type_: Type, value: str) -> Union[float, int, bool, str, None]: type_name = if value is None: return None elif type_name == TYPE_NAME_STRING: return value elif type_name in [TYPE_NAME_FLOAT, TYPE_NAME_DOUBLE]: return float(value) elif type_name in [TYPE_NAME_INTEGER, TYPE_NAME_SHORT, TYPE_NAME_LONG, TYPE_NAME_BYTE]: return int(value) elif type_name == TYPE_NAME_BOOLEAN: return self._parse_bool(value) else: raise ValueError(f"Not a primitive type: {type_name}") def _parse_bool(self, s: str) -> bool: if s == "true": return True if s == "false": return False raise ValueError(f"Not a boolean: {s}") def _clear_elem(self, elem): """Frees XML nodes that already have been processed to save memory""" elem.clear() while elem.getprevious() is not None: del elem.getparent()[0] class CasXmiSerializer: _COMMON_FIELD_NAMES = {"xmiID", "type"} def __init__(self): self._nsmap = {"xmi": "", "cas": "http:///uima/cas.ecore"} self._urls_to_prefixes = {} self._duplicate_namespaces = defaultdict(int) def serialize(self, sink: Union[IO, str, None], cas: Cas, pretty_print=True) -> Union[str, None]: xmi_attrs = {"{}version": "2.0"} root = etree.Element(etree.QName(self._nsmap["xmi"], "XMI"), nsmap=self._nsmap, **xmi_attrs) self._serialize_cas_null(root) # Find all fs, even the ones that are not directly added to a sofa for fs in sorted(cas._find_all_fs(), key=lambda a: a.xmiID): self._serialize_feature_structure(cas, root, fs) for sofa in cas.sofas: self._serialize_sofa(root, sofa) for view in cas.views: self._serialize_view(root, view) doc = etree.ElementTree(root) etree.cleanup_namespaces(doc, top_nsmap=self._nsmap) return_str = sink is None if return_str: sink = BytesIO() doc.write(sink, xml_declaration=True, pretty_print=pretty_print, encoding="UTF-8") if return_str: return sink.getvalue().decode("utf-8") return None def _serialize_cas_null(self, root: etree.Element): name = etree.QName(self._nsmap["cas"], "NULL") elem = etree.SubElement(root, name) elem.attrib["{}id"] = "0" def _serialize_feature_structure(self, cas: Cas, root: etree.Element, fs: FeatureStructure): ts = cas.typesystem type_name = if "." not in type_name: type_name = f"uima.noNamespace.{type_name}" # The type name is a Java package, e.g. `org.myproj.Foo`. parts = type_name.split(".") # The CAS type namespace is converted to an XML namespace URI by the following rule: # replace all dots with slashes, prepend http:///, and append .ecore. url = "http:///" + "/".join(parts[:-1]) + ".ecore" # The cas prefix is the last component of the CAS namespace, which is the second to last # element of the type (the last part is the type name without package name), e.g. `myproj` raw_prefix = parts[-2] typename = parts[-1] # If the url has not been seen yet, compute the namespace and add it if url not in self._urls_to_prefixes: # If the prefix already exists, but maps to a different url, then add it with # a number at the end, e.g. `type0` new_prefix = raw_prefix if raw_prefix in self._nsmap: suffix = self._duplicate_namespaces[raw_prefix] self._duplicate_namespaces[raw_prefix] += 1 new_prefix = raw_prefix + str(suffix) self._nsmap[new_prefix] = url self._urls_to_prefixes[url] = new_prefix prefix = self._urls_to_prefixes[url] name = etree.QName(self._nsmap[prefix], typename) elem = etree.SubElement(root, name) # Serialize common attributes elem.attrib["{}id"] = str(fs.xmiID) # Case where arrays are rendered as separate elements (not inline) for use with multipleReferencesAllowed = True if ts.is_primitive_array( or == "uima.cas.FSArray": if fs.elements is None: return elif ts.is_instance_of(, "uima.cas.StringArray"): # String arrays need to be serialized to a series of child elements, as strings can # contain whitespaces. Consider e.g. the array ['likes cats, 'likes dogs']. If we would # serialize it as an attribute, it would look like # # <my:fs elements="likes cats likes dogs" /> # # which looses the information about the whitespace. Instead, we serialize it to # # <my:fs> # <elements>likes cats</elements> # <elements>likes dogs</elements> # </my:fs> for e in fs.elements: child = etree.SubElement(elem, "elements") child.text = e elif == "uima.cas.FSArray": elements = " ".join(str(e.xmiID) for e in fs.elements) elem.attrib["elements"] = elements else: elem.attrib["elements"] = self._serialize_primitive_array(, fs.elements) return # Serialize feature attributes t = fs.type for feature in t.all_features: if in CasXmiSerializer._COMMON_FIELD_NAMES: continue feature_name = # Strip the underscore we added for reserved names if feature._has_reserved_name: feature_name =[:-1] # Skip over 'None' features value = fs[] if value is None: continue # Map back from offsets in Unicode codepoints to UIMA UTF-16 based offsets if ( ts.is_instance_of(, TYPE_NAME_ANNOTATION) and feature_name == FEATURE_BASE_NAME_BEGIN or feature_name == FEATURE_BASE_NAME_END ): sofa: Sofa = fs.sofa value = sofa._offset_converter.python_to_external(value) if ts.is_instance_of(feature.rangeType, TYPE_NAME_STRING_ARRAY) and not feature.multipleReferencesAllowed: if value.elements is not None: # Compare to none as not to skip if elements is empty! if not value.elements: elem.attrib[feature_name] = "" else: for e in value.elements: child = etree.SubElement(elem, feature_name) child.text = e elif ts.is_instance_of(feature.rangeType, TYPE_NAME_STRING_LIST) and not feature.multipleReferencesAllowed: if value is not None: # Compare to none to not skip if elements is empty! for e in self._collect_list_elements(, value): child = etree.SubElement(elem, feature_name) child.text = e elif ts.is_primitive_array(feature.rangeType) and not feature.multipleReferencesAllowed: if value.elements is not None: # Compare to none to not skip if elements is empty! elem.attrib[feature_name] = self._serialize_primitive_array(, value.elements) elif ts.is_primitive_list(feature.rangeType) and not feature.multipleReferencesAllowed: if value is not None: # Compare to none to not skip if elements is empty! elem.attrib[feature_name] = self._serialize_primitive_list(, value) elif == TYPE_NAME_FS_ARRAY and not feature.multipleReferencesAllowed: if value.elements is not None: # Compare to none to not skip if elements is empty! elem.attrib[feature_name] = " ".join(str(e.xmiID) for e in value.elements) elif == TYPE_NAME_FS_LIST and not feature.multipleReferencesAllowed: if value is not None: # Compare to none to not skip if elements is empty! elem.attrib[feature_name] = " ".join( str(e.xmiID) for e in self._collect_list_elements(, value) ) elif feature_name == FEATURE_BASE_NAME_SOFA: elem.attrib[feature_name] = str(value.xmiID) elif == TYPE_NAME_BOOLEAN: elem.attrib[feature_name] = "true" if value else "false" elif in {TYPE_NAME_DOUBLE, TYPE_NAME_FLOAT}: elem.attrib[feature_name] = self._serialize_float_value(value) elif ts.is_primitive(feature.rangeType): elem.attrib[feature_name] = str(value) else: # We need to encode non-primitive features as a reference elem.attrib[feature_name] = str(value.xmiID) def _serialize_sofa(self, root: etree.Element, sofa: Sofa): name = etree.QName(self._nsmap["cas"], "Sofa") elem = etree.SubElement(root, name) elem.attrib["{}id"] = str(sofa.xmiID) elem.attrib["sofaNum"] = str(sofa.sofaNum) elem.attrib["sofaID"] = str(sofa.sofaID) if sofa.mimeType is not None: elem.attrib["mimeType"] = str(sofa.mimeType) if sofa.sofaString is not None: elem.attrib["sofaString"] = str(sofa.sofaString) def _serialize_view(self, root: etree.Element, view: View): name = etree.QName(self._nsmap["cas"], "View") elem = etree.SubElement(root, name) elem.attrib["sofa"] = str(view.sofa.xmiID) elem.attrib["members"] = " ".join(sorted((str(x.xmiID) for x in view.get_all_annotations()), key=int)) def _collect_list_elements(self, type_name: str, value) -> List[str]: if type_name not in _LIST_TYPES: raise ValueError(f"Not a primitive list: {type_name}") elements = [] current = value while hasattr(current, "head"): elements.append(current.head) current = current.tail return elements def _serialize_primitive_list(self, type_name: str, value) -> str: elements = [] for e in self._collect_list_elements(type_name, value): if isinstance(e, float): elements.append(self._serialize_float_value(e)) else: elements.append(str(e)) return " ".join(elements) def _serialize_primitive_array(self, type_name: str, values: List) -> str: """Primitive collections are serialized as white space seperated primitive values""" # TODO: Use type name global variable here instead of hardcoded string literal if type_name not in _PRIMITIVE_ARRAY_TYPES: raise ValueError(f"Not a primitive array: {type_name}") if type_name == TYPE_NAME_BOOLEAN_ARRAY: return " ".join(str(e).lower() for e in values) elif type_name == TYPE_NAME_BYTE_ARRAY: return "".join(f"{x:02X}" for x in values) elif type_name in {TYPE_NAME_DOUBLE_ARRAY, TYPE_NAME_FLOAT_ARRAY}: return " ".join(self._serialize_float_value(x) for x in values) else: return " ".join(str(e) for e in values) def _serialize_float_value(self, value) -> Union[float, str]: if isnan(value): return NAN_VALUE elif isinf(value): if value > 0: return POSITIVE_INFINITE_VALUE else: return NEGATIVE_INFINITE_VALUE # Formatting in the same way that Java does it, with a capital 'E' and without a '+' if the exponent is positive return str(value).upper().replace("E+", "E")