diff --git a/Working Models/.gitkeep b/Working Models/.gitkeep new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/Working Models/symbolicRegressor.py b/Working Models/symbolicRegressor.py new file mode 100644 index 0000000000000000000000000000000000000000..b77eb7f83657775a41abed313400ae3b49942620 --- /dev/null +++ b/Working Models/symbolicRegressor.py @@ -0,0 +1,302 @@ +import os +import sys +import pandas as pd +import numpy as np +import re +from gplearn.genetic import SymbolicRegressor +from sklearn.model_selection import train_test_split +from sklearn.metrics import mean_squared_error, r2_score +from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder +from sklearn.pipeline import Pipeline +from sklearn.compose import ColumnTransformer +from sklearn.impute import SimpleImputer +import seaborn as sns + +### IMPORTANT: When doing these models, be careful what is printed, as it will be stored as it's response. + +# Load the data +project_root = os.path.dirname(os.path.dirname(__file__)) +file_path = f"{sys.argv[2]}" +data = pd.read_csv(file_path) + +# Will need to be changed to work with different csv files maybe ask user for their target column? +target_col = f"{sys.argv[1]}" +X = data.drop(target_col, axis=1) +y = data[target_col] + +# Function to convert time periods to number of days +def convert_time_period(value): + if pd.isna(value): + return np.nan + + try: + # Handle numeric values + if isinstance(value, (int, float)): + return value + + # Convert string to lowercase for consistency + value = str(value).lower() + + # Extract number and unit + match = re.search(r'(\d+)\s*(\w+)', value) + if not match: + # Try to extract just a number + number_match = re.search(r'(\d+)', value) + if number_match: + return int(number_match.group(1)) + return np.nan + + number = int(match.group(1)) + unit = match.group(2) + + # Convert to days + if 'day' in unit: + return number + elif 'week' in unit: + return number * 7 + elif 'month' in unit: + return number * 30 + elif 'year' in unit: + return number * 365 + else: + # If unit is not recognized, just return the number + return number + except Exception as e: + print(f"Error converting '{value}': {e}") + return np.nan + +# Categorize columns by data type +numeric_cols = X.select_dtypes(include=[np.number]).columns.tolist() +categorical_cols = X.select_dtypes(include=['object']).columns.tolist() + +# Process each categorical column appropriately +for col in categorical_cols: + + # First, fill missing values with a placeholder + missing_pct = X[col].isna().mean() * 100 + + # Check if column contains time periods (e.g., "5 months") + if col == 'Injury_Prognosis' or any( + re.search(r'\d+\s*(?:day|week|month|year)', str(val)) for val in X[col].dropna().iloc[:20]): + X[col] = X[col].apply(convert_time_period) + # Fill missing values with median after conversion + median_value = X[col].median() + X[col].fillna(median_value, inplace=True) + else: + # For regular categorical variables, use label encoding with a special category for missing values + # First, fill NaN with a placeholder string + X[col].fillna("MISSING_VALUE", inplace=True) + + # Then apply label encoding + le = LabelEncoder() + X[col] = le.fit_transform(X[col]) + + # Store mapping for reference + mapping = dict(zip(le.classes_, le.transform(le.classes_))) + +# Check for any remaining non-numeric columns +non_numeric_cols = X.select_dtypes(exclude=[np.number]).columns.tolist() +if non_numeric_cols: + # Drop any remaining non-numeric columns + X = X.drop(columns=non_numeric_cols) + +# Analyze missing values +missing_values = X.isna().sum() + +# Check for missing values in target column +target_missing = y.isna().sum() + +# Handle missing values with imputation instead of dropping + +# For numerical columns +num_imputer = SimpleImputer(strategy='median') +X_imputed = pd.DataFrame(num_imputer.fit_transform(X), columns=X.columns) + +# Handle missing values in target (if any) +if target_missing > 0: + mask = y.notna() + X_imputed = X_imputed[mask] + y_clean = y[mask] +else: + y_clean = y.copy() + +# Redefine X with imputed data +X_clean = X_imputed + +# Split the data +X_train, X_test, y_train, y_test = train_test_split(X_clean, y_clean, test_size=0.2, random_state=42) + +# Scale the features +scaler = StandardScaler() +X_train_scaled = scaler.fit_transform(X_train) +X_test_scaled = scaler.transform(X_test) + +# Configure and training the model +# Training Symbolic Regressor +symbolic_reg = SymbolicRegressor( + population_size=2000, + generations=30, + tournament_size=20, + p_crossover=0.7, + p_subtree_mutation=0.1, + p_hoist_mutation=0.05, + p_point_mutation=0.1, + max_samples=0.8, + verbose=0, + parsimony_coefficient=0.05, + random_state=42, + function_set=('add', 'sub', 'mul', 'div', 'sqrt', 'log', 'sin', 'cos') +) + +symbolic_reg.fit(X_train_scaled, y_train) + +# Make predictions +y_pred_train = symbolic_reg.predict(X_train_scaled) +y_pred_test = symbolic_reg.predict(X_test_scaled) + +# Evaluate the model +train_rmse = np.sqrt(mean_squared_error(y_train, y_pred_train)) +test_rmse = np.sqrt(mean_squared_error(y_test, y_pred_test)) +train_r2 = r2_score(y_train, y_pred_train) +test_r2 = r2_score(y_test, y_pred_test) + +### OUTPUTS + +#print(f"Train RMSE: {train_rmse:.2f}") +#print(f"Test RMSE: {test_rmse:.2f}") +#print(f"Train R² Score: {train_r2:.4f}") +#print(f"Test R² Score: {test_r2:.4f}") + +# Display the learned expression +#print("\nBest symbolic expression:") + +#print(symbolic_reg._program) + +#Frankenstein time! + +class TrainedSymbolicRegressor: + def __init__(self): + self.model = None + self._program = symbolic_reg._program + self._setup_model() + + def _setup_model(self): + # Define the predict function directly without printing + self.model = lambda X: self._predict_sample(X) + + def _predict_sample(self, X): + predictions = np.zeros(X.shape[0]) + + try: + for i in range(X.shape[0]): + # Extract features + x_vals = {} + for j in range(min(19, X.shape[1])): + x_vals[f'X{j + 1}'] = X[i, j] if j < X.shape[1] else 0 + + X1 = x_vals.get('X1', 0) + X5 = x_vals.get('X5', 0) + X6 = x_vals.get('X6', 0) + X7 = x_vals.get('X7', 0) + X8 = x_vals.get('X8', 0) + X13 = x_vals.get('X13', 0) + X15 = x_vals.get('X15', 0) + + def safe_sqrt(x): + return np.sqrt(max(0, x)) + + def safe_log(x): + return np.log(max(1e-10, abs(x))) + + def safe_div(a, b): + return a / (b if abs(b) > 1e-10 else 1e-10) + + + term1 = safe_sqrt(abs(X5 - X7)) + term2 = safe_sqrt(safe_div(term1, X6)) + term3 = safe_log(abs(np.cos(X7))) + term4 = np.sin(safe_sqrt(safe_log(safe_sqrt(abs(np.cos(X15)))))) + + + predictions[i] = safe_div(term2, term4) + except Exception: + pass + + return predictions + + def predict(self, X): + return self.model(X) + +def main(): + # Load the data + file_path = f"{sys.argv[2]}" + data = pd.read_csv(file_path) + + # Target column from arguments + target_col = f"{sys.argv[1]}" + X = data.drop(target_col, axis=1) + y = data[target_col] + + # Categorize columns by data type + numeric_cols = X.select_dtypes(include=[np.number]).columns.tolist() + categorical_cols = X.select_dtypes(include=['object']).columns.tolist() + + # Process categorical columns + for col in categorical_cols: + # Check if column contains time periods + if col == 'Injury_Prognosis' or any( + re.search(r'\d+\s*(?:day|week|month|year)', str(val)) for val in X[col].dropna().iloc[:20]): + X[col] = X[col].apply(convert_time_period) + # Fill missing values with median + median_value = X[col].median() + X[col].fillna(median_value, inplace=True) + else: + # Label encoding for categorical variables + X[col].fillna("MISSING_VALUE", inplace=True) + le = LabelEncoder() + X[col] = le.fit_transform(X[col]) + + # Drop any remaining non-numeric columns + non_numeric_cols = X.select_dtypes(exclude=[np.number]).columns.tolist() + if non_numeric_cols: + X = X.drop(columns=non_numeric_cols) + + # Handle missing values with imputation + num_imputer = SimpleImputer(strategy='median') + X_imputed = pd.DataFrame(num_imputer.fit_transform(X), columns=X.columns) + + # Handle missing values in target + target_missing = y.isna().sum() + if target_missing > 0: + mask = y.notna() + X_clean = X_imputed[mask] + y_clean = y[mask] + else: + X_clean = X_imputed + y_clean = y.copy() + + # Scale the features + scaler = StandardScaler() + X_scaled = scaler.fit_transform(X_clean) + + # Create and use the pre-trained model + model = TrainedSymbolicRegressor() + predictions = model.predict(X_scaled) + + # Create output dataframe and save to CSV + result_df = pd.DataFrame({ + f"Predicted_{target_col}": predictions + }) + + # Get output file path - same directory as input file but with _predictions suffix + input_path = file_path + input_dir = os.path.dirname(input_path) + input_filename = os.path.basename(input_path) + input_name = os.path.splitext(input_filename)[0] + output_path = os.path.join(input_dir, f"{input_name}_predictions.csv") + + result_df.to_csv(output_path, index=False) + print(output_path) + +if __name__ == "__main__": + main()