Skip to content
Snippets Groups Projects
Commit 537e50f3 authored by c72-taylor's avatar c72-taylor
Browse files

Upload New File

parent 0b5be123
No related branches found
No related tags found
No related merge requests found
import os
import pandas as pd
import numpy as np
import re
from gplearn.genetic import SymbolicRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
# Load the data
file_path = r"C:\Users\Charlie1\PycharmProjects\shallowsinks\ActualProjectCode\DjangoProject\records\Synthetic_Data_For_Students.csv"
data = pd.read_csv(file_path)
# Will need to be changed to work with different csv files
target_col = 'SettlementValue'
y = data[target_col]
X = data.drop(target_col, axis=1)
# Function to convert time periods to number of days
def convert_time_period(value):
if pd.isna(value):
return np.nan
try:
# Handle numeric values (already in days or some other unit)
if isinstance(value, (int, float)):
return value
# Convert string to lowercase for consistency
value = str(value).lower()
# Extract number and unit
match = re.search(r'(\d+)\s*(\w+)', value)
if not match:
# Try to extract just a number
number_match = re.search(r'(\d+)', value)
if number_match:
return int(number_match.group(1))
return np.nan
number = int(match.group(1))
unit = match.group(2)
# Convert to days
if 'day' in unit:
return number
elif 'week' in unit:
return number * 7
elif 'month' in unit:
return number * 30
elif 'year' in unit:
return number * 365
else:
# If unit is not recognized, just return the number
return number
except Exception as e:
print(f"Error converting '{value}': {e}")
return np.nan
# Categorize columns by data type
numeric_cols = X.select_dtypes(include=[np.number]).columns.tolist()
categorical_cols = X.select_dtypes(include=['object']).columns.tolist()
print(f"Numeric columns: {len(numeric_cols)}")
print(f"Categorical columns: {len(categorical_cols)}")
# Process each categorical column appropriately
for col in categorical_cols:
print(f"Processing column: {col}")
# First, fill missing values with a placeholder
missing_pct = X[col].isna().mean() * 100
print(f" Missing values: {missing_pct:.1f}%")
# Check if column contains time periods (e.g., "5 months")
if col == 'Injury_Prognosis' or any(
re.search(r'\d+\s*(?:day|week|month|year)', str(val)) for val in X[col].dropna().iloc[:20]):
print(f" Converting time periods in {col} to days")
X[col] = X[col].apply(convert_time_period)
# Fill missing values with median after conversion
median_value = X[col].median()
X[col].fillna(median_value, inplace=True)
print(f" Filled missing values with median: {median_value}")
else:
# For regular categorical variables, use label encoding with a special category for missing values
print(f" Label encoding {col}")
# First, fill NaN with a placeholder string
X[col].fillna("MISSING_VALUE", inplace=True)
# Then apply label encoding
le = LabelEncoder()
X[col] = le.fit_transform(X[col])
# Store mapping for reference
mapping = dict(zip(le.classes_, le.transform(le.classes_)))
print(f" Mapping: {mapping}")
# Check for any remaining non-numeric columns
non_numeric_cols = X.select_dtypes(exclude=[np.number]).columns.tolist()
if non_numeric_cols:
print(f"Remaining non-numeric columns: {non_numeric_cols}")
# Drop any remaining non-numeric columns
X = X.drop(columns=non_numeric_cols)
# Analyze missing values
missing_values = X.isna().sum()
print("\nMissing values per column:")
print(missing_values[missing_values > 0].sort_values(ascending=False))
# Check for missing values in target column
target_missing = y.isna().sum()
print(f"\nMissing values in target column '{target_col}': {target_missing}")
# Handle missing values with imputation instead of dropping
print("\nImputing missing values...")
# For numerical columns
num_imputer = SimpleImputer(strategy='median')
X_imputed = pd.DataFrame(num_imputer.fit_transform(X), columns=X.columns)
# Handle missing values in target (if any)
if target_missing > 0:
print(f"Warning: {target_missing} missing values in target column will be dropped")
# We can't impute target values as that would create artificial targets
mask = y.notna()
X_imputed = X_imputed[mask]
y_clean = y[mask]
else:
y_clean = y.copy()
# Final dataset size after handling missing values
print(
f"Rows after handling missing values: {len(X_imputed)} out of {data.shape[0]} ({len(X_imputed) / data.shape[0] * 100:.1f}%)")
# Redefine X with imputed data
X_clean = X_imputed
# Split the data
X_train, X_test, y_train, y_test = train_test_split(X_clean, y_clean, test_size=0.2, random_state=42)
# Scale the features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Save feature names for later interpretation
feature_names = X_clean.columns.tolist()
# Configure and train the model
print("Training the Symbolic Regressor...")
symbolic_reg = SymbolicRegressor(
population_size=2000,
generations=30,
tournament_size= 20,
p_crossover=0.7,
p_subtree_mutation=0.1,
p_hoist_mutation=0.05,
p_point_mutation=0.1,
max_samples=0.8,
verbose=1,
parsimony_coefficient=0.05,
random_state=42,
function_set=('add', 'sub', 'mul', 'div', 'sqrt', 'log')
)
symbolic_reg.fit(X_train_scaled, y_train)
# Make predictions
y_pred_train = symbolic_reg.predict(X_train_scaled)
y_pred_test = symbolic_reg.predict(X_test_scaled)
# Evaluate the model
train_rmse = np.sqrt(mean_squared_error(y_train, y_pred_train))
test_rmse = np.sqrt(mean_squared_error(y_test, y_pred_test))
train_r2 = r2_score(y_train, y_pred_train)
test_r2 = r2_score(y_test, y_pred_test)
print(f"Train RMSE: {train_rmse:.2f}")
print(f"Test RMSE: {test_rmse:.2f}")
print(f"Train R² Score: {train_r2:.4f}")
print(f"Test R² Score: {test_r2:.4f}")
# Display the learned expression
print("\nBest symbolic expression:")
print(symbolic_reg._program)
# Plot actual vs predicted values
plt.figure(figsize=(10, 6))
plt.scatter(y_test, y_pred_test, alpha=0.5)
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--')
plt.xlabel('Actual SettlementValue')
plt.ylabel('Predicted SettlementValue')
plt.title('Actual vs Predicted Values')
plt.savefig('symbolic_regression_results.png')
plt.show()
# Create a more interpretable version of the formula with feature names
def convert_formula_with_feature_names(program, feature_names):
formula_str = str(program)
for i, name in enumerate(feature_names):
formula_str = formula_str.replace(f'X{i}', f'"{name}"')
return formula_str
interpretable_formula = convert_formula_with_feature_names(symbolic_reg._program, feature_names)
# Save the model expression to a file
with open('symbolic_regression_formula.txt', 'w') as f:
f.write(str(symbolic_reg._program) + '\n\n')
f.write('Interpretable formula:\n')
f.write(interpretable_formula + '\n\n')
f.write('Feature importance:\n')
# Calculate feature importance based on frequency in the program
feature_importance = {}
for i, name in enumerate(feature_names):
feature_importance[name] = str(symbolic_reg._program).count(f'X{i}')
# Sort by importance and write to file
for name, importance in sorted(feature_importance.items(), key=lambda x: x[1], reverse=True):
if importance > 0:
f.write(f"{name}: {importance}\n")
f.write('\nModel Performance:\n')
f.write(f"Train RMSE: {train_rmse:.2f}\n")
f.write(f"Test RMSE: {test_rmse:.2f}\n")
f.write(f"Train R² Score: {train_r2:.4f}\n")
f.write(f"Test R² Score: {test_r2:.4f}\n")
f.write(f"\nFeatures used: {len(feature_names)}\n")
for i, feature in enumerate(feature_names):
f.write(f"{i}: {feature}\n")
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment