|
import spacy |
|
|
|
from geopy.geocoders import Nominatim |
|
import geonamescache |
|
import pycountry |
|
|
|
from geotext import GeoText |
|
|
|
import re |
|
|
|
from transformers import BertTokenizer, BertModel |
|
import torch |
|
|
|
|
|
|
|
|
|
|
|
spacy.cli.download("en_core_web_lg") |
|
nlp = spacy.load("en_core_web_lg") |
|
|
|
|
|
tokenizer = BertTokenizer.from_pretrained('bert-base-cased') |
|
model = BertModel.from_pretrained('bert-base-cased') |
|
|
|
|
|
gc = geonamescache.GeonamesCache() |
|
city_names = set([city['name'] for city in gc.get_cities().values()]) |
|
|
|
|
|
def flatten(lst): |
|
""" |
|
Define a helper function to flatten the list recursively |
|
""" |
|
|
|
for item in lst: |
|
if isinstance(item, list): |
|
yield from flatten(item) |
|
else: |
|
yield item |
|
|
|
|
|
def is_country(reference): |
|
""" |
|
Check if a given reference is a valid country name |
|
""" |
|
|
|
try: |
|
|
|
country = pycountry.countries.search_fuzzy(reference)[0] |
|
return True |
|
except LookupError: |
|
return False |
|
|
|
|
|
def is_city(reference): |
|
""" |
|
Check if a given reference is a valid city name |
|
""" |
|
|
|
|
|
if reference in city_names: |
|
return True |
|
|
|
|
|
geolocator = Nominatim(user_agent="certh_serco_validate_city_app") |
|
location = geolocator.geocode(reference, language="en") |
|
|
|
|
|
if location.raw['type'] in ['city', 'town', 'village']: |
|
return True |
|
|
|
|
|
|
|
|
|
elif location.raw['type'] == 'administrative': |
|
if len(location.raw['display_name'].split(",")) > 1: |
|
return True |
|
|
|
return False |
|
|
|
|
|
def validate_locations(locations): |
|
""" |
|
Validate that the identified references are indeed a Country and a City |
|
""" |
|
|
|
validated_loc = [] |
|
|
|
for location in locations: |
|
|
|
|
|
if is_city(location): |
|
validated_loc.append((location, 'city')) |
|
|
|
|
|
elif is_country(location): |
|
validated_loc.append((location, 'country')) |
|
|
|
else: |
|
|
|
words = location.split() |
|
if len(words) > 1: |
|
|
|
|
|
for i in range(len(words)): |
|
name = ' '.join(words[i:]) |
|
|
|
if is_country(name): |
|
validated_loc.append((name, 'country')) |
|
break |
|
|
|
elif is_city(name): |
|
validated_loc.append((name, 'city')) |
|
break |
|
|
|
return validated_loc |
|
|
|
|
|
|
|
def identify_loc_ner(sentence): |
|
""" |
|
Identify all the geopolitical and location entities with the spacy tool |
|
""" |
|
|
|
doc = nlp(sentence) |
|
|
|
ner_locations = [] |
|
|
|
|
|
for ent in doc.ents: |
|
if ent.label_ in ['GPE', 'LOC']: |
|
|
|
if len(ent.text.split()) > 1: |
|
ner_locations.append(ent.text) |
|
else: |
|
for token in ent: |
|
if token.ent_type_ == 'GPE': |
|
ner_locations.append(ent.text) |
|
break |
|
|
|
return ner_locations |
|
|
|
|
|
|
|
def identify_loc_geoparselibs(sentence): |
|
""" |
|
Identify cities and countries with 3 different geoparsing libraries |
|
""" |
|
|
|
geoparse_locations = [] |
|
|
|
|
|
|
|
|
|
gc = geonamescache.GeonamesCache() |
|
|
|
|
|
countries = gc.get_countries() |
|
cities = gc.get_cities() |
|
|
|
city_names = [city['name'] for city in cities.values()] |
|
country_names = [country['name'] for country in countries.values()] |
|
|
|
|
|
words = sentence.split() |
|
for i in range(len(words)): |
|
for j in range(i+1, len(words)+1): |
|
word_seq = ' '.join(words[i:j]) |
|
if word_seq in city_names or word_seq in country_names: |
|
geoparse_locations.append(word_seq) |
|
|
|
|
|
|
|
|
|
for country in pycountry.countries: |
|
if country.name in sentence: |
|
geoparse_locations.append(country.name) |
|
|
|
|
|
|
|
|
|
places = GeoText(sentence) |
|
cities = list(places.cities) |
|
countries = list(places.countries) |
|
|
|
if cities: |
|
geoparse_locations += cities |
|
if countries: |
|
geoparse_locations += countries |
|
|
|
return (geoparse_locations, countries, cities) |
|
|
|
|
|
|
|
def identify_loc_regex(sentence): |
|
""" |
|
Identify cities and countries with regular expression matching |
|
""" |
|
|
|
regex_locations = [] |
|
|
|
|
|
pattern = r"\b(in|from|of)\b\s([\w\s]+)" |
|
additional_refs = re.findall(pattern, sentence) |
|
|
|
for match in additional_refs: |
|
regex_locations.append(match[1]) |
|
|
|
return regex_locations |
|
|
|
|
|
|
|
def identify_loc_embeddings(sentence, countries, cities): |
|
""" |
|
Identify cities and countries with the BERT pre-trained embeddings matching |
|
""" |
|
|
|
embd_locations = [] |
|
|
|
|
|
countries_cities = countries + cities |
|
|
|
|
|
multiword_countries = [c.replace(' ', '_') for c in countries if ' ' in c] |
|
multiword_cities = [c.replace(' ', '_') for c in cities if ' ' in c] |
|
countries_cities += multiword_countries + multiword_cities |
|
|
|
|
|
tokens = tokenizer.tokenize(sentence) |
|
input_ids = torch.tensor([tokenizer.convert_tokens_to_ids(tokens)]) |
|
|
|
|
|
with torch.no_grad(): |
|
embeddings = model(input_ids)[0][0] |
|
|
|
|
|
for i in range(len(tokens)): |
|
token = tokens[i] |
|
if token in countries_cities: |
|
embd_locations.append(token) |
|
else: |
|
word_vector = embeddings[i] |
|
similarity_scores = torch.nn.functional.cosine_similarity(word_vector.unsqueeze(0), embeddings) |
|
similar_tokens = [tokens[j] for j in similarity_scores.argsort(descending=True)[1:6]] |
|
for word in similar_tokens: |
|
if word in countries_cities and similarity_scores[tokens.index(word)] > 0.5: |
|
embd_locations.append(word) |
|
|
|
|
|
embd_locations = [loc.replace('_', ' ') if '_' in loc else loc for loc in embd_locations] |
|
|
|
return embd_locations |
|
|
|
|
|
|
|
def multiple_country_city_identifications_solve(country_city_dict): |
|
""" |
|
This is a function to solve the appearance of multiple identification of countries and cities. |
|
It checks all the elements of the input dictionary and if any smaller length element exists as a substring inside |
|
a bigger length element of it, it deletes the smaller size one. In that sense, a dictionary of the sort |
|
{'city': ['Port moresby', 'Port'], 'country': ['Guinea', 'Papua new guinea']} will be converted into |
|
{'city': ['Port moresby'], 'country': ['Papua new guinea']}. |
|
|
|
The reason for that function, is because such type of incosistencies were identified during country/city identification, |
|
propably relevant to the geoparsing libraries in use |
|
""" |
|
|
|
try: |
|
|
|
country_flag = False |
|
city_flag = False |
|
|
|
|
|
|
|
if 'country' in country_city_dict: |
|
if len(country_city_dict['country']) > 1: |
|
country_flag = True |
|
|
|
if 'city' in country_city_dict: |
|
if len(country_city_dict['city']) > 1: |
|
city_flag = True |
|
|
|
|
|
|
|
if country_flag: |
|
|
|
|
|
country_city_dict['country'].sort(key=lambda x: len(x), reverse=True) |
|
|
|
|
|
cleaned_countries = [] |
|
for i in range(len(country_city_dict['country'])): |
|
is_substring = False |
|
for j in range(len(cleaned_countries)): |
|
if country_city_dict['country'][i].lower().find(cleaned_countries[j].lower()) != -1: |
|
|
|
is_substring = True |
|
break |
|
if not is_substring: |
|
cleaned_countries.append(country_city_dict['country'][i]) |
|
|
|
|
|
country_city_dict['country'] = cleaned_countries |
|
|
|
|
|
final_countries = [] |
|
for i in range(len(country_city_dict['country'])): |
|
is_superstring = False |
|
for j in range(len(country_city_dict['country'])): |
|
if i == j: |
|
continue |
|
if country_city_dict['country'][j].lower().find(country_city_dict['country'][i].lower()) != -1: |
|
|
|
is_superstring = True |
|
break |
|
if not is_superstring: |
|
final_countries.append(country_city_dict['country'][i]) |
|
|
|
|
|
country_city_dict['country'] = final_countries |
|
|
|
|
|
if city_flag: |
|
|
|
|
|
country_city_dict['city'].sort(key=lambda x: len(x), reverse=True) |
|
|
|
|
|
cleaned_cities = [] |
|
for i in range(len(country_city_dict['city'])): |
|
is_substring = False |
|
for j in range(len(cleaned_cities)): |
|
if country_city_dict['city'][i].lower().find(cleaned_cities[j].lower()) != -1: |
|
|
|
is_substring = True |
|
break |
|
if not is_substring: |
|
cleaned_cities.append(country_city_dict['city'][i]) |
|
|
|
|
|
country_city_dict['city'] = cleaned_cities |
|
|
|
|
|
final_cities = [] |
|
for i in range(len(country_city_dict['city'])): |
|
is_superstring = False |
|
for j in range(len(country_city_dict['city'])): |
|
if i == j: |
|
continue |
|
if country_city_dict['city'][j].lower().find(country_city_dict['city'][i].lower()) != -1: |
|
|
|
is_superstring = True |
|
break |
|
if not is_superstring: |
|
final_cities.append(country_city_dict['city'][i]) |
|
|
|
|
|
country_city_dict['city'] = final_cities |
|
|
|
|
|
if country_city_dict: |
|
return country_city_dict |
|
|
|
except: |
|
return (0, "LOCATION", "unknown_error") |
|
|
|
|
|
|
|
def identify_locations(sentence): |
|
""" |
|
Identify all the possible Country and City references in the given sentence, using different approaches in a hybrid manner |
|
""" |
|
|
|
locations = [] |
|
|
|
try: |
|
|
|
|
|
sentence = sentence.replace(",", " x$x ") |
|
|
|
|
|
locations.append(identify_loc_ner(sentence)) |
|
|
|
|
|
geoparse_list, countries, cities = identify_loc_geoparselibs(sentence) |
|
locations.append(geoparse_list) |
|
|
|
|
|
locations_flat_1 = list(flatten(locations)) |
|
|
|
|
|
locations_flat_1.append(identify_loc_regex(sentence)) |
|
|
|
|
|
locations_flat_2 = list(flatten(locations)) |
|
|
|
|
|
locations_flat_2.append(identify_loc_embeddings(sentence, countries, cities)) |
|
|
|
|
|
locations_flat_3 = list(flatten(locations)) |
|
|
|
|
|
|
|
loc_unique = set([loc.lower() for loc in locations_flat_3]) |
|
|
|
|
|
loc_capitalization = list(set([loc.capitalize() if loc.lower() in loc_unique else loc.lower() for loc in locations_flat_3])) |
|
|
|
|
|
validated_locations = validate_locations(loc_capitalization) |
|
|
|
|
|
loc_dict = {} |
|
for location, loc_type in validated_locations: |
|
if loc_type not in loc_dict: |
|
loc_dict[loc_type] = [] |
|
loc_dict[loc_type].append(location) |
|
|
|
|
|
sentence = sentence.replace(" x$x ",",") |
|
|
|
|
|
locations_dict = multiple_country_city_identifications_solve(loc_dict) |
|
|
|
|
|
|
|
if 'country' in locations_dict: |
|
|
|
|
|
if 'city' in locations_dict: |
|
|
|
|
|
if len(locations_dict['country']) == 1 and len(locations_dict['city']) == 1: |
|
|
|
|
|
locations_dict['country'][0] = locations_dict['country'][0].capitalize() |
|
return locations_dict |
|
|
|
|
|
elif len(locations_dict['country']) == 1 and len(locations_dict['city']) == 0: |
|
locations_dict['country'][0] = locations_dict['country'][0].capitalize() |
|
return locations_dict |
|
|
|
|
|
else: |
|
return (0, "LOCATION", "more_city_or_country") |
|
|
|
|
|
|
|
else: |
|
|
|
|
|
if len(locations_dict['country']) == 1: |
|
locations_dict['country'][0] = locations_dict['country'][0].capitalize() |
|
return locations_dict |
|
|
|
|
|
else: |
|
return (0, "LOCATION", "more_country") |
|
|
|
|
|
else: |
|
return (0, "LOCATION", "no_country") |
|
|
|
except: |
|
|
|
return (0, "LOCATION", "unknown_error") |