Spaces:
Sleeping
Sleeping
import os | |
import ee | |
import geemap | |
import json | |
import geopandas as gpd | |
import streamlit as st | |
import pandas as pd | |
import geojson | |
from shapely.geometry import Polygon, MultiPolygon, shape, Point | |
from io import BytesIO | |
# Enable fiona driver | |
gpd.io.file.fiona.drvsupport.supported_drivers['KML'] = 'rw' | |
#Intialize EE library | |
# Error in EE Authentication | |
ee_credentials = os.environ.get("EE") | |
os.makedirs(os.path.expanduser("~/.config/earthengine/"), exist_ok=True) | |
with open(os.path.expanduser("~/.config/earthengine/credentials"), "w") as f: | |
f.write(ee_credentials) | |
ee.Initialize() | |
# Functions | |
def convert_to_2d_geometry(geom): #Handles Polygon Only | |
if geom is None: | |
return None | |
elif geom.has_z: | |
# Extract exterior coordinates and convert to 2D | |
exterior_coords = geom.exterior.coords[:] # Get all coordinates of the exterior ring | |
exterior_coords_2d = [(x, y) for x, y, *_ in exterior_coords] # Keep only the x and y coordinates, ignoring z | |
# Handle interior rings (holes) if any | |
interior_coords_2d = [] | |
for interior in geom.interiors: | |
interior_coords = interior.coords[:] | |
interior_coords_2d.append([(x, y) for x, y, *_ in interior_coords]) | |
# Create a new Polygon with 2D coordinates | |
return type(geom)(exterior_coords_2d, interior_coords_2d) | |
else: | |
return geom | |
def validate_KML_file(gdf): | |
# try: | |
# gdf = gpd.read_file(BytesIO(uploaded_file.read()), driver='KML') | |
# except Exception as e: | |
# ValueError("Input must be a valid KML file.") | |
if gdf.empty: | |
return { | |
'corner_points': None, | |
'area': None, | |
'perimeter': None, | |
'is_single_polygon': False} | |
polygon_info = {} | |
# Check if it's a single polygon or multipolygon | |
if isinstance(gdf.iloc[0].geometry, Polygon): | |
polygon_info['is_single_polygon'] = True | |
polygon = convert_to_2d_geometry(gdf.geometry.iloc[0]) | |
# Calculate corner points in GCS projection | |
polygon_info['corner_points'] = [ | |
(polygon.bounds[0], polygon.bounds[1]), | |
(polygon.bounds[2], polygon.bounds[1]), | |
(polygon.bounds[2], polygon.bounds[3]), | |
(polygon.bounds[0], polygon.bounds[3]) | |
] | |
# Calculate Centroids in GCS projection | |
polygon_info['centroid'] = polygon.centroid.coords[0] | |
# Calculate area and perimeter in EPSG:7761 projection | |
# It is a local projection defined for Gujarat as per NNRMS | |
polygon = gdf.to_crs(epsg=7761).geometry.iloc[0] | |
polygon_info['area'] = polygon.area | |
polygon_info['perimeter'] = polygon.length | |
else: | |
polygon_info['is_single_polygon'] = False | |
polygon_info['corner_points'] = None | |
polygon_info['area'] = None | |
polygon_info['perimeter'] = None | |
polygon_info['centroid'] = None | |
ValueError("Input must be a single Polygon.") | |
return polygon_info | |
# Calculate NDVI as Normalized Index | |
def reduce_zonal_ndvi(image, ee_object): | |
ndvi = image.normalizedDifference(['B8', 'B4']).rename('NDVI') | |
image = image.addBands(ndvi) | |
image = image.select('NDVI') | |
reduced = image.reduceRegion( | |
reducer=ee.Reducer.mean(), | |
geometry=ee_object.geometry(), | |
scale=10, | |
maxPixels=1e12 | |
) | |
return image.set(reduced) | |
# Get Zonal NDVI | |
def get_zonal_ndvi(collection, geom_ee_object): | |
reduced_collection = collection.map(lambda image: reduce_zonal_ndvi(image, ee_object=geom_ee_object)) | |
stats_list = reduced_collection.aggregate_array('NDVI').getInfo() | |
filenames = reduced_collection.aggregate_array('system:index').getInfo() | |
dates = [f.split("_")[0].split('T')[0] for f in reduced_collection.aggregate_array('system:index').getInfo()] | |
df = pd.DataFrame({'NDVI': stats_list, 'Date': dates, 'Imagery': filenames}) | |
return df | |
# put title in center | |
st.markdown(""" | |
<style> | |
h1 { | |
text-align: center; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
st.title("Mean NDVI Calculator") | |
# get the start and end date from the user | |
col = st.columns(2) | |
start_date = col[0].date_input("Start Date", value=pd.to_datetime('2021-01-01')) | |
end_date = col[1].date_input("End Date", value=pd.to_datetime('2021-01-30')) | |
start_date = start_date.strftime("%Y-%m-%d") | |
end_date = end_date.strftime("%Y-%m-%d") | |
max_cloud_cover = st.number_input("Max Cloud Cover", value=20) | |
# Get the geojson file from the user | |
uploaded_file = st.file_uploader("Upload KML/GeoJSON file", type=["geojson", "kml"]) | |
if uploaded_file is not None: | |
try: | |
if uploaded_file.name.endswith("kml"): | |
gdf = gpd.read_file(BytesIO(uploaded_file.read()), driver='KML') | |
elif uploaded_file.name.endswith("geojson"): | |
gdf = gpd.read_file(uploaded_file) | |
except Exception as e: | |
st.write('ValueError: "Input must be a valid KML file."') | |
st.stop() | |
# Validate KML File | |
polygon_info = validate_KML_file(gdf) | |
if polygon_info["is_single_polygon"]==True: | |
st.write("Uploaded KML file has single geometry.") | |
st.write("It has bounds as {0:.6f}, {1:.6f}, {2:.6f}, and {3:.6f}.".format( | |
polygon_info['corner_points'][0][0], | |
polygon_info['corner_points'][0][1], | |
polygon_info['corner_points'][2][0], | |
polygon_info['corner_points'][2][1] | |
)) | |
st.write("It has centroid at ({0:.6f}, {1:.6f}).".format(polygon_info['centroid'][0], polygon_info['centroid'][1])) | |
st.write("It has area of {:.2f} meter squared.".format(polygon_info['area'])) | |
st.write("It has perimeter of {:.2f} meters.".format(polygon_info['perimeter'])) | |
# # Read KML file | |
# geom_ee_object = ee.FeatureCollection(json.loads(gdf.to_json())) | |
# # Add buffer of 100m to ee_object | |
# buffered_ee_object = geom_ee_object.map(lambda feature: feature.buffer(100)) | |
# # Filter data based on the date, bounds, cloud coverage and select NIR and Red Band | |
# collection = ee.ImageCollection("COPERNICUS/S2_SR_HARMONIZED").filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', max_cloud_cover)).filter(ee.Filter.date(start_date, end_date)).select(['B4', 'B8']) | |
# # Get Zonal NDVI based on collection and geometries (Original KML and Buffered KML) | |
# df_geom = get_zonal_ndvi(collection, geom_ee_object) | |
# df_buffered_geom = get_zonal_ndvi(collection, buffered_ee_object) | |
# # Merge both Zonalstats and create resultant dataframe | |
# resultant_df = pd.merge(df_geom, df_buffered_geom, on='Date', how='inner') | |
# resultant_df = resultant_df.rename(columns={'NDVI_x': 'AvgNDVI_Inside', 'NDVI_y': 'Avg_NDVI_Buffer', 'Imagery_x': 'Imagery'}) | |
# resultant_df['Ratio'] = resultant_df['AvgNDVI_Inside'] / resultant_df['Avg_NDVI_Buffer'] | |
# resultant_df.drop(columns=['Imagery_y'], inplace=True) | |
# # Re-order the columns of the resultant dataframe | |
# resultant_df = resultant_df[['Date', 'Imagery', 'AvgNDVI_Inside', 'Avg_NDVI_Buffer', 'Ratio']] | |
# st.write(resultant_df) | |
else: | |
st.write('ValueError: "Input must have single polygon geometry"') | |
st.write(gdf) | |
st.stop() |