malconv / src /tune_model.py
cycloevan's picture
Upload 17 files
b92918a verified
import os
import sys
import itertools
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.model import MalConv
from src.utils import preprocess_dataset
def hyperparameter_search(csv_path,
param_grid=None,
max_length=2**20,
epochs=5,
validation_split=0.2):
"""
๊ทธ๋ฆฌ๋“œ ์„œ์น˜๋ฅผ ํ†ตํ•œ ํ•˜์ดํผํŒŒ๋ผ๋ฏธํ„ฐ ์ตœ์ ํ™”
Args:
csv_path: ํ›ˆ๋ จ ๋ฐ์ดํ„ฐ CSV ๊ฒฝ๋กœ
param_grid: ํ•˜์ดํผํŒŒ๋ผ๋ฏธํ„ฐ ๊ทธ๋ฆฌ๋“œ
max_length: ์ตœ๋Œ€ ์ž…๋ ฅ ๊ธธ์ด
epochs: ํ›ˆ๋ จ ์—ํฌํฌ ์ˆ˜
validation_split: ๊ฒ€์ฆ ๋ฐ์ดํ„ฐ ๋น„์œจ
"""
if param_grid is None:
param_grid = {
'embedding_size': [8, 16],
'num_filters': [64, 128],
'fc_size': [64, 128],
'learning_rate': [0.001, 0.0001]
}
print("๋ฐ์ดํ„ฐ ๋กœ๋”ฉ ์ค‘...")
X, y = preprocess_dataset(csv_path, max_length)
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=validation_split, random_state=42, stratify=y
)
# ๋ชจ๋“  ํ•˜์ดํผํŒŒ๋ผ๋ฏธํ„ฐ ์กฐํ•ฉ ์ƒ์„ฑ
param_names = list(param_grid.keys())
param_values = list(param_grid.values())
param_combinations = list(itertools.product(*param_values))
best_score = 0
best_params = None
results = []
print(f"์ด {len(param_combinations)}๊ฐœ์˜ ์กฐํ•ฉ์„ ํ…Œ์ŠคํŠธํ•ฉ๋‹ˆ๋‹ค.")
for i, params in enumerate(param_combinations):
param_dict = dict(zip(param_names, params))
print(f"\n[{i+1}/{len(param_combinations)}] ํ…Œ์ŠคํŠธ ์ค‘: {param_dict}")
try:
# ๋ชจ๋ธ ์ƒ์„ฑ
model = MalConv(
max_input_length=max_length,
embedding_size=param_dict['embedding_size'],
num_filters=param_dict['num_filters'],
fc_size=param_dict['fc_size']
)
# ์ปดํŒŒ์ผ
model.compile(
optimizer=tf.keras.optimizers.Adam(
learning_rate=param_dict['learning_rate']
),
loss='binary_crossentropy',
metrics=['accuracy']
)
# ๋”๋ฏธ ์ž…๋ ฅ์œผ๋กœ ๋ชจ๋ธ ๋นŒ๋“œ
dummy_input = np.zeros((1, max_length), dtype=np.uint8)
_ = model(dummy_input)
# ํ›ˆ๋ จ
history = model.fit(
X_train, y_train,
batch_size=16,
epochs=epochs,
validation_data=(X_val, y_val),
verbose=0
)
# ํ‰๊ฐ€
val_loss, val_acc = model.evaluate(X_val, y_val, verbose=0)
result = {
'params': param_dict,
'val_accuracy': val_acc,
'val_loss': val_loss
}
results.append(result)
print(f"๊ฒ€์ฆ ์ •ํ™•๋„: {val_acc:.4f}")
# ์ตœ๊ณ  ์„ฑ๋Šฅ ์—…๋ฐ์ดํŠธ
if val_acc > best_score:
best_score = val_acc
best_params = param_dict
print(f"์ƒˆ๋กœ์šด ์ตœ๊ณ  ์„ฑ๋Šฅ! ์ •ํ™•๋„: {best_score:.4f}")
except Exception as e:
print(f"์—๋Ÿฌ ๋ฐœ์ƒ: {e}")
continue
print("\n" + "="*50)
print("ํ•˜์ดํผํŒŒ๋ผ๋ฏธํ„ฐ ํŠœ๋‹ ์™„๋ฃŒ")
print("="*50)
print(f"์ตœ๊ณ  ์„ฑ๋Šฅ: {best_score:.4f}")
print(f"์ตœ์  ํ•˜์ดํผํŒŒ๋ผ๋ฏธํ„ฐ: {best_params}")
# ๊ฒฐ๊ณผ ์ •๋ ฌ
results.sort(key=lambda x: x['val_accuracy'], reverse=True)
print("\n์ƒ์œ„ 5๊ฐœ ๊ฒฐ๊ณผ:")
for i, result in enumerate(results[:5]):
print(f"{i+1}. ์ •ํ™•๋„: {result['val_accuracy']:.4f}, "
f"ํŒŒ๋ผ๋ฏธํ„ฐ: {result['params']}")
return best_params, results
def main():
csv_path = "Input/sample_data.csv" # ์‹ค์ œ ๋ฐ์ดํ„ฐ ๊ฒฝ๋กœ๋กœ ๋ณ€๊ฒฝ
# ์ปค์Šคํ…€ ํ•˜์ดํผํŒŒ๋ผ๋ฏธํ„ฐ ๊ทธ๋ฆฌ๋“œ
param_grid = {
'embedding_size': [8, 16],
'num_filters': [64, 128],
'fc_size': [64, 128],
'learning_rate': [0.001, 0.0001]
}
best_params, results = hyperparameter_search(
csv_path=csv_path,
param_grid=param_grid,
epochs=3 # ๋น ๋ฅธ ํ…Œ์ŠคํŠธ๋ฅผ ์œ„ํ•ด ์—ํฌํฌ ์ˆ˜ ๊ฐ์†Œ
)
print(f"\n์ตœ์  ํ•˜์ดํผํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ๋ชจ๋ธ์„ ๋‹ค์‹œ ํ›ˆ๋ จํ•˜์„ธ์š”:")
print(f"python src/train.py {csv_path} --epochs 10")
if __name__ == "__main__":
main()