Spaces:
Runtime error
Runtime error
from PIL import Image, ImageChops | |
from playwright.async_api import async_playwright, TimeoutError | |
from bs4 import BeautifulSoup | |
from io import BytesIO | |
import numpy as np | |
import httpx # Asynchronous HTTP client | |
from fastapi import FastAPI, HTTPException, File, UploadFile, Depends, Form | |
from pydantic import BaseModel | |
from fastapi.responses import FileResponse | |
import requests | |
app = FastAPI() | |
async def read_root(): | |
return {"message": "Comparing preview with live model"} | |
class LoginException(Exception): | |
"""Custom exception for login failure""" | |
def __init__(self, message: str, status_code: int = 420): | |
self.message = message | |
self.status_code = status_code | |
super().__init__(self.message) | |
# Pydantic model for the incoming JSON data | |
class UserData(BaseModel): | |
username_admin: str | |
password_admin: str | |
status: bool | |
intiaro_model_id: int | |
# Funkcja do pobierania obrazu z URL i zwracania jako obiekt PIL.Image | |
async def get_image_from_url(url): | |
try: | |
response = requests.get(url) | |
response.raise_for_status() # Sprawdza, czy nie ma b艂臋d贸w HTTP (np. 404, 500) | |
# Sprawdzenie, czy URL zwraca dane typu obraz | |
content_type = response.headers['Content-Type'] | |
if 'image' not in content_type: | |
raise ValueError(f"URL {url} nie zawiera obrazu. Typ zawarto艣ci: {content_type}") | |
# Przekazanie danych obrazu do PIL | |
image = Image.open(BytesIO(response.content)) | |
return image | |
except requests.exceptions.RequestException as e: | |
raise HTTPException(status_code=400, detail=f"Nie uda艂o si臋 pobra膰 obrazu: {str(e)}") | |
except (ValueError, Image.UnidentifiedImageError) as e: | |
raise HTTPException(status_code=400, detail=f"B艂膮d w pliku obrazu: {str(e)}") | |
# Funkcja do por贸wnania dw贸ch obraz贸w | |
async def compare_images(image1_url, image2_url): | |
# Fetch images asynchronously | |
img1 = await get_image_from_url(image1_url) | |
img2 = await get_image_from_url(image2_url) | |
# Compare images using ImageChops.difference | |
diff = ImageChops.difference(img1, img2) | |
# If there are differences, save and return the difference image path | |
if diff.getbbox(): | |
diff_path = "diff_image.png" | |
diff.save(diff_path) | |
return diff_path | |
else: | |
return None | |
async def admin_funct(user_data: UserData): | |
async with async_playwright() as playwright: | |
browser = await playwright.chromium.launch(headless=True) | |
context = await browser.new_context() | |
page = await context.new_page() | |
img_pc_link = await admin_img(user_data.username_admin, user_data.password_admin, user_data.intiaro_model_id, page) | |
img_slider = await portal_funct(page, user_data.status, user_data.intiaro_model_id) | |
diff_path = await compare_images(img_pc_link, img_slider) | |
# Close browser after use | |
await browser.close() | |
if diff_path: | |
return FileResponse(diff_path, media_type="image/png", filename="diff_image.png") | |
else: | |
return {"message": "No differences found between images."} | |
async def admin_img(username_admin, password_admin, intiaro_model_id, page): | |
try: | |
# Przej艣cie do strony logowania | |
await page.goto( | |
f'https://intiaro.agitest.pl/admin/catalog_catalog/productvariant/?q=intiaro_model_id%3D{intiaro_model_id}') | |
await page.wait_for_timeout(3300) | |
# Wype艂nianie pola username | |
try: | |
await page.fill('input[name="username"]', username_admin) | |
except TimeoutError: | |
raise LoginException("Nie uda艂o si臋 wpisa膰 nazwy w pole username") | |
# Wype艂nianie pola password | |
try: | |
await page.fill('input[name="password"]', password_admin) | |
except TimeoutError: | |
raise LoginException("Nie uda艂o si臋 wpisa膰 has艂a w pole password") | |
# Klikni臋cie przycisku submit | |
try: | |
await page.click('input[type=submit]') | |
except TimeoutError: | |
raise LoginException("Nie uda艂o si臋 klikn膮膰 przycisku 'submit'") | |
# Czekanie na nawigacj臋 po zalogowaniu | |
await page.wait_for_timeout(5000) | |
# Pobieranie tre艣ci HTML | |
html = await page.content() | |
soup = BeautifulSoup(html, 'html.parser') | |
# Znajdowanie linku do produktu w elemencie <th> | |
prod_link = soup.find('th', class_='field-id') | |
if not prod_link: | |
raise HTTPException(status_code=404, detail="Nie znaleziono produktu dla podanego ID.") | |
# Pobieranie href z tagu <a> | |
tag = prod_link.find('a') | |
if not tag: | |
raise HTTPException(status_code=404, detail="Nie znaleziono linku do produktu.") | |
# Pobieranie linku z atrybutu href | |
link = tag.get('href') | |
if not link: | |
raise HTTPException(status_code=404, detail="Link do produktu nie istnieje.") | |
# Budowanie pe艂nego URL, je艣li jest to 艣cie偶ka wzgl臋dna | |
full_link = f"https://intiaro.agitest.pl{link}" | |
await page.goto(full_link) | |
print(full_link) | |
except HTTPException as e: | |
# Obs艂uga specyficznych b艂臋d贸w zwi膮zanych z logowaniem lub formularzami | |
raise LoginException("Nie uda艂o si臋 zalogowa膰") | |
except Exception as e: | |
# Og贸lna obs艂uga b艂臋d贸w | |
raise HTTPException(status_code=500, detail=f"Wyst膮pi艂 niespodziewany b艂膮d: {str(e)}") | |
await page.wait_for_timeout(1500) | |
html = await page.content() | |
soup = BeautifulSoup(html, 'html.parser') | |
input_elements = soup.find_all('input', class_='vForeignKeyRawIdAdminField') | |
input_element = input_elements[0] | |
if input_element: | |
config_id = input_element.get('value') | |
else: | |
raise HTTPException(status_code=404, detail="Product configuration not found") | |
await page.goto(str('https://intiaro.agitest.pl/admin/catalog_catalog/productconfiguration/' + config_id)) | |
await page.wait_for_timeout(3000) | |
html = await page.content() | |
soup = BeautifulSoup(html, 'html.parser') | |
img_link = soup.find('p', class_='file-upload') | |
if not img_link: | |
raise HTTPException(status_code=404, detail="Product image not found link img") | |
tag = img_link.find('a') | |
img_pc_link = tag.get('href') | |
return img_pc_link | |
async def portal_funct(page, status, intiaro_model_id): | |
if status: | |
link_status = '' | |
else: | |
link_status = '&status=in_review' | |
await page.goto(f"https://libs.intiaro.com/tests/360/pages/sandbox_new.html?build-url=https://libs.intiaro.com/360/configurator/2.5&id={intiaro_model_id}{link_status}") | |
await page.wait_for_timeout(2000) | |
html = await page.content() | |
soup = BeautifulSoup(html, 'html.parser') | |
slider = soup.find('img', class_='slider-image') | |
img_slider = slider.get('src') | |
return img_slider | |