|
import sys |
|
|
|
import os |
|
import base64 |
|
import json |
|
import uuid |
|
import cv2 |
|
import numpy as np |
|
import gradio as gr |
|
import shutil |
|
from time import gmtime, strftime |
|
from pydantic import BaseModel |
|
from fastapi import FastAPI, File, UploadFile |
|
from fastapi.responses import JSONResponse |
|
from typing import Dict |
|
|
|
from engine.header import * |
|
|
|
file_path = os.path.abspath(__file__) |
|
root_path = os.path.dirname(file_path) |
|
dump_path = os.path.join(root_path, "dump2/") |
|
|
|
device_id = get_deviceid().decode('utf-8') |
|
print_info('\t <Hardware ID> \t\t {}'.format(device_id)) |
|
|
|
def activate_sdk(): |
|
online_key = os.environ.get("LICENSE_KEY") |
|
offline_key_path = os.path.join(root_path, "license.txt") |
|
|
|
dict_path = os.path.join(root_path, "engine/bin") |
|
|
|
ret = -1 |
|
if online_key is None: |
|
print_warning("Online license key not found!") |
|
else: |
|
activate_ret = set_activation(online_key.encode('utf-8')).decode('utf-8') |
|
ret = json.loads(activate_ret).get("errorCode", None) |
|
|
|
if ret == 0: |
|
print_log("Successfully online activation SDK!") |
|
else: |
|
print_error(f"Failed to online activation SDK, Error code {ret}\n Trying offline activation SDK..."); |
|
if os.path.exists(offline_key_path) is False: |
|
print_warning("Offline license key file not found!") |
|
print_error(f"Falied to offline activation SDK, Error code {ret}") |
|
return ret |
|
else: |
|
file=open(offline_key_path,"r") |
|
offline_key = file.read() |
|
file.close() |
|
activate_ret = set_activation(offline_key.encode('utf-8')).decode('utf-8') |
|
ret = json.loads(activate_ret).get("errorCode", None) |
|
if ret == 0: |
|
print_log("Successfully offline activation SDK!") |
|
else: |
|
print_error(f"Falied to offline activation SDK, Error code {ret}") |
|
return ret |
|
|
|
init_ret = init_sdk(dict_path.encode('utf-8')).decode('utf-8') |
|
ret = json.loads(activate_ret).get("errorCode", None) |
|
print_log(f"Init SDK: {ret}") |
|
sys.stdout.flush() |
|
return ret |
|
|
|
|
|
async def save_upload_file(upload_file: UploadFile) -> str: |
|
file_name = uuid.uuid4().hex[:6] + "_" + upload_file.filename |
|
save_path = os.path.join(dump_path, file_name) |
|
with open(save_path, "wb") as buffer: |
|
shutil.copyfileobj(upload_file.file, buffer) |
|
return os.path.abspath(save_path) |
|
|
|
async def save_base64_file(base64_string: str) -> str: |
|
file_name = uuid.uuid4().hex[:6] + ".jpg" |
|
save_path = os.path.join(dump_path, file_name) |
|
|
|
with open(save_path, "wb") as buffer: |
|
buffer.write(base64.b64decode(base64_string)) |
|
|
|
return os.path.abspath(save_path) |
|
|
|
app = FastAPI() |
|
|
|
class ImageBase64Request(BaseModel): |
|
image: str |
|
|
|
@app.get("/") |
|
def read_root(): |
|
return {"status": "API is running"} |
|
|
|
@app.post("/api/read_idcard") |
|
async def read_idcard(image: UploadFile = File(...), image2: UploadFile = File(None)): |
|
try: |
|
file_path1 = await save_upload_file(image) |
|
|
|
file_path2 = "" |
|
if image2 is not None: |
|
file_path2 = await save_upload_file(image2) |
|
|
|
|
|
ocrResult = ocr_id_card(file_path1.encode('utf-8'), file_path2.encode('utf-8')) |
|
ocrResDict = json.loads(ocrResult) |
|
|
|
os.remove(file_path1) |
|
if file_path2: |
|
os.remove(file_path2) |
|
|
|
return JSONResponse(content={"status": "ok", "data": ocrResDict}, status_code=200) |
|
|
|
except Exception as e: |
|
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) |
|
|
|
|
|
@app.post("/api/read_idcard_base64") |
|
async def read_idcard_base64(request_data: ImageBase64Request): |
|
try: |
|
file_path = await save_base64_file(request_data.image) |
|
|
|
|
|
ocr_result = ocr_id_card(file_path.encode('utf-8'), ''.encode('utf-8')) |
|
ocr_res_dict = json.loads(ocr_result) |
|
|
|
|
|
os.remove(file_path) |
|
|
|
return JSONResponse(content={"status": "ok", "data": ocr_res_dict}, status_code=200) |
|
|
|
except Exception as e: |
|
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) |
|
|
|
@app.post("/api/read_credit") |
|
async def read_credit(image: UploadFile = File(...)): |
|
try: |
|
file_path = await save_upload_file(image) |
|
|
|
ocrResult = ocr_credit_card(file_path.encode('utf-8')) |
|
ocrResDict = json.loads(ocrResult) |
|
|
|
os.remove(file_path) |
|
|
|
return JSONResponse(content={"status": "ok", "data": ocrResDict}, status_code=200) |
|
|
|
except Exception as e: |
|
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) |
|
|
|
@app.post("/api/read_credit_base64") |
|
async def read_credit_base64(request_data: ImageBase64Request): |
|
try: |
|
file_path = await save_base64_file(request_data.image) |
|
|
|
|
|
ocr_result = ocr_credit_card(file_path.encode('utf-8')) |
|
ocr_res_dict = json.loads(ocr_result) |
|
|
|
|
|
os.remove(file_path) |
|
|
|
return JSONResponse(content={"status": "ok", "data": ocr_res_dict}, status_code=200) |
|
|
|
except Exception as e: |
|
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) |
|
|
|
|
|
@app.post("/api/read_barcode") |
|
async def read_barcode(image: UploadFile = File(...)): |
|
try: |
|
file_path = await save_upload_file(image) |
|
|
|
ocrResult = ocr_barcode(file_path.encode('utf-8')) |
|
ocrResDict = json.loads(ocrResult) |
|
|
|
os.remove(file_path) |
|
|
|
return JSONResponse(content={"status": "ok", "data": ocrResDict}, status_code=200) |
|
|
|
except Exception as e: |
|
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) |
|
|
|
@app.post("/api/read_barcode_base64") |
|
async def read_barcode_base64(request_data: ImageBase64Request): |
|
try: |
|
file_path = await save_base64_file(request_data.image) |
|
|
|
|
|
ocr_result = ocr_barcode(file_path.encode('utf-8')) |
|
ocr_res_dict = json.loads(ocr_result) |
|
|
|
|
|
os.remove(file_path) |
|
|
|
return JSONResponse(content={"status": "ok", "data": ocr_res_dict}, status_code=200) |
|
|
|
except Exception as e: |
|
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) |
|
|
|
|
|
if __name__ == '__main__': |
|
ret = activate_sdk() |
|
if ret != 0: |
|
exit(-1) |
|
|
|
dummy_interface = gr.Interface( |
|
fn=lambda x: "API ready.", |
|
inputs=gr.Textbox(label="Info"), |
|
outputs=gr.Textbox(label="Response"), |
|
flagging_mode="auto" |
|
) |
|
|
|
gr_app = gr.mount_gradio_app(app, dummy_interface, path="/gradio") |
|
|
|
import uvicorn |
|
uvicorn.run(gr_app, host="0.0.0.0", port=7860) |
|
|
|
|