fruk19's picture
Upload 3 files
ee1d751 verified
import ee
import numpy as np
from PIL import Image
import io
import pandas as pd
from datetime import datetime, timedelta
import requests
import os
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
service_account = "deploy-1@firepatternrecognition.iam.gserviceaccount.com"
credentials = ee.ServiceAccountCredentials(service_account, '2AB9532A8B7759C26D6980A570EF6EEE8B49DE78_firepatternrecognition-c10928272b24.json')
ee.Initialize(credentials)
# Initialize the Earth Engine module.
width, height = 158,292
nw_corner_list = [98.00, 20.15]
ne_corner_list = [99.58, 20.15]
sw_corner_list = [98.00, 17.23]
se_corner_list = [99.58, 17.23]
# Define the coordinates of the new area.
nw_corner = ee.Geometry.Point(nw_corner_list)
ne_corner = ee.Geometry.Point(ne_corner_list)
sw_corner = ee.Geometry.Point(sw_corner_list)
se_corner = ee.Geometry.Point(se_corner_list)
# Create a rectangle from the corners.
area = ee.Geometry.Polygon([
[nw_corner.coordinates(), ne_corner.coordinates(), se_corner.coordinates(), sw_corner.coordinates(), nw_corner.coordinates()]
])
def calculate_ndvi(image):
"""Calculate NDVI for an image."""
ndvi = image.normalizedDifference(['B8', 'B4']).rename('NDVI')
return image.addBands(ndvi)
def create_image_ndvi(date1, date2, new_area=area):
logging.info("Starting create image ndvi")
date_obj = datetime.strptime(date2, '%Y-%m-%d')
new_date_obj = date_obj + timedelta(days=1)
new_date_str = new_date_obj.strftime('%Y-%m-%d')
# Create an ImageCollection and filter it by date and bounds
image_collection = ee.ImageCollection('COPERNICUS/S2_SR') \
.filterDate(date1, new_date_str) \
.filterBounds(new_area)
# Select bands 4 and 8 and calculate NDVI for each image in the collection
image_collection = image_collection.select(['B4', 'B8']).map(calculate_ndvi)
# Get the median NDVI image and clip it to the area of interest
ndvi_image = image_collection.select('NDVI').max().clip(new_area)
ndvi_vis_params = {
'min': -1,
'max': 1,
'palette': ['white', 'black']
}
# Get the URL for the NDVI image
url = ndvi_image.getThumbURL({
'min': ndvi_vis_params['min'],
'max': ndvi_vis_params['max'],
'palette': ','.join(ndvi_vis_params['palette']),
'region': new_area,
'dimensions': [width, height],
'format': 'png'
})
response = requests.get(url)
image = Image.open(io.BytesIO(response.content)).convert("L")
ndvi_array = np.array(image)
ndvi_array = ndvi_array / 255.0
logging.info("End create image ndvi")
return ndvi_array,image
def calculate_ndwi(image):
ndwi = image.normalizedDifference(['B3', 'B8']).rename('NDWI')
return image.addBands(ndwi)
def create_image_ndwi(date1, date2, new_area=area):
logging.info("Starting create image ndwi")
date_obj = datetime.strptime(date2, '%Y-%m-%d')
new_date_obj = date_obj + timedelta(days=1)
new_date_str = new_date_obj.strftime('%Y-%m-%d')
image_collection = ee.ImageCollection('COPERNICUS/S2_SR') \
.filterDate(date1, new_date_str) \
.filterBounds(new_area)
# Select bands 3 and 8 and calculate NDWI for each image in the collection
image_collection = image_collection.select(['B3', 'B8']).map(calculate_ndwi)
# Get the mean NDWI image and clip it to the area of interest
ndwi_image = image_collection.select('NDWI').max().clip(new_area)
# Define visualization parameters for NDWI
ndwi_vis_params = {
'min': -1,
'max': 1,
'palette': ['white', 'black']
}
# Get the URL for the NDWI image
url = ndwi_image.getThumbURL({
'min': ndwi_vis_params['min'],
'max': ndwi_vis_params['max'],
'palette': ','.join(ndwi_vis_params['palette']),
'region': new_area,
'dimensions': [width, height],
'format': 'png'
})
# Request the image from the URL
response = requests.get(url)
image = Image.open(io.BytesIO(response.content)).convert("L")
# Convert the image to a NumPy array
ndwi_array = np.array(image)
ndwi_array = ndwi_array / 255
logging.info("End create image ndwi")
return ndwi_array,image
def calculate_ndmi(image):
ndmi = image.normalizedDifference(['B8', 'B11']).rename('NDMI')
return image.addBands(ndmi)
def create_image_ndmi(date1, date2, new_area=area):
logging.info("Starting create image ndmi")
date_obj = datetime.strptime(date2, '%Y-%m-%d')
new_date_obj = date_obj + timedelta(days=1)
new_date_str = new_date_obj.strftime('%Y-%m-%d')
# Create an ImageCollection and filter it by date and bounds
image_collection = ee.ImageCollection('COPERNICUS/S2_SR') \
.filterDate(date1, new_date_str) \
.filterBounds(new_area)
# Select bands 8 and 11 and calculate NDMI for each image in the collection
image_collection = image_collection.select(['B8', 'B11']).map(calculate_ndmi)
# Get the mean NDMI image and clip it to the area of interest
ndmi_image = image_collection.select('NDMI').max().clip(new_area)
# Define visualization parameters for NDMI
ndmi_vis_params = {
'min': -1,
'max': 1,
'palette': ['white', 'black']
}
# Get the URL for the NDMI image
url = ndmi_image.getThumbURL({
'min': ndmi_vis_params['min'],
'max': ndmi_vis_params['max'],
'palette': ','.join(ndmi_vis_params['palette']),
'region': new_area,
'dimensions': [width, height],
'format': 'png'
})
# Request the image from the URL
response = requests.get(url)
image = Image.open(io.BytesIO(response.content)).convert("L")
# Convert the image to a NumPy array
ndmi_array = np.array(image)
ndmi_array = ndmi_array/255
logging.info("End create image ndmi")
return ndmi_array,image
def calculate_nddi(image):
ndvi = image.select('NDVI')
ndwi = image.select('NDWI')
# nddi = ndvi.subtract(ndwi).divide(ndvi.add(ndwi)).rename('NDDI')
nddi = ndvi.subtract(ndwi).rename('NDDI')
return nddi
def create_image_nddi(date1, date2, new_area=area):
logging.info("Starting create image nddi")
# Parse the end date and add one day
date_obj = datetime.strptime(date2, '%Y-%m-%d')
new_date_obj = date_obj + timedelta(days=1)
new_date_str = new_date_obj.strftime('%Y-%m-%d')
# Create an ImageCollection and filter it by date and bounds
image_collection = ee.ImageCollection('COPERNICUS/S2_SR') \
.filterDate(date1, new_date_str) \
.filterBounds(new_area)
# Select bands 8, 4, and 3 and calculate NDVI and NDWI for each image in the collection
image_collection = image_collection.select(['B8', 'B4', 'B3']).map(calculate_ndvi).map(calculate_ndwi)
# Calculate NDDI for each image in the collection
# image_collection = image_collection.map(lambda image: image.addBands(calculate_nddi(image)))
image_collection = image_collection.map(calculate_nddi)
# Get the mean NDDI image and clip it to the area of interest
nddi_image = image_collection.select('NDDI').max().clip(new_area)
# Define visualization parameters for NDDI
nddi_vis_params = {
'min': -1,
'max': 1,
'palette': ['white', 'black']
}
# Get the URL for the NDDI image
url = nddi_image.getThumbURL({
'min': nddi_vis_params['min'],
'max': nddi_vis_params['max'],
'palette': ','.join(nddi_vis_params['palette']),
'region': new_area,
'dimensions': [width, height],
'format': 'png'
})
# Request the image from the URL
response = requests.get(url)
image = Image.open(io.BytesIO(response.content)).convert("L")
# Convert the image to a NumPy array
nddi_array = np.array(image)
nddi_array = nddi_array /255
logging.info("End create image nddi")
return nddi_array,image
def calculate_savi(image):
red = image.select('B4')
nir = image.select('B8')
# Define the soil adjustment factor (L), typically between 0 to 1
L = 0.5
savi = nir.subtract(red).multiply(1 + L).divide(nir.add(red).add(L)).rename('SAVI')
return image.addBands(savi)
def create_image_savi(date1, date2, new_area=area):
logging.info("Starting create image savi")
# Parse the end date and add one day
date_obj = datetime.strptime(date2, '%Y-%m-%d')
new_date_obj = date_obj + timedelta(days=1)
new_date_str = new_date_obj.strftime('%Y-%m-%d')
# Create an ImageCollection and filter it by date and bounds
image_collection = ee.ImageCollection('COPERNICUS/S2_SR') \
.filterDate(date1, new_date_str) \
.filterBounds(new_area)
# Select bands 4, 8, and 12 and calculate SAVI for each image in the collection
image_collection = image_collection.select(['B4', 'B8', 'B12']).map(calculate_savi)
# Get the mean SAVI image and clip it to the area of interest
savi_image = image_collection.select('SAVI').max().clip(new_area)
# Define visualization parameters for SAVI
savi_vis_params = {
'min': -1,
'max': 1,
'palette': ['white', 'black']
}
# Get the URL for the SAVI image
url = savi_image.getThumbURL({
'min': savi_vis_params['min'],
'max': savi_vis_params['max'],
'palette': ','.join(savi_vis_params['palette']),
'region': new_area,
'dimensions': [158, 292],
'format': 'png'
})
# Request the image from the URL
response = requests.get(url)
image = Image.open(io.BytesIO(response.content)).convert("L")
# Convert the image to a NumPy array
savi_array = np.array(image)
# Normalize the values to be between -1 and 1
savi_array = savi_array / 255
logging.info("End create image savi")
return savi_array,image
def date_create(today_date,new_area=area):
# today_date = datetime.today().strftime('%Y-%m-%d')
today_date = datetime.strptime(today_date, '%Y-%m-%d')
seven_days_ago = (today_date - timedelta(days=7)).strftime('%Y-%m-%d')
image_collection = ee.ImageCollection('COPERNICUS/S2_SR')
image_collection_filter = image_collection.filterDate(seven_days_ago, today_date).filterBounds(new_area)
image_info = image_collection_filter.aggregate_array('system:index').getInfo()
image_dates = image_collection_filter.aggregate_array('system:time_start').map(lambda d: ee.Date(d).format('YYYY-MM-dd')).getInfo()
df_day = pd.DataFrame({'Image ID': image_info, 'Date': image_dates})
df_du = pd.unique(df_day['Date'])
df_latest_2day = df_du[-2:]
return df_latest_2day
def load_and_process_file(file_path):
logging.info(f"Loading file: {file_path}")
# Check if the file exists
if not os.path.isfile(file_path):
raise FileNotFoundError(f"The file '{file_path}' does not exist.")
# Check the file extension
file_extension = os.path.splitext(file_path)[1].lower()
if file_extension == '.csv':
# Load CSV file
df = pd.read_csv(file_path)
elif file_extension in ['.xls', '.xlsx']:
# Load Excel file
df = pd.read_excel(file_path)
else:
raise ValueError("The file must be a CSV or Excel file with extensions '.csv', '.xls', or '.xlsx'.")
# Convert column names to lowercase
df.columns = df.columns.str.lower()
df = df[["latitude","longitude","acq_date"]]
return df
def filter_latest_date(df, date_column='date'):
lat_min = sw_corner_list[1]
lat_max = nw_corner_list[1]
lon_min = sw_corner_list[0]
lon_max = se_corner_list[0]
df = df[(df['latitude'] >= lat_min) & (df['latitude'] <= lat_max) &
(df['longitude'] >= lon_min) & (df['longitude'] <= lon_max)]
# Convert the date column to datetime format
df[date_column] = pd.to_datetime(df[date_column])
# Find the latest date
latest_date = df[date_column].max()
# Filter the DataFrame to only include rows with the latest date
filtered_df = df[df[date_column] == latest_date]
filtered_df = filtered_df.reset_index(drop=True)
logging.info(f"Loading file: {str(filtered_df[date_column][0].date())}")
return filtered_df
def create_image_from_coordinates(df):
logging.info("Start create_image_from_coordinates")
# Create a blank (black) image array
image_array = np.zeros((height, width), dtype=np.uint8)
# Function to convert lat/lon to pixel coordinates
def latlon_to_pixel(lat, lon):
lat_range = nw_corner_list[1] - sw_corner_list[1]
lon_range = ne_corner_list[0] - nw_corner_list[0]
y = int(height * (nw_corner_list[1] - lat) / lat_range)
x = int(width * (lon - nw_corner_list[0]) / lon_range)
return x, y
# Iterate through each coordinate in the dataframe
for index, row in df.iterrows():
lat, lon = row['latitude'], row['longitude']
x, y = latlon_to_pixel(lat, lon)
# Paint a 11x11 pixel square centered on the hotspot coordinate if within bounds
for dx in range(-2, 3):
for dy in range(-2, 3):
new_x = x + dx
new_y = y + dy
if 0 <= new_x < width and 0 <= new_y < height:
image_array[new_y, new_x] = 255
# Convert array to PIL Image
pil_image = Image.fromarray(image_array).convert("L")
logging.info("End create_image_from_coordinates")
# Return both the array and the PIL Image
return image_array