Spaces:
Sleeping
Sleeping
| from ultralytics import YOLO | |
| import time | |
| import numpy as np | |
| import mediapipe as mp | |
| from flask import Flask, request, jsonify, send_file | |
| import uvicorn | |
| from socketio import ASGIApp | |
| import cv2 | |
| from flask import Flask, render_template, request, Response, session, redirect, url_for, make_response | |
| from flask_socketio import SocketIO | |
| from flask_socketio import emit | |
| from flask_cors import CORS | |
| from flask_socketio import SocketIO | |
| import yt_dlp as youtube_dl | |
| import uvicorn | |
| import base64 | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import base64 | |
| from io import BytesIO | |
| from PIL import Image | |
| def plot_base64_image(image_base64): | |
| # Decode base64 string | |
| image_data = base64.b64decode(image_base64) | |
| # Convert bytes to PIL Image | |
| image = Image.open(BytesIO(image_data)) | |
| # Convert PIL Image to numpy array | |
| image_array = np.array(image) | |
| # Plot image | |
| plt.imshow(image_array) | |
| plt.axis('off') | |
| plt.show() | |
| # Example usage: | |
| # base64_image = "..." # Your base64 encoded image string | |
| # plot_base64_image(base64_image) | |
| model_object_detection = YOLO("bisindov2.pt") | |
| app = Flask(__name__) | |
| socketio = SocketIO(app, cors_allowed_origins="*") | |
| CORS(app) | |
| app.secret_key = 'flask-sockets-builds' | |
| ###################################################### | |
| classes_translation = { | |
| "all": "الكل", | |
| "A": "أ", | |
| "B": "ب", | |
| "C": "ج", | |
| "D": "د", | |
| "F": "ف", | |
| "H": "هـ", | |
| "I": "أنا", | |
| "J": "جيم", | |
| "L": "إل", | |
| "M": "إم", | |
| "O": "أو", | |
| "R": "ر", | |
| "T": "ت", | |
| "U": "يو", | |
| "V": "في", | |
| "W": "دبليو", | |
| "Z": "زد", | |
| "additional": "إضافي", | |
| "alcohol": "مدرسة", | |
| "allergy": "حساسية", | |
| "bacon": "لحم المقدد", | |
| "bag": "حقيبة", | |
| "barbecue": "شواء", | |
| "bill": "فاتورة", | |
| "biscuit": "بسكويت", | |
| "bitter": "مر", | |
| "bread": "خبز", | |
| "burger": "برغر", | |
| "bye": "وداعاً", | |
| "cheese": "جبن", | |
| "chicken": "دجاج", | |
| "coke": "كوكاكولا", | |
| "cold": "بارد", | |
| "cost": "تكلفة", | |
| "coupon": "كوبون", | |
| "cup": "كوب", | |
| "dessert": "حلوى", | |
| "drink": "شراب", | |
| "drive": "قيادة", | |
| "eat": "تناول الطعام", | |
| "eggs": "بيض", | |
| "enjoy": "استمتع", | |
| "fork": "شوكة", | |
| "french fries": "بطاطس مقلية", | |
| "fresh": "طازج", | |
| "hello": "مرحبا", | |
| "hot": "ساخن", | |
| "icecream": "آيس كريم", | |
| "ingredients": "مكونات", | |
| "juicy": "عصيري", | |
| "ketchup": "كاتشب", | |
| "lactose": "لاكتوز", | |
| "lettuce": "خس", | |
| "lid": "غطاء", | |
| "manager": "مدير", | |
| "menu": "قائمة الطعام", | |
| "milk": "حليب", | |
| "mustard": "خردل", | |
| "napkin": "منديل", | |
| "no": "لا", | |
| "order": "طلب", | |
| "pepper": "فلفل", | |
| "pickle": "مخلل", | |
| "pizza": "بيتزا", | |
| "please": "من فضلك", | |
| "ready": "جاهز", | |
| "refill": "إعادة ملء", | |
| "repeat": "كرر", | |
| "safe": "آمن", | |
| "salt": "ملح", | |
| "sandwich": "ساندويتش", | |
| "sauce": "صلصة", | |
| "small": "صغير", | |
| "soda": "صودا", | |
| "sorry": "آسف", | |
| "spicy": "حار", | |
| "spoon": "ملعقة", | |
| "straw": "قش", | |
| "sugar": "سكر", | |
| "sweet": "حلو", | |
| "tissues": "مناديل", | |
| "total": "مجموع", | |
| "urgent": "عاجل", | |
| "vegetables": "خضروات", | |
| "warm": "دافئ", | |
| "water": "ماء", | |
| "what": "ماذا", | |
| "yoghurt": "زبادي", | |
| "your": "لك", | |
| "ILoveYou":"أحبك", | |
| "Halo":"مرحبًا" | |
| } | |
| ###################################################### | |
| class VideoStreaming(object): | |
| def __init__(self): | |
| super(VideoStreaming, self).__init__() | |
| print ("===== Video Streaming =====") | |
| self._preview = False | |
| self._flipH = False | |
| self._detect = False | |
| self._model = False | |
| self._mediaPipe = False | |
| self._confidence = 75.0 | |
| self.mp_hands = mp.solutions.hands | |
| self.hands = self.mp_hands.Hands() | |
| def confidence(self): | |
| return self._confidence | |
| def confidence(self, value): | |
| self._confidence = int(value) | |
| def preview(self): | |
| return self._preview | |
| def preview(self, value): | |
| self._preview = bool(value) | |
| def flipH(self): | |
| return self._flipH | |
| def flipH(self, value): | |
| self._flipH = bool(value) | |
| def detect(self): | |
| return self._detect | |
| def detect(self, value): | |
| self._detect = bool(value) | |
| def mediaPipe(self): | |
| return self._mediaPipe | |
| def mediaPipe(self, value): | |
| self._mediaPipe = bool(value) | |
| def show(self, url): | |
| print(url) | |
| self._preview = False | |
| self._flipH = False | |
| self._detect = False | |
| self._mediaPipe = False | |
| self._confidence = 75.0 | |
| ydl_opts = { | |
| "quiet": True, | |
| "no_warnings": True, | |
| "format": "best", | |
| "forceurl": True, | |
| } | |
| if url == '4': | |
| print("am here with 0 to start cam") | |
| cap = cv2.VideoCapture(0) | |
| else: | |
| ydl = youtube_dl.YoutubeDL(ydl_opts) | |
| info = ydl.extract_info("https://www.youtube.com/watch?v=j4YZBRwVFFo", download=False) | |
| url = info["url"] | |
| cap = cv2.VideoCapture(url) | |
| while True: | |
| if self._preview: | |
| if stop_flag: | |
| print("Process Stopped") | |
| return | |
| grabbed, frame = cap.read() | |
| if not grabbed: | |
| break | |
| if self.flipH: | |
| print("flip part :") | |
| frame = cv2.flip(frame, 1) | |
| if self.detect: | |
| frame_yolo = frame.copy() | |
| results_yolo = model_object_detection.predict(frame_yolo, conf=self._confidence / 100) | |
| frame_yolo, labels = results_yolo[0].plot() | |
| list_labels = [] | |
| # labels_confidences | |
| for label in labels: | |
| confidence = label.split(" ")[-1] | |
| label_name = " ".join(label.split(" ")[:-1]) | |
| # Translate the label if it exists in the translation dictionary | |
| translated_label = classes_translation.get(label_name, label_name) | |
| list_labels.append(translated_label) | |
| list_labels.append(confidence) | |
| socketio.emit('label', list_labels) | |
| if self.mediaPipe: | |
| # Convert the image to RGB for processing with MediaPipe | |
| image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| results = self.hands.process(image) | |
| if results.multi_hand_landmarks: | |
| for hand_landmarks in results.multi_hand_landmarks: | |
| mp.solutions.drawing_utils.draw_landmarks( | |
| frame, | |
| hand_landmarks, | |
| self.mp_hands.HAND_CONNECTIONS, | |
| landmark_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(color=(255, 0, 0), thickness=4, circle_radius=3), | |
| connection_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(color=(255, 255, 255), thickness=2, circle_radius=2), | |
| ) | |
| print("frame information in here : ") | |
| frame = cv2.imencode(".jpg", frame)[1].tobytes() | |
| yield ( | |
| b'--frame\r\n' | |
| b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n' | |
| ) | |
| else: | |
| snap = np.zeros(( | |
| 1000, | |
| 1000 | |
| ), np.uint8) | |
| label = "Streaming Off" | |
| H, W = snap.shape | |
| font = cv2.FONT_HERSHEY_PLAIN | |
| color = (255, 255, 255) | |
| cv2.putText(snap, label, (W//2 - 100, H//2), | |
| font, 2, color, 2) | |
| frame = cv2.imencode(".jpg", snap)[1].tobytes() | |
| yield (b'--frame\r\n' | |
| b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') | |
| def show1(self, url): | |
| print("url") | |
| self._preview = False | |
| self._flipH = False | |
| self._detect = False | |
| self._mediaPipe = False | |
| self._confidence = 75.0 | |
| ydl_opts = { | |
| "quiet": True, | |
| "no_warnings": True, | |
| "format": "best", | |
| "forceurl": True, | |
| } | |
| while True: | |
| # Decoding the Base64 string to get the frame data | |
| frame_bytes = base64.b64decode(url) | |
| # Converting the frame data to an OpenCV image | |
| frame_np = np.frombuffer(frame_bytes, np.uint8) | |
| frame = cv2.imdecode(frame_np, cv2.IMREAD_COLOR) | |
| # Encode the frame data to bytes | |
| _, frame_encoded = cv2.imencode(".jpg", frame) | |
| frame_bytes = frame_encoded.tobytes() | |
| yield ( | |
| b'--frame\r\n' | |
| b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n' | |
| ) | |
| # check_settings() | |
| VIDEO = VideoStreaming() | |
| def homepage(): | |
| return render_template('hompage.html') | |
| def index(): | |
| print("index") | |
| global stop_flag | |
| stop_flag = False | |
| if request.method == 'POST': | |
| print("Index post request") | |
| url = request.form['url'] | |
| print("index: ", url) | |
| # Create a response object | |
| resp = make_response(redirect(url_for('index'))) | |
| # Set a cookie containing the URL | |
| resp.set_cookie('url', url) | |
| return resp | |
| return render_template('index.html') | |
| def video_feed(): | |
| #url = session.get('url', None) | |
| #print("video feed: ", url) | |
| # Retrieve the URL from the cookie | |
| url = request.cookies.get('url') | |
| url = '0' | |
| print("video feed: ", url) | |
| if url is None: | |
| return redirect(url_for('homepage')) | |
| print("video feed: ", url) | |
| return Response(VIDEO.show(url), mimetype='multipart/x-mixed-replace; boundary=frame') | |
| # * Button requests | |
| def request_preview_switch(): | |
| VIDEO.preview = not VIDEO.preview | |
| print("*"*10, VIDEO.preview) | |
| return "nothing" | |
| def request_flipH_switch(): | |
| VIDEO.flipH = not VIDEO.flipH | |
| print("*"*10, VIDEO.flipH) | |
| return "nothing" | |
| def request_run_model_switch(): | |
| VIDEO.detect = not VIDEO.detect | |
| print("*"*10, VIDEO.detect) | |
| return "nothing" | |
| def request_mediapipe_switch(): | |
| VIDEO.mediaPipe = not VIDEO.mediaPipe | |
| print("*"*10, VIDEO.mediaPipe) | |
| return "nothing" | |
| def update_slider_value(): | |
| slider_value = request.form['sliderValue'] | |
| VIDEO.confidence = slider_value | |
| return 'OK' | |
| def stop_process(): | |
| print("Process stop Request") | |
| global stop_flag | |
| stop_flag = True | |
| return 'Process Stop Request' | |
| def test_connect(): | |
| print('Connected') | |
| #emit('message', data, broadcast=True) | |
| ###################### | |
| def preprocess_frame(frame_data): | |
| if frame_data is None: | |
| return None # Return None if frame_data is None | |
| try: | |
| # Convert base64 image string to numpy array | |
| # Split frame_data to extract base64 part | |
| base64_data = frame_data | |
| # Convert base64 image string to numpy array | |
| imgdata = base64.b64decode(base64_data) | |
| imgarray = np.frombuffer(imgdata, np.uint8) | |
| # Decode the image using cv2.imdecode | |
| frame = cv2.imdecode(imgarray, cv2.IMREAD_COLOR) | |
| print("rani dezte hna koolchi mezian") | |
| # Apply image processing here (example: grayscale conversion) | |
| processed_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| # Convert processed frame back to base64 image string | |
| _, buffer = cv2.imencode('.jpg', processed_frame) | |
| processed_frame_data = base64.b64encode(buffer).decode('utf-8') | |
| #plot_base64_image(processed_frame_data) | |
| return processed_frame_data | |
| except Exception as e: | |
| print("Error processing frame:", e) | |
| return None | |
| #@socketio.on('stream_frame') | |
| #def handle_stream_frame(frame_data): | |
| # processed_frame_data = preprocess_frame(frame_data) | |
| # #emit('receive_frame', processed_frame_data, broadcast=True) | |
| # return Response(VIDEO.show1(frame_data), mimetype='multipart/x-mixed-replace; boundary=frame') | |
| # Route to receive a frame for processing | |
| def process_frame(): | |
| if 'frame' in request.files: | |
| frame = request.files['frame'] | |
| # Process the frame here | |
| # For example, you can save the frame to a file | |
| frame_path = 'result_frame.jpg' | |
| frame.save(frame_path) | |
| # Return the processed frame | |
| return send_file(frame_path, mimetype='image/jpeg') | |
| else: | |
| return 'No frame data received' | |
| if __name__ == '__main__': | |
| socketio.run(app, host="0.0.0.0", allow_unsafe_werkzeug=True,port=7860) | |