💡 모듈 불러오기
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras.layers import LSTM, Dense, Dropout
from keras.callbacks import EarlyStopping
💡 데이터 불러오기
# 데이터 불러오기
data = pd.read_csv("./data/tr_eod_data_rounded.csv", index_col=0, parse_dates=True)
aapl_data = data[['AAPL.O']].dropna()
# 데이터 전처리 (표준화)
scaler = StandardScaler()
aapl_data_scaled = scaler.fit_transform(aapl_data)
aapl_data_scaled = pd.DataFrame(aapl_data_scaled, index=aapl_data.index, columns=['AAPL.O'])
스케일링 한 다음에 날짜를 인덱스로 다시 설정하고 칼럼명도 원래대로 돌린다.
💡 시퀀스 생성 & 교차 검증 설정
# 시계열 데이터 생성 함수
def create_sequences(data, window_size):
X, y = [], []
for i in range(len(data) - window_size):
X.append(data[i:(i + window_size), 0])
y.append(data[i + window_size, 0])
return np.array(X), np.array(y)
# 하이퍼파라미터 설정
window_size = 30 # 30일의 데이터를 사용하여 다음 값을 예측
epochs = 100 # 성능 향상을 위해 에포크 수를 늘림
batch_size = 32 # 기본 배치 사이즈
# TimeSeriesSplit 사용
tscv = TimeSeriesSplit(n_splits=5)
fold = 1 # 교차 검증 fold를 추적하기 위한 변수
X의 k번째 요소가 k일~k+29일의 값이 있다면 y의 k번째 요소로는 k+30번째 요소를 넣는다. 그렇게 X와 y를 쌍으로 잇는 시계열 시퀀스를 만든다.
epoch는 나중에 early_stopping을 쓸 거라서 크게 상관없다.
batch_size는 한 번의 학습에 사용하는 데이터의 개수를 말한다. 요약해서 말하자면 배치 사이즈가 크면 속도가 빠르고, 배치 사이즈가 자르면 성능이 좋다.
TimeSeriesSplit(n_splits=5)를 하면 5개의 항목이 나오는데 아예 분절되는 건 아니고 이전 값이 포함돼서 나온다. 예시로 2020년 1월 1일부터 100일간의 시계열 데이터에 대해 이를 적용하면,
💡 모델 학습 및 평가
# 교차 검증을 통한 모델 학습 및 평가
mse_list, mae_list, r2_list = [], [], []
for train_index, test_index in tscv.split(aapl_data_scaled):
train, test = aapl_data_scaled.iloc[train_index], aapl_data_scaled.iloc[test_index]
# 시계열 데이터 생성
X_train, y_train = create_sequences(train.values, window_size)
X_test, y_test = create_sequences(test.values, window_size)
# LSTM 입력 형태에 맞게 데이터 차원 변경
X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))
X_test = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))
# LSTM 모델 구축 (성능 개선)
lstm_model = Sequential()
lstm_model.add(LSTM(50, activation='relu', return_sequences=True, input_shape=(window_size, 1)))
lstm_model.add(Dropout(0.2)) # 과적합 방지를 위한 드롭아웃
lstm_model.add(LSTM(50, activation='relu')) # 두 번째 LSTM 레이어 추가
lstm_model.add(Dropout(0.2))
lstm_model.add(Dense(1))
lstm_model.compile(optimizer='adam', loss='mse')
# Early Stopping (조기 종료) 추가
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
# 모델 학습
lstm_model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, validation_data=(X_test, y_test),
callbacks=[early_stopping], verbose=1)
# 테스트 데이터 예측
lstm_forecast = lstm_model.predict(X_test)
# 예측 결과를 원래 스케일로 복원
lstm_forecast_unscaled = scaler.inverse_transform(lstm_forecast)
y_test_unscaled = scaler.inverse_transform(y_test.reshape(-1, 1))
# 성능 평가
mse = mean_squared_error(y_test_unscaled, lstm_forecast_unscaled)
mae = mean_absolute_error(y_test_unscaled, lstm_forecast_unscaled)
r2 = r2_score(y_test_unscaled, lstm_forecast_unscaled)
mse_list.append(mse)
mae_list.append(mae)
r2_list.append(r2)
print(f"Fold {fold}:")
print(f"LSTM Mean Squared Error: {mse}")
print(f"LSTM Mean Absolute Error: {mae}")
print(f"LSTM R^2 Score: {r2}")
# 예측 결과 시각화 (LSTM)
plt.figure(figsize=(12, 6))
plt.plot(test.index[window_size:], y_test_unscaled, label='Test Data')
plt.plot(test.index[window_size:], lstm_forecast_unscaled, label='LSTM Forecast', color='red')
plt.legend()
plt.title(f'AAPL Price Forecasting using LSTM (Fold {fold})')
plt.xlabel('Date')
plt.ylabel('Price')
plt.show()
fold += 1
# 교차 검증 결과 출력
print("Cross Validation Results:")
print(f"Average MSE: {np.mean(mse_list)}")
print(f"Average MAE: {np.mean(mae_list)}")
print(f"Average R^2: {np.mean(r2_list)}")
엄청 기니까 쪼개서 볼 거다. 함수 하나에 들어가서 그냥 다 넣었음.
# 교차 검증을 통한 모델 학습 및 평가
mse_list, mae_list, r2_list = [], [], []
for train_index, test_index in tscv.split(aapl_data_scaled):
train, test = aapl_data_scaled.iloc[train_index], aapl_data_scaled.iloc[test_index]
# 시계열 데이터 생성
X_train, y_train = create_sequences(train.values, window_size)
X_test, y_test = create_sequences(test.values, window_size)
# LSTM 입력 형태에 맞게 데이터 차원 변경
X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))
X_test = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))
각 폴드마다 평가 지표를 저장할 리스트를 만든다.
train과 test를 각각 시계열 시퀀스로 변환한다.
LSTM은 3차원 입력을 받기 때문에 세 번째 값으로 임의로 1을 넣어준다.
# LSTM 모델 구축 (성능 개선)
lstm_model = Sequential()
lstm_model.add(LSTM(50, activation='relu', return_sequences=True, input_shape=(window_size, 1)))
lstm_model.add(Dropout(0.2)) # 과적합 방지를 위한 드롭아웃
lstm_model.add(LSTM(50, activation='relu')) # 두 번째 LSTM 레이어 추가
lstm_model.add(Dropout(0.2))
lstm_model.add(Dense(1))
lstm_model.compile(optimizer='adam', loss='mse')
# Early Stopping (조기 종료) 추가
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
# 모델 학습
lstm_model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, validation_data=(X_test, y_test),
callbacks=[early_stopping], verbose=1)
LSTM의 첫 번째 인자로 입력된 50은 총 50개의 뉴런이 있다는 것이다. 그래서 30일짜리 시퀀스가 입력이 될 때 각각의 뉴런을 거쳐 50개의 시퀀스가 출력되는 것이다. return_sequences가 True기 때문에 출력값도 시퀀스가 되는 것이다. 설정하지 않았다면 마지막 값만 출력되므로 시퀀스가 아니라 단일한 값으로 나온다.
반면 두 번째 레이어에는 return_sequences가 따로 설정되어 있지 않기 때문에 기본값이 False가 되는데 이 때문에 출력값이 시퀀스가 아니라 마지막 타임스텝 하나만, 즉, 단일한 값이 나오는 것이다.
이걸 마지막 출력 레이어 Dense(1)로 뽑아서 값을 구한다. 최적화는 Adam을 사용하고 MSE를 손실 함수로 사용한다.
early_stopping을 통해 validation_loss가 개선되지 않으면 멈추고, restore_best_weights를 True로 설정해 가장 성능이 좋았던 시점의 모델 가중치로 복원한다..
# 테스트 데이터 예측
lstm_forecast = lstm_model.predict(X_test)
# 예측 결과를 원래 스케일로 복원
lstm_forecast_unscaled = scaler.inverse_transform(lstm_forecast)
y_test_unscaled = scaler.inverse_transform(y_test.reshape(-1, 1))
# 성능 평가
mse = mean_squared_error(y_test_unscaled, lstm_forecast_unscaled)
mae = mean_absolute_error(y_test_unscaled, lstm_forecast_unscaled)
r2 = r2_score(y_test_unscaled, lstm_forecast_unscaled)
mse_list.append(mse)
mae_list.append(mae)
r2_list.append(r2)
print(f"Fold {fold}:")
print(f"LSTM Mean Squared Error: {mse}")
print(f"LSTM Mean Absolute Error: {mae}")
print(f"LSTM R^2 Score: {r2}")
예측을 하고 평가 지표를 우리가 아까 만든 리스트에 차곡차곡 쌓는다.
# 예측 결과 시각화 (LSTM)
plt.figure(figsize=(12, 6))
plt.plot(test.index[window_size:], y_test_unscaled, label='Test Data')
plt.plot(test.index[window_size:], lstm_forecast_unscaled, label='LSTM Forecast', color='red')
plt.legend()
plt.title(f'AAPL Price Forecasting using LSTM (Fold {fold})')
plt.xlabel('Date')
plt.ylabel('Price')
plt.show()
fold += 1
시각화를 하고 다음 fold로 넘어간다.
💡 교차 검증 결과
# 교차 검증 결과 출력
print("Cross Validation Results:")
print(f"Average MSE: {np.mean(mse_list)}")
print(f"Average MAE: {np.mean(mae_list)}")
print(f"Average R^2: {np.mean(r2_list)}")
fold 1은 epoch 12에서 멈췄다.
전체 데이터가 2138개가 있고 batch_size는 32이기 때문에 66/66가 떠야 하지만 n_splits=5로 나눴기 때문에 첫 번째 fold에서는 11개까지 두 번째에는 22개까지.. 5번째 폴드에는 55개까지 학습했다. 그다음 11개가 test다.
아무튼 R^2 0.83 정도가 나왔다.
fold2는 당연히 22/22로 나왔고, 에러는 줄어들었지만 R^2는 줄지 않았다.
fold3에서는 R^2가 0.93까지 늘어났다.
fold4에는 많이 늘어났다.
최종적으로 결과는 이렇게 나왔다.
이제 부자가 되면 된다!!
'분명 전산학부 졸업 했는데 코딩 개못하는 조준호 > AI, ML, DL' 카테고리의 다른 글
한국은행 들어갈 때까지만 합니다
조만간 티비에서 봅시다