import ibis import ibis.selectors as s from ibis import _ import fiona import geopandas as gpd import rioxarray from shapely.geometry import box con = ibis.duckdb.connect() con.load_extension("spatial") threads = -1 agency_name = con.read_parquet("https://huggingface.co/datasets/boettiger-lab/pad-us-3/resolve/main/parquet/pad-agency-name.parquet").select(manager_name_id = "Code", manager_name = "Dom") agency_type = con.read_parquet("https://huggingface.co/datasets/boettiger-lab/pad-us-3/resolve/main/parquet/pad-agency-type.parquet").select(manager_type_id = "Code", manager_type = "Dom") desig_type = con.read_parquet("https://huggingface.co/datasets/boettiger-lab/pad-us-3/resolve/main/parquet/pad-desgination-type.parquet").select(designation_type_id = "Code", designation_type = "Dom") public_access = con.read_parquet("https://huggingface.co/datasets/boettiger-lab/pad-us-3/resolve/main/parquet/pad-public-access.parquet").select(public_access_id = "Code", public_access = "Dom") state_name = con.read_parquet("https://huggingface.co/datasets/boettiger-lab/pad-us-3/resolve/main/parquet/pad-state-name.parquet").select(state = "Code", state_name = "Dom") iucn = con.read_parquet("https://huggingface.co/datasets/boettiger-lab/pad-us-3/resolve/main/parquet/pad-iucn.parquet").select(iucn_code = "CODE", iucn_category = "DOM") # + fgb = "https://data.source.coop/cboettig/pad-us-3/pad-us3-combined.fgb" parquet = "https://data.source.coop/cboettig/pad-us-3/pad-us3-combined.parquet" # gdb = "https://data.source.coop/cboettig/pad-us-3/PADUS3/PAD_US3_0.gdb" # original, all tables # or read the fgb version, much slower # pad = con.read_geo(fgb) # pad = con.read_parquet(parquet) # Currently ibis doesn't detect that this is GeoParquet. We need a SQL escape-hatch to cast the geometry con.raw_sql(f"CREATE OR REPLACE VIEW pad AS SELECT *, st_geomfromwkb(geometry) as geom from read_parquet('{parquet}')") pad = con.table("pad") # - # Get the CRS # fiona is not built with parquet support, must read this from fgb. ideally duckdb's st_read_meta would do this from the parquet meta = fiona.open(fgb) crs = meta.crs ## optional getting bounds # extract bounds. (in this case these are already in the same projection actually so r.rio.bounds() would work) r = rioxarray.open_rasterio("https://data.source.coop/cboettig/mobi/species-richness-all/SpeciesRichness_All.tif") bounds = box(*r.rio.transform_bounds(crs)) # + # Now we can do all the usual SQL queries to subset the data. Note the `geom.within()` spatial filter! focal_columns = ["row_n", "FeatClass", "Mang_Name", "Mang_Type", "Des_Tp", "Pub_Access", "GAP_Sts", "IUCN_Cat", "Unit_Nm", "State_Nm", "EsmtHldr", "Date_Est", "SHAPE_Area", "geom"] pad_parquet = ( pad .mutate(row_n=ibis.row_number()) .filter((_.FeatClass.isin(["Easement", "Fee"])) | ( (_.FeatClass == "Proclamation") & (_.Mang_Name == "TRIB")) ) .filter(_.geom.within(bounds)) .select(focal_columns) .rename(geometry="geom") ) # Need to revist this to also process the external polygons # .filter(~ _.geom.within(bounds)) pad_parquet.to_parquet("pad-processed.parquet") # + # Add our custom bucket categories: # really could be done seperately. categorical_columns = ["bucket", "FeatClass", "Mang_Name", "Mang_Type", "Des_Tp", "Pub_Access", "GAP_Sts", "IUCN_Cat", "Unit_Nm", "State_Nm", "EsmtHldr", "Date_Est", "row_n"] public = ["DIST", "LOC", "FED", "STAT", "JNT"] case = ( ibis.case() .when( (_.Mang_Type.isin(public) & _.GAP_Sts.isin(["1","2"])), "public conservation") .when( (_.Mang_Type.isin(public) & _.GAP_Sts.isin(["3"])), "mixed use") .when( (_.Mang_Type.isin(public) & _.GAP_Sts.isin(["4"])), "public unprotected") .when( (_.Mang_Type.isin(["PVT", "NGO"]) & (_.GAP_Sts.isin(["1","2", "3"]))), "private conservation") .when( (_.Mang_Type.isin(["PVT", "NGO"]) & (_.GAP_Sts.isin(["4"]))), "private unprotected") .when( (_.Mang_Type == "TRIB"), "tribal") .end() ) pad_grouping = ( pad .mutate(row_n=ibis.row_number()) .filter((_.FeatClass.isin(["Easement", "Fee"])) | ( (_.FeatClass == "Proclamation") & (_.Mang_Name == "TRIB")) ) .mutate(bucket = case) .select(categorical_columns) .rename(manager_name_id = "Mang_Name", manager_type_id = "Mang_Type", designation_type_id = "Des_Tp", public_access_id = "Pub_Access", category = "FeatClass", iucn_code = "IUCN_Cat", gap_code = "GAP_Sts", state = "State_Nm", easement_holder = "EsmtHldr", date_established = "Date_Est", area_name = "Unit_Nm") .left_join(agency_name, "manager_name_id") .left_join(agency_type, "manager_type_id") .left_join(desig_type, "designation_type_id") .left_join(public_access, "public_access_id") .left_join(state_name, "state") .left_join(iucn, "iucn_code") .select(~s.contains("_right")) ) pad_grouping.to_parquet("pad-groupings.parquet") # - (pad_parquet .rename(manager_name_id = "Mang_Name", manager_type_id = "Mang_Type", designation_type_id = "Des_Tp", public_access_id = "Pub_Access", category = "FeatClass", iucn_code = "IUCN_Cat", gap_code = "GAP_Sts", state = "State_Nm", easement_holder = "EsmtHldr", date_established = "Date_Est", area_square_meters = "SHAPE_Area", area_name = "Unit_Nm") .left_join(agency_name, "manager_name_id") .left_join(agency_type, "manager_type_id") .left_join(desig_type, "designation_type_id") .left_join(public_access, "public_access_id") .left_join(state_name, "state") .left_join(iucn, "iucn_code") .select(~s.contains("_right")) # .select(~s.contains("_id")) # if we keep the original geoparquet WKB 'geometry' column, to_pandas() (or execute) gives us only a normal pandas data.frame, and geopandas doesn't see the metadata. # if we replace the geometry with duckdb-native 'geometry' type, to_pandas() gives us a geopanadas! But requires reading into RAM. .to_pandas() .set_crs(crs) .to_parquet("pad-processed.parquet") ) # + import rasterio from rasterstats import zonal_stats import geopandas as gpd import pandas as pd from joblib import Parallel, delayed def big_zonal_stats(vec_file, tif_file, stats, col_name, n_jobs, verbose = 10, timeout=10000): # read in vector as geopandas, match CRS to raster with rasterio.open(tif_file) as src: raster_profile = src.profile gdf = gpd.read_parquet(vec_file).to_crs(raster_profile['crs']) # row_n is a global id, may refer to excluded polygons # gdf["row_id"] = gdf.index + 1 # lamba fn to zonal_stats a slice: def get_stats(geom_slice, tif_file, stats): stats = zonal_stats(geom_slice.geometry, tif_file, stats = stats) stats[0]['row_n'] = geom_slice.row_n return stats[0] # iteratation (could be a list comprehension?) jobs = [] for r in gdf.itertuples(): jobs.append(delayed(get_stats)(r, tif_file, stats)) # And here we go output = Parallel(n_jobs=n_jobs, timeout=timeout, verbose=verbose)(jobs) # reshape output df = ( pd.DataFrame(output) .rename(columns={'mean': col_name}) .merge(gdf, how='right', on = 'row_n') ) gdf = gpd.GeoDataFrame(df, geometry="geometry") return gdf # - import geopandas as gpd test = gpd.read_parquet("pad-processed.parquet") test.columns # + # %%time tif_file = "/home/rstudio/boettiger-lab/us-pa-policy/hfp_2021_100m_v1-2_cog.tif" vec_file = './pad-processed.parquet' df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "human_impact", n_jobs=threads, verbose=0) gpd.GeoDataFrame(df, geometry="geometry").to_parquet("pad-stats.parquet") # + # %%time tif_file = '/home/rstudio/source.coop/cboettig/mobi/species-richness-all/SpeciesRichness_All.tif' vec_file = './pad-stats.parquet' big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "richness", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet") # + # %%time tif_file = '/home/rstudio/source.coop/cboettig/mobi/range-size-rarity-all/RSR_All.tif' vec_file = './pad-stats.parquet' df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "rsr", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet") # + # %%time tif_file = '/home/rstudio/source.coop/vizzuality/lg-land-carbon-data/deforest_carbon_100m_cog.tif' vec_file = './pad-stats.parquet' df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "deforest_carbon", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet") # + # %%time tif_file = '/home/rstudio/source.coop/vizzuality/lg-land-carbon-data/natcrop_bii_100m_cog.tif' vec_file = './pad-stats.parquet' df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "biodiversity_intactness_loss", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet") # + # %%time tif_file = '/home/rstudio/source.coop/vizzuality/lg-land-carbon-data/natcrop_fii_100m_cog.tif' vec_file = './pad-stats.parquet' df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "forest_integrity_loss", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet") # + # %%time tif_file = '/home/rstudio/source.coop/vizzuality/lg-land-carbon-data/natcrop_expansion_100m_cog.tif' vec_file = './pad-stats.parquet' df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "crop_expansion", n_jobs=threads, verbose=0) gpd.GeoDataFrame(df, geometry="geometry").to_parquet("pad-stats.parquet") # + # %%time tif_file = '/home/rstudio/source.coop/vizzuality/lg-land-carbon-data/natcrop_reduction_100m_cog.tif' vec_file = './pad-stats.parquet' df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "crop_reduction", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet") # + # %%time tif_file = '/home/rstudio/source.coop/cboettig/carbon/cogs/irrecoverable_c_total_2018.tif' vec_file = './pad-stats.parquet' df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "irrecoverable_carbon", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet") # + # %%time tif_file = '/home/rstudio/source.coop/cboettig/carbon/cogs/manageable_c_total_2018.tif' vec_file = './pad-stats.parquet' df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "manageable_carbon", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet") # + # %%time tif_file = '/home/rstudio/minio/shared-biodiversity/redlist/cog/combined_rwr_2022.tif' vec_file = './pad-stats.parquet' df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "all_species_rwr", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet") # + # %%time tif_file = '/home/rstudio/minio/shared-biodiversity/redlist/cog/combined_sr_2022.tif' vec_file = './pad-stats.parquet' df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "all_species_richness", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet") # + columns = ''' area_name, manager_name, manager_name_id, manager_type, manager_type_id, designation_type, designation_type_id, public_access, category, iucn_code, iucn_category, gap_code, state, state_name, easement_holder, date_established, area_square_meters, geometry, all_species_richness, all_species_rwr, manageable_carbon, irrecoverable_carbon, crop_reduction, crop_expansion, deforest_carbon, richness, rsr, forest_integrity_loss, biodiversity_intactness_loss ''' items = columns.split(',') # Remove empty strings and whitespace items = [item.strip() for item in items if item.strip()] items # - import ibis from ibis import _ df = ibis.read_parquet("pad-stats.parquet").select(items) df.group_by(_.manager_type).aggregate(n = _.manager_type.count()).to_pandas() # + ## create pad.duckdb from sqlalchemy import create_engine from sqlalchemy import text db_uri = "duckdb:///pad.duckdb" engine = create_engine(db_uri) con = engine.connect() con.execute(f"create or replace table pad as select {columns} from 'pad-stats.parquet'") con.close() # pad_stats = ibis.read_parquet("pad-stats.parquet") # pad_stats.head(20).to_pandas() # - import pandas as pd db_uri = "duckdb:///pad.duckdb" engine = create_engine(db_uri) con = engine.connect() pd.DataFrame(con.execute("select * from pad limit 1").fetchall())