from typing import Union import asyncio import pytesseract from .config import IS_PROD, ORIGINS from .utils.mark import mark_image, get_url_image if not IS_PROD: # tessdata_dir_config = '--tessdata-dir "E:/Programming/python/image-demo/tessdata"' pytesseract.pytesseract.tesseract_cmd = ( "C:\Program Files\Tesseract-OCR\\tesseract.exe" ) from fastapi import FastAPI, UploadFile from fastapi.requests import Request from fastapi.responses import StreamingResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware import cv2 import io import numpy as np from .utils.cache import create_cache, retrieve_cache import time app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=ORIGINS, allow_credentials=True, allow_methods=["GET", "POST"], allow_headers=["*"], expose_headers=["*"], max_age=31536000, ) @app.post("/bulk-upload") async def read_bulk_upload(files: list[UploadFile] = []): if len(files) == 0: return JSONResponse(status_code=400, content={"message": "No files uploaded"}) loop = asyncio.get_event_loop() texts = await asyncio.gather( *[ loop.run_in_executor( None, pytesseract.image_to_string, cv2.imdecode( np.fromstring(await file.read(), np.uint8), cv2.IMREAD_COLOR ), "ben+eng", ) for file in files ] ) results = [ { "text": text, "file": files[idx].filename, } for idx, text in enumerate(texts) ] return {"results": results} @app.post("/image-to-text") async def read_image_to_text(file: UploadFile): data = await file.read() img = cv2.imdecode(np.fromstring(data, np.uint8), cv2.IMREAD_COLOR) loop = asyncio.get_event_loop() text = await loop.run_in_executor( None, pytesseract.image_to_string, img, "ben+eng", ) return {"text": text} @app.get("/marked-image") async def read_marked_image( req: Request, q: Union[str, None] = None, image_url: Union[str, None] = None, ): if image_url is None: return StreamingResponse(io.BytesIO(), media_type="image/jpeg") headers = { "Cache-Control": "public, max-age=31536000, s-maxage=864000", "Date": time.ctime(time.time()), "accept-ranges": "bytes", "Connection": "keep-alive", "CF-Cache-Status": "HIT", } cache_key = req.url.__str__() (stored_cache, age) = await retrieve_cache(cache_key) if stored_cache: return StreamingResponse( io.BytesIO(stored_cache), media_type="image/jpeg", headers={ **headers, "Content-Length": str(len(stored_cache)), "Age": str(age), }, ) (img, img_headers) = await get_url_image(image_url) if q is None or q == "": (_, image_data) = cv2.imencode(".jpg", img) return StreamingResponse( io.BytesIO(image_data.tobytes()), media_type="image/jpeg" ) resized = await mark_image(img, q) (_, image_data) = cv2.imencode(".jpg", resized) # cv2.imwrite("output.jpg", resized) img_headers.pop("Content-Type") img_headers.pop("Date") img_headers.pop("Server") headers = { **img_headers, **headers, "Content-Length": str(image_data.nbytes), } image_bytes = image_data.tobytes() await create_cache(image_bytes, cache_key, 864000) return StreamingResponse( io.BytesIO(image_bytes), media_type="image/jpeg", headers=headers, )