File size: 2,313 Bytes
544fd88
 
 
eb20ed3
 
544fd88
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eb20ed3
 
 
 
 
 
 
544fd88
 
 
eb20ed3
544fd88
eb20ed3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
544fd88
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import cv2
import mediapipe as mp
import numpy as np
import threading

mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose
import gradio as gr

def detect_pose(im):
    with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
        # detect pose landmarks and render them on the image
        # convert the image from BGR to RGB (opneCV uses BGR by default)
        image = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        # make detection from pose instance
        results = pose.process(image)
        # print(results.pose_landmarks)
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        
        #Extracting landmarks
#         try: 
#             landmarks = results.pose_landmarks.landmark
#             print(landmarks)
#         except:
#             pass
        
        # print(results)
        # render pose landmarks on the image
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                mp_drawing.DrawingSpec(color=(51,255,51), thickness=2, circle_radius=2),
                mp_drawing.DrawingSpec(color=(51,153,255), thickness=2, circle_radius=2))
        
        # cv2.imshow('MediaPipe Pose', frame)
#         cv2.imshow('MediaPipe Pose', image)
        
    return image

pose_results = None

def update_pose_results(im):
    global pose_results
    pose_results = detect_pose(im)


with gr.Blocks() as app:
    gr.Markdown("# Webcam Testing")
    with gr.Row():
        inp = gr.Image(source="webcam", streaming=True)
        out = gr.Image()

    # Start a separate thread for pose detection
    def pose_detection_thread():
        while True:
            update_pose_results(inp.value)

    pose_thread = threading.Thread(target=pose_detection_thread)
    pose_thread.daemon = True  # Allow the thread to exit when the main program exits
    pose_thread.start()

    # Continuously update the output with pose results
    def update_output():
        global pose_results
        while True:
            if pose_results is not None:
                out.value = pose_results

    output_thread = threading.Thread(target=update_output)
    output_thread.daemon = True
    output_thread.start()

app.launch(debug=True)