UFPs-Deploy / utils.py
Chayanat's picture
Create utils.py
2e02ac0 verified
import numpy as np
import pandas as pd
import torch
import json
import time
import firebase_admin
from firebase_admin import credentials, db
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
import pickle
import io
import base64
def prepare_input_data(data, sequence_length, input_size):
"""
เตรียมข้อมูลนำเข้าให้อยู่ในรูปแบบที่เหมาะสมสำหรับโมเดล GRU
Args:
data (numpy.ndarray): ข้อมูลนำเข้า
sequence_length (int): ความยาวของลำดับเวลา
input_size (int): จำนวนคุณลักษณะนำเข้า
Returns:
tensor: ข้อมูลในรูปแบบ [batch_size, sequence_length, input_size]
"""
# ตรวจสอบรูปร่างของข้อมูล
if len(data.shape) == 1: # ถ้าเป็น 1D array
# สมมติว่ามี input_size features ในแต่ละ timestep
data = data.reshape(-1, input_size)
# ตรวจสอบว่ามีข้อมูลพอสำหรับ sequence_length หรือไม่
if data.shape[0] < sequence_length:
# ถ้าไม่พอ ให้เพิ่มข้อมูลโดยการทำซ้ำข้อมูลแรก
repeats_needed = sequence_length - data.shape[0]
first_row = np.tile(data[0:1], (repeats_needed, 1))
data = np.vstack([first_row, data])
# ถ้ามีข้อมูลมากกว่า sequence_length ให้ใช้แค่ sequence_length ล่าสุด
if data.shape[0] > sequence_length:
data = data[-sequence_length:]
# เพิ่มมิติ batch_size (=1)
data = data.reshape(1, sequence_length, -1)
return torch.FloatTensor(data)
def create_sequences(data, seq_length):
"""
สร้างลำดับ (sequences) จากข้อมูล
Args:
data (numpy.ndarray): ข้อมูลต้นฉบับ
seq_length (int): ความยาวของลำดับเวลา
Returns:
numpy.ndarray: ข้อมูลในรูปแบบลำดับเวลา
"""
xs = []
for i in range(len(data) - seq_length + 1):
x = data[i:(i + seq_length)]
xs.append(x)
return np.array(xs)
def init_firebase(credentials_json, database_url):
"""
เริ่มต้นการเชื่อมต่อกับ Firebase
Args:
credentials_json (str): ข้อมูล JSON ของ Firebase credentials
database_url (str): URL ของ Firebase Realtime Database
Returns:
bool: True ถ้าเชื่อมต่อสำเร็จ, False ถ้าไม่สำเร็จ
"""
if not firebase_admin._apps:
try:
# แปลง JSON string เป็น dictionary
cred_dict = json.loads(credentials_json)
cred = credentials.Certificate(cred_dict)
firebase_admin.initialize_app(cred, {
'databaseURL': database_url
})
return True
except Exception as e:
print(f"เกิดข้อผิดพลาดในการเชื่อมต่อกับ Firebase: {str(e)}")
return False
return True
def get_data_from_firebase(ref_path='input_data'):
"""
ดึงข้อมูลจาก Firebase Realtime Database
Args:
ref_path (str): พาธสำหรับดึงข้อมูลจาก Firebase
Returns:
dict/list: ข้อมูลที่ดึงมาจาก Firebase
"""
try:
ref = db.reference(ref_path)
data = ref.get()
return data
except Exception as e:
print(f"เกิดข้อผิดพลาดในการดึงข้อมูลจาก Firebase: {str(e)}")
return None
def save_data_to_firebase(data, ref_path='prediction_results'):
"""
บันทึกข้อมูลลงใน Firebase Realtime Database
Args:
data (dict/list): ข้อมูลที่ต้องการบันทึก
ref_path (str): พาธสำหรับบันทึกข้อมูลลงใน Firebase
Returns:
bool: True ถ้าบันทึกสำเร็จ, False ถ้าไม่สำเร็จ
"""
try:
ref = db.reference(ref_path)
ref.set(data)
return True
except Exception as e:
print(f"เกิดข้อผิดพลาดในการบันทึกข้อมูลลงใน Firebase: {str(e)}")
return False
def load_scalers_and_encoders(model_path):
"""
โหลด scalers และ encoders จากไฟล์โมเดล
Args:
model_path (str): พาธไปยังไฟล์โมเดล
Returns:
tuple: (numeric_scaler, label_encoders, y_scaler)
"""
try:
checkpoint = torch.load(model_path, map_location='cpu')
# ตรวจสอบแต่ละกรณี
numeric_scaler = None
label_encoders = None
y_scaler = None
if isinstance(checkpoint, dict):
# กรณีที่มี key โดยตรง
numeric_scaler = checkpoint.get('numeric_scaler')
label_encoders = checkpoint.get('label_encoders')
y_scaler = checkpoint.get('y_scaler')
# กรณีที่เก็บไว้ใน key อื่น
if numeric_scaler is None and 'scalers' in checkpoint:
numeric_scaler = checkpoint['scalers'].get('numeric_scaler')
if y_scaler is None and 'scalers' in checkpoint:
y_scaler = checkpoint['scalers'].get('y_scaler')
if label_encoders is None and 'encoders' in checkpoint:
label_encoders = checkpoint['encoders'].get('label_encoders')
return numeric_scaler, label_encoders, y_scaler
except Exception as e:
print(f"เกิดข้อผิดพลาดในการโหลด scalers และ encoders: {str(e)}")
return None, None, None
def create_default_scaler():
"""
สร้าง MinMaxScaler เริ่มต้น
"""
scaler = MinMaxScaler(feature_range=(0, 1))
# กำหนดค่า min และ max เริ่มต้น
scaler.min_ = np.zeros(1)
scaler.scale_ = np.ones(1)
scaler.data_min_ = np.zeros(1)
scaler.data_max_ = np.ones(1)
scaler.data_range_ = np.ones(1)
scaler.n_samples_seen_ = 1
return scaler
def create_default_encoders(n_categories=2):
"""
สร้าง LabelEncoder เริ่มต้น
"""
encoders = []
for i in range(n_categories):
le = LabelEncoder()
# กำหนดค่าเริ่มต้น
le.classes_ = np.array(['class0', 'class1'])
encoders.append(le)
return encoders
def preprocess_data(data, numeric_features, categorical_features, numeric_scaler, label_encoders):
"""
ประมวลผลข้อมูลก่อนการทำนาย
Args:
data (dict/list): ข้อมูลนำเข้า
numeric_features (list): รายชื่อคุณลักษณะตัวเลข
categorical_features (list): รายชื่อคุณลักษณะเชิงกลุ่ม
numeric_scaler (MinMaxScaler): scaler สำหรับข้อมูลตัวเลข
label_encoders (list): encoders สำหรับข้อมูลเชิงกลุ่ม
Returns:
numpy.ndarray: ข้อมูลที่ผ่านการประมวลผลแล้ว
"""
try:
# ตรวจสอบรูปแบบข้อมูล
if isinstance(data, list) and all(isinstance(item, dict) for item in data):
# กรณีที่ข้อมูลเป็นลิสต์ของ dict (หลาย timestep)
X_numeric = np.array([[item[feature] for feature in numeric_features] for item in data])
X_categorical = np.array([[item[feature] for feature in categorical_features] for item in data])
elif isinstance(data, dict):
# กรณีที่ข้อมูลเป็น dict เดียว (single timestep)
X_numeric = np.array([[data[feature] for feature in numeric_features]])
X_categorical = np.array([[data[feature] for feature in categorical_features]])
else:
raise ValueError("รูปแบบข้อมูลไม่ถูกต้อง ต้องเป็น dict หรือ list ของ dict")
# ตรวจสอบ scaler และ encoders
if numeric_scaler is None:
print("Warning: ไม่พบ numeric_scaler จะสร้างใหม่")
numeric_scaler = create_default_scaler()
if label_encoders is None or len(label_encoders) != len(categorical_features):
print("Warning: label_encoders ไม่ถูกต้อง จะสร้างใหม่")
label_encoders = create_default_encoders(len(categorical_features))
# ปรับสเกลข้อมูลตัวเลข
X_numeric_scaled = numeric_scaler.transform(X_numeric)
# Encode ข้อมูลเชิงกลุ่ม
X_categorical_encoded = []
for i, encoder in enumerate(label_encoders):
try:
# พยายาม transform ข้อมูล
encoded_col = encoder.transform(X_categorical[:, i])
except (ValueError, IndexError) as e:
# ถ้าเกิดข้อผิดพลาด (เช่น พบค่าที่ไม่เคยเห็น)
print(f"Warning: เกิดข้อผิดพลาดในการ encode คุณลักษณะที่ {i}: {str(e)}")
print(f"จะใช้ค่า 0 แทน")
# ใช้ค่า 0 แทน
encoded_col = np.zeros(X_categorical.shape[0], dtype=np.int64)
X_categorical_encoded.append(encoded_col)
# รวมข้อมูล
X_categorical_encoded = np.column_stack(X_categorical_encoded) if X_categorical_encoded else np.array([])
if X_categorical_encoded.size > 0:
# ถ้ามีข้อมูลเชิงกลุ่ม ให้รวมกับข้อมูลตัวเลข
X_encoded = np.concatenate([X_numeric_scaled, X_categorical_encoded], axis=1)
else:
# ถ้าไม่มีข้อมูลเชิงกลุ่ม ใช้เฉพาะข้อมูลตัวเลข
X_encoded = X_numeric_scaled
return X_encoded
except Exception as e:
print(f"เกิดข้อผิดพลาดในการประมวลผลข้อมูล: {str(e)}")
raise e
def get_file_download_link(data, filename, text="Download File"):
"""
สร้างลิงก์สำหรับดาวน์โหลดไฟล์
Args:
data: ข้อมูลที่ต้องการให้ดาวน์โหลด
filename (str): ชื่อไฟล์
text (str): ข้อความที่แสดงบนลิงก์
Returns:
str: HTML ลิงก์สำหรับดาวน์โหลด
"""
b64 = base64.b64encode(data).decode()
href = f'<a href="data:application/octet-stream;base64,{b64}" download="{filename}">{text}</a>'
return href
def save_scaler_to_bytes(scaler):
"""
แปลง scaler เป็น bytes สำหรับดาวน์โหลด
"""
bytes_io = io.BytesIO()
pickle.dump(scaler, bytes_io)
bytes_io.seek(0)
return bytes_io.read()
def save_encoders_to_bytes(encoders):
"""
แปลง encoders เป็น bytes สำหรับดาวน์โหลด
"""
bytes_io = io.BytesIO()
pickle.dump(encoders, bytes_io)
bytes_io.seek(0)
return bytes_io.read()