File size: 5,577 Bytes
319a292
 
 
f84a20c
319a292
 
f84a20c
319a292
f173552
319a292
 
 
 
 
 
 
 
 
f84a20c
319a292
09dbcd2
f84a20c
 
 
 
09dbcd2
f84a20c
 
 
 
 
 
 
 
 
09dbcd2
 
319a292
f84a20c
319a292
 
 
09dbcd2
319a292
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f84a20c
319a292
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f173552
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f84a20c
f173552
319a292
 
 
 
f173552
319a292
 
 
 
f173552
319a292
f173552
 
 
 
 
 
 
319a292
 
 
 
 
 
 
 
 
 
 
d84cd10
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import os
import re
import requests
import json
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from google.auth import exceptions
from google.cloud import storage
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from io import BytesIO
from dotenv import load_dotenv
import uvicorn

load_dotenv()

API_KEY = os.getenv("API_KEY")
GCS_BUCKET_NAME = os.getenv("GCS_BUCKET_NAME")
GOOGLE_APPLICATION_CREDENTIALS_JSON = os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON")
HF_API_TOKEN = os.getenv("HF_API_TOKEN")

def validate_bucket_name(bucket_name):
    if not isinstance(bucket_name, str):
        raise ValueError("Bucket name must be a string.")
    if len(bucket_name) < 3 or len(bucket_name) > 63:
        raise ValueError("Bucket name must be between 3 and 63 characters long.")
    if not re.match(r"^[a-z0-9][a-z0-9\-\.]*[a-z0-9]$", bucket_name):
        raise ValueError(
            f"Invalid bucket name '{bucket_name}'. Bucket names must:"
            " - Use only lowercase letters, numbers, hyphens (-), and periods (.)"
            " - Start and end with a letter or number."
        )
    if "--" in bucket_name or ".." in bucket_name or ".-" in bucket_name or "-." in bucket_name:
        raise ValueError(
            f"Invalid bucket name '{bucket_name}'. Bucket names cannot contain consecutive periods, hyphens, or use '.-' or '-.'"
        )
    return bucket_name

try:
    GCS_BUCKET_NAME = validate_bucket_name(GCS_BUCKET_NAME)
    credentials_info = json.loads(GOOGLE_APPLICATION_CREDENTIALS_JSON)
    storage_client = storage.Client.from_service_account_info(credentials_info)
    bucket = storage_client.bucket(GCS_BUCKET_NAME)
except (exceptions.DefaultCredentialsError, json.JSONDecodeError, KeyError, ValueError) as e:
    print(f"Error al cargar credenciales o bucket: {e}")
    exit(1)

app = FastAPI()

class DownloadModelRequest(BaseModel):
    model_name: str
    pipeline_task: str
    input_text: str

class GCSStreamHandler:
    def __init__(self, bucket_name):
        self.bucket = storage_client.bucket(bucket_name)

    def file_exists(self, blob_name):
        return self.bucket.blob(blob_name).exists()

    def stream_file_from_gcs(self, blob_name):
        blob = self.bucket.blob(blob_name)
        if not blob.exists():
            raise HTTPException(status_code=404, detail=f"File '{blob_name}' not found in GCS.")
        return blob.download_as_bytes()

    def upload_file_to_gcs(self, blob_name, data_stream):
        blob = self.bucket.blob(blob_name)
        blob.upload_from_file(data_stream)

    def ensure_bucket_structure(self, model_prefix):
        required_files = ["config.json", "tokenizer.json"]
        for filename in required_files:
            blob_name = f"{model_prefix}/{filename}"
            if not self.file_exists(blob_name):
                self.bucket.blob(blob_name).upload_from_string("{}", content_type="application/json")

    def stream_model_files(self, model_prefix, model_patterns):
        model_files = {}
        for pattern in model_patterns:
            blobs = list(self.bucket.list_blobs(prefix=f"{model_prefix}/"))
            for blob in blobs:
                if re.match(pattern, blob.name.split('/')[-1]):
                    model_files[blob.name.split('/')[-1]] = BytesIO(blob.download_as_bytes())
        return model_files

def download_model_from_huggingface(model_name):
    file_patterns = [
        "pytorch_model.bin",
        "model.safetensors",
        "config.json",
        "tokenizer.json",
    ]
    for i in range(1, 100):
        file_patterns.append(f"pytorch_model-{i:05}-of-{100:05}")
        file_patterns.append(f"model-{i:05}")
    for filename in file_patterns:
        url = f"https://huggingface.co/{model_name}/resolve/main/{filename}"
        headers = {"Authorization": f"Bearer {HF_API_TOKEN}"}
        try:
            response = requests.get(url, headers=headers, stream=True)
            if response.status_code == 200:
                blob_name = f"{model_name}/{filename}"
                blob = bucket.blob(blob_name)
                blob.upload_from_file(BytesIO(response.content))
        except Exception as e:
            pass

@app.post("/predict/")
async def predict(request: DownloadModelRequest):
    try:
        gcs_handler = GCSStreamHandler(GCS_BUCKET_NAME)
        model_prefix = request.model_name
        model_patterns = [
            r"pytorch_model-\d+-of-\d+",
            r"model-\d+",
            r"pytorch_model.bin",
            r"model.safetensors",
        ]
        if not any(
            gcs_handler.file_exists(f"{model_prefix}/{pattern}") for pattern in model_patterns
        ):
            download_model_from_huggingface(model_prefix)
        model_files = gcs_handler.stream_model_files(model_prefix, model_patterns)
        config_stream = gcs_handler.stream_file_from_gcs(f"{model_prefix}/config.json")
        tokenizer_stream = gcs_handler.stream_file_from_gcs(f"{model_prefix}/tokenizer.json")
        model = AutoModelForCausalLM.from_pretrained(BytesIO(config_stream))
        tokenizer = AutoTokenizer.from_pretrained(BytesIO(tokenizer_stream))
        pipeline_task = request.pipeline_task
        pipeline_ = pipeline(pipeline_task, model=model, tokenizer=tokenizer)
        input_text = request.input_text
        result = pipeline_(input_text)
        return {"response": result}
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error: {e}")

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=7860)