Growth / api.py
MissingBreath's picture
Update api.py
7cd35ce verified
from fastapi import FastAPI, File, UploadFile
import numpy as np
from PIL import Image
import io
import tensorflow as tf
from docx import Document
import os
import google.generativeai as genai
from pydantic import BaseModel
from ultralytics import YOLO
import matplotlib.pyplot as plt
from io import BytesIO
import base64
from langchain.document_loaders import TextLoader # Or a custom loader for .docs
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain.vectorstores import FAISS
from langchain_chroma import Chroma
from langchain.chains import RetrievalQA
from langchain_google_genai import ChatGoogleGenerativeAI # Import the GoogleGenerativeAI class
from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA
from langchain.chains import LLMChain
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.svm import SVC
import joblib
model = tf.keras.models.load_model('946_.keras', compile=False)
modelPalm = tf.keras.models.load_model('palm_model.h5', compile=False)
file_path = "Algeria Plant Disease Treatment Plan.docx"
def docx_to_knowledge_base(file_path):
try:
doc = Document(file_path)
except Exception as e:
print(f"Error reading file: {e}")
# 1. Extract paragraphs
paragraphs = [p.text.strip() for p in doc.paragraphs]
# 2. Extract tables as sentences
table_sentences = []
for table in doc.tables:
rows = table.rows
headers = [cell.text.strip() for cell in rows[0].cells]
for row in rows[1:]:
values = [cell.text.strip() for cell in row.cells]
entry = ', '.join([f"{h}: {v}" for h, v in zip(headers, values)])
table_sentences.append(entry)
# 3. Combine everything into one text block
knowledge_text = "\n".join(paragraphs + table_sentences)
return knowledge_text
# Usage
knowledge = docx_to_knowledge_base(file_path)
# Configure Gemini API (replace with your actual API key)
genai.configure(api_key=os.environ["GOOGLE_API_KEY"])
gemini_model_name = "gemini-2.0-flash"
embedding_model_name = "models/embedding-001"
# 2. Chunking
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
docs = text_splitter.create_documents([knowledge])
# 3. Embedding
embeddings = GoogleGenerativeAIEmbeddings(model=embedding_model_name)
# 4. Vector Database
Chorma_path = "/code/Chroma/"
# Check if the directory exists
if not os.path.exists(Chorma_path):
try:
os.makedirs(Chorma_path)
print(f"Directory created: {Chorma_path}")
except PermissionError:
print(f"Permission denied: Unable to create directory {Chorma_path}")
except Exception as e:
print(f"Error creating directory {Chorma_path}: {e}")
else:
print(f"Directory already exists: {Chorma_path}")
# Create a new empty Chroma DB (or load existing if path exists)
db = Chroma(
collection_name="plant_treatments",
embedding_function=embeddings,
persist_directory= Chorma_path # Local folder to persist data
)
db.add_documents(docs)
retriever = db.as_retriever()
# 5. RAG Pipeline
llm = ChatGoogleGenerativeAI(model=gemini_model_name, temperature=0.7)
def create_prompt(disease_name, severity,language="english"):
prompt = f"""
for the next prompt you should answer with the following language and it is obligatory : {language}
""" +"""
You are an expert in plant disease treatment for a platform called Growth.
Use the following context to answer the user's question.
If you don't know the answer, try to find the answer from the context try to make similair answer to the context.
If the question is not related to the context, Generate from your exsisting knowlodge about the context.
When responding dont start your answer by saying Ok i will, or Yes i will give you, Make it like you are an Assistant and you generate solutions directly.
Context:
{context}
Question:
{question}
Given a plant disease and its severity, suggest:
1. what is Organic treatment and how it will help Organic treatment
2. what is Chemical product and how it will help Chemical product
3. Application schedule plan of the treatment (explain in bullet points the schedule in simple/detailed/understandable way)
"""+f"""
Now here is a new case:
Disease: {disease_name}, Severity: {severity}
- Organic:"""
return prompt
def create_prompt_chat(question,language):
prompt = f"""
for the next prompt you should answer with the following language and it is obligatory : {language}
""" +"""
You are a chatbot called Growth, and you assist users with plant leaf disease identification and solutions.
Use the following context to answer the user's question.
If you don't know the answer, try to find the answer from the context try to make similair answer to the context.
If the question is not related to the context, Generate from your exsisting knowlodge about the context.
Context:
{context}
Answer the following question:
"""+f"""{question}"""
return prompt
# Load the saved model
loaded_svm_model = joblib.load('svm_model.joblib')
# Load the saved scaler
loaded_scaler = joblib.load('scaler.joblib')
# Load the saved label encoder
loaded_label_encoder = joblib.load('label_encoder.joblib')
Geomodel = tf.keras.models.load_model('keras_model.h5')
def predict_crop_disease(input_data):
# Load the saved model, scaler, and label encoder
loaded_svm_model = joblib.load('svm_model.joblib')
loaded_scaler = joblib.load('scaler.joblib')
loaded_label_encoder = joblib.load('label_encoder.joblib')
# Define the features columns
features_columns = ['region', 'crop_type', 'soil_moisture_%', 'soil_pH',
'temperature_C', 'rainfall_mm', 'humidity_%', 'sunlight_hours',
'irrigation_type', 'fertilizer_type', 'pesticide_usage_ml',
'total_days', 'yield_kg_per_hectare', 'latitude', 'longitude',
'NDVI_index']
# Create a DataFrame from the input data
input_df = pd.DataFrame([input_data],columns=features_columns)
# Ensure the input DataFrame has the correct columns and order
input_df = input_df[features_columns]
# Encoding categorical features (using the loaded LabelEncoder)
# Assuming these are the categorical columns and their corresponding mapping based on your training data
categorical_cols = ['region', 'crop_type', 'irrigation_type', 'fertilizer_type']
# Define the mapping (replace with actual mapping from your training data)
region_mapping = {'Central USA': 0, 'East Africa': 1, 'North India': 2, 'South India': 3, 'South USA': 4}
crop_mapping = {'Cotton': 0, 'Maize': 1, 'Rice': 2, 'Soybean': 3, 'Wheat': 4}
irrigation_mapping = {'Drip': 0, 'Manual': 1, 'Sprinkler': 2}
fertilizer_mapping = {'Inorganic': 0, 'Mixed': 1, 'Organic': 2}
for col in categorical_cols:
if col == 'region':
input_df[col] = input_df[col].map(region_mapping).fillna(0) # Handle missing values
elif col == 'crop_type':
input_df[col] = input_df[col].map(crop_mapping).fillna(0) # Handle missing values
elif col == 'irrigation_type':
input_df[col] = input_df[col].map(irrigation_mapping).fillna(0) # Handle missing values
elif col == 'fertilizer_type':
input_df[col] = input_df[col].map(fertilizer_mapping).fillna(0) # Handle missing values
# Scale the input features
input_scaled = loaded_scaler.transform(input_df.values.reshape(-1, 1)).reshape(1,-1)
print(input_scaled.reshape(1,-1).shape)
# Make predictions
prediction = Geomodel.predict(input_scaled)
# Inverse transform the prediction (if needed, depending on your model)
#predicted_label = loaded_label_encoder.inverse_transform(prediction)
return prediction[0] #predicted_label[0]
def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None):
grad_model = tf.keras.models.Model(
[model.inputs], [model.get_layer(last_conv_layer_name).output, model.output]
)
with tf.GradientTape() as tape:
last_conv_layer_output, preds = grad_model(img_array)
if pred_index is None:
pred_index = tf.argmax(preds[0])
class_channel = preds[:, pred_index]
grads = tape.gradient(class_channel, last_conv_layer_output)
pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))
last_conv_layer_output = last_conv_layer_output[0]
heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
heatmap = tf.squeeze(heatmap)
heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
return heatmap.numpy()
def display_gradcam(img, heatmap, alpha=0.4):
# Create heatmap from the given heatmap values
heatmap = np.uint8(255 * heatmap)
jet = plt.cm.get_cmap("jet")
jet_colors = jet(np.arange(256))[:, :3]
jet_heatmap = jet_colors[heatmap]
jet_heatmap = tf.keras.preprocessing.image.array_to_img(jet_heatmap)
jet_heatmap = jet_heatmap.resize((img.shape[1], img.shape[0]))
jet_heatmap = tf.keras.preprocessing.image.img_to_array(jet_heatmap)
# Superimpose the heatmap onto the original image
superimposed_img = jet_heatmap * alpha + img
superimposed_img = tf.keras.preprocessing.image.array_to_img(superimposed_img)
# Save the image to a BytesIO object instead of showing it with plt.imshow()
img_byte_arr = BytesIO()
superimposed_img.save(img_byte_arr, format='PNG')
img_byte_arr = img_byte_arr.getvalue()
# Return the image as base64
return base64.b64encode(img_byte_arr).decode('utf-8')
def calculate_activation_ratio(heatmap, threshold=0.2):
"""Calculates the ratio of activated to non-activated pixels in a heatmap.
Args:
heatmap: A NumPy array representing the Grad-CAM heatmap.
threshold: The threshold for classifying pixels as activated or not.
Returns:
The ratio of activated pixels to non-activated pixels.
"""
activated_pixels = np.sum(heatmap > threshold)
total_pixels = heatmap.size
non_activated_pixels = total_pixels - activated_pixels
if non_activated_pixels == 0:
return 1.0 # Avoid division by zero if all pixels are activated
return activated_pixels / non_activated_pixels
# Example usage within the existing code (assuming heatmap is calculated as before):
last_conv_layer_name = "block3_conv2"
app = FastAPI()
@app.post("/classify")
async def classify(image: UploadFile = File(...)):
if image is not None:
img = Image.open(io.BytesIO(await image.read()))
img = img.resize((64,64))
img_array = np.array(img) / 255.0
img_array = np.expand_dims(img_array, axis=0)
predictions = model.predict(img_array)
predicted_class_idx = np.argmax(predictions)
predicted_class_idx = int(predicted_class_idx)
heatmap = make_gradcam_heatmap(img_array, model, last_conv_layer_name)
base64_image = display_gradcam( np.array(img), heatmap)
# Return the base64 encoded image in the response
# Calculate and print the activation ratio
ratio = calculate_activation_ratio(heatmap)
return {"prediction": predicted_class_idx,"gradcam": base64_image,"ration":ratio}
else:
return {"error": "No image provided"}
@app.post("/palmclassify")
async def palmclassify(image: UploadFile = File(...)):
if image is not None:
img = Image.open(io.BytesIO(await image.read()))
img = img.resize((64,64))
img_array = np.array(img) / 255.0
img_array = np.expand_dims(img_array, axis=0)
predictions = modelPalm.predict(img_array)
predicted_class_idx = np.argmax(predictions)
predicted_class_idx = int(predicted_class_idx)
last_mb = "Conv_1"
heatmap = make_gradcam_heatmap(img_array, modelPalm, last_mb)
base64_image = display_gradcam( np.array(img), heatmap)
# Return the base64 encoded image in the response
# Calculate and print the activation ratio
ratio = calculate_activation_ratio(heatmap)
return {"prediction": predicted_class_idx,"gradcam": base64_image,"ration":ratio}
else:
return {"error": "No image provided"}
yolomodel = YOLO("yolo11m.pt")
@app.post("/multiclassify")
async def classify(image: UploadFile = File(...)):
if image is not None:
img = Image.open(io.BytesIO(await image.read()))
results = yolomodel(img)
output=[]
for i, box in enumerate(results[0].boxes):
# Extract box coordinates
x1, y1, x2, y2 = box.xyxy[0].tolist()
confidence = box.conf[0].item()
class_id = box.cls[0].item()
# Crop the image based on bounding box
cropped_image = results[0].orig_img[int(y1):int(y2), int(x1):int(x2)]
cropped_image = Image.fromarray(cropped_image)
cropped_image = cropped_image.resize((64,64))
img_array = np.array(cropped_image) / 255.0
img_array = np.expand_dims(img_array, axis=0)
predictions = model.predict(img_array)
predicted_class_idx = np.argmax(predictions)
predicted_class_idx = int(predicted_class_idx)
print("HIIIIIIIIIIIIIIIIII")
output.append({
"box": {"x1": x1, "y1": y1, "x2": x2, "y2": y2},
"confidence": round(confidence, 2),
"predicted_class": predicted_class_idx
})
return {"output": output}
else:
return {"error": "No image provided"}
class DiseaseQuery(BaseModel):
disease: str
severity: str # "normal" or "severe"
language: str
@app.post("/RAG")
async def rag_classify(query: DiseaseQuery):
try:
prompt_template = create_prompt(query.disease, query.severity,query.language)
prompt = PromptTemplate(
input_variables=["context", "question"],
template=prompt_template,
)
qa = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=retriever,
chain_type_kwargs={"prompt": prompt},
verbose=True,
)
final_query = f"What is the best treatment plan for {query.disease} in a {query.severity} case?"
result = qa.run(final_query)
return {"answer": result}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
class ChatQuery(BaseModel):
question: str
language: str
@app.post("/RAGChat")
async def rag_chat(query: ChatQuery):
try:
prompt_template = create_prompt_chat(query.question,query.language)
prompt = PromptTemplate(
input_variables=["context", "question"],
template=prompt_template,
)
qa = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=retriever,
chain_type_kwargs={"prompt": prompt},
verbose=True,
)
final_query = f"Answer the Provided questino like a human, and remember the chat history {query.question}"
result = qa.run(final_query)
return {"answer": result}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
class GeoSenQuery(BaseModel):
region: str
crop_type: str
soil_moisture: float
soil_pH: float
temperature_C: float
rainfall_mm: float
humidity: float
sunlight_hours: float
irrigation_type: str
fertilizer_type: str
pesticide_usage_ml: float
total_days: int
yield_kg_per_hectare: float
latitude: float
longitude: float
NDVI_index: float
@app.post("/GeoSensor")
async def geo_sensor_classify(query: GeoSenQuery):
try:
logits = predict_crop_disease([query.region,query.crop_type,query.soil_moisture,query.soil_pH,query.temperature_C,
query.rainfall_mm,query.humidity,query.sunlight_hours,query.irrigation_type,query.fertilizer_type,
query.pesticide_usage_ml,query.total_days,query.yield_kg_per_hectare,query.latitude,query.longitude,
query.NDVI_index])
return {
"answer": int(np.argmax(logits)),
"logits": logits.tolist() if isinstance(logits, np.ndarray) else float(logits)
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))