pbs_biologics_helper / pbs_data.py
cmcmaster's picture
Update pbs_data.py
53cfe0d verified
import datetime
import requests
import csv
from io import StringIO
import time
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from datasets import Dataset
class PBSPublicDataAPIClient:
def __init__(self, subscription_key, base_url='https://data-api.health.gov.au/pbs/api/v3', rate_limit=0.2):
self.subscription_key = subscription_key
self.base_url = base_url
self.rate_limit = rate_limit # Requests per second
self.last_request_time = 0
# Set up a session with retry strategy
self.session = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
self.session.mount('https://', HTTPAdapter(max_retries=retries))
def get_sample_data(self, endpoint, limit=5):
params = {"limit": limit}
response = self.make_request(endpoint, params=params, accept="text/csv")
csv_content = StringIO(response.text)
return list(csv.DictReader(csv_content))
def fetch_sample_data(self):
schedules = self.get_schedules()
latest_schedule = schedules[0]['schedule_code']
endpoints = [
"amt-items",
"atc-codes",
"indications",
"prescribing-texts",
"item-prescribing-text-relationships",
"restrictions",
"item-restriction-relationships"
]
sample_data = {}
for endpoint in endpoints:
print(f"Fetching sample data from /{endpoint}...")
data = self.get_sample_data(endpoint)
if data:
sample_data[endpoint] = data
print(f"Sample keys for {endpoint}: {data[0].keys()}")
else:
print(f"No data found for {endpoint}")
time.sleep(2) # Wait 2 seconds between requests to avoid rate limiting
return sample_data
def get_raw_data(self, endpoint, params=None, accept="application/json"):
response = self.make_request(endpoint, params=params, accept=accept)
return response.text
def make_request(self, endpoint, params=None, accept="application/json"):
url = f"{self.base_url}/{endpoint}"
headers = {
"subscription-key": self.subscription_key,
"Accept": accept
}
while True:
current_time = time.time()
time_since_last_request = current_time - self.last_request_time
if time_since_last_request < 1 / self.rate_limit:
sleep_time = (1 / self.rate_limit) - time_since_last_request
time.sleep(sleep_time)
try:
response = self.session.get(url, headers=headers, params=params)
self.last_request_time = time.time()
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 60))
print(f"Rate limit exceeded. Waiting for {retry_after} seconds.")
time.sleep(retry_after)
continue
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
print(f"Request failed: {str(e)}. Retrying in 5 seconds...")
time.sleep(5)
def get_schedules(self, limit=100):
endpoint = "schedules"
params = {"limit": limit}
response = self.make_request(endpoint, params=params)
json_data = response.json()
return json_data['data']
def get_amt_items(self, schedule_code, limit=100000):
endpoint = "amt-items"
params = {
"schedule_code": schedule_code,
"limit": limit
}
response = self.make_request(endpoint, params=params, accept="text/csv")
csv_content = StringIO(response.text)
return list(csv.DictReader(csv_content))
def get_atc_codes(self, schedule_code, limit=100000):
endpoint = "atc-codes"
params = {
"schedule_code": schedule_code,
"limit": limit
}
response = self.make_request(endpoint, params=params, accept="text/csv")
csv_content = StringIO(response.text)
return list(csv.DictReader(csv_content))
def get_indications(self, schedule_code, limit=100000):
endpoint = "indications"
params = {
"schedule_code": schedule_code,
"limit": limit
}
response = self.make_request(endpoint, params=params, accept="text/csv")
csv_content = StringIO(response.text)
return list(csv.DictReader(csv_content))
def get_prescribing_texts(self, schedule_code, limit=100000):
endpoint = "prescribing-texts"
params = {
"schedule_code": schedule_code,
"limit": limit
}
response = self.make_request(endpoint, params=params, accept="text/csv")
csv_content = StringIO(response.text)
return list(csv.DictReader(csv_content))
def get_item_prescribing_text_relationships(self, schedule_code, limit=100000):
endpoint = "item-prescribing-text-relationships"
params = {
"schedule_code": schedule_code,
"limit": limit
}
response = self.make_request(endpoint, params=params, accept="text/csv")
csv_content = StringIO(response.text)
return list(csv.DictReader(csv_content))
def get_restrictions(self, schedule_code, limit=100000):
endpoint = "restrictions"
params = {
"schedule_code": schedule_code,
"limit": limit
}
response = self.make_request(endpoint, params=params, accept="text/csv")
csv_content = StringIO(response.text)
return list(csv.DictReader(csv_content))
def get_item_restriction_relationships(self, schedule_code, limit=100000):
endpoint = "item-restriction-relationships"
params = {
"schedule_code": schedule_code,
"limit": limit
}
response = self.make_request(endpoint, params=params, accept="text/csv")
csv_content = StringIO(response.text)
return list(csv.DictReader(csv_content))
def get_restriction_prescribing_text_relationships(self, schedule_code, limit=100000):
endpoint = "restriction-prescribing-text-relationships"
params = {
"schedule_code": schedule_code,
"limit": limit
}
response = self.make_request(endpoint, params=params, accept="text/csv")
csv_content = StringIO(response.text)
return list(csv.DictReader(csv_content))
def get_items(self, schedule_code, limit=100000):
endpoint = "items"
params = {
"schedule_code": schedule_code,
"limit": limit
}
response = self.make_request(endpoint, params=params, accept="text/csv")
csv_content = StringIO(response.text)
return list(csv.DictReader(csv_content))
def fetch_rheumatology_biologics_data(self):
biologics = [
"adalimumab", "etanercept", "infliximab", "certolizumab", "golimumab",
"rituximab", "abatacept", "tocilizumab", "secukinumab", "ixekizumab",
"ustekinumab", "guselkumab", "tofacitinib", "baricitinib", "secukinumab",
"upadacitinib", "anifrolumab", "bimekizumab", "avacopan"
]
rheumatic_diseases = [
"rheumatoid arthritis", "psoriatic arthritis", "ankylosing spondylitis",
"non-radiographic axial spondyloarthritis", "giant cell arteritis",
"juvenile idiopathic arthritis", "systemic lupus erythematosus", "Anti-neutrophil cytoplasmic autoantibody (ANCA) associated vasculitis"
]
data = {}
schedules = self.get_schedules()
# Select schedule based on current month
current_date = datetime.datetime.now()
current_schedule = next(
(s for s in schedules if s['effective_year'] == current_date.year and s['effective_month'] == current_date.strftime('%B').upper()),
schedules[0] # fallback to the most recent schedule if no match
)
latest_schedule = current_schedule['schedule_code']
schedule_year = current_schedule['effective_year']
schedule_month = current_schedule['effective_month']
print(f"Selected schedule: {latest_schedule} (Effective: {current_schedule['effective_date']})")
print("Fetching items...")
items = self.get_items(latest_schedule)
time.sleep(5)
print("Fetching indications...")
indications = self.get_indications(latest_schedule)
print(f"Number of indications fetched: {len(indications)}")
print("Sample of raw indications data:")
for indication in indications[:5]:
print(indication)
time.sleep(5)
print("Fetching prescribing texts...")
prescribing_texts = self.get_prescribing_texts(latest_schedule)
time.sleep(5)
print("Fetching item-prescribing-text relationships...")
item_prescribing_text_relationships = self.get_item_prescribing_text_relationships(latest_schedule)
time.sleep(5)
print("Fetching restrictions...")
restrictions = self.get_restrictions(latest_schedule)
time.sleep(5)
print("Fetching item-restriction relationships...")
item_restriction_relationships = self.get_item_restriction_relationships(latest_schedule)
print("Fetching restriction-prescribing-text relationships...")
restriction_prescribing_text_relationships = self.get_restriction_prescribing_text_relationships(latest_schedule)
print(f"Number of restriction-prescribing-text relationships fetched: {len(restriction_prescribing_text_relationships)}")
time.sleep(5)
# Create lookup dictionaries
prescribing_text_lookup = {text['prescribing_txt_id']: text for text in prescribing_texts if 'prescribing_txt_id' in text}
restriction_lookup = {res['res_code']: res for res in restrictions if 'res_code' in res}
# Create indication lookup
indication_lookup = {}
for ind in indications:
# Print all keys in the first indication to see available fields
if not indication_lookup:
print("Keys in indication data:", ind.keys())
# Try different possible keys for the prescribing text ID
prescribing_text_id = ind.get('prescribing_text_id') or ind.get('indication_prescribing_txt_id') or ind.get('prescribing_txt_id')
if prescribing_text_id:
indication_lookup[prescribing_text_id] = ind
print(f"Number of items in indication_lookup: {len(indication_lookup)}")
print("Sample of indication_lookup:")
for key, value in list(indication_lookup.items())[:5]:
print(f" {key}: {value}")
# Create a lookup for item-prescribing-text relationships
item_prescribing_text_lookup = {}
for relationship in item_prescribing_text_relationships:
pbs_code = relationship.get('pbs_code')
prescribing_txt_id = relationship.get('prescribing_txt_id')
if pbs_code and prescribing_txt_id:
if pbs_code not in item_prescribing_text_lookup:
item_prescribing_text_lookup[pbs_code] = []
item_prescribing_text_lookup[pbs_code].append(prescribing_txt_id)
# Create a lookup for restriction-prescribing-text relationships
restriction_prescribing_text_lookup = {}
print("\nDebugging restriction-prescribing-text relationships:")
print("Full structure of first 5 relationships:")
for relationship in restriction_prescribing_text_relationships[:5]:
print(relationship)
for relationship in restriction_prescribing_text_relationships:
res_code = relationship.get('res_code')
prescribing_text_id = relationship.get('prescribing_text_id')
if res_code and prescribing_text_id:
if res_code not in restriction_prescribing_text_lookup:
restriction_prescribing_text_lookup[res_code] = []
restriction_prescribing_text_lookup[res_code].append(prescribing_text_id)
print(f"Number of items in restriction_prescribing_text_lookup: {len(restriction_prescribing_text_lookup)}")
print("Sample of restriction_prescribing_text_lookup:")
for key, value in list(restriction_prescribing_text_lookup.items())[:5]:
print(f" {key}: {value}")
print("Debugging: Inspecting lookups")
print(f"Number of items in prescribing_text_lookup: {len(prescribing_text_lookup)}")
print(f"Number of items in restriction_lookup: {len(restriction_lookup)}")
print(f"Number of items in indication_lookup: {len(indication_lookup)}")
print(f"Number of items in item_prescribing_text_lookup: {len(item_prescribing_text_lookup)}")
print(f"Number of items in restriction_prescribing_text_lookup: {len(restriction_prescribing_text_lookup)}")
def classify_formulation(description):
# Define keywords for each formulation type
tablet_keywords = ['Tablet']
pen_keywords = ['pen', 'auto-injector', 'autoinjector']
syringe_keywords = ['syringe']
infusion_keywords = ['I.V. infusion', 'Concentrate for injection']
# Normalize the description to lowercase for case-insensitive matching
desc_lower = description.lower()
# Check for keywords and return the corresponding formulation type
if any(keyword.lower() in desc_lower for keyword in tablet_keywords):
return 'tablet'
elif any(keyword.lower() in desc_lower for keyword in pen_keywords):
return 'subcut pen'
elif any(keyword.lower() in desc_lower for keyword in syringe_keywords):
return 'subcut syringe'
elif any(keyword.lower() in desc_lower for keyword in infusion_keywords):
return 'infusion'
else:
return 'unknown' # For cases that don't match any category
def classify_hospital_type(program_code):
if program_code == 'HS':
return 'Private'
elif program_code == 'HB':
return 'Public'
else:
return 'Any'
for item in items:
if any(biologic.lower() in item['drug_name'].lower() for biologic in biologics):
pbs_code = item['pbs_code']
if pbs_code not in data:
data[pbs_code] = {
"schedule_code": latest_schedule,
"schedule_year": schedule_year,
"schedule_month": schedule_month,
"name": item['drug_name'],
"brands": [], # Change this to a list
"formulation": classify_formulation(item['li_form']),
"li_form": item['li_form'],
"schedule_form": item['schedule_form'],
"manner_of_administration": item['manner_of_administration'],
"maximum_quantity": item['maximum_quantity_units'],
"number_of_repeats": item['number_of_repeats'],
"hospital_type": classify_hospital_type(item['program_code']),
"restrictions": []
}
# Append the brand name if it's not already in the list
if item['brand_name'] not in data[pbs_code]['brands']:
data[pbs_code]['brands'].append(item['brand_name'])
for pbs_code in list(data.keys()):
for relationship in item_restriction_relationships:
if relationship.get('pbs_code') == pbs_code:
res_code = relationship.get('res_code')
restriction = restriction_lookup.get(res_code)
if restriction:
prescribing_text_ids = restriction_prescribing_text_lookup.get(res_code, [])
for prescribing_text_id in prescribing_text_ids:
indication = indication_lookup.get(prescribing_text_id)
if indication:
condition = indication.get('condition', '').lower()
found_indication = next((disease for disease in rheumatic_diseases if disease.lower() in condition), None)
if found_indication:
restriction_data = {
'res_code': res_code,
'indications': found_indication,
'treatment_phase': restriction.get('treatment_phase', ''),
'restriction_text': restriction.get('li_html_text', ''),
'authority_method': restriction.get('authority_method', ''),
'streamlined_code': restriction.get('treatment_of_code') if restriction.get('authority_method') == "STREAMLINED" else None,
'online_application': "HOBART TAS 7001" not in restriction.get('schedule_html_text', '')
}
data[pbs_code]['restrictions'].append(restriction_data)
break # Stop after finding the first matching indication
# Drop entries if restrictions are empty
data = {k: v for k, v in data.items() if v['restrictions']}
return data
def preprocess_data(self, data):
processed = {
'combinations': []
}
for pbs_code, item in data.items():
for restriction in item['restrictions']:
for brand in item['brands']:
processed['combinations'].append({
'pbs_code': pbs_code,
'drug': item['name'],
'brand': brand,
'formulation': item['li_form'],
'indication': restriction['indications'],
'treatment_phase': restriction['treatment_phase'],
'streamlined_code': restriction['streamlined_code'],
'online_application': restriction['online_application'],
'authority_method': restriction['authority_method'],
'hospital_type': item['hospital_type'],
'schedule_code': item['schedule_code'],
'schedule_year': item['schedule_year'],
'schedule_month': item['schedule_month']
})
return processed
def save_data_to_hf(self, data, hf_token, dataset_name="cmcmaster/rheumatology-biologics-dataset"):
processed_data = self.preprocess_data(data)
# Create a Dataset from the combinations
dataset = Dataset.from_list(processed_data['combinations'])
# Push the dataset to the Hugging Face Hub
dataset.push_to_hub(dataset_name, token=hf_token)
print(f"Data saved to Hugging Face Hub: {dataset_name}")
def main():
client = PBSPublicDataAPIClient("2384af7c667342ceb5a736fe29f1dc6b", rate_limit=0.2)
try:
print("Fetching data on biologics used for rheumatological diseases...")
data = client.fetch_rheumatology_biologics_data()
print(f"Data fetched for {len(data)} items.")
client.save_data_to_hf(data)
print("Data saved to Hugging Face Hub")
except Exception as e:
print(f"An error occurred: {str(e)}")
if __name__ == "__main__":
main()