Skip to content
Snippets Groups Projects
Commit e09998f5 authored by c72-taylor's avatar c72-taylor
Browse files

model

parent 43fb9371
Branches
No related tags found
No related merge requests found
import os
import sys
import pandas as pd
import numpy as np
import re
import json
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeRegressor
from sklearn.impute import SimpleImputer
class PiecewiseLinearRegressor:
def __init__(self, max_depth=5, min_samples_leaf=20):
self.max_depth = max_depth
self.min_samples_leaf = min_samples_leaf
self.tree = None
self.linear_models = {}
self.leaf_ids = None
self._program = None # To store the model representation
def fit(self, X, y):
# First, use a decision tree to partition the space
self.tree = DecisionTreeRegressor(
max_depth=self.max_depth,
min_samples_leaf=self.min_samples_leaf,
random_state=42
)
self.tree.fit(X, y)
# Get leaf node assignments for each sample
self.leaf_ids = self.tree.apply(X)
# Fit a linear model for each leaf
unique_leaves = np.unique(self.leaf_ids)
for leaf_id in unique_leaves:
mask = self.leaf_ids == leaf_id
if np.sum(mask) > 1: # Ensure we have enough samples
leaf_model = LinearRegression()
leaf_model.fit(X[mask], y[mask])
self.linear_models[leaf_id] = leaf_model
# Generate a readable representation of the model
self._create_program_representation(X)
return self
def predict(self, X):
leaf_ids = self.tree.apply(X)
predictions = np.zeros(X.shape[0])
for leaf_id in self.linear_models:
mask = leaf_ids == leaf_id
if np.sum(mask) > 0:
predictions[mask] = self.linear_models[leaf_id].predict(X[mask])
return predictions
def _create_program_representation(self, X):
if len(self.linear_models) == 0:
self._program = "No valid model could be created"
return
model_str = []
model_str.append("Piecewise Linear Model with the following segments:")
# Sort leaf IDs for consistent output
sorted_leaves = sorted(self.linear_models.keys())
for i, leaf_id in enumerate(sorted_leaves):
linear_model = self.linear_models[leaf_id]
coefs = linear_model.coef_
intercept = linear_model.intercept_
segment_str = f"\nSegment {i + 1} (Leaf {leaf_id}):"
# Add linear equation for this segment
equation = f"y = {intercept:.4f}"
for j, coef in enumerate(coefs):
if j < X.shape[1]: # Ensure we don't go out of bounds
if coef >= 0:
equation += f" + {coef:.4f} * x{j + 1}"
else:
equation += f" - {abs(coef):.4f} * x{j + 1}"
segment_str += f"\n {equation}"
model_str.append(segment_str)
self._program = "\n".join(model_str)
def get_model_params(self):
"""Export model parameters for later use"""
model_params = {
"feature_count": self.linear_models[list(self.linear_models.keys())[0]].coef_.shape[0],
"segments": {}
}
for leaf_id, model in self.linear_models.items():
model_params["segments"][str(leaf_id)] = {
"intercept": float(model.intercept_),
"coefficients": [float(x) for x in model.coef_]
}
return model_params
# Function to convert time periods to number of days
def convert_time_period(value):
if pd.isna(value):
return np.nan
try:
# Handle numeric values
if isinstance(value, (int, float)):
return value
# Convert string to lowercase for consistency
value = str(value).lower()
# Extract number and unit
match = re.search(r'(\d+)\s*(\w+)', value)
if not match:
# Try to extract just a number
number_match = re.search(r'(\d+)', value)
if number_match:
return int(number_match.group(1))
return np.nan
number = int(match.group(1))
unit = match.group(2)
# Convert to days
if 'day' in unit:
return number
elif 'week' in unit:
return number * 7
elif 'month' in unit:
return number * 30
elif 'year' in unit:
return number * 365
else:
# If unit is not recognized, just return the number
return number
except Exception:
# Silently handle errors
return np.nan
def preprocess_data(data, target_col):
# Split features and target
X = data.drop(target_col, axis=1)
y = data[target_col]
# Categorize columns by data type
numeric_cols = X.select_dtypes(include=[np.number]).columns.tolist()
categorical_cols = X.select_dtypes(include=['object']).columns.tolist()
# Process each categorical column appropriately
for col in categorical_cols:
# Check if column contains time periods (e.g., "5 months")
if col == 'Injury_Prognosis' or any(
re.search(r'\d+\s*(?:day|week|month|year)', str(val)) for val in X[col].dropna().iloc[:20]):
X[col] = X[col].apply(convert_time_period)
# Fill missing values with median after conversion
median_value = X[col].median()
X[col] = X[col].fillna(median_value)
else:
# For regular categorical variables, use label encoding with a special category for missing values
X[col] = X[col].fillna("MISSING_VALUE")
# Then apply label encoding
le = LabelEncoder()
X[col] = le.fit_transform(X[col])
# Check for any remaining non-numeric columns
non_numeric_cols = X.select_dtypes(exclude=[np.number]).columns.tolist()
if non_numeric_cols:
# Drop any remaining non-numeric columns
X = X.drop(columns=non_numeric_cols)
# Handle missing values in target (if any)
target_missing = y.isna().sum()
if target_missing > 0:
mask = y.notna()
X = X[mask]
y = y[mask]
# Handle missing values with imputation
num_imputer = SimpleImputer(strategy='median')
X_imputed = pd.DataFrame(num_imputer.fit_transform(X), columns=X.columns)
return X_imputed, y
def main():
# Create a null file to redirect stderr
null_file = open(os.devnull, 'w')
# Save original stderr
original_stderr = sys.stderr
# Redirect stderr to null file
sys.stderr = null_file
try:
# Load the data using absolute path
file_path = f"{sys.argv[2]}"
data = pd.read_csv(file_path)
# Hard-coded target column
target_col = target_col = f"{sys.argv[1]}"
# Preprocess the data
X_clean, y_clean = preprocess_data(data, target_col)
# Split the data
X_train, X_test, y_train, y_test = train_test_split(X_clean, y_clean, test_size=0.2, random_state=42)
# Scale the features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Configure and train the piecewise linear model
piecewise_model = PiecewiseLinearRegressor(
max_depth=5, # Controls the number of segments
min_samples_leaf=20 # Minimum samples in each segment
)
piecewise_model.fit(X_train_scaled, y_train)
# Make predictions
y_pred_train = piecewise_model.predict(X_train_scaled)
y_pred_test = piecewise_model.predict(X_test_scaled)
# Evaluate the model
train_rmse = np.sqrt(mean_squared_error(y_train, y_pred_train))
test_rmse = np.sqrt(mean_squared_error(y_test, y_pred_test))
train_r2 = r2_score(y_train, y_pred_train)
test_r2 = r2_score(y_test, y_pred_test)
results = {
"Train RMSE": f"{train_rmse:.2f}",
"Test RMSE": f"{test_rmse:.2f}",
"Train R² Score": f"{train_r2:.4f}",
"Test R² Score": f"{test_r2:.4f}",
"Model": piecewise_model._program
}
# Save scaler parameters for future use
scaler_params = {
"mean": scaler.mean_.tolist(),
"scale": scaler.scale_.tolist(),
"var": scaler.var_.tolist(),
"feature_names": X_clean.columns.tolist()
}
# Save model parameters
model_params = piecewise_model.get_model_params()
# Combine all parameters needed for prediction
prediction_params = {
"scaler": scaler_params,
"model": model_params
}
# Save parameters to JSON file
input_dir = os.path.dirname(file_path) if os.path.dirname(file_path) else "."
input_filename = os.path.basename(file_path)
input_name = os.path.splitext(input_filename)[0]
model_path = os.path.join(input_dir, f"{input_name}_model.json")
with open(model_path, 'w') as f:
json.dump(prediction_params, f, indent=2)
# Save human-readable results
output_path = os.path.join(input_dir, f"{input_name}_training_results.txt")
with open(output_path, "w") as f:
f.write(f"Train RMSE: {results['Train RMSE']}\n")
f.write(f"Test RMSE: {results['Test RMSE']}\n")
f.write(f"Train R² Score: {results['Train R² Score']}\n")
f.write(f"Test R² Score: {results['Test R² Score']}\n")
f.write("\nBest piecewise linear model:")
f.write(str(results['Model']))
# Restore original stderr before printing
sys.stderr = original_stderr
print(f"Model parameters saved to: {model_path}")
print(f"Training results saved to: {output_path}")
finally:
# Restore original stderr and close the null file
sys.stderr = original_stderr
null_file.close()
if __name__ == "__main__":
main()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment