File size: 5,337 Bytes
c6a9f21
 
bcee635
c6a9f21
fb24ad1
c6a9f21
1654acc
c6a9f21
 
 
 
1654acc
 
c6a9f21
 
bcee635
c6a9f21
 
 
 
 
 
bcee635
 
c6a9f21
 
 
 
bcee635
 
c6a9f21
 
bcee635
 
 
 
 
 
 
 
 
 
 
 
 
c6a9f21
 
bcee635
 
c6a9f21
bcee635
 
 
c6a9f21
bcee635
c6a9f21
fb24ad1
 
bcee635
 
 
 
 
fb24ad1
 
 
bcee635
 
 
 
 
fb24ad1
c6a9f21
 
bcee635
 
c6a9f21
bcee635
1654acc
bcee635
 
 
1654acc
 
c6a9f21
 
 
bcee635
 
 
 
 
c6a9f21
 
bcee635
 
 
 
 
c6a9f21
 
 
 
 
bcee635
 
c6a9f21
7a0e77d
bcee635
fb24ad1
 
bcee635
fb24ad1
bcee635
 
 
 
fb24ad1
bcee635
fb24ad1
bcee635
 
 
 
 
 
fb24ad1
bcee635
1654acc
bcee635
 
 
 
 
 
1654acc
 
bcee635
1654acc
 
bcee635
 
1654acc
bcee635
1654acc
 
 
bcee635
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from fastapi.testclient import TestClient
from fastapi.routing import APIRoute
from app.routers.video import updateArtifact, createThumbnail, inferenceFrame
from app.main import app
from app.constants import deviceId
from app import db
import mmcv
import os
import pytest
import requests
import json
import cv2
import shutil
from google.cloud.firestore_v1.base_query import FieldFilter


def endpoints():
    endpoints = []
    for route in app.routes:
        if isinstance(route, APIRoute):
            endpoints.append(route.path)
    return endpoints


@pytest.fixture
def client():
    client = TestClient(app, "http://0.0.0.0:3000")
    yield client


@pytest.fixture
def user():
    url = (
        "https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword?key="
        + os.environ.get("FIREBASE_API_KEY")
    )

    payload = json.dumps(
        {
            "email": "test_video@gmail.com",
            "password": "testing",
            "returnSecureToken": True,
        }
    )
    headers = {"Content-Type": "application/json"}
    response = requests.request("POST", url, headers=headers, data=payload)
    data = response.json()
    user = {"id": data["localId"], "token": data["idToken"]}
    db.collection("user").document(user["id"]).set({"deviceId": deviceId})
    yield user
    db.collection("user").document(user["id"]).delete()


class TestVideoAPI:
    @pytest.mark.skipif("/video" not in endpoints(), reason="Route not defined")
    def test_video_API(self, user, client):
        # Test when no token is pass to route
        payload = {}
        files = [("file", ("demo.mp4", open("demo.mp4", "rb"), "video/mp4"))]
        headers = {}
        response = client.request(
            "POST", "video", headers=headers, data=payload, files=files
        )
        assert response.status_code == 403
        # Test when a dummy (not valid) token passed
        payload = {}
        files = [("file", ("demo.mp4", open("demo.mp4", "rb"), "video/mp4"))]
        headers = {"Authorization": "Bearer saikoljncaskljnfckjnasckjna"}
        response = client.request(
            "POST", "video", headers=headers, data=payload, files=files
        )
        assert response.status_code == 401
        # Test when sent file is not a video
        payload = {}
        files = [
            ("file", ("demo.jpg", open("demo.jpg", "rb"), "application/octet-stream"))
        ]
        headers = {"Authorization": "Bearer " + user["token"]}
        while True:
            response = client.request(
                "POST", "video", headers=headers, data=payload, files=files
            )
            if response.status_code != 401:
                break
        assert response.status_code == 400
        # Test when all requirements have been fulfilled
        payload = {}
        files = [("file", ("demo.mp4", open("demo.mp4", "rb"), "video/mp4"))]
        headers = {"Authorization": "Bearer " + user["token"]}
        response = client.request(
            "POST", "video", headers=headers, data=payload, files=files
        )
        assert response.status_code == 200
        artifactName = response.text
        docs = (
            db.collection("artifacts")
            .where(filter=FieldFilter("name", "==", artifactName))
            .stream()
        )
        index = 0
        for doc in docs:
            # For each document in docs. Verify name and status of the artifact
            index += 1
            data = doc.get().to_dict()
            assert data["name"] == artifactName
            assert data["status"] == "pending"
            assert index == 1
            doc.delete()

    def test_update_artifact(self):
        # Check and preprocess test data before testing
        test_artifact = db.collection("artifacts").document("test")
        if not test_artifact.get().exists:
            db.collection("artifacts").document("test").set(
                {"name": "test", "path": "", "status": "testing", "thumbnailURL": ""}
            )
            test_artifact = db.collection("artifacts").document("test")
        else:
            test_artifact.update({"status": "testing", "path": "", "thumbnailURL": ""})
        # Testing update on each field
        updateArtifact(test_artifact.id, {"status": "test_done"})
        assert (
            db.collection("artifacts").document("test").get().to_dict()["status"]
            == "test_done"
        )
        # Delete data for next time test
        test_artifact.delete()

    def test_inference_frame(self):
        if not os.path.exists("test_vid"):
            os.mkdir("test_vid")
        shutil.copyfile("demo.mp4", "test_vid/input.mp4")
        thumbnail = inferenceFrame("test_vid")
        assert os.path.exists("test_vid/out.mp4") and os.path.isfile("test_vid/out.mp4")
        vidcap = cv2.VideoCapture("test_vid/input.mp4")
        success, image = vidcap.read()
        if success:
            assert image.shape == thumbnail.shape
        vidcap.release()
        del vidcap
        shutil.rmtree("test_vid")

    def test_create_thumbnal(self):
        vidcap = cv2.VideoCapture("demo.mp4")
        success, image = vidcap.read()
        if success:
            createThumbnail(image, "")
            result = mmcv.imread("thumbnail.jpg", channel_order="rgb")
            assert result.shape == (160, 160, 3)