我想利用各种实验设计 (DOE) 中最好的方法来预测我的数据。为此,我使用了 Optuna 超参数优化器并按照代码中的描述对其进行了编程。它执行交叉验证,然后每步进行额外 10 次迭代,以提供准确的均方误差 (MSE) 并对其进行优化。结果,我得到了一个包含大约 25 个神经元的层。在我的实际模型中,此配置产生良好的 MSE 和 R_test / R_total 值。然而,当对不同的数据集进行预测时,我观察到偏差超过 50%。我已附上我的模型和超参数优化 (HPO) 详细信息。我非常感谢建设性的指导。我的数据集由 16 个数据点组成,具有 5 个特征 (X) 和一个输出值 (y)。
磷酸二氢钾:
from sklearn.model_selection import train_test_split , cross_val_score
from sklearn.neural_network import MLPRegressor
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import MinMaxScaler
import optuna
import numpy as np
import sqlite3
import pandas as pd
# Read data from "Versuchspläne_final.xlsx" for modeling
data_df = pd.read_excel(r"cVersuche_V01_FF.xlsx", sheet_name="Probentabelle", header=1)
# Assuming 'X' columns are ['Temperatur', 'Anteil PP505', 'Drehzahl', 'Anteil Peroxid']
X_noscalar = data_df[['M%1', 'M%2','M%3', 'Drehzahl n/min-1', 'Endzonentemperatur °C']]
y = data_df[['MVR MW']].values.ravel()
# Read scaling parameters from "alle_Werte.xlsx"
scaling_data = pd.read_excel(r"c:.xlsx", sheet_name="Probentabelle", header=1)
scaler_y = MinMaxScaler()
scaler_x = MinMaxScaler()
scaler_y.fit(scaling_data[['MVR MW']]) # Assuming 'MVR' is the target variable in alle_Werte.xlsx
scaler_x.fit(scaling_data[['M%1', 'M%2','M%3', 'Drehzahl n/min-1', 'Endzonentemperatur °C']])
# Apply the previously calculated scaling to 'X' data
X = scaler_x.transform(X_noscalar)
y_scaled = scaler_y.transform(y.reshape(-1, 1)).ravel()
num_splits = 5
shuffled_indices = [train_test_split(range(len(X)), test_size=0.2, shuffle=True) for _ in range(num_splits)]
def objective(trial):
hidden_layer_sizes = tuple([trial.suggest_int(f'n_units_layer_{i}', 1, 100) for i in range(trial.suggest_int('n_layers', 1, 2))])
alpha = trial.suggest_float('alpha', 0.0001, 0.1, log=True)
learning_rate_init = trial.suggest_float('learning_rate_init', 0.001, 0.1, log=True)
learning_rate = trial.suggest_categorical('learning_rate', ['constant', 'invscaling', 'adaptive'])
mean_mse_test = 0
mean_mse_value = 0
num_iterations = 10
for _ in range(num_iterations):
for indices in shuffled_indices:
train_indices, test_indices = indices
X_train, X_test = X[train_indices], X[test_indices]
y_train, y_test = y_scaled[train_indices], y_scaled[test_indices]
model = MLPRegressor(
hidden_layer_sizes=hidden_layer_sizes,
alpha=alpha, max_iter=1000,
learning_rate=learning_rate,
learning_rate_init=learning_rate_init,
solver='lbfgs',
early_stopping=True,
validation_fraction=0.1, # Der Anteil der Daten, der für die Validierung verwendet wird
n_iter_no_change=10, # Anzahl der Iterationen ohne Verbesserung auf der Validierungsmetrik, bevor das Training gestoppt wird
tol=1e-3, # Toleranz für frühzeitiges Stoppen, wenn die Verbesserung kleiner als die Toleranz ist
verbose=True )
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
mse_test = mean_squared_error(y_test, y_pred)
mean_mse_test += mse_test
mean_mse_test /= num_splits
return mean_mse_test
mean_mse_value /= num_iterations
return mean_mse_value
if __name__ == "__main__":
study = optuna.create_study(study_name="mlp_hyperparam_opt4", storage="sqlite:///mlp_optuna.db", load_if_exists=True)
study.optimize(objective, n_trials=100)
# Beste Hyperparameter und Ergebnisse anzeigen
print("Best trial:")
trial = study.best_trial
print("Value: ", trial.value)
print("Params: ")
for key, value in trial.params.items():
print(f" {key}: {value}")
if __name__ == "__main__":
study = optuna.create_study(study_name="mlp_hyperparam_opt_ff", storage="sqlite:///mlp_optuna.db", load_if_exists=True)
study.optimize(objective, n_trials=300)
# Die besten 10 Trials erhalten
top_trials = study.trials[:10]
# Die besten 10 Trials und ihre Parameter anzeigen
for i, trial in enumerate(top_trials, 1):
print(f"Top {i} trial:")
print("Value:", trial.value)
print("Params:")
for key, value in trial.params.items():
print(f" {key}: {value}")
模型:
from sklearn.neural_network import MLPRegressor
from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import MinMaxScaler
import pandas as pd
import numpy as np
import joblib
import pandas as pd
import numpy as np
# Read data from "alle_Werte.xlsx" to calculate scaling parameters
scaling_data = pd.read_excel(r"c:\Versuche_V01_gesamt.xlsx", sheet_name="Probentabelle", header=1)
scaler_y = MinMaxScaler()
scaler_x = MinMaxScaler()
scaler_y.fit(scaling_data[['MVR MW']]) # Assuming 'MVR' is the target variable in alle_Werte.xlsx
scaler_x.fit(scaling_data[['M%1', 'M%2','M%3', 'Drehzahl n/min-1', 'Endzonentemperatur °C']])
# Read data from "Versuchspläne_final.xlsx" for modeling
data_df = pd.read_excel(r"c:Versuche_V01_FF.xlsx", sheet_name="Probentabelle", header=1)
# Assuming 'X' columns are ['Temperatur', 'Anteil PP505', 'Drehzahl', 'Anteil Peroxid']
X_noscalar = data_df[['M%1', 'M%2','M%3', 'Drehzahl n/min-1', 'Endzonentemperatur °C']]
y = data_df[['MVR MW']].values.ravel()
#print(X_noscalar)
# Apply the previously calculated scaling to 'X' data from "Versuchspläne_final.xlsx"
X = scaler_x.transform(X_noscalar)
# Scale the target variable 'y' using the scaler for 'y'
y_scaled = scaler_y.transform(y.reshape(-1, 1)).ravel()
# Aufteilen der Daten in Trainings- und Testsets
X_train, X_test, y_train, y_test = train_test_split(X, y_scaled, test_size=0.2, random_state=12)
# Anzahl der Aufteilungen für die Cross-Validation
num_splits = 5
# Initialisiere Listen, um die Ergebnisse für jedes Split zu speichern
mse_scores = []
r2_test_scores = []
# Führe die Cross-Validation durch
for _ in range(num_splits):
# Aufteilen der Daten in Trainings- und Testsets
X_train, X_test, y_train, y_test = train_test_split(X, y_scaled, test_size=0.2, random_state=1) # Random_state=None für zufällige Aufteilung
# Initialize and train the MLP model
model = MLPRegressor(
hidden_layer_sizes=(3),
#max_iter=2000,
alpha=0.019149003457879503,
learning_rate='constant',
learning_rate_init=0.0451257591571734,
solver='lbfgs',
early_stopping=True,
validation_fraction=0.1, # Der Anteil der Daten, der für die Validierung verwendet wird
n_iter_no_change=10, # Anzahl der Iterationen ohne Verbesserung auf der Validierungsmetrik, bevor das Training gestoppt wird
tol=1e-3, # Toleranz für frühzeitiges Stoppen, wenn die Verbesserung kleiner als die Toleranz ist
verbose=True
)
model.fit(X_train, y_train)
# Predictions für den Testdatensatz
y_pred_test = model.predict(X_test)
y_pred_train = model.predict(X_train)
y_pred = model.predict(X)
# Berechnung des Mean Squared Error (MSE)
#mse = mean_squared_error(y_test, y_pred_test)
#mse_scores.append(mse)
# Berechnung des R2-Werts für den Testdatensatz hier mit train test
r2_test = r2_score(y_test, y_pred_test)
r2_test_scores.append(r2_test)
print('r2_test',r2_test)
r2_train = r2_score(y_train, y_pred_train)
#r2_train_scores.append(r2_train)
print('R2_train: ',r2_train)
r2 = r2_score(y_scaled, y_pred)
#r2_train_scores.append(r2_train)
print('R2: ',r2)
# Ergebnisse ausgeben
mse_scores=mean_squared_error(y_test, y_pred_test)
print("MSE Scores:", mse_scores)
# Berechnung der Durchschnittswerte für MSE und R2
mean_mse = np.mean(mse_scores)
mean_r2 = np.mean(r2_test_scores)
print("Mean MSE across splits:", mean_mse)
print("Mean R2 across splits:", mean_r2)
model_filename = "FF_test_22_1.joblib"
joblib.dump(model, model_filename)
new_df = pd.read_excel(r"c:\Users\janbu\Desktop\Bachelorarbeit\Versuchspläne_L\Versuche_V01_stat.xlsx",
sheet_name="Probentabelle", header=1)
X_new_unscaled = new_df[['M%1', 'M%2', 'M%3', 'Drehzahl n/min-1', 'Endzonentemperatur °C']]
# Skaliere die neuen Daten
X_new = scaler_x.transform(X_new_unscaled)
# Durchführung der Vorhersage für die neuen Daten
predicted_scaled_y = model.predict(X_new)
# Umkehrung der Skalierung, um den eigentlichen y-Wert zu erhalten
predicted_y = scaler_y.inverse_transform(predicted_scaled_y.reshape(-1, 1)).ravel()
print("Predicted y:", predicted_y)
new_df = pd.read_excel(r"c:\Users\janbu\Desktop\Bachelorarbeit\Versuchspläne_L\Versuche_V01_stat.xlsx",
sheet_name="Probentabelle", header=1)
# Extrahiere die tatsächlichen y-Werte der neuen Daten
actual_y_new = new_df[['MVR MW']].values.ravel()
print("Actual y:", actual_y_new)
# Skaliere die neuen Daten
X_new = scaler_x.transform(X_new_unscaled)
# Durchführung der Vorhersage für die neuen Daten
predicted_scaled_y = model.predict(X_new)
# Umkehrung der Skalierung, um den eigentlichen y-Wert zu erhalten
predicted_y = scaler_y.inverse_transform(predicted_scaled_y.reshape(-1, 1)).ravel()
# Berechnung der Abweichung zwischen den vorhergesagten Werten und den tatsächlichen Werten
#deviation = actual_y_new - predicted_y
# Hinzufügen der Abweichung zur DataFrame der neuen Daten
new_df['Predicted MVR MW'] = predicted_y
最大层数和神经元数量的限制,用于交叉验证的额外迭代循环。
对于使用神经网络来说,数据太少了。您应该使用参数较少的东西,例如线性回归。
还有其他问题,例如交叉验证未正确实现:
这允许单个点最终出现在多次折叠的测试集中。交叉验证应该将数据分为 k 个部分,然后选择这 k 个部分之一作为测试集,而无需在每次折叠之间重新整理这些部分之间的数据。
如果您有更多数据,这并不重要,但事实上,您的测试数据集最终将包含 3 个数据点。对于这么小的测试数据集,运气非常重要。
这可能是模型过度拟合的结果,也可能是分布变化的结果。从提供的细节来看很难说。