import ee import numpy as np from PIL import Image import io import pandas as pd from datetime import datetime, timedelta import requests import os import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') service_account = "deploy-1@firepatternrecognition.iam.gserviceaccount.com" credentials = ee.ServiceAccountCredentials(service_account, '2AB9532A8B7759C26D6980A570EF6EEE8B49DE78_firepatternrecognition-c10928272b24.json') ee.Initialize(credentials) # Initialize the Earth Engine module. width, height = 158,292 nw_corner_list = [98.00, 20.15] ne_corner_list = [99.58, 20.15] sw_corner_list = [98.00, 17.23] se_corner_list = [99.58, 17.23] # Define the coordinates of the new area. nw_corner = ee.Geometry.Point(nw_corner_list) ne_corner = ee.Geometry.Point(ne_corner_list) sw_corner = ee.Geometry.Point(sw_corner_list) se_corner = ee.Geometry.Point(se_corner_list) # Create a rectangle from the corners. area = ee.Geometry.Polygon([ [nw_corner.coordinates(), ne_corner.coordinates(), se_corner.coordinates(), sw_corner.coordinates(), nw_corner.coordinates()] ]) def calculate_ndvi(image): """Calculate NDVI for an image.""" ndvi = image.normalizedDifference(['B8', 'B4']).rename('NDVI') return image.addBands(ndvi) def create_image_ndvi(date1, date2, new_area=area): logging.info("Starting create image ndvi") date_obj = datetime.strptime(date2, '%Y-%m-%d') new_date_obj = date_obj + timedelta(days=1) new_date_str = new_date_obj.strftime('%Y-%m-%d') # Create an ImageCollection and filter it by date and bounds image_collection = ee.ImageCollection('COPERNICUS/S2_SR') \ .filterDate(date1, new_date_str) \ .filterBounds(new_area) # Select bands 4 and 8 and calculate NDVI for each image in the collection image_collection = image_collection.select(['B4', 'B8']).map(calculate_ndvi) # Get the median NDVI image and clip it to the area of interest ndvi_image = image_collection.select('NDVI').max().clip(new_area) ndvi_vis_params = { 'min': -1, 'max': 1, 'palette': ['white', 'black'] } # Get the URL for the NDVI image url = ndvi_image.getThumbURL({ 'min': ndvi_vis_params['min'], 'max': ndvi_vis_params['max'], 'palette': ','.join(ndvi_vis_params['palette']), 'region': new_area, 'dimensions': [width, height], 'format': 'png' }) response = requests.get(url) image = Image.open(io.BytesIO(response.content)).convert("L") ndvi_array = np.array(image) ndvi_array = ndvi_array / 255.0 logging.info("End create image ndvi") return ndvi_array,image def calculate_ndwi(image): ndwi = image.normalizedDifference(['B3', 'B8']).rename('NDWI') return image.addBands(ndwi) def create_image_ndwi(date1, date2, new_area=area): logging.info("Starting create image ndwi") date_obj = datetime.strptime(date2, '%Y-%m-%d') new_date_obj = date_obj + timedelta(days=1) new_date_str = new_date_obj.strftime('%Y-%m-%d') image_collection = ee.ImageCollection('COPERNICUS/S2_SR') \ .filterDate(date1, new_date_str) \ .filterBounds(new_area) # Select bands 3 and 8 and calculate NDWI for each image in the collection image_collection = image_collection.select(['B3', 'B8']).map(calculate_ndwi) # Get the mean NDWI image and clip it to the area of interest ndwi_image = image_collection.select('NDWI').max().clip(new_area) # Define visualization parameters for NDWI ndwi_vis_params = { 'min': -1, 'max': 1, 'palette': ['white', 'black'] } # Get the URL for the NDWI image url = ndwi_image.getThumbURL({ 'min': ndwi_vis_params['min'], 'max': ndwi_vis_params['max'], 'palette': ','.join(ndwi_vis_params['palette']), 'region': new_area, 'dimensions': [width, height], 'format': 'png' }) # Request the image from the URL response = requests.get(url) image = Image.open(io.BytesIO(response.content)).convert("L") # Convert the image to a NumPy array ndwi_array = np.array(image) ndwi_array = ndwi_array / 255 logging.info("End create image ndwi") return ndwi_array,image def calculate_ndmi(image): ndmi = image.normalizedDifference(['B8', 'B11']).rename('NDMI') return image.addBands(ndmi) def create_image_ndmi(date1, date2, new_area=area): logging.info("Starting create image ndmi") date_obj = datetime.strptime(date2, '%Y-%m-%d') new_date_obj = date_obj + timedelta(days=1) new_date_str = new_date_obj.strftime('%Y-%m-%d') # Create an ImageCollection and filter it by date and bounds image_collection = ee.ImageCollection('COPERNICUS/S2_SR') \ .filterDate(date1, new_date_str) \ .filterBounds(new_area) # Select bands 8 and 11 and calculate NDMI for each image in the collection image_collection = image_collection.select(['B8', 'B11']).map(calculate_ndmi) # Get the mean NDMI image and clip it to the area of interest ndmi_image = image_collection.select('NDMI').max().clip(new_area) # Define visualization parameters for NDMI ndmi_vis_params = { 'min': -1, 'max': 1, 'palette': ['white', 'black'] } # Get the URL for the NDMI image url = ndmi_image.getThumbURL({ 'min': ndmi_vis_params['min'], 'max': ndmi_vis_params['max'], 'palette': ','.join(ndmi_vis_params['palette']), 'region': new_area, 'dimensions': [width, height], 'format': 'png' }) # Request the image from the URL response = requests.get(url) image = Image.open(io.BytesIO(response.content)).convert("L") # Convert the image to a NumPy array ndmi_array = np.array(image) ndmi_array = ndmi_array/255 logging.info("End create image ndmi") return ndmi_array,image def calculate_nddi(image): ndvi = image.select('NDVI') ndwi = image.select('NDWI') # nddi = ndvi.subtract(ndwi).divide(ndvi.add(ndwi)).rename('NDDI') nddi = ndvi.subtract(ndwi).rename('NDDI') return nddi def create_image_nddi(date1, date2, new_area=area): logging.info("Starting create image nddi") # Parse the end date and add one day date_obj = datetime.strptime(date2, '%Y-%m-%d') new_date_obj = date_obj + timedelta(days=1) new_date_str = new_date_obj.strftime('%Y-%m-%d') # Create an ImageCollection and filter it by date and bounds image_collection = ee.ImageCollection('COPERNICUS/S2_SR') \ .filterDate(date1, new_date_str) \ .filterBounds(new_area) # Select bands 8, 4, and 3 and calculate NDVI and NDWI for each image in the collection image_collection = image_collection.select(['B8', 'B4', 'B3']).map(calculate_ndvi).map(calculate_ndwi) # Calculate NDDI for each image in the collection # image_collection = image_collection.map(lambda image: image.addBands(calculate_nddi(image))) image_collection = image_collection.map(calculate_nddi) # Get the mean NDDI image and clip it to the area of interest nddi_image = image_collection.select('NDDI').max().clip(new_area) # Define visualization parameters for NDDI nddi_vis_params = { 'min': -1, 'max': 1, 'palette': ['white', 'black'] } # Get the URL for the NDDI image url = nddi_image.getThumbURL({ 'min': nddi_vis_params['min'], 'max': nddi_vis_params['max'], 'palette': ','.join(nddi_vis_params['palette']), 'region': new_area, 'dimensions': [width, height], 'format': 'png' }) # Request the image from the URL response = requests.get(url) image = Image.open(io.BytesIO(response.content)).convert("L") # Convert the image to a NumPy array nddi_array = np.array(image) nddi_array = nddi_array /255 logging.info("End create image nddi") return nddi_array,image def calculate_savi(image): red = image.select('B4') nir = image.select('B8') # Define the soil adjustment factor (L), typically between 0 to 1 L = 0.5 savi = nir.subtract(red).multiply(1 + L).divide(nir.add(red).add(L)).rename('SAVI') return image.addBands(savi) def create_image_savi(date1, date2, new_area=area): logging.info("Starting create image savi") # Parse the end date and add one day date_obj = datetime.strptime(date2, '%Y-%m-%d') new_date_obj = date_obj + timedelta(days=1) new_date_str = new_date_obj.strftime('%Y-%m-%d') # Create an ImageCollection and filter it by date and bounds image_collection = ee.ImageCollection('COPERNICUS/S2_SR') \ .filterDate(date1, new_date_str) \ .filterBounds(new_area) # Select bands 4, 8, and 12 and calculate SAVI for each image in the collection image_collection = image_collection.select(['B4', 'B8', 'B12']).map(calculate_savi) # Get the mean SAVI image and clip it to the area of interest savi_image = image_collection.select('SAVI').max().clip(new_area) # Define visualization parameters for SAVI savi_vis_params = { 'min': -1, 'max': 1, 'palette': ['white', 'black'] } # Get the URL for the SAVI image url = savi_image.getThumbURL({ 'min': savi_vis_params['min'], 'max': savi_vis_params['max'], 'palette': ','.join(savi_vis_params['palette']), 'region': new_area, 'dimensions': [158, 292], 'format': 'png' }) # Request the image from the URL response = requests.get(url) image = Image.open(io.BytesIO(response.content)).convert("L") # Convert the image to a NumPy array savi_array = np.array(image) # Normalize the values to be between -1 and 1 savi_array = savi_array / 255 logging.info("End create image savi") return savi_array,image def date_create(today_date,new_area=area): # today_date = datetime.today().strftime('%Y-%m-%d') today_date = datetime.strptime(today_date, '%Y-%m-%d') seven_days_ago = (today_date - timedelta(days=7)).strftime('%Y-%m-%d') image_collection = ee.ImageCollection('COPERNICUS/S2_SR') image_collection_filter = image_collection.filterDate(seven_days_ago, today_date).filterBounds(new_area) image_info = image_collection_filter.aggregate_array('system:index').getInfo() image_dates = image_collection_filter.aggregate_array('system:time_start').map(lambda d: ee.Date(d).format('YYYY-MM-dd')).getInfo() df_day = pd.DataFrame({'Image ID': image_info, 'Date': image_dates}) df_du = pd.unique(df_day['Date']) df_latest_2day = df_du[-2:] return df_latest_2day def load_and_process_file(file_path): logging.info(f"Loading file: {file_path}") # Check if the file exists if not os.path.isfile(file_path): raise FileNotFoundError(f"The file '{file_path}' does not exist.") # Check the file extension file_extension = os.path.splitext(file_path)[1].lower() if file_extension == '.csv': # Load CSV file df = pd.read_csv(file_path) elif file_extension in ['.xls', '.xlsx']: # Load Excel file df = pd.read_excel(file_path) else: raise ValueError("The file must be a CSV or Excel file with extensions '.csv', '.xls', or '.xlsx'.") # Convert column names to lowercase df.columns = df.columns.str.lower() df = df[["latitude","longitude","acq_date"]] return df def filter_latest_date(df, date_column='date'): lat_min = sw_corner_list[1] lat_max = nw_corner_list[1] lon_min = sw_corner_list[0] lon_max = se_corner_list[0] df = df[(df['latitude'] >= lat_min) & (df['latitude'] <= lat_max) & (df['longitude'] >= lon_min) & (df['longitude'] <= lon_max)] # Convert the date column to datetime format df[date_column] = pd.to_datetime(df[date_column]) # Find the latest date latest_date = df[date_column].max() # Filter the DataFrame to only include rows with the latest date filtered_df = df[df[date_column] == latest_date] filtered_df = filtered_df.reset_index(drop=True) logging.info(f"Loading file: {str(filtered_df[date_column][0].date())}") return filtered_df def create_image_from_coordinates(df): logging.info("Start create_image_from_coordinates") # Create a blank (black) image array image_array = np.zeros((height, width), dtype=np.uint8) # Function to convert lat/lon to pixel coordinates def latlon_to_pixel(lat, lon): lat_range = nw_corner_list[1] - sw_corner_list[1] lon_range = ne_corner_list[0] - nw_corner_list[0] y = int(height * (nw_corner_list[1] - lat) / lat_range) x = int(width * (lon - nw_corner_list[0]) / lon_range) return x, y # Iterate through each coordinate in the dataframe for index, row in df.iterrows(): lat, lon = row['latitude'], row['longitude'] x, y = latlon_to_pixel(lat, lon) # Paint a 11x11 pixel square centered on the hotspot coordinate if within bounds for dx in range(-2, 3): for dy in range(-2, 3): new_x = x + dx new_y = y + dy if 0 <= new_x < width and 0 <= new_y < height: image_array[new_y, new_x] = 255 # Convert array to PIL Image pil_image = Image.fromarray(image_array).convert("L") logging.info("End create_image_from_coordinates") # Return both the array and the PIL Image return image_array