File size: 5,990 Bytes
11f96c1
63df3f2
11f96c1
63df3f2
 
 
 
 
43c94a5
 
63df3f2
 
 
 
 
11f96c1
 
 
 
 
63df3f2
 
 
 
 
 
 
 
 
 
 
 
 
11f96c1
 
63df3f2
 
11f96c1
 
 
63df3f2
11f96c1
63df3f2
de5a712
63df3f2
 
 
 
 
 
 
 
 
 
 
11f96c1
63df3f2
de5a712
63df3f2
 
 
 
 
 
 
 
 
 
 
 
 
 
11f96c1
63df3f2
11f96c1
 
 
 
 
 
63df3f2
 
 
 
 
 
 
 
 
 
11f96c1
63df3f2
11f96c1
 
 
 
 
 
 
63df3f2
 
 
 
 
 
 
 
 
 
43c94a5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
63df3f2
 
de5a712
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from fastapi import FastAPI, HTTPException, Query, Path
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List
import json
import os
import logging
from txtai.embeddings import Embeddings
import pandas as pd
import glob

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(
    title="Embeddings API",
    description="An API for creating and querying text embeddings indexes.",
    version="1.0.0"
)

# Enable CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Allows all origins
    allow_credentials=True,
    allow_methods=["*"],  # Allows all methods
    allow_headers=["*"],  # Allows all headers
)

embeddings = Embeddings({"path": "avsolatorio/GIST-all-MiniLM-L6-v2"})

class DocumentRequest(BaseModel):
    index_id: str = Field(..., description="Unique identifier for the index")
    documents: List[str] = Field(..., description="List of documents to be indexed")

class QueryRequest(BaseModel):
    index_id: str = Field(..., description="Unique identifier for the index to query")
    query: str = Field(..., description="The search query")
    num_results: int = Field(..., description="Number of results to return", ge=1)

def save_embeddings(index_id: str, document_list: List[str]):
    try:
        folder_path = f"/app/indexes/{index_id}"
        os.makedirs(folder_path, exist_ok=True)
        # Save embeddings
        embeddings.save(f"{folder_path}/embeddings")
        # Save document_list
        with open(f"{folder_path}/document_list.json", "w") as f:
            json.dump(document_list, f)
        logger.info(f"Embeddings and document list saved for index_id: {index_id}")
    except Exception as e:
        logger.error(f"Error saving embeddings for index_id {index_id}: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Error saving embeddings: {str(e)}")

def load_embeddings(index_id: str) -> List[str]:
    try:
        folder_path = f"/app/indexes/{index_id}"
        if not os.path.exists(folder_path):
            logger.error(f"Index not found for index_id: {index_id}")
            raise HTTPException(status_code=404, detail="Index not found")
        # Load embeddings
        embeddings.load(f"{folder_path}/embeddings")
        # Load document_list
        with open(f"{folder_path}/document_list.json", "r") as f:
            document_list = json.load(f)
        logger.info(f"Embeddings and document list loaded for index_id: {index_id}")
        return document_list
    except Exception as e:
        logger.error(f"Error loading embeddings for index_id {index_id}: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Error loading embeddings: {str(e)}")

@app.post("/create_index/", response_model=dict, tags=["Index Operations"])
async def create_index(request: DocumentRequest):
    """
    Create a new index with the given documents.
    
    - **index_id**: Unique identifier for the index
    - **documents**: List of documents to be indexed
    """
    try:
        document_list = [(i, text, None) for i, text in enumerate(request.documents)]
        embeddings.index(document_list)
        save_embeddings(request.index_id, request.documents)  # Save the original documents
        logger.info(f"Index created successfully for index_id: {request.index_id}")
        return {"message": "Index created successfully"}
    except Exception as e:
        logger.error(f"Error creating index: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Error creating index: {str(e)}")

@app.post("/query_index/", response_model=dict, tags=["Index Operations"])
async def query_index(request: QueryRequest):
    """
    Query an existing index with the given search query.
    
    - **index_id**: Unique identifier for the index to query
    - **query**: The search query
    - **num_results**: Number of results to return
    """
    try:
        document_list = load_embeddings(request.index_id)
        results = embeddings.search(request.query, request.num_results)
        queried_texts = [document_list[idx[0]] for idx in results]
        logger.info(f"Query executed successfully for index_id: {request.index_id}")
        return {"queried_texts": queried_texts}
    except Exception as e:
        logger.error(f"Error querying index: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Error querying index: {str(e)}")

def process_csv_file(file_path):
    try:
        df = pd.read_csv(file_path)
        df_rows = df.apply(lambda row: ' '.join(row.values.astype(str)), axis=1)
        txtai_data = [(i, row, None) for i, row in enumerate(df_rows)]
        return txtai_data, df_rows.tolist()
    except Exception as e:
        logger.error(f"Error processing CSV file {file_path}: {str(e)}")
        return None, None

def check_and_index_csv_files():
    index_data_folder = "/app/index_data"
    if not os.path.exists(index_data_folder):
        logger.warning(f"index_data folder not found: {index_data_folder}")
        return

    csv_files = glob.glob(os.path.join(index_data_folder, "*.csv"))
    for csv_file in csv_files:
        index_id = os.path.splitext(os.path.basename(csv_file))[0]
        if not os.path.exists(f"/app/indexes/{index_id}"):
            logger.info(f"Processing CSV file: {csv_file}")
            txtai_data, documents = process_csv_file(csv_file)
            if txtai_data and documents:
                embeddings.index(txtai_data)
                save_embeddings(index_id, documents)
                logger.info(f"CSV file indexed successfully: {csv_file}")
            else:
                logger.warning(f"Failed to process CSV file: {csv_file}")
        else:
            logger.info(f"Index already exists for: {csv_file}")

@app.on_event("startup")
async def startup_event():
    check_and_index_csv_files()

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)