Spaces:
Sleeping
Sleeping
File size: 4,821 Bytes
519b419 74e2066 519b419 74e2066 519b419 74e2066 519b419 8c99444 519b419 8c99444 519b419 74e2066 8c99444 74e2066 519b419 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# -*- coding:utf-8 -*-
from io import BytesIO
import re
from zipfile import ZipFile
import os
from pathlib import Path
import streamlit as st
from cassis import load_typesystem, load_cas_from_xmi
def st_pb(method):
"""streamlit decorator to display
progress bar
"""
def progress_bar(ref):
container = st.empty()
bar = st.progress(0)
pg_gen = method(ref)
try:
while True:
progress = next(pg_gen)
bar.progress(progress[0])
if progress[2]:
container.write("✅ Processing... " + progress[1])
else:
container.write("❌️ Errror with..." + progress[1])
except StopIteration as result:
return result.value
return progress_bar
class Project:
def __init__(self, zip_project, type, remote):
# zip container that contains XMI and typesystem
self.zip_project = zip_project
self.remote = remote
# 'iaa' or 'global'
self.type = type
# store source filename
self.documents = []
# store XMI representation
self.xmi_documents = []
# store typesystem file
self.typesystem = None # cassis.load_typesystem(BytesIO(annotation_zip.read('TypeSystem.xml')))
# set annotators
self.annotators = []
# set annotations
"""
{
"Filename.xmi": {
mentions: [],
labels: []
}, ...
}
"""
self.annotations = {}
if isinstance(self.zip_project, ZipFile) and self.remote and self.type == "global":
for fp in self.zip_project.namelist():
if self.typesystem is None:
self.typesystem = load_typesystem(BytesIO(self.zip_project.open('TypeSystem.xml').read()))
if fp.endswith('.xmi'):
self.documents.append(fp)
self.xmi_documents.append(str(self.zip_project.open(fp).read().decode("utf-8")))
else:
with ZipFile(self.zip_project) as project_zip:
if self.type == "global":
regex = re.compile('.*curation/.*/(?!\._).*zip$')
elif self.type == "iaa":
regex = re.compile('.*xm[il]$')
annotation_fps = (fp for fp in project_zip.namelist() if regex.match(fp))
for fp in annotation_fps:
if self.type == "global":
with ZipFile(BytesIO(project_zip.read(fp))) as annotation_zip:
if self.typesystem is None:
self.typesystem = load_typesystem(BytesIO(annotation_zip.read('TypeSystem.xml')))
for f in annotation_zip.namelist():
if f.endswith('.xmi'):
# store source filename
self.documents.append(Path(fp).parent.name)
# annotators = []
# store XMI representation
self.xmi_documents.append(str(annotation_zip.read(f).decode("utf-8")))
elif self.type == "iaa":
if self.typesystem is None and fp.endswith('.xml'):
self.typesystem = load_typesystem(BytesIO(project_zip.read('TypeSystem.xml')))
else:
if fp.endswith('.xmi'):
# store source filename
self.documents.append(fp)
# set annotators
self.annotators.append(os.path.splitext(fp)[0])
# store XMI representation
self.xmi_documents.append(str(project_zip.read(fp).decode("utf-8")))
self.extract_ne()
@st_pb
def extract_ne(self):
count = 0
for xmi, src in zip(self.xmi_documents, self.documents):
doc_flag = True
try:
cas = load_cas_from_xmi(xmi, typesystem=self.typesystem)
self.annotations[src] = {
"mentions": [],
"labels": []
}
for ne in cas.select('de.tudarmstadt.ukp.dkpro.core.api.ner.type.NamedEntity'):
self.annotations[src]["mentions"].append(ne.get_covered_text())
self.annotations[src]["labels"].append(ne.value)
except:
doc_flag = False
count += 1
yield (count / len(self.documents)) * 1.0, src, doc_flag
|