Spaces:
Build error
Build error
import json | |
import os | |
import duckdb | |
# Configure DuckDB connection | |
if not os.getenv("motherduck_token"): | |
raise Exception( | |
"No motherduck token found. Please set the `motherduck_token` environment variable." | |
) | |
else: | |
con = duckdb.connect("md:climatebase") | |
con.sql("USE climatebase;") | |
# load extensions | |
con.sql("""INSTALL spatial; LOAD spatial;""") | |
# to-do: pass con through decorator | |
def list_projects_by_author(author_id): | |
return con.execute( | |
"SELECT DISTINCT name FROM project WHERE (authorId = ? OR authorId = 'default') AND (geometry IS NOT NULL)", | |
[author_id], | |
).df() | |
def get_project_geometry(project_name): | |
return con.execute( | |
"SELECT geometry FROM project WHERE name = ? LIMIT 1", [project_name] | |
).fetchall() | |
def get_project_centroid(project_name): | |
# Workaround to get centroid of project | |
# To-do: refactor to only use DuckDB spatial extension | |
_geom = get_project_geometry(project_name) | |
_polygon = json.dumps(json.loads(_geom[0][0])["features"][0]["geometry"]) | |
return con.sql( | |
f"SELECT ST_X(ST_Centroid(ST_GeomFromGeoJSON('{_polygon}'))) AS longitude, ST_Y(ST_Centroid(ST_GeomFromGeoJSON('{_polygon}'))) AS latitude;" | |
).fetchall()[0] | |
def get_project_scores(project_name, start_year, end_year): | |
return con.execute( | |
"SELECT * FROM bioindicator WHERE (year >= ? AND year <= ? AND project_name = ?)", | |
[start_year, end_year, project_name], | |
).df() | |
def check_if_table_exists(table_name): | |
tables = con.execute("SHOW TABLES;").fetchall() | |
for i in range(len(tables)): | |
tables[i] = tables[i][0] | |
return table_name in tables | |
def check_if_project_exists_for_year(project_name, year): | |
return con.execute( | |
"SELECT COUNT(1) FROM bioindicator WHERE (year = ? AND project_name = ?)", | |
[year, project_name], | |
).fetchall()[0][0] | |
def write_score_to_temptable(df): | |
con.sql( | |
"CREATE OR REPLACE TABLE _temptable AS SELECT *, (value * area) AS score FROM (SELECT year, project_name, metric, AVG(value * coefficient) AS value, area FROM df GROUP BY year, project_name, metric, area ORDER BY project_name, metric)" | |
) | |
return True | |
def get_or_create_bioindicator_table(): | |
con.sql( | |
""" | |
USE climatebase; | |
CREATE TABLE IF NOT EXISTS bioindicator (year BIGINT, project_name VARCHAR(255), metric VARCHAR(255), value DOUBLE, area DOUBLE, score DOUBLE, CONSTRAINT unique_year_project_name_metric UNIQUE (year, project_name, metric)); | |
""" | |
) | |
return True | |
def upsert_project_record(): | |
con.sql( | |
""" | |
INSERT INTO bioindicator FROM _temptable | |
ON CONFLICT (year, project_name, metric) DO UPDATE SET value = excluded.value; | |
""" | |
) | |
return True | |