A7med-Ame3's picture
Upload 7 files
4fd9791 verified
"""
cli.py
──────
Command-line interface for ClearPath Scene Description.
Usage examples:
python cli.py --image photo.jpg
python cli.py --image photo.jpg --speak
python cli.py --video footage.mp4 --interval 3 --speak
python cli.py --camera --speak # live webcam loop (press q to quit)
"""
import argparse
import sys
import logging
import time
from PIL import Image
import cv2
from scene_captioner import SceneCaptioner
from safety_classifier import SafetyClassifier
from tts_engine import TTSEngine
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger(__name__)
# ── ANSI colours ──────────────────────────────────────────────────────────────
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
CYAN = "\033[96m"
BOLD = "\033[1m"
RESET = "\033[0m"
def print_result(caption: str, result, timestamp: str = ""):
print()
print("─" * 60)
if timestamp:
print(f"{CYAN}{timestamp}{RESET}")
print(f"{BOLD}📝 Caption:{RESET}")
print(f" {caption}")
print()
if result.is_dangerous:
print(f"{RED}{BOLD}⚠️ CLASSIFICATION : DANGEROUS{RESET}")
print(f"{RED} Hazard categories : {', '.join(result.hazards)}{RESET}")
print(f"{RED} Matched tokens : {', '.join(result.matches)}{RESET}")
else:
print(f"{GREEN}{BOLD}✅ CLASSIFICATION : SAFE{RESET}")
print("─" * 60)
print()
def main():
parser = argparse.ArgumentParser(
description="ClearPath — Real-Time Scene Description for Visually-Impaired People",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--image", metavar="PATH", help="Path to an image file")
group.add_argument("--video", metavar="PATH", help="Path to a video file")
group.add_argument("--camera", action="store_true", help="Use webcam (live loop)")
parser.add_argument("--speak", action="store_true", help="Read description aloud via TTS")
parser.add_argument("--interval", type=float, default=3.0,
help="Seconds between captures in video/camera mode (default: 3)")
parser.add_argument("--model", default=None,
help="Override Qwen model ID (e.g. Qwen/Qwen2-VL-7B-Instruct)")
args = parser.parse_args()
# ── Load modules ──────────────────────────────────────────────────────────
logger.info("Loading captioning model …")
captioner = SceneCaptioner(model_id=args.model) if args.model else SceneCaptioner()
classifier = SafetyClassifier()
tts = TTSEngine() if args.speak else None
def run(image: Image.Image, ts: str = ""):
caption = captioner.describe(image)
result = classifier.classify(caption)
print_result(caption, result, ts)
if tts:
prefix = "Danger detected. " if result.is_dangerous else "Safe. "
tts.speak(prefix + caption)
return caption, result
# ── Image mode ────────────────────────────────────────────────────────────
if args.image:
img = Image.open(args.image).convert("RGB")
run(img)
# ── Video mode ────────────────────────────────────────────────────────────
elif args.video:
cap = cv2.VideoCapture(args.video)
fps = cap.get(cv2.CAP_PROP_FPS) or 25
step = max(1, int(fps * args.interval))
idx = 0
print(f"{CYAN}Processing video — capturing every {args.interval}s …{RESET}")
while True:
ret, frame = cap.read()
if not ret:
break
if idx % step == 0:
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil = Image.fromarray(rgb)
ts = f"Frame {idx} / {round(idx / fps, 1)}s"
run(pil, ts)
idx += 1
cap.release()
print(f"{GREEN}Video processing complete.{RESET}")
# ── Camera (live) mode ────────────────────────────────────────────────────
elif args.camera:
cap = cv2.VideoCapture(0)
if not cap.isOpened():
sys.exit("❌ Could not open webcam.")
print(f"{CYAN}Live camera mode — capturing every {args.interval}s. Press Ctrl+C to quit.{RESET}")
try:
while True:
ret, frame = cap.read()
if not ret:
continue
cv2.imshow("ClearPath — press q to quit", frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil = Image.fromarray(rgb)
run(pil, ts=time.strftime("%H:%M:%S"))
time.sleep(args.interval)
except KeyboardInterrupt:
print("\nStopped.")
finally:
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()