Bachstelze commited on
Commit
9e2e72e
·
1 Parent(s): 2e9c848

connect app.py with pose_estimator.py

Browse files
Files changed (3) hide show
  1. app.py +314 -141
  2. open_pose_app.py +235 -0
  3. requirements.txt +0 -2
app.py CHANGED
@@ -1,75 +1,72 @@
1
  from PIL import Image
2
  import gradio as gr
3
- from controlnet_aux import OpenposeDetector
4
  import json
5
  import csv
6
  import os
7
  from datetime import datetime
8
  from typing import Dict, List, Any, Optional
9
  import numpy as np
10
-
11
- # Load OpenPose detector
12
- openpose = OpenposeDetector.from_pretrained("lllyasviel/ControlNet")
13
-
14
- # OpenPose joint mapping (COCO format - 18 joints)
15
- JOINT_NAMES = [
16
- "Nose", # 0
17
- "Neck", # 1
18
- "RShoulder", # 2
19
- "RElbow", # 3
20
- "RWrist", # 4
21
- "LShoulder", # 5
22
- "LElbow", # 6
23
- "LWrist", # 7
24
- "RHip", # 8
25
- "RKnee", # 9
26
- "RAnkle", # 10
27
- "LHip", # 11
28
- "LKnee", # 12
29
- "LAnkle", # 13
30
- "REye", # 14
31
- "LEye", # 15
32
- "REar", # 16
33
- "LEar" # 17
 
 
34
  ]
35
 
36
- def extract_joint_positions_from_detect_poses(pose_results: List[Any]) -> Dict[str, Any]:
37
- """Extract joint positions from OpenPose detect_poses result."""
38
- all_poses = []
39
-
40
- for idx, pose in enumerate(pose_results):
41
- body = pose.body
42
- keypoints = []
43
-
44
- for joint_idx, keypoint in enumerate(body.keypoints):
45
- if keypoint is not None:
46
- keypoints.append({
47
- "x": keypoint.x,
48
- "y": keypoint.y,
49
- "score": getattr(keypoint, 'score', 0.0),
50
- "name": JOINT_NAMES[joint_idx] if joint_idx < len(JOINT_NAMES) else f"Joint_{joint_idx}"
51
- })
52
- else:
53
- keypoints.append({
54
- "x": None,
55
- "y": None,
56
- "score": None,
57
- "name": JOINT_NAMES[joint_idx] if joint_idx < len(JOINT_NAMES) else f"Joint_{joint_idx}"
58
- })
59
-
60
- all_poses.append({
61
- "pose_id": idx,
62
- "total_score": body.total_score,
63
- "total_parts": body.total_parts,
64
- "keypoints": keypoints
65
  })
66
 
67
  return {
68
- "poses": all_poses,
 
 
 
 
 
69
  "timestamp": datetime.now().isoformat(),
70
- "joint_names": JOINT_NAMES
 
71
  }
72
 
 
73
  def save_to_csv(joint_data: Dict[str, Any], filename: str = None) -> str:
74
  """Save joint positions to CSV file."""
75
  if filename is None:
@@ -105,9 +102,11 @@ def save_to_csv(joint_data: Dict[str, Any], filename: str = None) -> str:
105
 
106
  writer.writerow([])
107
  writer.writerow(["Timestamp", joint_data.get("timestamp", "")])
 
108
 
109
  return filepath
110
 
 
111
  def save_to_json(joint_data: Dict[str, Any], filename: str = None) -> str:
112
  """Save joint positions to JSON file."""
113
  if filename is None:
@@ -122,114 +121,288 @@ def save_to_json(joint_data: Dict[str, Any], filename: str = None) -> str:
122
 
123
  return filepath
124
 
125
- def generate_pose(image, use_openpose=True, save_outputs=True, include_hands=False, include_face=False):
126
- """Generate pose estimation and extract joint positions."""
127
- img = image.convert("RGB")
128
-
129
- if use_openpose:
130
- # Convert PIL Image to numpy array for detect_poses
131
- img_array = np.array(img)
132
-
133
- # Use detect_poses to get structured data
134
- pose_results = openpose.detect_poses(
135
- img_array,
136
- include_hand=include_hands,
137
- include_face=include_face
138
- )
139
-
140
- # Extract joint positions from pose results
141
- joint_data = extract_joint_positions_from_detect_poses(pose_results)
142
-
143
- # Generate the annotated image
144
- result = openpose(img)
145
-
146
- # Save pose data if requested
147
- if save_outputs:
148
- csv_path = save_to_csv(joint_data)
149
- json_path = save_to_json(joint_data)
150
- joint_data["csv_path"] = csv_path
151
- joint_data["json_path"] = json_path
 
 
 
 
 
 
152
  else:
153
- result = img
154
- joint_data = {
155
- "poses": [],
156
- "timestamp": datetime.now().isoformat(),
157
- "note": "OpenPose disabled - no pose data extracted"
158
- }
159
 
160
- if not isinstance(result, Image.Image):
161
- result = Image.fromarray(result)
162
 
163
- return result, joint_data
164
 
165
  def format_pose_output(joint_data: Dict[str, Any]) -> str:
166
  """Format pose data for display in Gradio."""
167
- if not joint_data.get("poses"):
168
- return "No pose data available.\n\n" + \
169
- f"**Timestamp:** {joint_data.get('timestamp', 'N/A')}\n" + \
170
- f"**CSV File:** `{joint_data.get('csv_path', 'N/A')}`\n" + \
171
- f"**JSON File:** `{joint_data.get('json_path', 'N/A')}`"
172
-
173
  output = "### Detected Poses\n\n"
174
- output += f"**Timestamp:** {joint_data.get('timestamp', 'N/A')}\n\n"
 
175
 
176
- for pose in joint_data.get("poses", []):
177
- output += f"#### Pose #{pose.get('pose_id', 0)}\n"
178
- output += f"- **Total Score:** {pose.get('total_score', 0):.3f}\n"
179
- output += f"- **Total Parts:** {pose.get('total_parts', 0)}\n\n"
 
 
 
180
 
181
- output += "| Joint | X | Y | Confidence | Visible |\n"
182
- output += "|-------|---|---|------------|---------|\n"
183
 
184
- for kp in pose.get("keypoints", []):
185
- name = kp.get("name", "Unknown")
186
- x = kp.get("x")
187
- y = kp.get("y")
188
- score = kp.get("score")
189
 
190
- x_str = f"{x:.1f}" if x is not None else "N/A"
191
- y_str = f"{y:.1f}" if y is not None else "N/A"
192
- score_str = f"{score:.3f}" if score is not None else "N/A"
193
- visible = "Yes" if x is not None and y is not None else "No"
194
 
195
- output += f"| {name} | {x_str} | {y_str} | {score_str} | {visible} |\n"
196
 
197
- output += "\n"
198
 
199
  output += f"**CSV File:** `{joint_data.get('csv_path', 'N/A')}`\n"
200
  output += f"**JSON File:** `{joint_data.get('json_path', 'N/A')}`\n"
201
 
202
  return output
203
 
204
- def process_and_display(image, use_openpose=True, include_hands=False, include_face=False):
205
- """Process image and return pose output with data files."""
206
- result, joint_data = generate_pose(
207
- image,
208
- use_openpose=use_openpose,
209
- save_outputs=True,
210
- include_hands=include_hands,
211
- include_face=include_face
212
- )
213
 
 
 
 
214
  pose_info = format_pose_output(joint_data)
215
  return result, pose_info
216
 
217
- # Gradio UI
218
- demo = gr.Interface(
219
- fn=process_and_display,
220
- inputs=[
221
- gr.Image(type="pil", label="Upload Image"),
222
- gr.Checkbox(value=True, label="Use OpenPose (default: true)"),
223
- gr.Checkbox(value=False, label="Include Hands"),
224
- gr.Checkbox(value=False, label="Include Face"),
225
- ],
226
- outputs=[
227
- gr.Image(type="pil", label="Pose Output"),
228
- gr.Textbox(label="Pose Data", lines=15)
229
- ],
230
- title="Pose Estimation and Export",
231
- description="Generate full body pose including face and hands. Extracts and stores joint positions in CSV and JSON formats."
232
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
233
 
234
  if __name__ == "__main__":
235
  demo.launch(server_name="0.0.0.0", server_port=7860)
 
1
  from PIL import Image
2
  import gradio as gr
3
+ from A8.pose_estimator import MoveNetPoseEstimator
4
  import json
5
  import csv
6
  import os
7
  from datetime import datetime
8
  from typing import Dict, List, Any, Optional
9
  import numpy as np
10
+ import cv2
11
+ import tempfile
12
+ import time
13
+
14
+ # Initialize MoveNet pose estimator
15
+ pose_estimator = MoveNetPoseEstimator(model_name='lightning')
16
+
17
+ # COCO Keypoint definitions (17 keypoints)
18
+ KEYPOINT_NAMES = [
19
+ 'nose',
20
+ 'left_eye',
21
+ 'right_eye',
22
+ 'left_ear',
23
+ 'right_ear',
24
+ 'left_shoulder',
25
+ 'right_shoulder',
26
+ 'left_elbow',
27
+ 'right_elbow',
28
+ 'left_wrist',
29
+ 'right_wrist',
30
+ 'left_hip',
31
+ 'right_hip',
32
+ 'left_knee',
33
+ 'right_knee',
34
+ 'left_ankle',
35
+ 'right_ankle'
36
  ]
37
 
38
+
39
+ def extract_joint_positions_from_movenet(pose_result: Dict[str, Any]) -> Dict[str, Any]:
40
+ """Extract joint positions from MoveNet pose result."""
41
+ keypoints = pose_result.get('keypoints', {})
42
+ all_keypoints = []
43
+
44
+ for joint_name in KEYPOINT_NAMES:
45
+ kp = keypoints.get(joint_name, {})
46
+ x = kp.get('x')
47
+ y = kp.get('y')
48
+ score = kp.get('confidence')
49
+
50
+ all_keypoints.append({
51
+ "x": x,
52
+ "y": y,
53
+ "score": score,
54
+ "name": joint_name
 
 
 
 
 
 
 
 
 
 
 
 
55
  })
56
 
57
  return {
58
+ "poses": [{
59
+ "pose_id": 0,
60
+ "total_score": 0.0,
61
+ "total_parts": len([k for k in all_keypoints if k['x'] is not None]),
62
+ "keypoints": all_keypoints
63
+ }],
64
  "timestamp": datetime.now().isoformat(),
65
+ "joint_names": KEYPOINT_NAMES,
66
+ "inference_time_ms": pose_result.get('inference_time_ms', 0)
67
  }
68
 
69
+
70
  def save_to_csv(joint_data: Dict[str, Any], filename: str = None) -> str:
71
  """Save joint positions to CSV file."""
72
  if filename is None:
 
102
 
103
  writer.writerow([])
104
  writer.writerow(["Timestamp", joint_data.get("timestamp", "")])
105
+ writer.writerow(["Inference_Time_ms", joint_data.get("inference_time_ms", 0)])
106
 
107
  return filepath
108
 
109
+
110
  def save_to_json(joint_data: Dict[str, Any], filename: str = None) -> str:
111
  """Save joint positions to JSON file."""
112
  if filename is None:
 
121
 
122
  return filepath
123
 
124
+
125
+ def process_single_image(image: Image.Image, confidence_threshold: float = 0.3) -> tuple:
126
+ """Process a single image and return annotated image with pose data."""
127
+ img_array = np.array(image.convert("RGB"))
128
+ img_bgr = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR)
129
+
130
+ pose_result = pose_estimator.detect_pose(img_bgr)
131
+ joint_data = extract_joint_positions_from_movenet(pose_result)
132
+
133
+ result_bgr = pose_estimator.draw_keypoints(img_bgr, pose_result, confidence_threshold=confidence_threshold)
134
+ result_rgb = cv2.cvtColor(result_bgr, cv2.COLOR_BGR2RGB)
135
+ result_image = Image.fromarray(result_rgb)
136
+
137
+ csv_path = save_to_csv(joint_data)
138
+ json_path = save_to_json(joint_data)
139
+ joint_data["csv_path"] = csv_path
140
+ joint_data["json_path"] = json_path
141
+
142
+ return result_image, joint_data
143
+
144
+
145
+ def process_video_frame(frame: np.ndarray, confidence_threshold: float = 0.3) -> np.ndarray:
146
+ """Process a single video frame and return annotated frame."""
147
+ # Handle frame format - OpenCV videos are BGR with 3 channels
148
+ # If frame has 3 channels, assume BGR. If 4 channels, convert BGRA to BGR.
149
+ # If grayscale (2D), convert to BGR.
150
+ if len(frame.shape) == 3:
151
+ if frame.shape[2] == 3:
152
+ img_bgr = frame # Already BGR
153
+ elif frame.shape[2] == 4:
154
+ img_bgr = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR) # Convert BGRA to BGR
155
+ else:
156
+ img_bgr = frame # Fallback
157
  else:
158
+ img_bgr = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) # Convert grayscale to BGR
159
+
160
+ pose_result = pose_estimator.detect_pose(img_bgr)
161
+ annotated_bgr = pose_estimator.draw_keypoints(img_bgr, pose_result, confidence_threshold=confidence_threshold)
 
 
162
 
163
+ return annotated_bgr
 
164
 
 
165
 
166
  def format_pose_output(joint_data: Dict[str, Any]) -> str:
167
  """Format pose data for display in Gradio."""
 
 
 
 
 
 
168
  output = "### Detected Poses\n\n"
169
+ output += f"**Timestamp:** {joint_data.get('timestamp', 'N/A')}\n"
170
+ output += f"**Inference Time:** {joint_data.get('inference_time_ms', 0):.2f} ms\n\n"
171
 
172
+ poses = joint_data.get("poses", [])
173
+ if not poses:
174
+ output += "No pose data available.\n\n"
175
+ else:
176
+ for pose in poses:
177
+ output += f"#### Pose #{pose.get('pose_id', 0)}\n"
178
+ output += f"- **Total Parts:** {pose.get('total_parts', 0)}\n\n"
179
 
180
+ output += "| Joint | X | Y | Confidence | Visible |\n"
181
+ output += "|-------|---|---|------------|---------|\n"
182
 
183
+ for kp in pose.get("keypoints", []):
184
+ name = kp.get("name", "Unknown")
185
+ x = kp.get("x")
186
+ y = kp.get("y")
187
+ score = kp.get("score")
188
 
189
+ x_str = f"{x:.1f}" if x is not None else "N/A"
190
+ y_str = f"{y:.1f}" if y is not None else "N/A"
191
+ score_str = f"{score:.3f}" if score is not None else "N/A"
192
+ visible = "Yes" if x is not None and y is not None else "No"
193
 
194
+ output += f"| {name} | {x_str} | {y_str} | {score_str} | {visible} |\n"
195
 
196
+ output += "\n"
197
 
198
  output += f"**CSV File:** `{joint_data.get('csv_path', 'N/A')}`\n"
199
  output += f"**JSON File:** `{joint_data.get('json_path', 'N/A')}`\n"
200
 
201
  return output
202
 
 
 
 
 
 
 
 
 
 
203
 
204
+ def process_and_display(image: Image.Image, confidence_threshold: float = 0.3) -> tuple:
205
+ """Process image and return pose output with data files."""
206
+ result, joint_data = process_single_image(image, confidence_threshold)
207
  pose_info = format_pose_output(joint_data)
208
  return result, pose_info
209
 
210
+
211
+ def process_webcam_video(
212
+ video_path: str,
213
+ confidence_threshold: float = 0.3,
214
+ progress=gr.Progress()
215
+ ) -> tuple:
216
+ """Process uploaded video with pose estimation."""
217
+ if video_path is None:
218
+ return None, "No video uploaded."
219
+
220
+ cap = cv2.VideoCapture(video_path)
221
+ if not cap.isOpened():
222
+ return None, "Could not open video."
223
+
224
+ # Get video properties
225
+ fps = cap.get(cv2.CAP_PROP_FPS)
226
+ width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
227
+ height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
228
+ total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
229
+
230
+ print(f"Video properties: FPS={fps}, Width={width}, Height={height}, TotalFrames={total_frames}")
231
+
232
+ # Validate FPS - if it's extremely high or invalid, use a reasonable default
233
+ if fps <= 0 or fps > 240: # 240 FPS is unrealistically high for normal videos
234
+ print(f"Invalid FPS ({fps}), using default 30 FPS")
235
+ fps = 30
236
+ else:
237
+ print(f"Using FPS: {fps}")
238
+
239
+ # Create output video
240
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
241
+ output_path = os.path.join("pose_outputs", f"annotated_video_{timestamp}.mp4")
242
+ os.makedirs("pose_outputs", exist_ok=True)
243
+
244
+ fourcc = cv2.VideoWriter_fourcc(*'mp4v')
245
+ out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
246
+
247
+ # Verify video writer opened successfully
248
+ if not out.isOpened():
249
+ print(f"Error: Video writer failed to open. Output path: {output_path}")
250
+ return None, "Failed to create output video. Please check the video format and try again."
251
+
252
+ all_keypoints = []
253
+ frame_count = 0
254
+
255
+ progress(0, desc="Processing video...")
256
+
257
+ while True:
258
+ ret, frame = cap.read()
259
+ if not ret:
260
+ print(f"Frame read failed at frame {frame_count}")
261
+ break
262
+
263
+ # Debug: Check frame properties
264
+ print(f"Frame {frame_count}: shape={frame.shape if frame is not None else None}")
265
+
266
+ # Process frame
267
+ annotated_frame = process_video_frame(frame, confidence_threshold)
268
+
269
+ # Verify frame dimensions match video writer
270
+ if annotated_frame.shape[1] != width or annotated_frame.shape[0] != height:
271
+ print(f"Resizing frame from {annotated_frame.shape[1]}x{annotated_frame.shape[0]} to {width}x{height}")
272
+ annotated_frame = cv2.resize(annotated_frame, (width, height))
273
+
274
+ out.write(annotated_frame)
275
+
276
+ # Extract keypoints for this frame
277
+ img_bgr = frame if frame.shape[2] == 3 else cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
278
+ pose_result = pose_estimator.detect_pose(img_bgr)
279
+ joint_data = extract_joint_positions_from_movenet(pose_result)
280
+ joint_data['frame_id'] = frame_count
281
+ joint_data['timestamp'] = frame_count / fps if fps > 0 else 0
282
+ all_keypoints.append(joint_data)
283
+
284
+ frame_count += 1
285
+
286
+ # Update progress
287
+ if frame_count % 30 == 0:
288
+ progress(frame_count / total_frames if total_frames > 0 else 0, desc=f"Processing frame {frame_count}/{total_frames if total_frames > 0 else '?'}...")
289
+
290
+ cap.release()
291
+ out.release()
292
+
293
+ print(f"Total frames processed: {frame_count}")
294
+
295
+ # Save keypoints to CSV
296
+ csv_path = os.path.join("pose_outputs", f"video_keypoints_{timestamp}.csv")
297
+ with open(csv_path, 'w', newline='') as csvfile:
298
+ writer = csv.writer(csvfile)
299
+ writer.writerow(["Frame_ID", "Joint", "X", "Y", "Confidence", "Visible"])
300
+
301
+ for frame_data in all_keypoints:
302
+ frame_id = frame_data.get('frame_id', 0)
303
+ for kp in frame_data['poses'][0]['keypoints']:
304
+ x = kp.get('x')
305
+ y = kp.get('y')
306
+ score = kp.get('score')
307
+ name = kp.get('name', 'Unknown')
308
+
309
+ visible = "Yes" if x is not None and y is not None else "No"
310
+ writer.writerow([
311
+ frame_id,
312
+ name,
313
+ f"{x:.2f}" if x is not None else "N/A",
314
+ f"{y:.2f}" if y is not None else "N/A",
315
+ f"{score:.3f}" if score is not None else "N/A",
316
+ visible
317
+ ])
318
+
319
+ avg_inference = np.mean([k.get('inference_time_ms', 0) for k in all_keypoints]) if all_keypoints else 0
320
+
321
+ result_text = f"""### Video Processing Complete
322
+
323
+ - **Frames processed:** {frame_count}
324
+ - **Average inference time:** {avg_inference:.2f} ms/frame
325
+ - **Output video:** `{output_path}`
326
+ - **Keypoints CSV:** `{csv_path}`
327
+ """
328
+
329
+ return output_path, result_text
330
+
331
+
332
+ # Gradio UI with Tabs
333
+ with gr.Blocks(title="MoveNet Pose Estimation") as demo:
334
+ gr.Markdown("# 🏃 MoveNet Pose Estimation")
335
+ gr.Markdown("Estimate human poses using Google's MoveNet model. Supports single images and video files.")
336
+
337
+ with gr.Tabs():
338
+ # Image Processing Tab
339
+ with gr.TabItem("📸 Image Processing"):
340
+ with gr.Row():
341
+ with gr.Column():
342
+ gr.Markdown("### Upload Image")
343
+ image_input = gr.Image(type="pil", label="Input Image")
344
+ confidence_slider = gr.Slider(
345
+ minimum=0.0,
346
+ maximum=1.0,
347
+ value=0.3,
348
+ step=0.05,
349
+ label="Confidence Threshold"
350
+ )
351
+ process_btn = gr.Button("🚀 Process Image", variant="primary")
352
+
353
+ with gr.Column():
354
+ gr.Markdown("### Results")
355
+ image_output = gr.Image(type="pil", label="Annotated Output")
356
+ pose_text = gr.Textbox(label="Pose Data", lines=15)
357
+
358
+ process_btn.click(
359
+ fn=process_and_display,
360
+ inputs=[image_input, confidence_slider],
361
+ outputs=[image_output, pose_text]
362
+ )
363
+
364
+ # Video Processing Tab
365
+ with gr.TabItem("🎥 Video Processing"):
366
+ with gr.Row():
367
+ with gr.Column():
368
+ gr.Markdown("### Upload Video")
369
+ video_input = gr.Video(label="Input Video")
370
+ video_confidence = gr.Slider(
371
+ minimum=0.0,
372
+ maximum=1.0,
373
+ value=0.3,
374
+ step=0.05,
375
+ label="Confidence Threshold"
376
+ )
377
+ process_video_btn = gr.Button("🎬 Process Video", variant="primary")
378
+
379
+ with gr.Column():
380
+ gr.Markdown("### Results")
381
+ video_output = gr.Video(label="Annotated Video")
382
+ video_result = gr.Textbox(label="Processing Results", lines=15)
383
+
384
+ process_video_btn.click(
385
+ fn=process_webcam_video,
386
+ inputs=[video_input, video_confidence],
387
+ outputs=[video_output, video_result]
388
+ )
389
+
390
+ # Example section
391
+ with gr.Accordion("ℹ️ Information", open=False):
392
+ gr.Markdown("""
393
+ ### Features
394
+ - **Single Image Processing**: Upload and process static images
395
+ - **Video Processing**: Upload video files for pose estimation
396
+ - **17 COCO Keypoints**: Detects nose, eyes, ears, shoulders, elbows, wrists, hips, knees, and ankles
397
+ - **Confidence Threshold**: Adjust detection sensitivity
398
+ - **CSV/JSON Export**: Download pose data for further analysis
399
+
400
+ ### Model Details
401
+ - Model: MoveNet SinglePose (Lightning)
402
+ - Input size: 192x192 pixels
403
+ - Fast and efficient real-time pose estimation
404
+ """)
405
+
406
 
407
  if __name__ == "__main__":
408
  demo.launch(server_name="0.0.0.0", server_port=7860)
open_pose_app.py ADDED
@@ -0,0 +1,235 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from PIL import Image
2
+ import gradio as gr
3
+ from controlnet_aux import OpenposeDetector
4
+ import json
5
+ import csv
6
+ import os
7
+ from datetime import datetime
8
+ from typing import Dict, List, Any, Optional
9
+ import numpy as np
10
+
11
+ # Load OpenPose detector
12
+ openpose = OpenposeDetector.from_pretrained("lllyasviel/ControlNet")
13
+
14
+ # OpenPose joint mapping (COCO format - 18 joints)
15
+ JOINT_NAMES = [
16
+ "Nose", # 0
17
+ "Neck", # 1
18
+ "RShoulder", # 2
19
+ "RElbow", # 3
20
+ "RWrist", # 4
21
+ "LShoulder", # 5
22
+ "LElbow", # 6
23
+ "LWrist", # 7
24
+ "RHip", # 8
25
+ "RKnee", # 9
26
+ "RAnkle", # 10
27
+ "LHip", # 11
28
+ "LKnee", # 12
29
+ "LAnkle", # 13
30
+ "REye", # 14
31
+ "LEye", # 15
32
+ "REar", # 16
33
+ "LEar" # 17
34
+ ]
35
+
36
+ def extract_joint_positions_from_detect_poses(pose_results: List[Any]) -> Dict[str, Any]:
37
+ """Extract joint positions from OpenPose detect_poses result."""
38
+ all_poses = []
39
+
40
+ for idx, pose in enumerate(pose_results):
41
+ body = pose.body
42
+ keypoints = []
43
+
44
+ for joint_idx, keypoint in enumerate(body.keypoints):
45
+ if keypoint is not None:
46
+ keypoints.append({
47
+ "x": keypoint.x,
48
+ "y": keypoint.y,
49
+ "score": getattr(keypoint, 'score', 0.0),
50
+ "name": JOINT_NAMES[joint_idx] if joint_idx < len(JOINT_NAMES) else f"Joint_{joint_idx}"
51
+ })
52
+ else:
53
+ keypoints.append({
54
+ "x": None,
55
+ "y": None,
56
+ "score": None,
57
+ "name": JOINT_NAMES[joint_idx] if joint_idx < len(JOINT_NAMES) else f"Joint_{joint_idx}"
58
+ })
59
+
60
+ all_poses.append({
61
+ "pose_id": idx,
62
+ "total_score": body.total_score,
63
+ "total_parts": body.total_parts,
64
+ "keypoints": keypoints
65
+ })
66
+
67
+ return {
68
+ "poses": all_poses,
69
+ "timestamp": datetime.now().isoformat(),
70
+ "joint_names": JOINT_NAMES
71
+ }
72
+
73
+ def save_to_csv(joint_data: Dict[str, Any], filename: str = None) -> str:
74
+ """Save joint positions to CSV file."""
75
+ if filename is None:
76
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
77
+ filename = f"pose_data_{timestamp}.csv"
78
+
79
+ filepath = os.path.join("pose_outputs", filename)
80
+ os.makedirs("pose_outputs", exist_ok=True)
81
+
82
+ with open(filepath, 'w', newline='') as csvfile:
83
+ writer = csv.writer(csvfile)
84
+ writer.writerow(["Pose_ID", "Joint", "X", "Y", "Confidence", "Visible"])
85
+
86
+ poses = joint_data.get("poses", [])
87
+ for pose in poses:
88
+ pose_id = pose.get("pose_id", 0)
89
+ for kp in pose.get("keypoints", []):
90
+ x = kp.get("x")
91
+ y = kp.get("y")
92
+ score = kp.get("score")
93
+ name = kp.get("name", "Unknown")
94
+
95
+ visible = "Yes" if x is not None and y is not None else "No"
96
+
97
+ writer.writerow([
98
+ pose_id,
99
+ name,
100
+ f"{x:.2f}" if x is not None else "N/A",
101
+ f"{y:.2f}" if y is not None else "N/A",
102
+ f"{score:.3f}" if score is not None else "N/A",
103
+ visible
104
+ ])
105
+
106
+ writer.writerow([])
107
+ writer.writerow(["Timestamp", joint_data.get("timestamp", "")])
108
+
109
+ return filepath
110
+
111
+ def save_to_json(joint_data: Dict[str, Any], filename: str = None) -> str:
112
+ """Save joint positions to JSON file."""
113
+ if filename is None:
114
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
115
+ filename = f"pose_data_{timestamp}.json"
116
+
117
+ filepath = os.path.join("pose_outputs", filename)
118
+ os.makedirs("pose_outputs", exist_ok=True)
119
+
120
+ with open(filepath, 'w') as jsonfile:
121
+ json.dump(joint_data, jsonfile, indent=2)
122
+
123
+ return filepath
124
+
125
+ def generate_pose(image, use_openpose=True, save_outputs=True, include_hands=False, include_face=False):
126
+ """Generate pose estimation and extract joint positions."""
127
+ img = image.convert("RGB")
128
+
129
+ if use_openpose:
130
+ # Convert PIL Image to numpy array for detect_poses
131
+ img_array = np.array(img)
132
+
133
+ # Use detect_poses to get structured data
134
+ pose_results = openpose.detect_poses(
135
+ img_array,
136
+ include_hand=include_hands,
137
+ include_face=include_face
138
+ )
139
+
140
+ # Extract joint positions from pose results
141
+ joint_data = extract_joint_positions_from_detect_poses(pose_results)
142
+
143
+ # Generate the annotated image
144
+ result = openpose(img)
145
+
146
+ # Save pose data if requested
147
+ if save_outputs:
148
+ csv_path = save_to_csv(joint_data)
149
+ json_path = save_to_json(joint_data)
150
+ joint_data["csv_path"] = csv_path
151
+ joint_data["json_path"] = json_path
152
+ else:
153
+ result = img
154
+ joint_data = {
155
+ "poses": [],
156
+ "timestamp": datetime.now().isoformat(),
157
+ "note": "OpenPose disabled - no pose data extracted"
158
+ }
159
+
160
+ if not isinstance(result, Image.Image):
161
+ result = Image.fromarray(result)
162
+
163
+ return result, joint_data
164
+
165
+ def format_pose_output(joint_data: Dict[str, Any]) -> str:
166
+ """Format pose data for display in Gradio."""
167
+ if not joint_data.get("poses"):
168
+ return "No pose data available.\n\n" + \
169
+ f"**Timestamp:** {joint_data.get('timestamp', 'N/A')}\n" + \
170
+ f"**CSV File:** `{joint_data.get('csv_path', 'N/A')}`\n" + \
171
+ f"**JSON File:** `{joint_data.get('json_path', 'N/A')}`"
172
+
173
+ output = "### Detected Poses\n\n"
174
+ output += f"**Timestamp:** {joint_data.get('timestamp', 'N/A')}\n\n"
175
+
176
+ for pose in joint_data.get("poses", []):
177
+ output += f"#### Pose #{pose.get('pose_id', 0)}\n"
178
+ output += f"- **Total Score:** {pose.get('total_score', 0):.3f}\n"
179
+ output += f"- **Total Parts:** {pose.get('total_parts', 0)}\n\n"
180
+
181
+ output += "| Joint | X | Y | Confidence | Visible |\n"
182
+ output += "|-------|---|---|------------|---------|\n"
183
+
184
+ for kp in pose.get("keypoints", []):
185
+ name = kp.get("name", "Unknown")
186
+ x = kp.get("x")
187
+ y = kp.get("y")
188
+ score = kp.get("score")
189
+
190
+ x_str = f"{x:.1f}" if x is not None else "N/A"
191
+ y_str = f"{y:.1f}" if y is not None else "N/A"
192
+ score_str = f"{score:.3f}" if score is not None else "N/A"
193
+ visible = "Yes" if x is not None and y is not None else "No"
194
+
195
+ output += f"| {name} | {x_str} | {y_str} | {score_str} | {visible} |\n"
196
+
197
+ output += "\n"
198
+
199
+ output += f"**CSV File:** `{joint_data.get('csv_path', 'N/A')}`\n"
200
+ output += f"**JSON File:** `{joint_data.get('json_path', 'N/A')}`\n"
201
+
202
+ return output
203
+
204
+ def process_and_display(image, use_openpose=True, include_hands=False, include_face=False):
205
+ """Process image and return pose output with data files."""
206
+ result, joint_data = generate_pose(
207
+ image,
208
+ use_openpose=use_openpose,
209
+ save_outputs=True,
210
+ include_hands=include_hands,
211
+ include_face=include_face
212
+ )
213
+
214
+ pose_info = format_pose_output(joint_data)
215
+ return result, pose_info
216
+
217
+ # Gradio UI
218
+ demo = gr.Interface(
219
+ fn=process_and_display,
220
+ inputs=[
221
+ gr.Image(type="pil", label="Upload Image"),
222
+ gr.Checkbox(value=True, label="Use OpenPose (default: true)"),
223
+ gr.Checkbox(value=False, label="Include Hands"),
224
+ gr.Checkbox(value=False, label="Include Face"),
225
+ ],
226
+ outputs=[
227
+ gr.Image(type="pil", label="Pose Output"),
228
+ gr.Textbox(label="Pose Data", lines=15)
229
+ ],
230
+ title="Pose Estimation and Export",
231
+ description="Generate full body pose including face and hands. Extracts and stores joint positions in CSV and JSON formats."
232
+ )
233
+
234
+ if __name__ == "__main__":
235
+ demo.launch(server_name="0.0.0.0", server_port=7860)
requirements.txt CHANGED
@@ -15,5 +15,3 @@ opencv-python>=4.10.0
15
 
16
  pytest==8.3.4
17
  pytest-cov==6.0.0
18
-
19
- controlnet-aux==0.0.6
 
15
 
16
  pytest==8.3.4
17
  pytest-cov==6.0.0