import os import time import yaml ''' This client is a generic client for any Grobid application and sub-modules. At the moment, it supports only single document processing. Source: https://github.com/kermitt2/grobid-client-python ''' """ Generic API Client """ from copy import deepcopy import json import requests try: from urlparse import urljoin except ImportError: from urllib.parse import urljoin class ApiClient(object): """ Client to interact with a generic Rest API. Subclasses should implement functionality accordingly with the provided service methods, i.e. ``get``, ``post``, ``put`` and ``delete``. """ accept_type = 'application/xml' api_base = None def __init__( self, base_url, username=None, api_key=None, status_endpoint=None, timeout=60 ): """ Initialise client. Args: base_url (str): The base URL to the service being used. username (str): The username to authenticate with. api_key (str): The API key to authenticate with. timeout (int): Maximum time before timing out. """ self.base_url = base_url self.username = username self.api_key = api_key self.status_endpoint = urljoin(self.base_url, status_endpoint) self.timeout = timeout @staticmethod def encode(request, data): """ Add request content data to request body, set Content-type header. Should be overridden by subclasses if not using JSON encoding. Args: request (HTTPRequest): The request object. data (dict, None): Data to be encoded. Returns: HTTPRequest: The request object. """ if data is None: return request request.add_header('Content-Type', 'application/json') request.extracted_data = json.dumps(data) return request @staticmethod def decode(response): """ Decode the returned data in the response. Should be overridden by subclasses if something else than JSON is expected. Args: response (HTTPResponse): The response object. Returns: dict or None. """ try: return response.json() except ValueError as e: return e.message def get_credentials(self): """ Returns parameters to be added to authenticate the request. This lives on its own to make it easier to re-implement it if needed. Returns: dict: A dictionary containing the credentials. """ return {"username": self.username, "api_key": self.api_key} def call_api( self, method, url, headers=None, params=None, data=None, files=None, timeout=None, ): """ Call API. This returns object containing data, with error details if applicable. Args: method (str): The HTTP method to use. url (str): Resource location relative to the base URL. headers (dict or None): Extra request headers to set. params (dict or None): Query-string parameters. data (dict or None): Request body contents for POST or PUT requests. files (dict or None: Files to be passed to the request. timeout (int): Maximum time before timing out. Returns: ResultParser or ErrorParser. """ headers = deepcopy(headers) or {} headers['Accept'] = self.accept_type if 'Accept' not in headers else headers['Accept'] params = deepcopy(params) or {} data = data or {} files = files or {} # if self.username is not None and self.api_key is not None: # params.update(self.get_credentials()) r = requests.request( method, url, headers=headers, params=params, files=files, data=data, timeout=timeout, ) return r, r.status_code def get(self, url, params=None, **kwargs): """ Call the API with a GET request. Args: url (str): Resource location relative to the base URL. params (dict or None): Query-string parameters. Returns: ResultParser or ErrorParser. """ return self.call_api( "GET", url, params=params, **kwargs ) def delete(self, url, params=None, **kwargs): """ Call the API with a DELETE request. Args: url (str): Resource location relative to the base URL. params (dict or None): Query-string parameters. Returns: ResultParser or ErrorParser. """ return self.call_api( "DELETE", url, params=params, **kwargs ) def put(self, url, params=None, data=None, files=None, **kwargs): """ Call the API with a PUT request. Args: url (str): Resource location relative to the base URL. params (dict or None): Query-string parameters. data (dict or None): Request body contents. files (dict or None: Files to be passed to the request. Returns: An instance of ResultParser or ErrorParser. """ return self.call_api( "PUT", url, params=params, data=data, files=files, **kwargs ) def post(self, url, params=None, data=None, files=None, **kwargs): """ Call the API with a POST request. Args: url (str): Resource location relative to the base URL. params (dict or None): Query-string parameters. data (dict or None): Request body contents. files (dict or None: Files to be passed to the request. Returns: An instance of ResultParser or ErrorParser. """ return self.call_api( method="POST", url=url, params=params, data=data, files=files, **kwargs ) def service_status(self, **kwargs): """ Call the API to get the status of the service. Returns: An instance of ResultParser or ErrorParser. """ return self.call_api( 'GET', self.status_endpoint, params={'format': 'json'}, **kwargs ) class NERClientGeneric(ApiClient): def __init__(self, config_path=None, ping=False): self.config = None if config_path is not None: self.config = self._load_yaml_config_from_file(path=config_path) super().__init__(self.config['grobid']['server']) if ping: result = self.ping_service() if not result: raise Exception("Grobid is down.") os.environ['NO_PROXY'] = "nims.go.jp" @staticmethod def _load_json_config_from_file(path='./config.json'): """ Load the json configuration """ config = {} with open(path, 'r') as fp: config = json.load(fp) return config @staticmethod def _load_yaml_config_from_file(path='./config.yaml'): """ Load the YAML configuration """ config = {} try: with open(path, 'r') as the_file: raw_configuration = the_file.read() config = yaml.safe_load(raw_configuration) except Exception as e: print("Configuration could not be loaded: ", str(e)) exit(1) return config def set_config(self, config, ping=False): self.config = config if ping: try: result = self.ping_service() if not result: raise Exception("Grobid is down.") except Exception as e: raise Exception("Grobid is down or other problems were encountered. ", e) def ping_service(self): # test if the server is up and running... ping_url = self.get_url("ping") r = requests.get(ping_url) status = r.status_code if status != 200: print('GROBID server does not appear up and running ' + str(status)) return False else: print("GROBID server is up and running") return True def get_url(self, action): grobid_config = self.config['grobid'] base_url = grobid_config['server'] action_url = base_url + grobid_config['url_mapping'][action] return action_url def process_texts(self, input, method_name='superconductors', params={}, headers={"Accept": "application/json"}): files = { 'texts': input } the_url = self.get_url(method_name) params, the_url = self.get_params_from_url(the_url) res, status = self.post( url=the_url, files=files, data=params, headers=headers ) if status == 503: time.sleep(self.config['sleep_time']) return self.process_texts(input, method_name, params, headers) elif status != 200: print('Processing failed with error ' + str(status)) return status, None else: return status, json.loads(res.text) def process_text(self, input, method_name='superconductors', params={}, headers={"Accept": "application/json"}): files = { 'text': input } the_url = self.get_url(method_name) params, the_url = self.get_params_from_url(the_url) res, status = self.post( url=the_url, files=files, data=params, headers=headers ) if status == 503: time.sleep(self.config['sleep_time']) return self.process_text(input, method_name, params, headers) elif status != 200: print('Processing failed with error ' + str(status)) return status, None else: return status, json.loads(res.text) def process_pdf(self, form_data: dict, method_name='superconductors', params={}, headers={"Accept": "application/json"} ): the_url = self.get_url(method_name) params, the_url = self.get_params_from_url(the_url) res, status = self.post( url=the_url, files=form_data, data=params, headers=headers ) if status == 503: time.sleep(self.config['sleep_time']) return self.process_text(input, method_name, params, headers) elif status != 200: print('Processing failed with error ' + str(status)) else: return res.text def process_pdfs(self, pdf_files, params={}): pass def process_pdf( self, pdf_file, method_name, params={}, headers={"Accept": "application/json"}, verbose=False, retry=None ): files = { 'input': ( pdf_file, open(pdf_file, 'rb'), 'application/pdf', {'Expires': '0'} ) } the_url = self.get_url(method_name) params, the_url = self.get_params_from_url(the_url) res, status = self.post( url=the_url, files=files, data=params, headers=headers ) if status == 503 or status == 429: if retry is None: retry = self.config['max_retry'] - 1 else: if retry - 1 == 0: if verbose: print("re-try exhausted. Aborting request") return None, status else: retry -= 1 sleep_time = self.config['sleep_time'] if verbose: print("Server is saturated, waiting", sleep_time, "seconds and trying again. ") time.sleep(sleep_time) return self.process_pdf(pdf_file, method_name, params, headers, verbose=verbose, retry=retry) elif status != 200: desc = None if res.content: c = json.loads(res.text) desc = c['description'] if 'description' in c else None return desc, status elif status == 204: # print('No content returned. Moving on. ') return None, status else: return res.text, status def get_params_from_url(self, the_url): """ This method is used to pass to the URL predefined parameters, which are added in the URL format """ params = {} if "?" in the_url: split = the_url.split("?") the_url = split[0] params = split[1] params = {param.split("=")[0]: param.split("=")[1] for param in params.split("&")} return params, the_url