Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile | |
| import numpy as np | |
| from PIL import Image | |
| import io | |
| import tensorflow as tf | |
| from docx import Document | |
| import os | |
| import google.generativeai as genai | |
| from pydantic import BaseModel | |
| from ultralytics import YOLO | |
| import matplotlib.pyplot as plt | |
| from io import BytesIO | |
| import base64 | |
| from langchain.document_loaders import TextLoader # Or a custom loader for .docs | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
| from langchain.vectorstores import FAISS | |
| from langchain_chroma import Chroma | |
| from langchain.chains import RetrievalQA | |
| from langchain_google_genai import ChatGoogleGenerativeAI # Import the GoogleGenerativeAI class | |
| from langchain.prompts import PromptTemplate | |
| from langchain.chains import RetrievalQA | |
| from langchain.chains import LLMChain | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.preprocessing import LabelEncoder | |
| from sklearn.svm import SVC | |
| import joblib | |
| model = tf.keras.models.load_model('946_.keras', compile=False) | |
| modelPalm = tf.keras.models.load_model('palm_model.h5', compile=False) | |
| file_path = "Algeria Plant Disease Treatment Plan.docx" | |
| def docx_to_knowledge_base(file_path): | |
| try: | |
| doc = Document(file_path) | |
| except Exception as e: | |
| print(f"Error reading file: {e}") | |
| # 1. Extract paragraphs | |
| paragraphs = [p.text.strip() for p in doc.paragraphs] | |
| # 2. Extract tables as sentences | |
| table_sentences = [] | |
| for table in doc.tables: | |
| rows = table.rows | |
| headers = [cell.text.strip() for cell in rows[0].cells] | |
| for row in rows[1:]: | |
| values = [cell.text.strip() for cell in row.cells] | |
| entry = ', '.join([f"{h}: {v}" for h, v in zip(headers, values)]) | |
| table_sentences.append(entry) | |
| # 3. Combine everything into one text block | |
| knowledge_text = "\n".join(paragraphs + table_sentences) | |
| return knowledge_text | |
| # Usage | |
| knowledge = docx_to_knowledge_base(file_path) | |
| # Configure Gemini API (replace with your actual API key) | |
| genai.configure(api_key=os.environ["GOOGLE_API_KEY"]) | |
| gemini_model_name = "gemini-2.0-flash" | |
| embedding_model_name = "models/embedding-001" | |
| # 2. Chunking | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100) | |
| docs = text_splitter.create_documents([knowledge]) | |
| # 3. Embedding | |
| embeddings = GoogleGenerativeAIEmbeddings(model=embedding_model_name) | |
| # 4. Vector Database | |
| Chorma_path = "/code/Chroma/" | |
| # Check if the directory exists | |
| if not os.path.exists(Chorma_path): | |
| try: | |
| os.makedirs(Chorma_path) | |
| print(f"Directory created: {Chorma_path}") | |
| except PermissionError: | |
| print(f"Permission denied: Unable to create directory {Chorma_path}") | |
| except Exception as e: | |
| print(f"Error creating directory {Chorma_path}: {e}") | |
| else: | |
| print(f"Directory already exists: {Chorma_path}") | |
| # Create a new empty Chroma DB (or load existing if path exists) | |
| db = Chroma( | |
| collection_name="plant_treatments", | |
| embedding_function=embeddings, | |
| persist_directory= Chorma_path # Local folder to persist data | |
| ) | |
| db.add_documents(docs) | |
| retriever = db.as_retriever() | |
| # 5. RAG Pipeline | |
| llm = ChatGoogleGenerativeAI(model=gemini_model_name, temperature=0.7) | |
| def create_prompt(disease_name, severity,language="english"): | |
| prompt = f""" | |
| for the next prompt you should answer with the following language and it is obligatory : {language} | |
| """ +""" | |
| You are an expert in plant disease treatment for a platform called Growth. | |
| Use the following context to answer the user's question. | |
| If you don't know the answer, try to find the answer from the context try to make similair answer to the context. | |
| If the question is not related to the context, Generate from your exsisting knowlodge about the context. | |
| When responding dont start your answer by saying Ok i will, or Yes i will give you, Make it like you are an Assistant and you generate solutions directly. | |
| Context: | |
| {context} | |
| Question: | |
| {question} | |
| Given a plant disease and its severity, suggest: | |
| 1. what is Organic treatment and how it will help Organic treatment | |
| 2. what is Chemical product and how it will help Chemical product | |
| 3. Application schedule plan of the treatment (explain in bullet points the schedule in simple/detailed/understandable way) | |
| """+f""" | |
| Now here is a new case: | |
| Disease: {disease_name}, Severity: {severity} | |
| - Organic:""" | |
| return prompt | |
| def create_prompt_chat(question,language): | |
| prompt = f""" | |
| for the next prompt you should answer with the following language and it is obligatory : {language} | |
| """ +""" | |
| You are a chatbot called Growth, and you assist users with plant leaf disease identification and solutions. | |
| Use the following context to answer the user's question. | |
| If you don't know the answer, try to find the answer from the context try to make similair answer to the context. | |
| If the question is not related to the context, Generate from your exsisting knowlodge about the context. | |
| Context: | |
| {context} | |
| Answer the following question: | |
| """+f"""{question}""" | |
| return prompt | |
| # Load the saved model | |
| loaded_svm_model = joblib.load('svm_model.joblib') | |
| # Load the saved scaler | |
| loaded_scaler = joblib.load('scaler.joblib') | |
| # Load the saved label encoder | |
| loaded_label_encoder = joblib.load('label_encoder.joblib') | |
| Geomodel = tf.keras.models.load_model('keras_model.h5') | |
| def predict_crop_disease(input_data): | |
| # Load the saved model, scaler, and label encoder | |
| loaded_svm_model = joblib.load('svm_model.joblib') | |
| loaded_scaler = joblib.load('scaler.joblib') | |
| loaded_label_encoder = joblib.load('label_encoder.joblib') | |
| # Define the features columns | |
| features_columns = ['region', 'crop_type', 'soil_moisture_%', 'soil_pH', | |
| 'temperature_C', 'rainfall_mm', 'humidity_%', 'sunlight_hours', | |
| 'irrigation_type', 'fertilizer_type', 'pesticide_usage_ml', | |
| 'total_days', 'yield_kg_per_hectare', 'latitude', 'longitude', | |
| 'NDVI_index'] | |
| # Create a DataFrame from the input data | |
| input_df = pd.DataFrame([input_data],columns=features_columns) | |
| # Ensure the input DataFrame has the correct columns and order | |
| input_df = input_df[features_columns] | |
| # Encoding categorical features (using the loaded LabelEncoder) | |
| # Assuming these are the categorical columns and their corresponding mapping based on your training data | |
| categorical_cols = ['region', 'crop_type', 'irrigation_type', 'fertilizer_type'] | |
| # Define the mapping (replace with actual mapping from your training data) | |
| region_mapping = {'Central USA': 0, 'East Africa': 1, 'North India': 2, 'South India': 3, 'South USA': 4} | |
| crop_mapping = {'Cotton': 0, 'Maize': 1, 'Rice': 2, 'Soybean': 3, 'Wheat': 4} | |
| irrigation_mapping = {'Drip': 0, 'Manual': 1, 'Sprinkler': 2} | |
| fertilizer_mapping = {'Inorganic': 0, 'Mixed': 1, 'Organic': 2} | |
| for col in categorical_cols: | |
| if col == 'region': | |
| input_df[col] = input_df[col].map(region_mapping).fillna(0) # Handle missing values | |
| elif col == 'crop_type': | |
| input_df[col] = input_df[col].map(crop_mapping).fillna(0) # Handle missing values | |
| elif col == 'irrigation_type': | |
| input_df[col] = input_df[col].map(irrigation_mapping).fillna(0) # Handle missing values | |
| elif col == 'fertilizer_type': | |
| input_df[col] = input_df[col].map(fertilizer_mapping).fillna(0) # Handle missing values | |
| # Scale the input features | |
| input_scaled = loaded_scaler.transform(input_df.values.reshape(-1, 1)).reshape(1,-1) | |
| print(input_scaled.reshape(1,-1).shape) | |
| # Make predictions | |
| prediction = Geomodel.predict(input_scaled) | |
| # Inverse transform the prediction (if needed, depending on your model) | |
| #predicted_label = loaded_label_encoder.inverse_transform(prediction) | |
| return prediction[0] #predicted_label[0] | |
| def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None): | |
| grad_model = tf.keras.models.Model( | |
| [model.inputs], [model.get_layer(last_conv_layer_name).output, model.output] | |
| ) | |
| with tf.GradientTape() as tape: | |
| last_conv_layer_output, preds = grad_model(img_array) | |
| if pred_index is None: | |
| pred_index = tf.argmax(preds[0]) | |
| class_channel = preds[:, pred_index] | |
| grads = tape.gradient(class_channel, last_conv_layer_output) | |
| pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2)) | |
| last_conv_layer_output = last_conv_layer_output[0] | |
| heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis] | |
| heatmap = tf.squeeze(heatmap) | |
| heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap) | |
| return heatmap.numpy() | |
| def display_gradcam(img, heatmap, alpha=0.4): | |
| # Create heatmap from the given heatmap values | |
| heatmap = np.uint8(255 * heatmap) | |
| jet = plt.cm.get_cmap("jet") | |
| jet_colors = jet(np.arange(256))[:, :3] | |
| jet_heatmap = jet_colors[heatmap] | |
| jet_heatmap = tf.keras.preprocessing.image.array_to_img(jet_heatmap) | |
| jet_heatmap = jet_heatmap.resize((img.shape[1], img.shape[0])) | |
| jet_heatmap = tf.keras.preprocessing.image.img_to_array(jet_heatmap) | |
| # Superimpose the heatmap onto the original image | |
| superimposed_img = jet_heatmap * alpha + img | |
| superimposed_img = tf.keras.preprocessing.image.array_to_img(superimposed_img) | |
| # Save the image to a BytesIO object instead of showing it with plt.imshow() | |
| img_byte_arr = BytesIO() | |
| superimposed_img.save(img_byte_arr, format='PNG') | |
| img_byte_arr = img_byte_arr.getvalue() | |
| # Return the image as base64 | |
| return base64.b64encode(img_byte_arr).decode('utf-8') | |
| def calculate_activation_ratio(heatmap, threshold=0.2): | |
| """Calculates the ratio of activated to non-activated pixels in a heatmap. | |
| Args: | |
| heatmap: A NumPy array representing the Grad-CAM heatmap. | |
| threshold: The threshold for classifying pixels as activated or not. | |
| Returns: | |
| The ratio of activated pixels to non-activated pixels. | |
| """ | |
| activated_pixels = np.sum(heatmap > threshold) | |
| total_pixels = heatmap.size | |
| non_activated_pixels = total_pixels - activated_pixels | |
| if non_activated_pixels == 0: | |
| return 1.0 # Avoid division by zero if all pixels are activated | |
| return activated_pixels / non_activated_pixels | |
| # Example usage within the existing code (assuming heatmap is calculated as before): | |
| last_conv_layer_name = "block3_conv2" | |
| app = FastAPI() | |
| async def classify(image: UploadFile = File(...)): | |
| if image is not None: | |
| img = Image.open(io.BytesIO(await image.read())) | |
| img = img.resize((64,64)) | |
| img_array = np.array(img) / 255.0 | |
| img_array = np.expand_dims(img_array, axis=0) | |
| predictions = model.predict(img_array) | |
| predicted_class_idx = np.argmax(predictions) | |
| predicted_class_idx = int(predicted_class_idx) | |
| heatmap = make_gradcam_heatmap(img_array, model, last_conv_layer_name) | |
| base64_image = display_gradcam( np.array(img), heatmap) | |
| # Return the base64 encoded image in the response | |
| # Calculate and print the activation ratio | |
| ratio = calculate_activation_ratio(heatmap) | |
| return {"prediction": predicted_class_idx,"gradcam": base64_image,"ration":ratio} | |
| else: | |
| return {"error": "No image provided"} | |
| async def palmclassify(image: UploadFile = File(...)): | |
| if image is not None: | |
| img = Image.open(io.BytesIO(await image.read())) | |
| img = img.resize((64,64)) | |
| img_array = np.array(img) / 255.0 | |
| img_array = np.expand_dims(img_array, axis=0) | |
| predictions = modelPalm.predict(img_array) | |
| predicted_class_idx = np.argmax(predictions) | |
| predicted_class_idx = int(predicted_class_idx) | |
| last_mb = "Conv_1" | |
| heatmap = make_gradcam_heatmap(img_array, modelPalm, last_mb) | |
| base64_image = display_gradcam( np.array(img), heatmap) | |
| # Return the base64 encoded image in the response | |
| # Calculate and print the activation ratio | |
| ratio = calculate_activation_ratio(heatmap) | |
| return {"prediction": predicted_class_idx,"gradcam": base64_image,"ration":ratio} | |
| else: | |
| return {"error": "No image provided"} | |
| yolomodel = YOLO("yolo11m.pt") | |
| async def classify(image: UploadFile = File(...)): | |
| if image is not None: | |
| img = Image.open(io.BytesIO(await image.read())) | |
| results = yolomodel(img) | |
| output=[] | |
| for i, box in enumerate(results[0].boxes): | |
| # Extract box coordinates | |
| x1, y1, x2, y2 = box.xyxy[0].tolist() | |
| confidence = box.conf[0].item() | |
| class_id = box.cls[0].item() | |
| # Crop the image based on bounding box | |
| cropped_image = results[0].orig_img[int(y1):int(y2), int(x1):int(x2)] | |
| cropped_image = Image.fromarray(cropped_image) | |
| cropped_image = cropped_image.resize((64,64)) | |
| img_array = np.array(cropped_image) / 255.0 | |
| img_array = np.expand_dims(img_array, axis=0) | |
| predictions = model.predict(img_array) | |
| predicted_class_idx = np.argmax(predictions) | |
| predicted_class_idx = int(predicted_class_idx) | |
| print("HIIIIIIIIIIIIIIIIII") | |
| output.append({ | |
| "box": {"x1": x1, "y1": y1, "x2": x2, "y2": y2}, | |
| "confidence": round(confidence, 2), | |
| "predicted_class": predicted_class_idx | |
| }) | |
| return {"output": output} | |
| else: | |
| return {"error": "No image provided"} | |
| class DiseaseQuery(BaseModel): | |
| disease: str | |
| severity: str # "normal" or "severe" | |
| language: str | |
| async def rag_classify(query: DiseaseQuery): | |
| try: | |
| prompt_template = create_prompt(query.disease, query.severity,query.language) | |
| prompt = PromptTemplate( | |
| input_variables=["context", "question"], | |
| template=prompt_template, | |
| ) | |
| qa = RetrievalQA.from_chain_type( | |
| llm=llm, | |
| chain_type="stuff", | |
| retriever=retriever, | |
| chain_type_kwargs={"prompt": prompt}, | |
| verbose=True, | |
| ) | |
| final_query = f"What is the best treatment plan for {query.disease} in a {query.severity} case?" | |
| result = qa.run(final_query) | |
| return {"answer": result} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| class ChatQuery(BaseModel): | |
| question: str | |
| language: str | |
| async def rag_chat(query: ChatQuery): | |
| try: | |
| prompt_template = create_prompt_chat(query.question,query.language) | |
| prompt = PromptTemplate( | |
| input_variables=["context", "question"], | |
| template=prompt_template, | |
| ) | |
| qa = RetrievalQA.from_chain_type( | |
| llm=llm, | |
| chain_type="stuff", | |
| retriever=retriever, | |
| chain_type_kwargs={"prompt": prompt}, | |
| verbose=True, | |
| ) | |
| final_query = f"Answer the Provided questino like a human, and remember the chat history {query.question}" | |
| result = qa.run(final_query) | |
| return {"answer": result} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| class GeoSenQuery(BaseModel): | |
| region: str | |
| crop_type: str | |
| soil_moisture: float | |
| soil_pH: float | |
| temperature_C: float | |
| rainfall_mm: float | |
| humidity: float | |
| sunlight_hours: float | |
| irrigation_type: str | |
| fertilizer_type: str | |
| pesticide_usage_ml: float | |
| total_days: int | |
| yield_kg_per_hectare: float | |
| latitude: float | |
| longitude: float | |
| NDVI_index: float | |
| async def geo_sensor_classify(query: GeoSenQuery): | |
| try: | |
| logits = predict_crop_disease([query.region,query.crop_type,query.soil_moisture,query.soil_pH,query.temperature_C, | |
| query.rainfall_mm,query.humidity,query.sunlight_hours,query.irrigation_type,query.fertilizer_type, | |
| query.pesticide_usage_ml,query.total_days,query.yield_kg_per_hectare,query.latitude,query.longitude, | |
| query.NDVI_index]) | |
| return { | |
| "answer": int(np.argmax(logits)), | |
| "logits": logits.tolist() if isinstance(logits, np.ndarray) else float(logits) | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) |