studies / utility.py
Roland Ding
8.8.21.61 added data structure transformation function list_dict_to_dict.
0b33c29
raw
history blame
9.69 kB
import json
import regex as re
import tiktoken
from application import *
from pdfminer.high_level import extract_text
from pdfminer.pdfparser import PDFParser
from pdfminer.pdfdocument import PDFDocument
encoding = tiktoken.get_encoding("cl100k_base")
'''
universal system functions
'''
def terminal_print(func):
from datetime import datetime
# import os
def wrapper(*args, **kwargs):
start = datetime.now()
print(f"{start.strftime('%y-%m-%d %H:%M:%S')} - executing function: {func.__name__}")
result = func(*args, **kwargs)
end = datetime.now()
print(f"{end.strftime('%y-%m-%d %H:%M:%S')} - completed function: {func.__name__}, runtime: {end-start} seconds")
return result
return wrapper
'''
following functions are for file manipulation
'''
@terminal_print
def read_pdf(file_path):
'''
this function read the pdf file and return the text
Parameters
----------
file_path : str
path to the pdf file
Returns
-------
text : str
text extracted from the pdf file
'''
# open the pdf file
if type(file_path) is str:
file_obj = open(file_path, 'rb')
# elif type(file_path) is tempfile._TemporaryFileWrapper:
else:
file_obj = open(file_path.name, 'rb')
text = extract_text(file_obj)
text = remove_symbols(text)
text = remove_citation(text)
parser = PDFParser(file_obj)
doc = PDFDocument(parser)
meta = doc.info
# close the pdf file object
file_obj.close()
return text, meta
'''
following functions are for format standard response
'''
def format_response(code,data):
'''
this function format the response to be returned to the client.
this is used for lambda serverless framework to return the response.
Parameters
----------
code : int
status code
data : dict
data to be returned to the client
Returns
-------
dict
formatted response
'''
return {
"statusCode":code,
"headers":{
"Access-Control-Allow-Origin": "*",
"Content-Type": "application/json"
},
"body":json.dumps(data),
"isBase64Encoded": False
}
'''
following functions are for string manipulation
'''
@terminal_print
def format_text(text,remove_char_ls = ["\\n--\\n","\\n\\n","\n"]):
'''
this function format the text output by removing excessive characters
Parameters
----------
text : str
text to be processed
Returns
-------
str
processed text
'''
for c in remove_char_ls:
text = text.replace(c,"")
return text
@terminal_print
def remove_symbols(text):
'''
this function remove symbols that are not in unicode
Parameters
----------
text : str
text to be processed
Returns
-------
str
processed text
'''
text = text.encode("ascii", "ignore").decode()
text = text.replace('-\n', '')
return text
@terminal_print
def remove_citation(text):
'''
this function remove citation pattern in the text
Parameters
----------
text : str
text to be processed
Returns
-------
str
processed text
'''
return re.sub(r'\(cid:\d+\)','',text)
@terminal_print
def str_to_tuple(s):
'''
this function convert string to tuple
Parameters
----------
s : str
string to be converted
Returns
-------
tuple
converted tuple
'''
return tuple(s.replace("(","").replace(")","").split(","))
@terminal_print
def replace_symbols(s):
'''
this function replace symbols in the string to comply with file names
Parameters
----------
s : str
string to be replaced
Returns
-------
str
replaced string
'''
s = s.replace(" ","_")
s = s.replace(",","")
s = s.replace(".","")
s = s.replace("-","_")
s = s.replace("(","")
s = s.replace(")","")
s = s.replace("/","_")
s = s.replace(":","")
s = s.replace(";","")
s = s.replace("'","")
s = s.replace('"',"")
return s
'''
following functions are for dynamodb data manipulation
'''
# @terminal_print
def db_map_to_py_dict(db_map):
'''
this function convert dynamodb map data structure to python dictionary
Parameters
----------
db_map : dict
dynamodb map
Returns
-------
dict
python dictionary
'''
py_dict = {}
for k,i in db_map.items():
for l,v in i.items():
if l == "M":
py_dict[k] = db_map_to_py_dict(v)
elif l == "S":
py_dict[k] = v
elif l == "N":
py_dict[k] = int(v) if float(v)%1 ==0 else float(v)
elif l == "L":
py_dict[k] = db_list_to_py_list(v)
elif l == "BS":
py_dict[k] = v
elif l == "BOOL":
py_dict[k] = v
elif l =="NULL":
py_dict[k] = None
else:
py_dict[k] = v
return py_dict
# @terminal_print
def py_dict_to_db_map(py_dict):
'''
this function convert python dictionary to dynamodb map data structure
Parameters
----------
py_dict : dict
python dictionary
Returns
-------
dict
dynamodb map
'''
db_map = {}
for key,value in py_dict.items():
key = str(key)
if type(value) is str:
db_map[key] = {"S":value}
elif type(value) is int or type(value) is float:
db_map[key] = {"N":str(value)}
elif type(value) is dict:
db_map[key] = {"M":py_dict_to_db_map(value)}
elif type(value) is list:
db_map[key] = {"L":py_list_to_db_list(value)}
elif type(value) is bytes:
db_map[key] = {"B":value}
elif type(value) is bool:
db_map[key] = {"BOOL":value}
elif value is None:
db_map[key] = {"NULL":True}
return db_map
# @terminal_print
def db_list_to_py_list(db_list):
'''
this function convert dynamodb list data structure to python list
Parameters
----------
db_list : list
dynamodb list
Returns
-------
list
python list
'''
py_list = []
for d in db_list:
for t,v in d.items():
if t == "M":
py_list.append(db_map_to_py_dict(v))
elif t == "L":
py_list.append(db_list_to_py_list(v))
elif t =="N":
if "." in v:
py_list.append(float(v))
else:
py_list.append(int(v))
elif t =="S" or t =="BOOL" or t =="SS" or t =="NS":
py_list.append(v)
elif t =="B" or t =="BS":
py_list.append(bytes(v,"utf-8"))
elif t =="NULL":
py_list.append(None)
elif t =="BOOL":
py_list.append(bool(v))
else:
py_list.append(db_map_to_py_dict(v))
return py_list
# @terminal_print
def py_list_to_db_list(py_list):
'''
this function convert python list to dynamodb list data structure
Parameters
----------
py_list : list
python list
Returns
-------
list
dynamodb list
'''
db_list = []
for value in py_list:
if type(value) is str:
item = {"S":value}
elif type(value) is int or type(value) is float:
item = {"N":str(value)}
elif type(value) is dict:
item = {"M":py_dict_to_db_map(value)}
# item = py_dict_to_db_map(value)
elif type(value) is list:
item = {"L":py_list_to_db_list(value)}
elif type(value) is tuple:
item = {"L":py_list_to_db_list(value)}
elif type(value) is bytes:
item = {"B":value}
elif type(value) is bool:
item = {"BOOL":value}
elif value is None:
item = {"NULL":True}
db_list.append(item)
return db_list
def list_dict_to_dict(ls,key):
result_dict = {}
for d in ls:
if key in d:
result_dict[d[key]] = d
return result_dict
'''
following functions are for markdown table creation
'''
@terminal_print
def create_md_table(array):
'''
create markdown tables for an array.
Parameters
----------
array: list
a table in the form of a list of lists
Returns
-------
md_table: str
'''
md_table = ""
for i,row in enumerate(array):
md_row = ""
for item in row:
md_item = f"| {item} "
md_row += md_item
md_row += "|\n"
md_table += md_row
if i == 0:
md_table += f"| {' | '.join(['---' for _ in range(len(row))])} |\n"
return md_table
'''
following functions are used for business logic. (to be moved to business logic layer)
'''
@terminal_print
def est_cost(text,rate):
'''
this function calculate the estimated cost of the translation
please note that the rate is per 1000 tokens.
the structure of the charging function is aligned with openai's api pricing structure.
Parameters
----------
text : str
number of tokens in the text
rate : float
rate per 1000 tokens
Returns
-------
float
estimated cost of the translation'''
n_tokens = len(encoding.encode(text))
return round(rate*n_tokens/1000,4)