insight / data_access.py
dwb2023's picture
undo experiment
aafdc15
"""
Data access module for GDELT data retrieval and filtering
"""
import duckdb
import pandas as pd
def get_gdelt_data(
limit=10,
tone_threshold=-7.0,
start_date=None,
end_date=None,
source_filter=None,
themes_filter=None,
persons_filter=None,
organizations_filter=None,
locations_filter=None
):
"""Get filtered GDELT data from DuckDB with dynamic query parameters."""
con = duckdb.connect(database=':memory:')
# Create view of the dataset
con.execute("""
CREATE VIEW negative_tone AS (
SELECT *
FROM read_parquet('hf://datasets/dwb2023/gdelt-gkg-march2020-v2@~parquet/default/negative_tone/*.parquet')
);
""")
# Base query components
base_conditions = [
"SourceCollectionIdentifier IS NOT NULL",
"DATE IS NOT NULL",
"SourceCommonName IS NOT NULL",
"DocumentIdentifier IS NOT NULL",
"V1Counts IS NOT NULL",
"V1Themes IS NOT NULL",
"V1Locations IS NOT NULL",
"V1Persons IS NOT NULL",
"V1Organizations IS NOT NULL",
"V2GCAM IS NOT NULL",
"\"V2.1Quotations\" IS NOT NULL",
"tone <= ?"
]
params = [tone_threshold]
extra_conditions = []
# Add optional filters
if start_date:
extra_conditions.append("DATE >= ?")
params.append(start_date)
if end_date:
extra_conditions.append("DATE <= ?")
params.append(end_date)
if source_filter:
extra_conditions.append("SourceCommonName ILIKE ?")
params.append(f"%{source_filter}%")
if themes_filter:
extra_conditions.append("(V1Themes ILIKE ? OR V2EnhancedThemes ILIKE ?)")
params.extend([f"%{themes_filter}%", f"%{themes_filter}%"])
if persons_filter:
extra_conditions.append("(V1Persons ILIKE ? OR V2EnhancedPersons ILIKE ?)")
params.extend([f"%{persons_filter}%", f"%{persons_filter}%"])
if organizations_filter:
extra_conditions.append("(V1Organizations ILIKE ? OR V2EnhancedOrganizations ILIKE ?)")
params.extend([f"%{organizations_filter}%", f"%{organizations_filter}%"])
if locations_filter:
extra_conditions.append("(V1Locations ILIKE ? OR V2EnhancedLocations ILIKE ?)")
params.extend([f"%{locations_filter}%", f"%{locations_filter}%"])
# Combine all conditions
all_conditions = base_conditions + extra_conditions
where_clause = " AND ".join(all_conditions) if all_conditions else "1=1"
# Build final query
query = f"""
SELECT *
FROM negative_tone
WHERE {where_clause}
LIMIT ?;
"""
params.append(limit)
# Execute query with parameters
results_df = con.execute(query, params).fetchdf()
con.close()
return results_df
def filter_dataframe(df, source_filter=None, date_filter=None, tone_min=None, tone_max=None):
"""Filter dataframe based on provided criteria"""
display_df = df[['GKGRECORDID', 'DATE', 'SourceCommonName', 'tone']].copy()
display_df.columns = ['ID', 'Date', 'Source', 'Tone']
if source_filter:
display_df = display_df[display_df['Source'].str.contains(source_filter, case=False, na=False)]
if date_filter:
display_df = display_df[display_df['Date'].str.contains(date_filter, na=False)]
if tone_min is not None and tone_max is not None:
display_df = display_df[
(display_df['Tone'] >= tone_min) &
(display_df['Tone'] <= tone_max)
]
return display_df
# Constants for raw data categories
GDELT_CATEGORIES = {
"Metadata": ["GKGRECORDID", "DATE", "SourceCommonName", "DocumentIdentifier", "V2.1Quotations", "tone"],
"Persons": ["V2EnhancedPersons", "V1Persons"],
"Organizations": ["V2EnhancedOrganizations", "V1Organizations"],
"Locations": ["V2EnhancedLocations", "V1Locations"],
"Themes": ["V2EnhancedThemes", "V1Themes"],
"Names": ["V2.1AllNames"],
"Counts": ["V2.1Counts", "V1Counts"],
"Amounts": ["V2.1Amounts"],
"V2GCAM": ["V2GCAM"],
"V2.1EnhancedDates": ["V2.1EnhancedDates"],
}