Spaces:
Sleeping
Sleeping
from typing import Union | |
import asyncio | |
import pytesseract | |
from .config import IS_PROD, ORIGINS | |
from .utils.mark import mark_image, get_url_image | |
if not IS_PROD: | |
# tessdata_dir_config = '--tessdata-dir "E:/Programming/python/image-demo/tessdata"' | |
pytesseract.pytesseract.tesseract_cmd = ( | |
"C:\Program Files\Tesseract-OCR\\tesseract.exe" | |
) | |
from fastapi import FastAPI, UploadFile | |
from fastapi.requests import Request | |
from fastapi.responses import StreamingResponse, JSONResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
import cv2 | |
import io | |
import numpy as np | |
from .utils.cache import create_cache, retrieve_cache | |
import time | |
app = FastAPI() | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=ORIGINS, | |
allow_credentials=True, | |
allow_methods=["GET", "POST"], | |
allow_headers=["*"], | |
expose_headers=["*"], | |
max_age=31536000, | |
) | |
async def read_bulk_upload(files: list[UploadFile] = []): | |
if len(files) == 0: | |
return JSONResponse(status_code=400, content={"message": "No files uploaded"}) | |
loop = asyncio.get_event_loop() | |
texts = await asyncio.gather( | |
*[ | |
loop.run_in_executor( | |
None, | |
pytesseract.image_to_string, | |
cv2.imdecode( | |
np.fromstring(await file.read(), np.uint8), cv2.IMREAD_COLOR | |
), | |
"ben+eng", | |
) | |
for file in files | |
] | |
) | |
results = [ | |
{ | |
"text": text, | |
"file": files[idx].filename, | |
} | |
for idx, text in enumerate(texts) | |
] | |
return {"results": results} | |
async def read_image_to_text(file: UploadFile): | |
data = await file.read() | |
img = cv2.imdecode(np.fromstring(data, np.uint8), cv2.IMREAD_COLOR) | |
loop = asyncio.get_event_loop() | |
text = await loop.run_in_executor( | |
None, | |
pytesseract.image_to_string, | |
img, | |
"ben+eng", | |
) | |
return {"text": text} | |
async def read_marked_image( | |
req: Request, | |
q: Union[str, None] = None, | |
image_url: Union[str, None] = None, | |
): | |
if image_url is None: | |
return StreamingResponse(io.BytesIO(), media_type="image/jpeg") | |
headers = { | |
"Cache-Control": "public, max-age=31536000, s-maxage=864000", | |
"Date": time.ctime(time.time()), | |
"accept-ranges": "bytes", | |
"Connection": "keep-alive", | |
"CF-Cache-Status": "HIT", | |
} | |
cache_key = req.url.__str__() | |
(stored_cache, age) = await retrieve_cache(cache_key) | |
if stored_cache: | |
return StreamingResponse( | |
io.BytesIO(stored_cache), | |
media_type="image/jpeg", | |
headers={ | |
**headers, | |
"Content-Length": str(len(stored_cache)), | |
"Age": str(age), | |
}, | |
) | |
(img, img_headers) = await get_url_image(image_url) | |
if q is None or q == "": | |
(_, image_data) = cv2.imencode(".jpg", img) | |
return StreamingResponse( | |
io.BytesIO(image_data.tobytes()), media_type="image/jpeg" | |
) | |
resized = await mark_image(img, q) | |
(_, image_data) = cv2.imencode(".jpg", resized) | |
# cv2.imwrite("output.jpg", resized) | |
img_headers.pop("Content-Type") | |
img_headers.pop("Date") | |
img_headers.pop("Server") | |
headers = { | |
**img_headers, | |
**headers, | |
"Content-Length": str(image_data.nbytes), | |
} | |
image_bytes = image_data.tobytes() | |
await create_cache(image_bytes, cache_key, 864000) | |
return StreamingResponse( | |
io.BytesIO(image_bytes), | |
media_type="image/jpeg", | |
headers=headers, | |
) | |