Spaces:
Sleeping
Sleeping
File size: 3,728 Bytes
f40ca12 f879e6c f40ca12 5129d6c f879e6c f40ca12 5129d6c f40ca12 f879e6c 5129d6c f879e6c f40ca12 f0909a7 f40ca12 5129d6c f40ca12 5129d6c f40ca12 5129d6c f40ca12 5129d6c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
from typing import Union
import asyncio
import pytesseract
from .config import IS_PROD, ORIGINS
from .utils.mark import mark_image, get_url_image
if not IS_PROD:
# tessdata_dir_config = '--tessdata-dir "E:/Programming/python/image-demo/tessdata"'
pytesseract.pytesseract.tesseract_cmd = (
"C:\Program Files\Tesseract-OCR\\tesseract.exe"
)
from fastapi import FastAPI, UploadFile
from fastapi.requests import Request
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import cv2
import io
import numpy as np
from .utils.cache import create_cache, retrieve_cache
import time
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=ORIGINS,
allow_credentials=True,
allow_methods=["GET", "POST"],
allow_headers=["*"],
expose_headers=["*"],
max_age=31536000,
)
@app.post("/bulk-upload")
async def read_bulk_upload(files: list[UploadFile] = []):
if len(files) == 0:
return JSONResponse(status_code=400, content={"message": "No files uploaded"})
loop = asyncio.get_event_loop()
texts = await asyncio.gather(
*[
loop.run_in_executor(
None,
pytesseract.image_to_string,
cv2.imdecode(
np.fromstring(await file.read(), np.uint8), cv2.IMREAD_COLOR
),
"ben+eng",
)
for file in files
]
)
results = [
{
"text": text,
"file": files[idx].filename,
}
for idx, text in enumerate(texts)
]
return {"results": results}
@app.post("/image-to-text")
async def read_image_to_text(file: UploadFile):
data = await file.read()
img = cv2.imdecode(np.fromstring(data, np.uint8), cv2.IMREAD_COLOR)
loop = asyncio.get_event_loop()
text = await loop.run_in_executor(
None,
pytesseract.image_to_string,
img,
"ben+eng",
)
return {"text": text}
@app.get("/marked-image")
async def read_marked_image(
req: Request,
q: Union[str, None] = None,
image_url: Union[str, None] = None,
):
if image_url is None:
return StreamingResponse(io.BytesIO(), media_type="image/jpeg")
headers = {
"Cache-Control": "public, max-age=31536000, s-maxage=864000",
"Date": time.ctime(time.time()),
"accept-ranges": "bytes",
"Connection": "keep-alive",
"CF-Cache-Status": "HIT",
}
cache_key = req.url.__str__()
(stored_cache, age) = await retrieve_cache(cache_key)
if stored_cache:
return StreamingResponse(
io.BytesIO(stored_cache),
media_type="image/jpeg",
headers={
**headers,
"Content-Length": str(len(stored_cache)),
"Age": str(age),
},
)
(img, img_headers) = await get_url_image(image_url)
if q is None or q == "":
(_, image_data) = cv2.imencode(".jpg", img)
return StreamingResponse(
io.BytesIO(image_data.tobytes()), media_type="image/jpeg"
)
resized = await mark_image(img, q)
(_, image_data) = cv2.imencode(".jpg", resized)
# cv2.imwrite("output.jpg", resized)
img_headers.pop("Content-Type")
img_headers.pop("Date")
img_headers.pop("Server")
headers = {
**img_headers,
**headers,
"Content-Length": str(image_data.nbytes),
}
image_bytes = image_data.tobytes()
await create_cache(image_bytes, cache_key, 864000)
return StreamingResponse(
io.BytesIO(image_bytes),
media_type="image/jpeg",
headers=headers,
)
|