import streamlit as st |
import os |
from PIL import Image, ImageOps |
from PIL import ImageDraw, ImageFont |
import random |
from io import BytesIO |
from requests.auth import HTTPBasicAuth |
import boto3 |
import json |
from datetime import datetime |
import requests |
remote_url = os.environ.get("remote_url") |
base_url = os.environ.get("base_url") |
authEmail = os.environ.get("authEmail") |
authPass = os.environ.get("authPass") |
aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID") |
aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY") |
aws_default_region = os.environ.get("AWS_DEFAULT_REGION") |
bucket_name = os.environ.get("bucket_name") |
user_id = os.environ.get("user_id") |
session = boto3.Session( |
aws_access_key_id=aws_access_key_id, |
aws_secret_access_key=aws_secret_access_key, |
region_name=aws_default_region |
) |
s3 = session.client('s3') |
def calculate_font_size(box, max_font_size=30, min_font_size=10, font_path="NotoSansKR-Medium.otf"): |
width = box[2] - box[0] |
font_size = int(max_font_size * width / 100) |
font_size = min(max_font_size, font_size) |
font_size = max(min_font_size, font_size) |
font = ImageFont.truetype(font_path, font_size) |
return font, font_size |
def estimate_text_size(text, font_size): |
avg_character_width = font_size * 0.5 |
text_width = len(text) * avg_character_width |
text_height = font_size |
return text_width, text_height |
def draw_annotated_box(draw, box, text, font_path="NotoSansKR-Medium.otf"): |
x1, y1, x2, y2 = box |
draw.rectangle([x1, y1, x2, y2], outline="green", width=3) |
font, font_size = calculate_font_size(box) |
text_width, text_height = estimate_text_size(text, font_size) |
reduced_text_height = int(text_height * 0.8) |
background_box = [x1, y1 - reduced_text_height - 2, x1 + text_width + 2, y1] |
draw.rectangle(background_box, fill="green") |
draw.text((x1, y1 - text_height), text, font=font, fill="white") |
def upload_to_s3(image_data, region): |
region = region.upper() |
path_prefix = f"image_{region}" |
s3 = boto3.client('s3') |
now = datetime.now() |
date_str = now.strftime("%Y%m%d") |
time_str = now.strftime("%H%M%S") |
file_name = f"{path_prefix}/{user_id}_{date_str}_{time_str}.jpeg" |
try: |
s3.put_object( |
Body=image_data, |
Bucket=bucket_name, |
Key=file_name, |
ContentType='image/jpeg' |
) |
return True |
except Exception as e: |
print(f"Error uploading to S3: {e}") |
return False |
def main(): |
st.title("π ANPR - Ver.1.0.leon") |
country = st.radio("π¦ Type of License", options=['Korean', 'EU']) |
st.write("π§ͺ Options") |
use_default_image = st.checkbox("Use default server image?", value=False) |
img_file_buffer = st.file_uploader("π¦ Upload your image", type=['jpg', 'jpeg', 'png']) if not use_default_image else None |
submit = st.button("Submit") |
show_image = True |
if show_image: |
font = ImageFont.truetype("NotoSansKR-Medium.otf", 30) |
if submit: |
if use_default_image or img_file_buffer is not None: |
gif_runner = st.image('wait-waiting.gif') |
if use_default_image: |
if country == 'Korean': |
image_choice = random.choice(['kr1.jpg']) |
image_choice = os.path.join('./', image_choice) |
image = Image.open(image_choice) |
elif country == 'EU': |
image_choice = random.choice(['eu1.jpg']) |
image_choice = os.path.join('./', image_choice) |
image = Image.open(image_choice) |
else: |
image = Image.open(img_file_buffer) |
image = ImageOps.exif_transpose(image) |
img_byte_arr = BytesIO() |
if image.mode == 'RGBA': |
rgb_image = image.convert('RGB') |
rgb_image.save(img_byte_arr, format='JPEG') |
else: |
image.save(img_byte_arr, format='JPEG') |
img_byte_arr.seek(0) |
region = "kr" |
if country == 'Korean': |
region = "kr" |
elif country == 'EU': |
region = "eu" |
auth_values = HTTPBasicAuth(authEmail, authPass) |
url = remote_url |
form_data = {'base_url': base_url, 'region':region} |
files = {'file': ('image.jpg', img_byte_arr, 'image/jpeg')} |
res = requests.post(url, headers={'Accept': 'application/json'}, |
files=files, |
data=form_data, |
auth=auth_values) |
img_byte_arr.seek(0) |
if use_default_image == False: |
if upload_to_s3(img_byte_arr.getvalue(), region): |
print("Upload successful!") |
img_byte_arr.close() |
if res.status_code == 200: |
output_res = res.json() |
if show_image: |
draw = ImageDraw.Draw(image) |
for item in output_res['results']: |
x1, y1, x2, y2 = item['ltrb'] |
ocr_text = item['ocr'] |
if show_image: |
draw_annotated_box(draw, (x1, y1, x2, y2), ocr_text) |
if show_image: |
st.image(image, caption='Annotated Image', use_column_width=True) |
st.header('π¦ Result Json') |
st.code(json.dumps(output_res, indent=4, ensure_ascii=False), language="json") |
else: |
error_detail = res.json().get("detail", "Unknown error") |
st.code(f"Failed to get result, Status Code: {res.status_code}, Detail: {error_detail}") |
gif_runner.empty() |
st.write("") |
if __name__ == "__main__": |
main() |