Source code for cassis.util

import csv
from collections import defaultdict
from functools import cmp_to_key
from io import IOBase, StringIO
from typing import Dict, Iterable, Set

from cassis import Cas
from cassis.typesystem import FEATURE_BASE_NAME_SOFA, TYPE_NAME_ANNOTATION, FeatureStructure, Type, is_array

_EXCLUDED_FEATURES = {FEATURE_BASE_NAME_SOFA}
_NULL_VALUE = "<NULL>"


[docs] def cas_to_comparable_text( cas: Cas, out: [IOBase, None] = None, seeds: Iterable[FeatureStructure] = None, mark_indexed: bool = True, covered_text: bool = True, exclude_types: Set[str] = None, ) -> [str, None]: indexed_feature_structures = _get_indexed_feature_structures(cas) all_feature_structures_by_type = _group_feature_structures_by_type(cas._find_all_fs(seeds=seeds)) types_sorted = sorted(all_feature_structures_by_type.keys()) fs_id_to_anchor = _generate_anchors( cas, types_sorted, all_feature_structures_by_type, indexed_feature_structures, mark_indexed=mark_indexed ) if not out: out = StringIO() csv_writer = csv.writer(out, dialect=csv.unix_dialect) for t in types_sorted: if exclude_types and t in exclude_types: continue type_ = cas.typesystem.get_type(t) csv_writer.writerow([type_.name]) is_annotation_type = covered_text and cas.typesystem.subsumes(parent=TYPE_NAME_ANNOTATION, child=type_) csv_writer.writerow(_render_header(type_, covered_text=is_annotation_type)) feature_structures_of_type = all_feature_structures_by_type.get(type_.name) if not feature_structures_of_type: continue for fs in feature_structures_of_type: row_data = _render_feature_structure( type_, fs, fs_id_to_anchor, max_covered_text=30 if is_annotation_type else 0 ) csv_writer.writerow(row_data) return out.getvalue() or None
def _render_header(type_: Type, covered_text: bool = True) -> []: header = ["<ANCHOR>"] if covered_text: header.append("<COVERED_TEXT>") for feature in sorted(type_.all_features, key=lambda v: v.name): if feature.name in _EXCLUDED_FEATURES: continue header.append(feature.name) return header def _render_feature_structure( type_: Type, fs: FeatureStructure, fs_id_to_anchor: Dict[int, str], max_covered_text: int = 30 ) -> []: row_data = [fs_id_to_anchor.get(fs.xmiID)] if max_covered_text > 0 and _is_annotation_fs(fs): covered_text = fs.get_covered_text() if covered_text and len(covered_text) >= max_covered_text: prefix = covered_text[0 : (max_covered_text // 2)] suffix = covered_text[-(max_covered_text // 2) :] covered_text = f"{prefix}...{suffix}" row_data.append(covered_text if covered_text is not None else _NULL_VALUE) if _is_array_fs(fs): row_data.append(_render_feature_value(fs.elements, fs_id_to_anchor)) return row_data for feature in sorted(type_.all_features, key=lambda v: v.name): if feature.name in _EXCLUDED_FEATURES: continue feature_value = fs[feature.name] row_data.append(_render_feature_value(feature_value, fs_id_to_anchor)) return row_data def _render_feature_value(feature_value: any, fs_id_to_anchor: Dict[int, str]) -> any: if feature_value is None: return _NULL_VALUE elif isinstance(feature_value, list): return [_render_feature_value(e, fs_id_to_anchor) for e in feature_value] elif _is_array_fs(feature_value): if feature_value.elements is not None: return [_render_feature_value(e, fs_id_to_anchor) for e in feature_value.elements] elif _is_primitive_value(feature_value): return feature_value else: return fs_id_to_anchor.get(feature_value.xmiID) def _get_indexed_feature_structures(cas: Cas) -> Iterable[FeatureStructure]: feature_structures = [] for sofa in cas.sofas: view = cas.get_view(sofa.sofaID) feature_structures.extend(view.select_all()) return feature_structures def _group_feature_structures_by_type( feature_structures: Iterable[FeatureStructure], ) -> Dict[str, Iterable[FeatureStructure]]: fs_by_type = {} for fs in feature_structures: by_type_list = fs_by_type.get(fs.type.name) if not by_type_list: by_type_list = fs_by_type[fs.type.name] = [] by_type_list.append(fs) return fs_by_type def _generate_anchors( cas: Cas, types_sorted: Iterable[str], all_feature_structures_by_type: Dict[str, Iterable[FeatureStructure]], indexed_feature_structures: Iterable[FeatureStructure], unique_anchors: bool = True, mark_indexed: bool = True, ) -> Dict[int, str]: fs_id_to_anchor = {} disambiguation_by_prefix = defaultdict(lambda: 0) for t in types_sorted: type_ = cas.typesystem.get_type(t) feature_structures = all_feature_structures_by_type[type_.name] feature_structures.sort(key=cmp_to_key(lambda a, b: _compare_fs(type_, a, b))) for fs in feature_structures: add_index_mark = mark_indexed and fs in indexed_feature_structures anchor = _generate_anchor(fs, add_index_mark) disambiguation_id = disambiguation_by_prefix.get(anchor) disambiguation_by_prefix[anchor] += 1 if unique_anchors and disambiguation_id: anchor += f"({disambiguation_id})" fs_id_to_anchor[fs.xmiID] = anchor return fs_id_to_anchor def _generate_anchor(fs: FeatureStructure, add_index_mark: bool) -> str: anchor = fs.type.name.rsplit(".", 2)[-1] # Get the short type name (no package) if _is_annotation_fs(fs): anchor += f"[{fs.begin}-{fs.end}]" if add_index_mark: anchor += "*" if hasattr(fs, FEATURE_BASE_NAME_SOFA): anchor += f"@{fs.sofa.sofaID}" return anchor def _is_primitive_value(value: any) -> bool: return type(value) in (int, float, bool, str) def _is_array_fs(fs: FeatureStructure) -> bool: if not isinstance(fs, FeatureStructure): return False return is_array(fs.type) def _is_annotation_fs(fs: FeatureStructure) -> bool: return hasattr(fs, "begin") and isinstance(fs.begin, int) and hasattr(fs, "end") and isinstance(fs.end, int) def _compare_fs(type_: Type, a: FeatureStructure, b: FeatureStructure) -> int: if a is b: return 0 # duck-typing check if something is a annotation - if yes, try sorting by offets fs_a_is_annotation = _is_annotation_fs(a) fs_b_is_annotation = _is_annotation_fs(b) if fs_a_is_annotation != fs_b_is_annotation: return -1 if fs_a_is_annotation and fs_b_is_annotation: begin_cmp = a.begin - b.begin if begin_cmp != 0: return begin_cmp begin_cmp = b.end - a.end if begin_cmp != 0: return begin_cmp # Alternative implementation # Doing arithmetics on the hash value as we have done with the offsets does not work because the hashes do not # provide a global order. Hence, we map all results to 0, -1 and 1 here. fs_hash_a = _feature_structure_hash(type_, a) fs_hash_b = _feature_structure_hash(type_, b) if fs_hash_a == fs_hash_b: return 0 return -1 if fs_hash_a < fs_hash_b else 1 def _feature_structure_hash(type_: Type, fs: FeatureStructure): hash_ = 0 if _is_array_fs(fs): return len(fs.elements) if fs.elements else 0 # Should be possible to get away with not sorting here assuming that all_features returns the features always in # the same order for feature in type_.all_features: if feature.name == FEATURE_BASE_NAME_SOFA: continue feature_value = getattr(fs, feature.name) if _is_array_fs(feature_value): if feature_value.elements is not None: for element in feature_value.elements: hash_ = _feature_value_hash(feature_value, hash_) else: hash_ = _feature_value_hash(feature_value, hash_) return hash_ def _feature_value_hash(feature_value: any, hash_: int): # Note we do not recurse further into arrays here because that could lead to endless loops! if type(feature_value) in (int, float, bool, str): return hash_ + hash(feature_value) else: # If we get here, it is a feature structure reference... we cannot really recursively # go into it to calculate a recursive hash... so we just check if the value is non-null return hash_ * (-1 if feature_value is None else 1)