pad-us / preprocess.py
Carl Boettiger
grouping
308c0d8
import ibis
import ibis.selectors as s
from ibis import _
import fiona
import geopandas as gpd
import rioxarray
from shapely.geometry import box
con = ibis.duckdb.connect()
con.load_extension("spatial")
threads = -1
agency_name = con.read_parquet("https://huggingface.co/datasets/boettiger-lab/pad-us-3/resolve/main/parquet/pad-agency-name.parquet").select(manager_name_id = "Code", manager_name = "Dom")
agency_type = con.read_parquet("https://huggingface.co/datasets/boettiger-lab/pad-us-3/resolve/main/parquet/pad-agency-type.parquet").select(manager_type_id = "Code", manager_type = "Dom")
desig_type = con.read_parquet("https://huggingface.co/datasets/boettiger-lab/pad-us-3/resolve/main/parquet/pad-desgination-type.parquet").select(designation_type_id = "Code", designation_type = "Dom")
public_access = con.read_parquet("https://huggingface.co/datasets/boettiger-lab/pad-us-3/resolve/main/parquet/pad-public-access.parquet").select(public_access_id = "Code", public_access = "Dom")
state_name = con.read_parquet("https://huggingface.co/datasets/boettiger-lab/pad-us-3/resolve/main/parquet/pad-state-name.parquet").select(state = "Code", state_name = "Dom")
iucn = con.read_parquet("https://huggingface.co/datasets/boettiger-lab/pad-us-3/resolve/main/parquet/pad-iucn.parquet").select(iucn_code = "CODE", iucn_category = "DOM")
# +
fgb = "https://data.source.coop/cboettig/pad-us-3/pad-us3-combined.fgb"
parquet = "https://data.source.coop/cboettig/pad-us-3/pad-us3-combined.parquet"
# gdb = "https://data.source.coop/cboettig/pad-us-3/PADUS3/PAD_US3_0.gdb" # original, all tables
# or read the fgb version, much slower
# pad = con.read_geo(fgb)
# pad = con.read_parquet(parquet)
# Currently ibis doesn't detect that this is GeoParquet. We need a SQL escape-hatch to cast the geometry
con.raw_sql(f"CREATE OR REPLACE VIEW pad AS SELECT *, st_geomfromwkb(geometry) as geom from read_parquet('{parquet}')")
pad = con.table("pad")
# -
# Get the CRS
# fiona is not built with parquet support, must read this from fgb. ideally duckdb's st_read_meta would do this from the parquet
meta = fiona.open(fgb)
crs = meta.crs
## optional getting bounds
# extract bounds. (in this case these are already in the same projection actually so r.rio.bounds() would work)
r = rioxarray.open_rasterio("https://data.source.coop/cboettig/mobi/species-richness-all/SpeciesRichness_All.tif")
bounds = box(*r.rio.transform_bounds(crs))
# +
# Now we can do all the usual SQL queries to subset the data. Note the `geom.within()` spatial filter!
focal_columns = ["row_n", "FeatClass", "Mang_Name",
"Mang_Type", "Des_Tp", "Pub_Access",
"GAP_Sts", "IUCN_Cat", "Unit_Nm",
"State_Nm", "EsmtHldr", "Date_Est",
"SHAPE_Area", "geom"]
pad_parquet = (
pad
.mutate(row_n=ibis.row_number())
.filter((_.FeatClass.isin(["Easement", "Fee"])) | (
(_.FeatClass == "Proclamation") & (_.Mang_Name == "TRIB"))
)
.filter(_.geom.within(bounds))
.select(focal_columns)
.rename(geometry="geom")
)
# Need to revist this to also process the external polygons
# .filter(~ _.geom.within(bounds))
pad_parquet.to_parquet("pad-processed.parquet")
# +
# Add our custom bucket categories:
# really could be done seperately.
categorical_columns = ["bucket", "FeatClass", "Mang_Name",
"Mang_Type", "Des_Tp", "Pub_Access",
"GAP_Sts", "IUCN_Cat", "Unit_Nm",
"State_Nm", "EsmtHldr", "Date_Est",
"row_n"]
public = ["DIST", "LOC", "FED", "STAT", "JNT"]
case = (
ibis.case()
.when( (_.Mang_Type.isin(public) & _.GAP_Sts.isin(["1","2"])), "public conservation")
.when( (_.Mang_Type.isin(public) & _.GAP_Sts.isin(["3"])), "mixed use")
.when( (_.Mang_Type.isin(public) & _.GAP_Sts.isin(["4"])), "public unprotected")
.when( (_.Mang_Type.isin(["PVT", "NGO"]) & (_.GAP_Sts.isin(["1","2", "3"]))), "private conservation")
.when( (_.Mang_Type.isin(["PVT", "NGO"]) & (_.GAP_Sts.isin(["4"]))), "private unprotected")
.when( (_.Mang_Type == "TRIB"), "tribal")
.end()
)
pad_grouping = (
pad
.mutate(row_n=ibis.row_number())
.filter((_.FeatClass.isin(["Easement", "Fee"])) | (
(_.FeatClass == "Proclamation") & (_.Mang_Name == "TRIB"))
)
.mutate(bucket = case)
.select(categorical_columns)
.rename(manager_name_id = "Mang_Name",
manager_type_id = "Mang_Type",
designation_type_id = "Des_Tp",
public_access_id = "Pub_Access",
category = "FeatClass",
iucn_code = "IUCN_Cat",
gap_code = "GAP_Sts",
state = "State_Nm",
easement_holder = "EsmtHldr",
date_established = "Date_Est",
area_name = "Unit_Nm")
.left_join(agency_name, "manager_name_id")
.left_join(agency_type, "manager_type_id")
.left_join(desig_type, "designation_type_id")
.left_join(public_access, "public_access_id")
.left_join(state_name, "state")
.left_join(iucn, "iucn_code")
.select(~s.contains("_right"))
)
pad_grouping.to_parquet("pad-groupings.parquet")
# -
(pad_parquet
.rename(manager_name_id = "Mang_Name",
manager_type_id = "Mang_Type",
designation_type_id = "Des_Tp",
public_access_id = "Pub_Access",
category = "FeatClass",
iucn_code = "IUCN_Cat",
gap_code = "GAP_Sts",
state = "State_Nm",
easement_holder = "EsmtHldr",
date_established = "Date_Est",
area_square_meters = "SHAPE_Area",
area_name = "Unit_Nm")
.left_join(agency_name, "manager_name_id")
.left_join(agency_type, "manager_type_id")
.left_join(desig_type, "designation_type_id")
.left_join(public_access, "public_access_id")
.left_join(state_name, "state")
.left_join(iucn, "iucn_code")
.select(~s.contains("_right"))
# .select(~s.contains("_id"))
# if we keep the original geoparquet WKB 'geometry' column, to_pandas() (or execute) gives us only a normal pandas data.frame, and geopandas doesn't see the metadata.
# if we replace the geometry with duckdb-native 'geometry' type, to_pandas() gives us a geopanadas! But requires reading into RAM.
.to_pandas()
.set_crs(crs)
.to_parquet("pad-processed.parquet")
)
# +
import rasterio
from rasterstats import zonal_stats
import geopandas as gpd
import pandas as pd
from joblib import Parallel, delayed
def big_zonal_stats(vec_file, tif_file, stats, col_name, n_jobs, verbose = 10, timeout=10000):
# read in vector as geopandas, match CRS to raster
with rasterio.open(tif_file) as src:
raster_profile = src.profile
gdf = gpd.read_parquet(vec_file).to_crs(raster_profile['crs'])
# row_n is a global id, may refer to excluded polygons
# gdf["row_id"] = gdf.index + 1
# lamba fn to zonal_stats a slice:
def get_stats(geom_slice, tif_file, stats):
stats = zonal_stats(geom_slice.geometry, tif_file, stats = stats)
stats[0]['row_n'] = geom_slice.row_n
return stats[0]
# iteratation (could be a list comprehension?)
jobs = []
for r in gdf.itertuples():
jobs.append(delayed(get_stats)(r, tif_file, stats))
# And here we go
output = Parallel(n_jobs=n_jobs, timeout=timeout, verbose=verbose)(jobs)
# reshape output
df = (
pd.DataFrame(output)
.rename(columns={'mean': col_name})
.merge(gdf, how='right', on = 'row_n')
)
gdf = gpd.GeoDataFrame(df, geometry="geometry")
return gdf
# -
import geopandas as gpd
test = gpd.read_parquet("pad-processed.parquet")
test.columns
# +
# %%time
tif_file = "/home/rstudio/boettiger-lab/us-pa-policy/hfp_2021_100m_v1-2_cog.tif"
vec_file = './pad-processed.parquet'
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'],
col_name = "human_impact", n_jobs=threads, verbose=0)
gpd.GeoDataFrame(df, geometry="geometry").to_parquet("pad-stats.parquet")
# +
# %%time
tif_file = '/home/rstudio/source.coop/cboettig/mobi/species-richness-all/SpeciesRichness_All.tif'
vec_file = './pad-stats.parquet'
big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "richness", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet")
# +
# %%time
tif_file = '/home/rstudio/source.coop/cboettig/mobi/range-size-rarity-all/RSR_All.tif'
vec_file = './pad-stats.parquet'
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'],
col_name = "rsr", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet")
# +
# %%time
tif_file = '/home/rstudio/source.coop/vizzuality/lg-land-carbon-data/deforest_carbon_100m_cog.tif'
vec_file = './pad-stats.parquet'
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'],
col_name = "deforest_carbon", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet")
# +
# %%time
tif_file = '/home/rstudio/source.coop/vizzuality/lg-land-carbon-data/natcrop_bii_100m_cog.tif'
vec_file = './pad-stats.parquet'
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'],
col_name = "biodiversity_intactness_loss", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet")
# +
# %%time
tif_file = '/home/rstudio/source.coop/vizzuality/lg-land-carbon-data/natcrop_fii_100m_cog.tif'
vec_file = './pad-stats.parquet'
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'],
col_name = "forest_integrity_loss", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet")
# +
# %%time
tif_file = '/home/rstudio/source.coop/vizzuality/lg-land-carbon-data/natcrop_expansion_100m_cog.tif'
vec_file = './pad-stats.parquet'
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "crop_expansion", n_jobs=threads, verbose=0)
gpd.GeoDataFrame(df, geometry="geometry").to_parquet("pad-stats.parquet")
# +
# %%time
tif_file = '/home/rstudio/source.coop/vizzuality/lg-land-carbon-data/natcrop_reduction_100m_cog.tif'
vec_file = './pad-stats.parquet'
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "crop_reduction", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet")
# +
# %%time
tif_file = '/home/rstudio/source.coop/cboettig/carbon/cogs/irrecoverable_c_total_2018.tif'
vec_file = './pad-stats.parquet'
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "irrecoverable_carbon", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet")
# +
# %%time
tif_file = '/home/rstudio/source.coop/cboettig/carbon/cogs/manageable_c_total_2018.tif'
vec_file = './pad-stats.parquet'
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "manageable_carbon", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet")
# +
# %%time
tif_file = '/home/rstudio/minio/shared-biodiversity/redlist/cog/combined_rwr_2022.tif'
vec_file = './pad-stats.parquet'
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "all_species_rwr", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet")
# +
# %%time
tif_file = '/home/rstudio/minio/shared-biodiversity/redlist/cog/combined_sr_2022.tif'
vec_file = './pad-stats.parquet'
df = big_zonal_stats(vec_file, tif_file, stats = ['mean'], col_name = "all_species_richness", n_jobs=threads, verbose=0).to_parquet("pad-stats.parquet")
# +
columns = '''
area_name,
manager_name,
manager_name_id,
manager_type,
manager_type_id,
designation_type,
designation_type_id,
public_access,
category,
iucn_code,
iucn_category,
gap_code,
state,
state_name,
easement_holder,
date_established,
area_square_meters,
geometry,
all_species_richness,
all_species_rwr,
manageable_carbon,
irrecoverable_carbon,
crop_reduction,
crop_expansion,
deforest_carbon,
richness,
rsr,
forest_integrity_loss,
biodiversity_intactness_loss
'''
items = columns.split(',')
# Remove empty strings and whitespace
items = [item.strip() for item in items if item.strip()]
items
# -
import ibis
from ibis import _
df = ibis.read_parquet("pad-stats.parquet").select(items)
df.group_by(_.manager_type).aggregate(n = _.manager_type.count()).to_pandas()
# +
## create pad.duckdb
from sqlalchemy import create_engine
from sqlalchemy import text
db_uri = "duckdb:///pad.duckdb"
engine = create_engine(db_uri)
con = engine.connect()
con.execute(f"create or replace table pad as select {columns} from 'pad-stats.parquet'")
con.close()
# pad_stats = ibis.read_parquet("pad-stats.parquet")
# pad_stats.head(20).to_pandas()
# -
import pandas as pd
db_uri = "duckdb:///pad.duckdb"
engine = create_engine(db_uri)
con = engine.connect()
pd.DataFrame(con.execute("select * from pad limit 1").fetchall())