lora / 0_prepare_data.py
Dmitriy-Egorov's picture
Update 0_prepare_data.py
f5de412 verified
import pandas as pd
import config as cfg
import json # Используем json вместо pandas.io.json для большей стандартности
def get_target_signal(current_close, next_day_close, threshold):
"""
Определяет сигнал (BUY, SELL, HOLD) на основе next_day_close.
"""
if pd.isna(next_day_close) or pd.isna(current_close) or current_close == 0:
return "HOLD" # Недостаточно данных или некорректные данные
price_change_ratio = (next_day_close - current_close) / current_close
if price_change_ratio > threshold:
return "BUY"
elif price_change_ratio < -threshold:
return "SELL"
else:
return "HOLD"
def format_input_for_llm(row, all_indicator_columns):
"""
Форматирует текущее состояние рынка и индикаторы в текстовый промпт для LLM.
"""
# Базовая информация
prompt_parts = [
f"Current open: {row['open']:.2f}",
f"high: {row['high']:.2f}",
f"low: {row['low']:.2f}",
f"close: {row['close']:.2f}",
f"volume: {row['volume']:.0f}."
]
# Добавляем все остальные индикаторы
indicator_descs = []
for col in all_indicator_columns:
# Исключаем уже добавленные и целевую переменную
if col not in ['date', 'open', 'high', 'low', 'close', 'volume', 'next_day_close'] and col in row and pd.notna(row[col]):
# Убираем префиксы rsi_, cci_, sma_, ema_, atr_ для более естественного языка, если хотим
# cleaned_col_name = col.replace("rsi_", "RSI ").replace("cci_", "CCI ").replace("sma_", "SMA ").replace("ema_", "EMA ").replace("atr_", "ATR ")
# indicator_descs.append(f"{cleaned_col_name.replace('_', ' ')}: {row[col]:.2f}")
indicator_descs.append(f"{col.replace('_', ' ')}: {row[col]:.2f}") # Простой вариант
if indicator_descs:
prompt_parts.append("Technical indicators: " + ", ".join(indicator_descs) + ".")
return " ".join(prompt_parts)
def main():
try:
df = pd.read_csv(cfg.CSV_FILE_PATH)
except FileNotFoundError:
print(f"Ошибка: CSV файл не найден по пути: {cfg.CSV_FILE_PATH}")
print("Пожалуйста, убедитесь, что файл существует и путь в config.py указан верно.")
return
# Преобразуем 'date' в datetime и установим как индекс
if 'date' not in df.columns:
print("Ошибка: колонка 'date' отсутствует в CSV файле.")
return
df['date'] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)
df.sort_index(inplace=True) # Убедимся, что данные отсортированы по времени
# Получаем список всех колонок-индикаторов (кроме date и next_day_close)
# 'open', 'high', 'low', 'close', 'volume' будут обработаны отдельно в format_input_for_llm
all_feature_columns = [col for col in df.columns if col not in ['next_day_close']]
# Удаляем строки, где есть NaN в фичах или в 'close'/'next_day_close'
# (кроме 'next_day_close' для последней строки, это обработается в цикле)
df.dropna(subset=[col for col in all_feature_columns if col != 'next_day_close'], inplace=True) # Удаляем NaN в фичах
# df.dropna(subset=['close', 'next_day_close'], inplace=True) # Удаляем если нет таргета или текущей цены
if df.empty:
print("Датафрейм пуст после удаления NaN. Проверьте данные.")
return
print(f"Количество строк после начальной обработки: {len(df)}")
processed_data = []
# Идем почти до конца, так как для последней строки может не быть next_day_close
# или next_day_close может быть NaN, что обработает get_target_signal
for i in range(len(df)):
current_row = df.iloc[i]
# Проверяем, есть ли next_day_close для текущей строки (особенно актуально для последней строки файла)
if 'next_day_close' not in current_row or pd.isna(current_row['next_day_close']):
if i == len(df) - 1: # Если это последняя строка и нет next_day_close, просто пропускаем ее
print(f"Пропускаем последнюю строку {current_row.name}, т.к. отсутствует 'next_day_close'.")
continue
else: # Если это не последняя строка, но 'next_day_close' NaN, это странно, но get_target_signal вернет HOLD
pass
market_description = format_input_for_llm(current_row, all_feature_columns)
current_close_price = current_row['close']
next_day_close_price = current_row['next_day_close']
signal = get_target_signal(current_close_price, next_day_close_price, cfg.PRICE_CHANGE_THRESHOLD_SIGNAL)
# Формат для SFTTrainer
formatted_text = f"<s>[INST] Анализ рынка BTC/USDT на основе следующих данных: {market_description} Какое торговое действие (BUY, SELL, или HOLD) следует предпринять? [/INST] {signal}</s>"
processed_data.append({"text": formatted_text})
# Сохраняем в формате JSONL
with open(cfg.TRAINING_DATA_JSONL, 'w', encoding='utf-8') as f:
for item in processed_data:
f.write(json.dumps(item) + '\n')
print(f"Данные подготовлены и сохранены в {cfg.TRAINING_DATA_JSONL}. Количество примеров: {len(processed_data)}")
if processed_data:
print("Пример первой строки данных для обучения:")
print(processed_data[0]['text'])
else:
print("Не было сгенерировано ни одного примера для обучения. Проверьте логику и данные.")
if __name__ == "__main__":
main()