Spaces:
Runtime error
Runtime error
import ee | |
import numpy as np | |
from PIL import Image | |
import io | |
import pandas as pd | |
from datetime import datetime, timedelta | |
import requests | |
import os | |
import logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
service_account = "deploy-1@firepatternrecognition.iam.gserviceaccount.com" | |
credentials = ee.ServiceAccountCredentials(service_account, '2AB9532A8B7759C26D6980A570EF6EEE8B49DE78_firepatternrecognition-c10928272b24.json') | |
ee.Initialize(credentials) | |
# Initialize the Earth Engine module. | |
width, height = 158,292 | |
nw_corner_list = [98.00, 20.15] | |
ne_corner_list = [99.58, 20.15] | |
sw_corner_list = [98.00, 17.23] | |
se_corner_list = [99.58, 17.23] | |
# Define the coordinates of the new area. | |
nw_corner = ee.Geometry.Point(nw_corner_list) | |
ne_corner = ee.Geometry.Point(ne_corner_list) | |
sw_corner = ee.Geometry.Point(sw_corner_list) | |
se_corner = ee.Geometry.Point(se_corner_list) | |
# Create a rectangle from the corners. | |
area = ee.Geometry.Polygon([ | |
[nw_corner.coordinates(), ne_corner.coordinates(), se_corner.coordinates(), sw_corner.coordinates(), nw_corner.coordinates()] | |
]) | |
def calculate_ndvi(image): | |
"""Calculate NDVI for an image.""" | |
ndvi = image.normalizedDifference(['B8', 'B4']).rename('NDVI') | |
return image.addBands(ndvi) | |
def create_image_ndvi(date1, date2, new_area=area): | |
logging.info("Starting create image ndvi") | |
date_obj = datetime.strptime(date2, '%Y-%m-%d') | |
new_date_obj = date_obj + timedelta(days=1) | |
new_date_str = new_date_obj.strftime('%Y-%m-%d') | |
# Create an ImageCollection and filter it by date and bounds | |
image_collection = ee.ImageCollection('COPERNICUS/S2_SR') \ | |
.filterDate(date1, new_date_str) \ | |
.filterBounds(new_area) | |
# Select bands 4 and 8 and calculate NDVI for each image in the collection | |
image_collection = image_collection.select(['B4', 'B8']).map(calculate_ndvi) | |
# Get the median NDVI image and clip it to the area of interest | |
ndvi_image = image_collection.select('NDVI').max().clip(new_area) | |
ndvi_vis_params = { | |
'min': -1, | |
'max': 1, | |
'palette': ['white', 'black'] | |
} | |
# Get the URL for the NDVI image | |
url = ndvi_image.getThumbURL({ | |
'min': ndvi_vis_params['min'], | |
'max': ndvi_vis_params['max'], | |
'palette': ','.join(ndvi_vis_params['palette']), | |
'region': new_area, | |
'dimensions': [width, height], | |
'format': 'png' | |
}) | |
response = requests.get(url) | |
image = Image.open(io.BytesIO(response.content)).convert("L") | |
ndvi_array = np.array(image) | |
ndvi_array = ndvi_array / 255.0 | |
logging.info("End create image ndvi") | |
return ndvi_array,image | |
def calculate_ndwi(image): | |
ndwi = image.normalizedDifference(['B3', 'B8']).rename('NDWI') | |
return image.addBands(ndwi) | |
def create_image_ndwi(date1, date2, new_area=area): | |
logging.info("Starting create image ndwi") | |
date_obj = datetime.strptime(date2, '%Y-%m-%d') | |
new_date_obj = date_obj + timedelta(days=1) | |
new_date_str = new_date_obj.strftime('%Y-%m-%d') | |
image_collection = ee.ImageCollection('COPERNICUS/S2_SR') \ | |
.filterDate(date1, new_date_str) \ | |
.filterBounds(new_area) | |
# Select bands 3 and 8 and calculate NDWI for each image in the collection | |
image_collection = image_collection.select(['B3', 'B8']).map(calculate_ndwi) | |
# Get the mean NDWI image and clip it to the area of interest | |
ndwi_image = image_collection.select('NDWI').max().clip(new_area) | |
# Define visualization parameters for NDWI | |
ndwi_vis_params = { | |
'min': -1, | |
'max': 1, | |
'palette': ['white', 'black'] | |
} | |
# Get the URL for the NDWI image | |
url = ndwi_image.getThumbURL({ | |
'min': ndwi_vis_params['min'], | |
'max': ndwi_vis_params['max'], | |
'palette': ','.join(ndwi_vis_params['palette']), | |
'region': new_area, | |
'dimensions': [width, height], | |
'format': 'png' | |
}) | |
# Request the image from the URL | |
response = requests.get(url) | |
image = Image.open(io.BytesIO(response.content)).convert("L") | |
# Convert the image to a NumPy array | |
ndwi_array = np.array(image) | |
ndwi_array = ndwi_array / 255 | |
logging.info("End create image ndwi") | |
return ndwi_array,image | |
def calculate_ndmi(image): | |
ndmi = image.normalizedDifference(['B8', 'B11']).rename('NDMI') | |
return image.addBands(ndmi) | |
def create_image_ndmi(date1, date2, new_area=area): | |
logging.info("Starting create image ndmi") | |
date_obj = datetime.strptime(date2, '%Y-%m-%d') | |
new_date_obj = date_obj + timedelta(days=1) | |
new_date_str = new_date_obj.strftime('%Y-%m-%d') | |
# Create an ImageCollection and filter it by date and bounds | |
image_collection = ee.ImageCollection('COPERNICUS/S2_SR') \ | |
.filterDate(date1, new_date_str) \ | |
.filterBounds(new_area) | |
# Select bands 8 and 11 and calculate NDMI for each image in the collection | |
image_collection = image_collection.select(['B8', 'B11']).map(calculate_ndmi) | |
# Get the mean NDMI image and clip it to the area of interest | |
ndmi_image = image_collection.select('NDMI').max().clip(new_area) | |
# Define visualization parameters for NDMI | |
ndmi_vis_params = { | |
'min': -1, | |
'max': 1, | |
'palette': ['white', 'black'] | |
} | |
# Get the URL for the NDMI image | |
url = ndmi_image.getThumbURL({ | |
'min': ndmi_vis_params['min'], | |
'max': ndmi_vis_params['max'], | |
'palette': ','.join(ndmi_vis_params['palette']), | |
'region': new_area, | |
'dimensions': [width, height], | |
'format': 'png' | |
}) | |
# Request the image from the URL | |
response = requests.get(url) | |
image = Image.open(io.BytesIO(response.content)).convert("L") | |
# Convert the image to a NumPy array | |
ndmi_array = np.array(image) | |
ndmi_array = ndmi_array/255 | |
logging.info("End create image ndmi") | |
return ndmi_array,image | |
def calculate_nddi(image): | |
ndvi = image.select('NDVI') | |
ndwi = image.select('NDWI') | |
# nddi = ndvi.subtract(ndwi).divide(ndvi.add(ndwi)).rename('NDDI') | |
nddi = ndvi.subtract(ndwi).rename('NDDI') | |
return nddi | |
def create_image_nddi(date1, date2, new_area=area): | |
logging.info("Starting create image nddi") | |
# Parse the end date and add one day | |
date_obj = datetime.strptime(date2, '%Y-%m-%d') | |
new_date_obj = date_obj + timedelta(days=1) | |
new_date_str = new_date_obj.strftime('%Y-%m-%d') | |
# Create an ImageCollection and filter it by date and bounds | |
image_collection = ee.ImageCollection('COPERNICUS/S2_SR') \ | |
.filterDate(date1, new_date_str) \ | |
.filterBounds(new_area) | |
# Select bands 8, 4, and 3 and calculate NDVI and NDWI for each image in the collection | |
image_collection = image_collection.select(['B8', 'B4', 'B3']).map(calculate_ndvi).map(calculate_ndwi) | |
# Calculate NDDI for each image in the collection | |
# image_collection = image_collection.map(lambda image: image.addBands(calculate_nddi(image))) | |
image_collection = image_collection.map(calculate_nddi) | |
# Get the mean NDDI image and clip it to the area of interest | |
nddi_image = image_collection.select('NDDI').max().clip(new_area) | |
# Define visualization parameters for NDDI | |
nddi_vis_params = { | |
'min': -1, | |
'max': 1, | |
'palette': ['white', 'black'] | |
} | |
# Get the URL for the NDDI image | |
url = nddi_image.getThumbURL({ | |
'min': nddi_vis_params['min'], | |
'max': nddi_vis_params['max'], | |
'palette': ','.join(nddi_vis_params['palette']), | |
'region': new_area, | |
'dimensions': [width, height], | |
'format': 'png' | |
}) | |
# Request the image from the URL | |
response = requests.get(url) | |
image = Image.open(io.BytesIO(response.content)).convert("L") | |
# Convert the image to a NumPy array | |
nddi_array = np.array(image) | |
nddi_array = nddi_array /255 | |
logging.info("End create image nddi") | |
return nddi_array,image | |
def calculate_savi(image): | |
red = image.select('B4') | |
nir = image.select('B8') | |
# Define the soil adjustment factor (L), typically between 0 to 1 | |
L = 0.5 | |
savi = nir.subtract(red).multiply(1 + L).divide(nir.add(red).add(L)).rename('SAVI') | |
return image.addBands(savi) | |
def create_image_savi(date1, date2, new_area=area): | |
logging.info("Starting create image savi") | |
# Parse the end date and add one day | |
date_obj = datetime.strptime(date2, '%Y-%m-%d') | |
new_date_obj = date_obj + timedelta(days=1) | |
new_date_str = new_date_obj.strftime('%Y-%m-%d') | |
# Create an ImageCollection and filter it by date and bounds | |
image_collection = ee.ImageCollection('COPERNICUS/S2_SR') \ | |
.filterDate(date1, new_date_str) \ | |
.filterBounds(new_area) | |
# Select bands 4, 8, and 12 and calculate SAVI for each image in the collection | |
image_collection = image_collection.select(['B4', 'B8', 'B12']).map(calculate_savi) | |
# Get the mean SAVI image and clip it to the area of interest | |
savi_image = image_collection.select('SAVI').max().clip(new_area) | |
# Define visualization parameters for SAVI | |
savi_vis_params = { | |
'min': -1, | |
'max': 1, | |
'palette': ['white', 'black'] | |
} | |
# Get the URL for the SAVI image | |
url = savi_image.getThumbURL({ | |
'min': savi_vis_params['min'], | |
'max': savi_vis_params['max'], | |
'palette': ','.join(savi_vis_params['palette']), | |
'region': new_area, | |
'dimensions': [158, 292], | |
'format': 'png' | |
}) | |
# Request the image from the URL | |
response = requests.get(url) | |
image = Image.open(io.BytesIO(response.content)).convert("L") | |
# Convert the image to a NumPy array | |
savi_array = np.array(image) | |
# Normalize the values to be between -1 and 1 | |
savi_array = savi_array / 255 | |
logging.info("End create image savi") | |
return savi_array,image | |
def date_create(today_date,new_area=area): | |
# today_date = datetime.today().strftime('%Y-%m-%d') | |
today_date = datetime.strptime(today_date, '%Y-%m-%d') | |
seven_days_ago = (today_date - timedelta(days=7)).strftime('%Y-%m-%d') | |
image_collection = ee.ImageCollection('COPERNICUS/S2_SR') | |
image_collection_filter = image_collection.filterDate(seven_days_ago, today_date).filterBounds(new_area) | |
image_info = image_collection_filter.aggregate_array('system:index').getInfo() | |
image_dates = image_collection_filter.aggregate_array('system:time_start').map(lambda d: ee.Date(d).format('YYYY-MM-dd')).getInfo() | |
df_day = pd.DataFrame({'Image ID': image_info, 'Date': image_dates}) | |
df_du = pd.unique(df_day['Date']) | |
df_latest_2day = df_du[-2:] | |
return df_latest_2day | |
def load_and_process_file(file_path): | |
logging.info(f"Loading file: {file_path}") | |
# Check if the file exists | |
if not os.path.isfile(file_path): | |
raise FileNotFoundError(f"The file '{file_path}' does not exist.") | |
# Check the file extension | |
file_extension = os.path.splitext(file_path)[1].lower() | |
if file_extension == '.csv': | |
# Load CSV file | |
df = pd.read_csv(file_path) | |
elif file_extension in ['.xls', '.xlsx']: | |
# Load Excel file | |
df = pd.read_excel(file_path) | |
else: | |
raise ValueError("The file must be a CSV or Excel file with extensions '.csv', '.xls', or '.xlsx'.") | |
# Convert column names to lowercase | |
df.columns = df.columns.str.lower() | |
df = df[["latitude","longitude","acq_date"]] | |
return df | |
def filter_latest_date(df, date_column='date'): | |
lat_min = sw_corner_list[1] | |
lat_max = nw_corner_list[1] | |
lon_min = sw_corner_list[0] | |
lon_max = se_corner_list[0] | |
df = df[(df['latitude'] >= lat_min) & (df['latitude'] <= lat_max) & | |
(df['longitude'] >= lon_min) & (df['longitude'] <= lon_max)] | |
# Convert the date column to datetime format | |
df[date_column] = pd.to_datetime(df[date_column]) | |
# Find the latest date | |
latest_date = df[date_column].max() | |
# Filter the DataFrame to only include rows with the latest date | |
filtered_df = df[df[date_column] == latest_date] | |
filtered_df = filtered_df.reset_index(drop=True) | |
logging.info(f"Loading file: {str(filtered_df[date_column][0].date())}") | |
return filtered_df | |
def create_image_from_coordinates(df): | |
logging.info("Start create_image_from_coordinates") | |
# Create a blank (black) image array | |
image_array = np.zeros((height, width), dtype=np.uint8) | |
# Function to convert lat/lon to pixel coordinates | |
def latlon_to_pixel(lat, lon): | |
lat_range = nw_corner_list[1] - sw_corner_list[1] | |
lon_range = ne_corner_list[0] - nw_corner_list[0] | |
y = int(height * (nw_corner_list[1] - lat) / lat_range) | |
x = int(width * (lon - nw_corner_list[0]) / lon_range) | |
return x, y | |
# Iterate through each coordinate in the dataframe | |
for index, row in df.iterrows(): | |
lat, lon = row['latitude'], row['longitude'] | |
x, y = latlon_to_pixel(lat, lon) | |
# Paint a 11x11 pixel square centered on the hotspot coordinate if within bounds | |
for dx in range(-2, 3): | |
for dy in range(-2, 3): | |
new_x = x + dx | |
new_y = y + dy | |
if 0 <= new_x < width and 0 <= new_y < height: | |
image_array[new_y, new_x] = 255 | |
# Convert array to PIL Image | |
pil_image = Image.fromarray(image_array).convert("L") | |
logging.info("End create_image_from_coordinates") | |
# Return both the array and the PIL Image | |
return image_array |