Spaces:
Configuration error
Configuration error
| import uuid | |
| from io import BytesIO | |
| from PIL import Image | |
| import numpy as np | |
| from qdrant_client.http.models import Filter, FieldCondition, MatchValue, ScrollRequest | |
| from qdrant_client.models import SearchParams | |
| from .clients import get_s3, get_qdrant, get_neo4j, get_s3_session | |
| from .config import S3_BUCKET, QDRANT_COLLECTION, AWS_REGION | |
| from .processing import embed_image_dino_large | |
| from .image_processing import encode_image_to_base64 | |
| def upload_image_to_s3(image_np: np.ndarray, key: str) -> str: | |
| pil = Image.fromarray(image_np) | |
| buf = BytesIO() | |
| pil.save(buf, format="PNG") | |
| buf.seek(0) | |
| get_s3().upload_fileobj(buf, S3_BUCKET, key, ExtraArgs={"ContentType":"image/png"}) | |
| # 3) build URL | |
| return f"https://{S3_BUCKET}.s3.{AWS_REGION}.amazonaws.com/{key}" | |
| def download_image_from_s3(key: str) -> np.ndarray: | |
| buf = BytesIO() | |
| get_s3().download_fileobj(S3_BUCKET, key, buf) | |
| buf.seek(0) | |
| pil = Image.open(buf).convert("RGB") | |
| return np.array(pil) | |
| from qdrant_client.http.models import PointStruct | |
| import uuid | |
| import numpy as np | |
| def add_vector_to_qdrant(vectors: dict, payload: dict, view_id: str = None): | |
| """ | |
| Add one or more named vectors to Qdrant. | |
| :param vectors: Dict of named vectors, e.g., {"text_embedding": np.ndarray, "image_embedding": np.ndarray} | |
| :param payload: Metadata dictionary | |
| :param view_id: Optional specific point ID | |
| :return: view_id used for storage | |
| """ | |
| if view_id is None: | |
| view_id = str(uuid.uuid4()) | |
| # Ensure vectors are converted to lists if they are numpy arrays | |
| vector_payload = {name: vec.tolist() if isinstance(vec, np.ndarray) else vec for name, vec in vectors.items()} | |
| pt = PointStruct(id=view_id, vector=vector_payload, payload=payload) | |
| get_qdrant().upsert(collection_name=QDRANT_COLLECTION, points=[pt]) | |
| return view_id | |
| def query_vector_db_by_mask(query_image: np.ndarray, k:int=5): | |
| embeddings = embed_image_dino_large(query_image) | |
| vec = ("dinov2_embedding", embeddings.tolist()) | |
| client = get_qdrant() | |
| results = client.search(QDRANT_COLLECTION, query_vector=vec, limit=k) | |
| return results | |
| def query_vector_db_by_image_embedding(embeddings: np.ndarray, k:int=5, house_id: str = None): | |
| client = get_qdrant() | |
| if house_id: | |
| # Filter by house_id if provided | |
| filter_condition = Filter( | |
| must=[ | |
| FieldCondition(key="house_id", match=MatchValue(value=house_id)) | |
| ] | |
| ) | |
| else: | |
| filter_condition = None | |
| # Search using the provided embeddings | |
| results = client.search(QDRANT_COLLECTION, | |
| query_vector=("dinov2_embedding", embeddings.tolist()), | |
| limit=k, | |
| query_filter=filter_condition, | |
| search_params=SearchParams(exact=True)) | |
| return results | |
| def query_vector_db_by_text_embedding(embeddings: np.ndarray, k:int=5, house_id: str = None): | |
| client = get_qdrant() | |
| if house_id: | |
| # Filter by house_id if provided | |
| filter_condition = Filter( | |
| must=[ | |
| FieldCondition(key="house_id", match=MatchValue(value=house_id)) | |
| ] | |
| ) | |
| else: | |
| filter_condition = None | |
| results = client.search(QDRANT_COLLECTION, | |
| query_vector=("clip_text_embedding", embeddings.tolist()), | |
| limit=k, | |
| query_filter=filter_condition) | |
| return results | |
| def add_object_to_neo4j(object_id, house_id, description, qdrant_object_id) -> None: | |
| with get_neo4j().session() as s: | |
| s.run( | |
| "MERGE (h:House {house_id:$house_id}) " | |
| "MERGE (o:Object {object_id:$object_id}) " | |
| "SET o.description=$desc, o.qdrant_object_id=$qdrant_object_id " | |
| "MERGE (h)-[:CONTAINS]->(o)", | |
| {"house_id":house_id, "object_id":object_id, | |
| "desc":description, "qdrant_object_id":qdrant_object_id} | |
| ) | |
| def get_object_info_from_graph(house_id: str, object_id: str) -> str: | |
| with get_neo4j().session() as s: | |
| result = s.run(""" | |
| MATCH (h:House {house_id: $household_id})-[:CONTAINS]->(o:Object {object_id: $object_id}) | |
| RETURN o.description AS description | |
| """, {"household_id": house_id, "object_id": object_id}) | |
| record = result.single() | |
| if record: | |
| return record.get("description") | |
| return None | |
| def set_object_primary_location_hierarchy( | |
| object_id: str, | |
| house_id: str, | |
| location_hierarchy: list # Example: ["Kitchen", "Left Upper Cabinet", "Middle Shelf"] | |
| ) -> None: | |
| with get_neo4j().session() as s: | |
| # Ensure the house node exists | |
| s.run( | |
| "MERGE (h:House {house_id: $house_id})", | |
| {"house_id": house_id} | |
| ) | |
| # Build nested location hierarchy | |
| parent_label = "House" | |
| parent_key = "house_id" | |
| parent_value = house_id | |
| for idx, location_name in enumerate(location_hierarchy): | |
| s.run( | |
| f""" | |
| MATCH (parent:{parent_label} {{{parent_key}: $parent_value}}) | |
| MERGE (loc:Location {{name: $location_name}}) | |
| MERGE (parent)-[:CONTAINS]->(loc) | |
| """, | |
| {"parent_value": parent_value, "location_name": location_name} | |
| ) | |
| parent_label = "Location" | |
| parent_key = "name" | |
| parent_value = location_name | |
| if location_hierarchy: | |
| final_location_name = location_hierarchy[-1] | |
| # Remove existing PRIMARY_LOCATION edges | |
| s.run( | |
| """ | |
| MATCH (o:Object {object_id: $object_id})-[r:PRIMARY_LOCATION]->(:Location) | |
| DELETE r | |
| """, | |
| {"object_id": object_id} | |
| ) | |
| # Add new PRIMARY_LOCATION edge | |
| s.run( | |
| """ | |
| MATCH (o:Object {object_id: $object_id}) | |
| MATCH (loc:Location {name: $location_name}) | |
| MERGE (o)-[:PRIMARY_LOCATION]->(loc) | |
| """, | |
| {"object_id": object_id, "location_name": final_location_name} | |
| ) | |
| def get_object_location_chain(house_id: str, object_id: str, include_images: bool = False): | |
| with get_neo4j().session() as session: | |
| # Find PRIMARY_LOCATION | |
| result = session.run( | |
| """ | |
| MATCH (h:House {house_id: $house_id}) | |
| -[:CONTAINS*]->(loc:Location)<-[:PRIMARY_LOCATION]-(o:Object {object_id: $object_id}) | |
| RETURN loc | |
| """, | |
| {"house_id": house_id, "object_id": object_id} | |
| ) | |
| record = result.single() | |
| if not record: | |
| return [] | |
| # Build location chain | |
| locations = [] | |
| current_name = record["loc"]["name"] | |
| while current_name: | |
| loc_record = session.run( | |
| """ | |
| MATCH (h:House {house_id: $house_id}) | |
| -[:CONTAINS*]->(loc:Location {name: $name}) | |
| RETURN loc | |
| """, | |
| {"house_id": house_id, "name": current_name} | |
| ).single() | |
| if not loc_record: | |
| break | |
| loc_node = loc_record["loc"] | |
| loc_info = { | |
| "name": loc_node["name"], | |
| "image_uri": loc_node.get("image_uri"), | |
| "location_x": loc_node.get("location_x"), | |
| "location_y": loc_node.get("location_y"), | |
| "location_z": loc_node.get("location_z"), | |
| "shape": loc_node.get("shape"), | |
| "radius": loc_node.get("radius"), | |
| "height": loc_node.get("height"), | |
| "width": loc_node.get("width"), | |
| "depth": loc_node.get("depth"), | |
| } | |
| # Optionally include actual image data | |
| if include_images and loc_node.get("image_uri"): | |
| try: | |
| img = download_image_from_s3(loc_node["image_uri"]) | |
| loc_info["image_base64"] = encode_image_to_base64(img) | |
| except Exception as e: | |
| loc_info["image"] = None # Optionally log or raise | |
| print(f"Warning: Failed to load image from S3 for {loc_node['name']}: {e}") | |
| locations.insert(0, loc_info) | |
| parent_record = session.run( | |
| """ | |
| MATCH (parent:Location)-[:CONTAINS]->(loc:Location {name: $name}) | |
| RETURN parent.name AS parent_name | |
| """, | |
| {"name": current_name} | |
| ).single() | |
| current_name = parent_record["parent_name"] if parent_record else None | |
| return locations | |
| def get_all_locations_for_house(house_id: str, include_images: bool = False): | |
| with get_neo4j().session() as session: | |
| result = session.run( | |
| """ | |
| MATCH (h:House {house_id: $house_id})-[:CONTAINS*]->(loc:Location) | |
| OPTIONAL MATCH (parent:Location)-[:CONTAINS]->(loc) | |
| RETURN loc, parent.name AS parent_name | |
| """, | |
| {"house_id": house_id} | |
| ) | |
| locations = [] | |
| for record in result: | |
| loc_node = record["loc"] | |
| parent_name = record["parent_name"] | |
| loc_info = { | |
| "name": loc_node["name"], | |
| "parents": [parent_name] if parent_name else [], | |
| "image_uri": loc_node.get("image_uri"), | |
| "location_x": loc_node.get("location_x"), | |
| "location_y": loc_node.get("location_y"), | |
| "location_z": loc_node.get("location_z") | |
| } | |
| if include_images and loc_node.get("image_uri"): | |
| try: | |
| img = download_image_from_s3(loc_node["image_uri"]) | |
| loc_info["image_base64"] = encode_image_to_base64(img) | |
| except Exception as e: | |
| print(f"Warning: Failed to load image for {loc_node['name']}: {e}") | |
| loc_info["image_base64"] = None | |
| locations.append(loc_info) | |
| return locations | |
| def get_object_owners(house_id: str, object_id: str): | |
| with get_neo4j().session() as session: | |
| result = session.run( | |
| """ | |
| MATCH (h:House {house_id: $house_id}) | |
| -[:CONTAINS*]->(o:Object {object_id: $object_id}) | |
| -[:OWNED_BY]->(p:Person) | |
| RETURN p | |
| """, | |
| {"house_id": house_id, "object_id": object_id} | |
| ) | |
| owners = [] | |
| for record in result: | |
| p = record["p"] | |
| owners.append({ | |
| "person_id": p.get("person_id"), | |
| "name": p.get("name"), | |
| "type": p.get("type", "person"), # Defaults to "person" if missing | |
| "image_uri": p.get("image_uri") | |
| }) | |
| return owners | |
| def add_owner_by_person_id(house_id: str, object_id: str, person_id: str): | |
| with get_neo4j().session() as session: | |
| result = session.run( | |
| """ | |
| MATCH (h:House {house_id: $house_id})-[:CONTAINS*]->(o:Object {object_id: $object_id}), | |
| (p:Person {person_id: $person_id}) | |
| MERGE (o)-[:OWNED_BY]->(p) | |
| RETURN p | |
| """, | |
| {"house_id": house_id, "object_id": object_id, "person_id": person_id} | |
| ) | |
| record = result.single() | |
| return record["p"] if record else None | |
| def add_owner_by_person_name(house_id: str, object_id: str, name: str, type: str = "person"): | |
| person_id = str(uuid.uuid4()) | |
| with get_neo4j().session() as session: | |
| result = session.run( | |
| """ | |
| MATCH (h:House {house_id: $house_id})-[:CONTAINS*]->(o:Object {object_id: $object_id}) | |
| CREATE (p:Person {person_id: $person_id, name: $name, type: $type}) | |
| MERGE (o)-[:OWNED_BY]->(p) | |
| RETURN p | |
| """, | |
| {"house_id": house_id, "object_id": object_id, "person_id": person_id, "name": name, "type": type} | |
| ) | |
| record = result.single() | |
| return record["p"] if record else None | |
| def get_object_details(house_id: str, object_id: str): | |
| """ | |
| Collects and returns: | |
| - Description from Neo4j | |
| - All image views from Qdrant and S3 | |
| - All text descriptions from Qdrant | |
| - Location hierarchy from Neo4j with S3-loaded images | |
| - Owners from Neo4j | |
| """ | |
| # Fetch description from Neo4j | |
| description = get_object_info_from_graph(house_id, object_id) | |
| # Fetch all vector points (images and texts) from Qdrant | |
| client = get_qdrant() | |
| all_points = [] | |
| offset = None | |
| while True: | |
| points, offset = client.scroll( | |
| collection_name=QDRANT_COLLECTION, | |
| scroll_filter=Filter( | |
| must=[ | |
| FieldCondition(key="object_id", match=MatchValue(value=object_id)) | |
| ] | |
| ), | |
| limit=100, | |
| offset=offset | |
| ) | |
| all_points.extend(points) | |
| if offset is None: | |
| break | |
| # Separate images and texts | |
| images = [] | |
| texts = [] | |
| for point in all_points: | |
| payload = point.payload | |
| if payload.get("type") == "image" and payload.get("image_url"): | |
| try: | |
| s3_key = payload["image_url"].replace(f"https://{S3_BUCKET}.s3.{AWS_REGION}.amazonaws.com/", "") | |
| img_np = download_image_from_s3(s3_key) | |
| images.append({"image": img_np, "url": payload["image_url"]}) | |
| except Exception as e: | |
| print(f"Failed to load image: {e}") | |
| elif payload.get("type") == "text" and payload.get("description"): | |
| texts.append(payload["description"]) | |
| # Fetch location hierarchy WITHOUT embedded images | |
| locations = get_object_location_chain(house_id, object_id, include_images=False) | |
| # Load images for each location if image_uri exists | |
| location_images = [] | |
| for loc in locations: | |
| uri = loc.get("image_uri") | |
| if uri: | |
| try: | |
| s3_key = uri.replace(f"https://{S3_BUCKET}.s3.{AWS_REGION}.amazonaws.com/", "") | |
| img_np = download_image_from_s3(s3_key) | |
| location_images.append(img_np) | |
| except Exception as e: | |
| print(f"Failed to load location image {uri}: {e}") | |
| # Fetch owners | |
| owners = get_object_owners(house_id, object_id) | |
| return { | |
| "description": description, | |
| "images": images, | |
| "texts": texts, | |
| "locations": locations, | |
| "location_images": location_images, | |
| "owners": owners | |
| } | |