| import asyncio |
| from concurrent.futures import ThreadPoolExecutor |
| from typing import TypedDict |
| import duckdb |
| import pandas as pd |
|
|
| async def execute_sql_query(sql_query: str) -> pd.DataFrame: |
| """Executes a SQL query on the DRIAS database and returns the results. |
| |
| This function connects to the DuckDB database containing DRIAS climate data |
| and executes the provided SQL query. It handles the database connection and |
| returns the results as a pandas DataFrame. |
| |
| Args: |
| sql_query (str): The SQL query to execute |
| |
| Returns: |
| pd.DataFrame: A DataFrame containing the query results |
| |
| Raises: |
| duckdb.Error: If there is an error executing the SQL query |
| """ |
| def _execute_query(): |
| |
| con = duckdb.connect() |
| results = con.sql(sql_query).fetchdf() |
| |
| return results |
|
|
| |
| loop = asyncio.get_event_loop() |
| with ThreadPoolExecutor() as executor: |
| return await loop.run_in_executor(executor, _execute_query) |
|
|
|
|
| class IndicatorPerYearAtLocationQueryParams(TypedDict, total=False): |
| """Parameters for querying an indicator's values over time at a location. |
| |
| This class defines the parameters needed to query climate indicator data |
| for a specific location over multiple years. |
| |
| Attributes: |
| indicator_column (str): The column name for the climate indicator |
| latitude (str): The latitude coordinate of the location |
| longitude (str): The longitude coordinate of the location |
| model (str): The climate model to use (optional) |
| """ |
| indicator_column: str |
| latitude: str |
| longitude: str |
| model: str |
|
|
|
|
| def indicator_per_year_at_location_query( |
| table: str, params: IndicatorPerYearAtLocationQueryParams |
| ) -> str: |
| """SQL Query to get the evolution of an indicator per year at a certain location |
| |
| Args: |
| table (str): sql table of the indicator |
| params (IndicatorPerYearAtLocationQueryParams) : dictionary with the required params for the query |
| |
| Returns: |
| str: the sql query |
| """ |
| indicator_column = params.get("indicator_column") |
| latitude = params.get("latitude") |
| longitude = params.get("longitude") |
| |
| if indicator_column is None or latitude is None or longitude is None: |
| return "" |
| |
| table = f"'hf://datasets/timeki/drias_db/{table.lower()}.parquet'" |
|
|
| sql_query = f"SELECT year, {indicator_column}, model\nFROM {table}\nWHERE latitude = {latitude} \nAnd longitude = {longitude} \nOrder by Year" |
|
|
| return sql_query |
|
|
| class IndicatorForGivenYearQueryParams(TypedDict, total=False): |
| """Parameters for querying an indicator's values across locations for a year. |
| |
| This class defines the parameters needed to query climate indicator data |
| across different locations for a specific year. |
| |
| Attributes: |
| indicator_column (str): The column name for the climate indicator |
| year (str): The year to query |
| model (str): The climate model to use (optional) |
| """ |
| indicator_column: str |
| year: str |
| model: str |
|
|
| def indicator_for_given_year_query( |
| table:str, params: IndicatorForGivenYearQueryParams |
| ) -> str: |
| """SQL Query to get the values of an indicator with their latitudes, longitudes and models for a given year |
| |
| Args: |
| table (str): sql table of the indicator |
| params (IndicatorForGivenYearQueryParams): dictionarry with the required params for the query |
| |
| Returns: |
| str: the sql query |
| """ |
| indicator_column = params.get("indicator_column") |
| year = params.get('year') |
| if year is None or indicator_column is None: |
| return "" |
| |
| table = f"'hf://datasets/timeki/drias_db/{table.lower()}.parquet'" |
|
|
| sql_query = f"Select {indicator_column}, latitude, longitude, model\nFrom {table}\nWhere year = {year}" |
| return sql_query |