Spaces:
Runtime error
Runtime error
#!/usr/local/bin/python3 | |
# avenir-python: Machine Learning | |
# Author: Pranab Ghosh | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); you | |
# may not use this file except in compliance with the License. You may | |
# obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
# implied. See the License for the specific language governing | |
# permissions and limitations under the License. | |
import os | |
import sys | |
from random import randint | |
import random | |
import time | |
from datetime import datetime | |
import re, string, unicodedata | |
import spacy | |
import torch | |
from collections import defaultdict | |
import pickle | |
import numpy as np | |
import re | |
from sentence_transformers import CrossEncoder | |
sys.path.append(os.path.abspath("../lib")) | |
from util import * | |
from mlutil import * | |
""" | |
neural language model | |
""" | |
class NeuralLangModel(object): | |
def __init__(self): | |
""" | |
initialize | |
""" | |
self.dexts = None | |
def loadDocs(self, fpaths): | |
""" | |
loads documents from one file | |
""" | |
fPaths = fpaths.split(",") | |
if len(fPaths) == 1: | |
if os.path.isfile(fPaths[0]): | |
#one file | |
print("got one file from path") | |
dnames = fpaths | |
docStr = getOneFileContent(fPaths[0]) | |
dtexts = [docStr] | |
else: | |
#all files under directory | |
print("got all files under directory from path") | |
dtexts, dnames = getFileContent(fPaths[0]) | |
print("found following files") | |
for dt, dn in zip(dtexts, dnames): | |
print(dn + "\t" + dt[:40]) | |
else: | |
#list of files | |
print("got list of files from path") | |
dnames = fpaths | |
dtexts = list(map(getOneFileContent, fpaths)) | |
ndocs = (dtexts, dnames) | |
return ndocs | |
#Encoded doc | |
class EncodedDoc: | |
def __init__(self, dtext, dname, drank=None): | |
""" | |
initialize | |
""" | |
self.dtext = dtext | |
self.dname = dname | |
self.drank = drank | |
self.denc = None | |
self.score = None | |
def encode(self, nlp): | |
""" | |
encode | |
""" | |
self.denc = nlp(self.dtext) | |
#similarity at token and sentence level for BERT encoding | |
class SemanticSearch: | |
def __init__(self, docs=None): | |
""" | |
initialize | |
""" | |
print("loading BERT transformer model") | |
self.nlp = spacy.load("en_trf_bertbaseuncased_lg") | |
self.docs = docs if docs is not None else list() | |
def docAv(self,qu, doc): | |
""" | |
whole doc similarity | |
""" | |
return qu.similarity(doc) | |
def tokSimAv(self, qu, doc): | |
""" | |
token pair wise average | |
""" | |
qts = simAll(qu, doc) | |
asi = numpy.mean(qts) | |
return asi | |
def tokSimMed(self, qu, doc): | |
""" | |
token pair wise average | |
""" | |
qts = simAll(qu, doc) | |
asi = numpy.median(qts) | |
return asi | |
def tokSimMax(self, qu, doc): | |
""" | |
token pair wise max (tsma) | |
""" | |
qte = self. __getTensor(qu) | |
dte = self. __getTensor(doc) | |
return self.simMax(qte, dte) | |
def tokSimAvMax(self, qu, doc): | |
""" | |
token max then average (tsavm) | |
""" | |
qte = self. __getTensor(qu) | |
dte = self. __getTensor(doc) | |
return self.simAvMax(qte, dte) | |
def tokSimMaxAv(self, qu, doc): | |
""" | |
token average and then max | |
""" | |
qte = self. __getTensor(qu) | |
dte = self. __getTensor(doc) | |
return self.simMaxAv(qte, dte) | |
def sentSimAv(self, qu, doc): | |
""" | |
sentence wise average | |
""" | |
qse, dse = self.__sentEnc(qu, doc) | |
sims = self.simAll(qse, dse) | |
return numpy.mean(sims) | |
def sentSimMed(self, qu, doc): | |
""" | |
sentence wise average (ssma) | |
""" | |
qse, dse = self.__sentEnc(qu, doc) | |
sims = self.simAll(qse, dse) | |
return numpy.median(sims) | |
def sentSimMax(self, qu, doc): | |
""" | |
sentence wise average (ssma) | |
""" | |
qse, dse = self.__sentEnc(qu, doc) | |
sims = self.simAll(qse, dse) | |
return numpy.maximum(sims) | |
def sentSimAvMax(self, qu, doc): | |
""" | |
sentence max then average (tsavm) | |
""" | |
qse, dse = self.__sentEnc(qu, doc) | |
return self.simAvMax(qse, dse) | |
def sentSimMaxAv(self, qu, doc): | |
""" | |
sentence average and then max | |
""" | |
qse, dse = self.__sentEnc(qu, doc) | |
return self.simMaxAv(qse, dse) | |
def simMax(self, qte, dte): | |
""" | |
max similarity between 2 elements | |
""" | |
msi = 0 | |
for qt in qte: | |
for dt in dte: | |
si = cosineSimilarity(qt, dt) | |
if not math.isnan(si) and si > msi: | |
msi = si | |
return msi | |
def simAvMax(self, qte, dte): | |
""" | |
max then average (tsavm) | |
""" | |
qts = list() | |
for qt in qte: | |
msi = 0 | |
for dt in dte: | |
si = cosineSimilarity(qt, dt) | |
if not math.isnan(si) and si > msi: | |
msi = si | |
qts.append(msi) | |
amsi = numpy.mean(numpy.array(qts)) | |
return amsi | |
def simMaxAv(self, lqe, lde): | |
""" | |
average and then max | |
""" | |
masi = 0 | |
for qe in lqe: | |
qes = list() | |
for de in lde: | |
si = cosineSimilarity(qe, de) | |
if not math.isnan(si): | |
qes.append(si) | |
av = numpy.mean(numpy.array(qes)) | |
if av > masi: | |
masi = av | |
return masi | |
def simAll(self, lqe, lde): | |
""" | |
all similarity | |
""" | |
qes = list() | |
for qe in lqe: | |
for de in lde: | |
si = cosineSimilarity(qe, de) | |
if not math.isnan(si): | |
qes.append(si) | |
return numpy.array(qes) | |
def __sentEnc(self, qu, doc): | |
""" | |
sentence encoding for query and doc | |
""" | |
qstr = qu._.trf_word_pieces_ | |
qte = zip(qstr, qu._.trf_last_hidden_state) | |
qse = list() | |
for t, v in qte: | |
if t == "[CLS]": | |
qse.append(v) | |
dstr = doc._.trf_word_pieces_ | |
dte = zip(dstr, doc._.trf_last_hidden_state) | |
dse = list() | |
for t, v in dte: | |
if t == "[CLS]": | |
dse.append(v) | |
enp = (numpy.array(qse), numpy.array(dse)) | |
return enp | |
def __getTensor(self, toks): | |
""" | |
tensors from tokens | |
""" | |
return list(map(lambda t: t.tensor, toks)) | |
def addDocs(self, docs): | |
""" | |
add named doc content | |
""" | |
self.docs.extend(docs) | |
def loadDocs(self, fpaths): | |
""" | |
loads documents from one file | |
""" | |
fPaths = fpaths.split(",") | |
if len(fPaths) == 1: | |
if os.path.isfile(fPaths[0]): | |
#one file | |
print("one file") | |
dnames = fpaths | |
docStr = getOneFileContent(fPaths[0]) | |
dtexts = [docStr] | |
else: | |
#all files under directory | |
print("all files under directory") | |
dtexts, dnames = getFileContent(fPaths[0]) | |
print("found following files") | |
for dt, dn in zip(dtexts, dnames): | |
print(dn + "\t" + dt[:40]) | |
else: | |
#list of files | |
print("list of files") | |
dnames = fpaths | |
dtexts = list(map(getOneFileContent, fpaths)) | |
docs = list(map(lambda dtext, dname : EncodedDoc(dtext, dname), zip(dtexts, dnames))) | |
self.docs.extend(docs) | |
def search(self, qstr, algo, gdranks=None): | |
""" | |
tensors from tokens | |
""" | |
qv = self.nlp(qstr) | |
res = list() | |
for d in self.docs: | |
dn = d.dname | |
if d.denc == None: | |
d.encode(self.nlp) | |
dv = d.denc | |
if algo == "ds": | |
si = self.docAv(qv, dv) | |
elif algo == "tsa": | |
si = self.tokSimAv(qv, dv) | |
elif algo == "tsme": | |
si = self.tokSimMed(qv, dv) | |
elif algo == "tsma": | |
si = self.tokSimMax(qv, dv) | |
elif algo == "tsavm": | |
si = self.tokSimAvMax(qv, dv) | |
elif algo == "tsmav": | |
si = self.tokSimMaxAv(qv, dv) | |
elif algo == "ssa": | |
si = self.sentSimAv(qv, dv) | |
elif algo == "ssme": | |
si = self.sentSimMed(qv, dv) | |
elif algo == "ssma": | |
si = self.sentSimMax(qv, dv) | |
elif algo == "ssavm": | |
si = self.sentSimAvMax(qv, dv) | |
elif algo == "ssmav": | |
si = self.sentSimMaxAv(qv, dv) | |
else: | |
si = -1.0 | |
print("invalid semilarity algo") | |
#print("{} score {:.6f}".format(dn, si)) | |
d.score = si | |
r = (dn, si) | |
res.append(r) | |
#search score for each document | |
res.sort(key=lambda r : r[1], reverse=True) | |
print("\nsorted search result") | |
print("query: {} matching algo: {}".format(qstr, algo)) | |
for r in res: | |
print("{} score {:.3f}".format(r[0], r[1])) | |
#rank order if gold truuth rank provided | |
if gdranks is not None: | |
i = 0 | |
count = 0 | |
for d in gdranks: | |
while i < len(gdranks): | |
if d == res[i][0]: | |
count += 1 | |
i += 1 | |
break; | |
i += 1 | |
ro = count / len(gdranks) | |
print("rank order {:.3f}".format(ro)) | |
#similarity at passage or paragraph level using sbertcross encoder | |
class SemanticSimilaityCrossEnc(NeuralLangModel): | |
def __init__(self, docs=None): | |
self.dparas = None | |
self.scores = None | |
print("loading cross encoder") | |
self.model = CrossEncoder("cross-encoder/ms-marco-TinyBERT-L-2") | |
print("done loading cross encoder") | |
super(NeuralLangModel, self).__init__() | |
def paraSimilarity(self, dtext, fpaths, minParNl=1): | |
""" | |
returns paragarph pair similarity across 2 documents | |
""" | |
dtexts, dnames = self.loadDocs(fpaths) | |
if dtext is None: | |
assertEqual(len(dtexts), 2, "exactly 2 files needed") | |
self.dtexts = dtexts | |
else: | |
assertEqual(len(dtexts), 1, "exactly 1 file needed") | |
self.dtexts = list() | |
self.dtexts.append(dtext) | |
self.dtexts.append(dtexts[0]) | |
self.dparas = list() | |
for text in self.dtexts: | |
regx = "\n+" if minParNl == 1 else "\n{2,}" | |
paras = re.split(regx, text.replace("\r\n", "\n")) | |
print("no of paras {}".format(len(paras))) | |
self.dparas.append(paras) | |
tinp = list() | |
for para1 in self.dparas[0]: | |
inp = list(map(lambda para2: [para1, para2], self.dparas[1])) | |
tinp.extend(inp) | |
print("input shape " + str(np.array(tinp).shape)) | |
scores = self.model.predict(tinp) | |
print("score shape " + str(np.array(scores).shape)) | |
#assertEqual(len(scores), len(self.dparas[0]) * len(self.dparas[1]), "no of scores don't match no of paragraph pairs") | |
print(scores) | |
i = 0 | |
print("text paragraph pair wise similarity") | |
for para1 in self.dparas[0]: | |
for para2 in self.dparas[1]: | |
print("first: {}\t second: {}\t score: {:.6f}".format(para1[:20], para2[:20], scores[i])) | |
i += 1 | |
self.scores = scores | |
def avMaxScore(self): | |
""" | |
""" | |
pass | |
def ner(text, nlp): | |
#nlp = spacy.load("en_core_web_md") | |
doc = nlp(text) | |
for ent in doc.ents: | |
print(ent.text, ent.start_char, ent.end_char, ent.label_) | |