document-qa / grobid_client_generic.py
lfoppiano's picture
correct import
2a77d35
raw
history blame
No virus
8.02 kB
import json
import os
import time
import requests
import yaml
from client import ApiClient
'''
This client is a generic client for any Grobid application and sub-modules.
At the moment, it supports only single document processing.
Source: https://github.com/kermitt2/grobid-client-python
'''
class GrobidClientGeneric(ApiClient):
def __init__(self, config_path=None, ping=False):
self.config = None
if config_path is not None:
self.config = self.load_yaml_config_from_file(path=config_path)
super().__init__(self.config['grobid']['server'])
if ping:
result = self.ping_grobid()
if not result:
raise Exception("Grobid is down.")
os.environ['NO_PROXY'] = "nims.go.jp"
@staticmethod
def load_json_config_from_file(self, path='./config.json', ping=False):
"""
Load the json configuration
"""
config = {}
with open(path, 'r') as fp:
config = json.load(fp)
if ping:
result = self.ping_grobid()
if not result:
raise Exception("Grobid is down.")
return config
def load_yaml_config_from_file(self, path='./config.yaml'):
"""
Load the YAML configuration
"""
config = {}
try:
with open(path, 'r') as the_file:
raw_configuration = the_file.read()
config = yaml.safe_load(raw_configuration)
except Exception as e:
print("Configuration could not be loaded: ", str(e))
exit(1)
return config
def set_config(self, config, ping=False):
self.config = config
if ping:
try:
result = self.ping_grobid()
if not result:
raise Exception("Grobid is down.")
except Exception as e:
raise Exception("Grobid is down or other problems were encountered. ", e)
def ping_grobid(self):
# test if the server is up and running...
ping_url = self.get_grobid_url("ping")
r = requests.get(ping_url)
status = r.status_code
if status != 200:
print('GROBID server does not appear up and running ' + str(status))
return False
else:
print("GROBID server is up and running")
return True
def get_grobid_url(self, action):
grobid_config = self.config['grobid']
base_url = grobid_config['server']
action_url = base_url + grobid_config['url_mapping'][action]
return action_url
def process_texts(self, input, method_name='superconductors', params={}, headers={"Accept": "application/json"}):
files = {
'texts': input
}
the_url = self.get_grobid_url(method_name)
params, the_url = self.get_params_from_url(the_url)
res, status = self.post(
url=the_url,
files=files,
data=params,
headers=headers
)
if status == 503:
time.sleep(self.config['sleep_time'])
return self.process_texts(input, method_name, params, headers)
elif status != 200:
print('Processing failed with error ' + str(status))
return status, None
else:
return status, json.loads(res.text)
def process_text(self, input, method_name='superconductors', params={}, headers={"Accept": "application/json"}):
files = {
'text': input
}
the_url = self.get_grobid_url(method_name)
params, the_url = self.get_params_from_url(the_url)
res, status = self.post(
url=the_url,
files=files,
data=params,
headers=headers
)
if status == 503:
time.sleep(self.config['sleep_time'])
return self.process_text(input, method_name, params, headers)
elif status != 200:
print('Processing failed with error ' + str(status))
return status, None
else:
return status, json.loads(res.text)
def process(self, form_data: dict, method_name='superconductors', params={}, headers={"Accept": "application/json"}):
the_url = self.get_grobid_url(method_name)
params, the_url = self.get_params_from_url(the_url)
res, status = self.post(
url=the_url,
files=form_data,
data=params,
headers=headers
)
if status == 503:
time.sleep(self.config['sleep_time'])
return self.process_text(input, method_name, params, headers)
elif status != 200:
print('Processing failed with error ' + str(status))
else:
return res.text
def process_pdf_batch(self, pdf_files, params={}):
pass
def process_pdf(self, pdf_file, method_name, params={}, headers={"Accept": "application/json"}, verbose=False,
retry=None):
files = {
'input': (
pdf_file,
open(pdf_file, 'rb'),
'application/pdf',
{'Expires': '0'}
)
}
the_url = self.get_grobid_url(method_name)
params, the_url = self.get_params_from_url(the_url)
res, status = self.post(
url=the_url,
files=files,
data=params,
headers=headers
)
if status == 503 or status == 429:
if retry is None:
retry = self.config['max_retry'] - 1
else:
if retry - 1 == 0:
if verbose:
print("re-try exhausted. Aborting request")
return None, status
else:
retry -= 1
sleep_time = self.config['sleep_time']
if verbose:
print("Server is saturated, waiting", sleep_time, "seconds and trying again. ")
time.sleep(sleep_time)
return self.process_pdf(pdf_file, method_name, params, headers, verbose=verbose, retry=retry)
elif status != 200:
desc = None
if res.content:
c = json.loads(res.text)
desc = c['description'] if 'description' in c else None
return desc, status
elif status == 204:
# print('No content returned. Moving on. ')
return None, status
else:
return res.text, status
def get_params_from_url(self, the_url):
params = {}
if "?" in the_url:
split = the_url.split("?")
the_url = split[0]
params = split[1]
params = {param.split("=")[0]: param.split("=")[1] for param in params.split("&")}
return params, the_url
def process_json(self, text, method_name="processJson", params={}, headers={"Accept": "application/json"},
verbose=False):
files = {
'input': (
None,
text,
'application/json',
{'Expires': '0'}
)
}
the_url = self.get_grobid_url(method_name)
params, the_url = self.get_params_from_url(the_url)
res, status = self.post(
url=the_url,
files=files,
data=params,
headers=headers
)
if status == 503:
time.sleep(self.config['sleep_time'])
return self.process_json(text, method_name, params, headers), status
elif status != 200:
if verbose:
print('Processing failed with error ', status)
return None, status
elif status == 204:
if verbose:
print('No content returned. Moving on. ')
return None, status
else:
return res.text, status