from typing import Dict, List, Any import time from dotenv import load_dotenv from langchain_openai import ChatOpenAI from langgraph_supervisor import create_supervisor from langgraph.prebuilt import create_react_agent load_dotenv() model = ChatOpenAI(model="gpt-4o") def extract_events_from_rdb( table_name: str, start_date: str, end_date: str, event_types: List[str] = None ) -> Dict[str, Any]: """ RDB 테이블에서 이벤트 레코드를 추출하고 텍스트 형식으로 변환합니다. Args: table_name: RDB 테이블 이름 start_date: 시작 날짜 (YYYY-MM-DD 형식) end_date: 종료 날짜 (YYYY-MM-DD 형식) event_types: 필터링할 이벤트 타입 목록 (선택사항) Returns: 추출된 데이터 통계 및 파일 경로를 포함한 딕셔너리 """ time.sleep(0.5) return { "status": "success", "records_extracted": 125847, "output_file": f"/data/events/{table_name}_{start_date}_{end_date}.txt", "total_size_mb": 482.3, "event_type_distribution": { "user_action": 45230, "system_event": 32145, "error_log": 18472, "transaction": 30000 }, "processing_time_seconds": 12.5 } def prepare_pretraining_data( input_file: str, tokenizer: str = "gpt2", max_length: int = 512, min_length: int = 50 ) -> Dict[str, Any]: """ 토크나이제이션과 포매팅을 통해 사전학습을 위한 텍스트 데이터를 준비합니다. Args: input_file: 입력 텍스트 파일 경로 tokenizer: 사용할 토크나이저 max_length: 최대 시퀀스 길이 min_length: 최소 시퀀스 길이 Returns: 준비된 데이터 통계를 포함한 딕셔너리 """ time.sleep(0.5) return { "status": "success", "output_file": "/data/pretraining/tokenized_data.bin", "total_sequences": 89234, "total_tokens": 45623890, "avg_sequence_length": 511.2, "vocab_size": 50257, "processing_time_seconds": 34.2 } def pretrain_model( data_file: str, model_architecture: str = "gpt2-small", num_epochs: int = 3, batch_size: int = 32, learning_rate: float = 5e-5 ) -> Dict[str, Any]: """ 준비된 데이터로 언어모델을 사전학습시킵니다. Args: data_file: 토크나이즈된 데이터 파일 경로 model_architecture: 사용할 모델 아키텍처 num_epochs: 학습 에포크 수 batch_size: 학습 배치 크기 learning_rate: 학습률 Returns: 학습 지표 및 모델 경로를 포함한 딕셔너리 """ time.sleep(0.5) return { "status": "success", "model_path": "/models/pretrained/model_checkpoint_epoch3", "final_loss": 2.341, "perplexity": 10.39, "training_time_hours": 4.5, "total_steps": 8340, "best_checkpoint": "checkpoint-7800", "gpu_hours": 36.0, "metrics": { "epoch_1_loss": 3.245, "epoch_2_loss": 2.789, "epoch_3_loss": 2.341 } } def create_finetuning_data( source_data: str, task_type: str = "classification", num_classes: int = 5, train_ratio: float = 0.8, augmentation: bool = True ) -> Dict[str, Any]: """ 분류 작업을 위한 파인튜닝 데이터셋을 생성합니다. Args: source_data: 소스 데이터 경로 task_type: 작업 유형 (classification, regression 등) num_classes: 분류 클래스 수 train_ratio: 학습 데이터 비율 augmentation: 데이터 증강 적용 여부 Returns: 데이터셋 통계 및 파일 경로를 포함한 딕셔너리 """ time.sleep(0.5) return { "status": "success", "train_file": "/data/finetuning/train.jsonl", "val_file": "/data/finetuning/val.jsonl", "test_file": "/data/finetuning/test.jsonl", "train_samples": 12456, "val_samples": 3114, "test_samples": 3114, "class_distribution": { "class_0": 2489, "class_1": 3201, "class_2": 2845, "class_3": 2134, "class_4": 1787 }, "augmentation_applied": True, "processing_time_seconds": 8.3 } def train_classification_model( pretrained_model: str, train_data: str, val_data: str, num_classes: int = 5, num_epochs: int = 10, batch_size: int = 16, learning_rate: float = 2e-5 ) -> Dict[str, Any]: """ 파인튜닝 데이터를 사용하여 분류 모델을 학습시킵니다. Args: pretrained_model: 사전학습된 모델 경로 train_data: 학습 데이터 경로 val_data: 검증 데이터 경로 num_classes: 클래스 수 num_epochs: 학습 에포크 수 batch_size: 배치 크기 learning_rate: 학습률 Returns: 학습 결과 및 모델 경로를 포함한 딕셔너리 """ time.sleep(0.5) return { "status": "success", "model_path": "/models/finetuned/classification_model", "best_checkpoint": "checkpoint-epoch8", "final_train_loss": 0.234, "final_val_loss": 0.312, "best_val_accuracy": 0.923, "training_time_hours": 1.2, "total_steps": 7785, "early_stopping_epoch": 8, "metrics_per_epoch": { "epoch_1": {"train_loss": 0.892, "val_loss": 0.845, "val_acc": 0.712}, "epoch_5": {"train_loss": 0.345, "val_loss": 0.389, "val_acc": 0.887}, "epoch_8": {"train_loss": 0.234, "val_loss": 0.312, "val_acc": 0.923} } } def evaluate_model( model_path: str, test_data: str, metrics: List[str] = None ) -> Dict[str, Any]: """ 테스트 데이터로 학습된 모델을 종합적인 지표로 평가합니다. Args: model_path: 학습된 모델 경로 test_data: 테스트 데이터 경로 metrics: 계산할 지표 목록 Returns: 평가 지표를 포함한 딕셔너리 """ time.sleep(0.5) if metrics is None: metrics = ["precision", "recall", "f1", "accuracy"] return { "status": "success", "test_samples": 3114, "overall_accuracy": 0.918, "macro_precision": 0.912, "macro_recall": 0.908, "macro_f1": 0.910, "weighted_precision": 0.916, "weighted_recall": 0.918, "weighted_f1": 0.917, "per_class_metrics": { "class_0": {"precision": 0.935, "recall": 0.921, "f1": 0.928, "support": 623}, "class_1": {"precision": 0.948, "recall": 0.952, "f1": 0.950, "support": 640}, "class_2": {"precision": 0.899, "recall": 0.887, "f1": 0.893, "support": 569}, "class_3": {"precision": 0.887, "recall": 0.901, "f1": 0.894, "support": 427}, "class_4": {"precision": 0.891, "recall": 0.879, "f1": 0.885, "support": 357} }, "confusion_matrix": [ [574, 12, 18, 10, 9], [8, 609, 11, 7, 5], [15, 9, 505, 28, 12], [11, 8, 22, 385, 1], [14, 6, 18, 5, 314] ], "inference_time_ms": 1247.5 } data_extraction_agent = create_react_agent( model=model, tools=[extract_events_from_rdb], name="data_extraction_expert", prompt=( "당신은 SQL과 RDB 작업에 특화된 데이터 추출 전문가입니다. " "데이터베이스 테이블에서 이벤트 레코드를 추출하고 텍스트 형식으로 변환하는 역할을 합니다. " "테이블 이름, 날짜 범위, 이벤트 타입에 대한 명확한 정보를 제공해야 합니다. " "레코드 수와 파일 크기를 포함한 추출 통계를 보고하세요." ) ) pretraining_agent = create_react_agent( model=model, tools=[prepare_pretraining_data, pretrain_model], name="pretraining_expert", prompt=( "당신은 언어모델 사전학습 전문가입니다. " "토큰화된 데이터를 준비하고 모델을 처음부터 학습시키는 책임을 맡고 있습니다. " "Loss와 Perplexity 같은 학습 지표를 모니터링하세요. " "데이터 준비 및 모델 학습 진행 상황에 대한 자세한 통계를 보고하세요. " "한 번에 하나의 도구만 사용하세요." ) ) finetuning_agent = create_react_agent( model=model, tools=[create_finetuning_data, train_classification_model], name="finetuning_expert", prompt=( "당신은 분류 작업에 특화된 파인튜닝 전문가입니다. " "고품질의 파인튜닝 데이터셋을 만들고 분류 모델을 학습시키는 역할을 합니다. " "적절한 데이터 분할과 클래스 분포를 보장하세요. " "파인튜닝 과정 전반에 걸쳐 학습 및 검증 지표를 모니터링하세요. " "한 번에 하나의 도구만 사용하세요." ) ) evaluation_agent = create_react_agent( model=model, tools=[evaluate_model], name="evaluation_expert", prompt=( "당신은 분류 지표에 특화된 모델 평가 전문가입니다. " "Precision, Recall, F1-score, Accuracy를 사용하여 학습된 모델을 철저히 평가하는 역할을 합니다. " "클래스별 세부 지표와 전체 성능 통계를 제공하세요. " "Confusion matrix를 분석하고 개선이 필요한 영역을 파악하세요." ) ) workflow = create_supervisor( [data_extraction_agent, pretraining_agent, finetuning_agent, evaluation_agent], model=model, prompt=( "당신은 ML 파이프라인 감독자입니다. " "사용자의 요청을 이해하고 목표를 달성하기 위해 필요한 전문가만 선택하세요.\n\n" "사용 가능한 전문가:\n" "- data_extraction_expert: RDB에서 이벤트 데이터 추출\n" "- pretraining_expert: 데이터 준비 및 언어모델 사전학습\n" "- finetuning_expert: 파인튜닝 데이터 생성 및 분류 모델 학습\n" "- evaluation_expert: 모델 평가 (Precision, Recall, F1 등)\n\n" "사용자가 요청한 작업만 수행하고, 요청하지 않은 추가 작업은 진행하지 마세요." ) ) ml_app = workflow.compile()