document-qa / document_qa /ner_client_generic.py
lfoppiano's picture
refactoring grobid client generic
eedfb73
raw
history blame
13.4 kB
import os
import time
import yaml
'''
This client is a generic client for any Grobid application and sub-modules.
At the moment, it supports only single document processing.
Source: https://github.com/kermitt2/grobid-client-python
'''
""" Generic API Client """
from copy import deepcopy
import json
import requests
try:
from urlparse import urljoin
except ImportError:
from urllib.parse import urljoin
class ApiClient(object):
""" Client to interact with a generic Rest API.
Subclasses should implement functionality accordingly with the provided
service methods, i.e. ``get``, ``post``, ``put`` and ``delete``.
"""
accept_type = 'application/xml'
api_base = None
def __init__(
self,
base_url,
username=None,
api_key=None,
status_endpoint=None,
timeout=60
):
""" Initialise client.
Args:
base_url (str): The base URL to the service being used.
username (str): The username to authenticate with.
api_key (str): The API key to authenticate with.
timeout (int): Maximum time before timing out.
"""
self.base_url = base_url
self.username = username
self.api_key = api_key
self.status_endpoint = urljoin(self.base_url, status_endpoint)
self.timeout = timeout
@staticmethod
def encode(request, data):
""" Add request content data to request body, set Content-type header.
Should be overridden by subclasses if not using JSON encoding.
Args:
request (HTTPRequest): The request object.
data (dict, None): Data to be encoded.
Returns:
HTTPRequest: The request object.
"""
if data is None:
return request
request.add_header('Content-Type', 'application/json')
request.extracted_data = json.dumps(data)
return request
@staticmethod
def decode(response):
""" Decode the returned data in the response.
Should be overridden by subclasses if something else than JSON is
expected.
Args:
response (HTTPResponse): The response object.
Returns:
dict or None.
"""
try:
return response.json()
except ValueError as e:
return e.message
def get_credentials(self):
""" Returns parameters to be added to authenticate the request.
This lives on its own to make it easier to re-implement it if needed.
Returns:
dict: A dictionary containing the credentials.
"""
return {"username": self.username, "api_key": self.api_key}
def call_api(
self,
method,
url,
headers=None,
params=None,
data=None,
files=None,
timeout=None,
):
""" Call API.
This returns object containing data, with error details if applicable.
Args:
method (str): The HTTP method to use.
url (str): Resource location relative to the base URL.
headers (dict or None): Extra request headers to set.
params (dict or None): Query-string parameters.
data (dict or None): Request body contents for POST or PUT requests.
files (dict or None: Files to be passed to the request.
timeout (int): Maximum time before timing out.
Returns:
ResultParser or ErrorParser.
"""
headers = deepcopy(headers) or {}
headers['Accept'] = self.accept_type if 'Accept' not in headers else headers['Accept']
params = deepcopy(params) or {}
data = data or {}
files = files or {}
# if self.username is not None and self.api_key is not None:
# params.update(self.get_credentials())
r = requests.request(
method,
url,
headers=headers,
params=params,
files=files,
data=data,
timeout=timeout,
)
return r, r.status_code
def get(self, url, params=None, **kwargs):
""" Call the API with a GET request.
Args:
url (str): Resource location relative to the base URL.
params (dict or None): Query-string parameters.
Returns:
ResultParser or ErrorParser.
"""
return self.call_api(
"GET",
url,
params=params,
**kwargs
)
def delete(self, url, params=None, **kwargs):
""" Call the API with a DELETE request.
Args:
url (str): Resource location relative to the base URL.
params (dict or None): Query-string parameters.
Returns:
ResultParser or ErrorParser.
"""
return self.call_api(
"DELETE",
url,
params=params,
**kwargs
)
def put(self, url, params=None, data=None, files=None, **kwargs):
""" Call the API with a PUT request.
Args:
url (str): Resource location relative to the base URL.
params (dict or None): Query-string parameters.
data (dict or None): Request body contents.
files (dict or None: Files to be passed to the request.
Returns:
An instance of ResultParser or ErrorParser.
"""
return self.call_api(
"PUT",
url,
params=params,
data=data,
files=files,
**kwargs
)
def post(self, url, params=None, data=None, files=None, **kwargs):
""" Call the API with a POST request.
Args:
url (str): Resource location relative to the base URL.
params (dict or None): Query-string parameters.
data (dict or None): Request body contents.
files (dict or None: Files to be passed to the request.
Returns:
An instance of ResultParser or ErrorParser.
"""
return self.call_api(
method="POST",
url=url,
params=params,
data=data,
files=files,
**kwargs
)
def service_status(self, **kwargs):
""" Call the API to get the status of the service.
Returns:
An instance of ResultParser or ErrorParser.
"""
return self.call_api(
'GET',
self.status_endpoint,
params={'format': 'json'},
**kwargs
)
class NERClientGeneric(ApiClient):
def __init__(self, config_path=None, ping=False):
self.config = None
if config_path is not None:
self.config = self._load_yaml_config_from_file(path=config_path)
super().__init__(self.config['grobid']['server'])
if ping:
result = self.ping_service()
if not result:
raise Exception("Grobid is down.")
os.environ['NO_PROXY'] = "nims.go.jp"
@staticmethod
def _load_json_config_from_file(path='./config.json'):
"""
Load the json configuration
"""
config = {}
with open(path, 'r') as fp:
config = json.load(fp)
return config
@staticmethod
def _load_yaml_config_from_file(path='./config.yaml'):
"""
Load the YAML configuration
"""
config = {}
try:
with open(path, 'r') as the_file:
raw_configuration = the_file.read()
config = yaml.safe_load(raw_configuration)
except Exception as e:
print("Configuration could not be loaded: ", str(e))
exit(1)
return config
def set_config(self, config, ping=False):
self.config = config
if ping:
try:
result = self.ping_service()
if not result:
raise Exception("Grobid is down.")
except Exception as e:
raise Exception("Grobid is down or other problems were encountered. ", e)
def ping_service(self):
# test if the server is up and running...
ping_url = self.get_url("ping")
r = requests.get(ping_url)
status = r.status_code
if status != 200:
print('GROBID server does not appear up and running ' + str(status))
return False
else:
print("GROBID server is up and running")
return True
def get_url(self, action):
grobid_config = self.config['grobid']
base_url = grobid_config['server']
action_url = base_url + grobid_config['url_mapping'][action]
return action_url
def process_texts(self, input, method_name='superconductors', params={}, headers={"Accept": "application/json"}):
files = {
'texts': input
}
the_url = self.get_url(method_name)
params, the_url = self.get_params_from_url(the_url)
res, status = self.post(
url=the_url,
files=files,
data=params,
headers=headers
)
if status == 503:
time.sleep(self.config['sleep_time'])
return self.process_texts(input, method_name, params, headers)
elif status != 200:
print('Processing failed with error ' + str(status))
return status, None
else:
return status, json.loads(res.text)
def process_text(self, input, method_name='superconductors', params={}, headers={"Accept": "application/json"}):
files = {
'text': input
}
the_url = self.get_url(method_name)
params, the_url = self.get_params_from_url(the_url)
res, status = self.post(
url=the_url,
files=files,
data=params,
headers=headers
)
if status == 503:
time.sleep(self.config['sleep_time'])
return self.process_text(input, method_name, params, headers)
elif status != 200:
print('Processing failed with error ' + str(status))
return status, None
else:
return status, json.loads(res.text)
def process_pdf(self,
form_data: dict,
method_name='superconductors',
params={},
headers={"Accept": "application/json"}
):
the_url = self.get_url(method_name)
params, the_url = self.get_params_from_url(the_url)
res, status = self.post(
url=the_url,
files=form_data,
data=params,
headers=headers
)
if status == 503:
time.sleep(self.config['sleep_time'])
return self.process_text(input, method_name, params, headers)
elif status != 200:
print('Processing failed with error ' + str(status))
else:
return res.text
def process_pdfs(self, pdf_files, params={}):
pass
def process_pdf(
self,
pdf_file,
method_name,
params={},
headers={"Accept": "application/json"},
verbose=False,
retry=None
):
files = {
'input': (
pdf_file,
open(pdf_file, 'rb'),
'application/pdf',
{'Expires': '0'}
)
}
the_url = self.get_url(method_name)
params, the_url = self.get_params_from_url(the_url)
res, status = self.post(
url=the_url,
files=files,
data=params,
headers=headers
)
if status == 503 or status == 429:
if retry is None:
retry = self.config['max_retry'] - 1
else:
if retry - 1 == 0:
if verbose:
print("re-try exhausted. Aborting request")
return None, status
else:
retry -= 1
sleep_time = self.config['sleep_time']
if verbose:
print("Server is saturated, waiting", sleep_time, "seconds and trying again. ")
time.sleep(sleep_time)
return self.process_pdf(pdf_file, method_name, params, headers, verbose=verbose, retry=retry)
elif status != 200:
desc = None
if res.content:
c = json.loads(res.text)
desc = c['description'] if 'description' in c else None
return desc, status
elif status == 204:
# print('No content returned. Moving on. ')
return None, status
else:
return res.text, status
def get_params_from_url(self, the_url):
"""
This method is used to pass to the URL predefined parameters, which are added in the URL format
"""
params = {}
if "?" in the_url:
split = the_url.split("?")
the_url = split[0]
params = split[1]
params = {param.split("=")[0]: param.split("=")[1] for param in params.split("&")}
return params, the_url