clifs / app.py
ncoop57
add initial code
021b099
raw
history blame
No virus
2.5 kB
from torch._C import device
import ffmpeg
import youtube_dl
import numpy as np
from PIL import Image
import requests
import torch
from sentence_transformers import SentenceTransformer, util, models
from clip import CLIPModel
# from sentence_transformers.models import CLIPModel
from PIL import Image
clip = CLIPModel()
model = SentenceTransformer(modules=[clip]).to(dtype=torch.float32, device=torch.device('cpu'))
def get_embedding(query, video):
text_emb = model.encode(query, device='cpu')
# Encode an image:
images = []
for img in video:
images.append(Image.fromarray(img))
img_embs = model.encode(images, device='cpu')
return text_emb, img_embs
# # Encode an image:
# url = "http://images.cocodataset.org/val2017/000000039769.jpg"
# img = Image.fromarray(np.array(Image.open(requests.get(url, stream=True).raw))).convert('RGB')
# img_emb = model.encode([img, img], device='cpu')
# # Encode text descriptions
# text_emb = model.encode(['Two dogs in the snow', 'Two cats laying on a sofa',
# 'A picture of London at night'], device='cpu')
# # Compute cosine similarities
# cos_scores = util.cos_sim(img_emb, text_emb)
# print(cos_scores)
def my_hook(d):
if d['status'] == 'finished':
print(d)
print('Done downloading, now extracting frames ...')
probe = ffmpeg.probe(d["filename"])
video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)
width = int(video_stream['width'])
height = int(video_stream['height'])
out, _ = (
ffmpeg
.input(d["filename"])
.output('pipe:', format='rawvideo', pix_fmt='rgb24')
.run(capture_stdout=True)
)
video = (
np
.frombuffer(out, np.uint8)
.reshape([-1, height, width, 3])
)[::10]
print(video.shape)
txt_embd, img_embds = get_embedding("two white puppies", video)
cos_scores = util.cos_sim(txt_embd, img_embds)
print(cos_scores)
ydl_opts = {"format": "mp4", "progress_hooks": [my_hook], }
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
ydl.download(['https://youtu.be/I3AaW9ZevIU'])
# # out, _ = (
# # ffmpeg
# # .input('in.mp4')
# # .output('pipe:', format='rawvideo', pix_fmt='rgb24')
# # .run(capture_stdout=True)
# # )
# # video = (
# # np
# # .frombuffer(out, np.uint8)
# # .reshape([-1, height, width, 3])
# )