import csv
import datetime
import os
import random
import re
import string
import warnings
from html import unescape
import graphviz
import lime
import lime.lime_tabular
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import shap
import spacy
import statsmodels.api as sm
from gensim.corpora import Dictionary
from gensim.models import CoherenceModel, nmf
from nltk.corpus import stopwords
from sklearn import set_config
from sklearn.decomposition import NMF
from sklearn.feature_extraction.text import (
ENGLISH_STOP_WORDS,
CountVectorizer,
TfidfVectorizer,
)
from sklearn.feature_selection import VarianceThreshold
from sklearn.metrics import (
accuracy_score,
classification_report,
f1_score,
mean_squared_error,
median_absolute_error,
multilabel_confusion_matrix,
precision_score,
r2_score,
recall_score,
)
from sklearn.model_selection import train_test_split
from sklearn.tree import export_graphviz
from spacy.cli import download
from tabulate import tabulate
from datto.CleanDataframe import CleanDataframe
from datto.utils import *
# Hide precision/recall/f1 warnings for model scorings
warnings.filterwarnings("always")
try:
nlp = spacy.load("en_core_web_sm")
except Exception:
download("en_core_web_sm")
nlp = spacy.load("en_core_web_sm")
[docs]class ModelResults:
"""
Evaluate model performance & explore output
"""
def _jaccard_similarity(self, topic_1, topic_2):
"""
Derives the Jaccard similarity of two topics
Jaccard similarity:
- A statistic used for comparing the similarity and diversity of sample sets
- J(A,B) = (A ∩ B)/(A ∪ B)
- Goal is low Jaccard scores for coverage of the diverse elements
Parameters
--------
topic_1: list
topic_2: list
Returns
--------
score: float
"""
intersection = set(topic_1).intersection(set(topic_2))
union = set(topic_1).union(set(topic_2))
score = float(len(intersection)) / float(len(union))
return score
[docs] def most_similar_texts(
self,
X,
text_column_name,
chosen_num_topics=None,
chosen_stopwords=set(),
exclude_numbers=False,
exclude_times=False,
exclude_months=False,
exclude_weekdays=False,
exclude_greetings_goodbyes=False,
exclude_adverbs=False,
num_examples=15,
min_df=3,
max_df=0.1,
min_ngrams=1,
max_ngrams=3,
):
"""
Uses NMF clustering to create n topics based on adjusted word frequencies
Parameters
--------
X: DataFrame
num_examples: int
text_column_name: str
chosen_num_topics: int
Optional - if none algorithm will determine best number
chosen_stopwords: set
Option to add in your own unique stopwords
exclude_numbers: bool
Adding numbers 0-3000 (with & without commas) as additional stopwords
exclude_times: bool
Adding times as additional stopwords (e.g. `8:00`)
exclude_months: bool
Adding month names as additional stopwords
exclude_weekdays: bool
Adding weekday names as additional stopwords
exclude_greetings_goodbyes: bool
Adding common greetings & goodbyes as additional stopwords (e.g. `hello`)
exclude_adverbs: bool
Adding common adverbs as additional stopwords (e.g. `especially`)
min_df: float
Minimum number/proportion of docs that need to have the words
max_df: float
Maximum number/proportion of docs that can have the words
min_ngrams: int
Minimum number of words needed in phrases found
max_ngrams: int
Maximum number of words in phrases found
Returns
--------
all_topics: DataFrame
Top n words/phrases per topic
original_with_keywords: DataFrame
Original text with topic number assigned to each
model: NMF model
"""
X = X[~X[text_column_name].isna()]
X = X[X[text_column_name] != ""]
X = X[X[text_column_name] != " "]
# Remove HTML & unicode characters
X[text_column_name] = X[text_column_name].apply(
lambda x: unescape(x)
.encode("ascii", errors="ignore")
.decode("ascii")
.replace(" ", " ")
.replace(" ", " ")
.replace(" ", " ")
.strip()
)
with_stopword_params = chosen_stopwords
if exclude_numbers:
with_stopword_params = with_stopword_params | numbers
if exclude_times:
with_stopword_params = with_stopword_params | times
if exclude_months:
with_stopword_params = with_stopword_params | months
if exclude_weekdays:
with_stopword_params = with_stopword_params | weekdays
if exclude_greetings_goodbyes:
with_stopword_params = with_stopword_params | greetings_goodbyes
if exclude_adverbs:
with_stopword_params = with_stopword_params | adverbs
all_stop_words = (
# Combine stopwords from all the packages
set(ENGLISH_STOP_WORDS)
| set(stopwords.words("english"))
| nlp.Defaults.stop_words
| set(string.punctuation)
| set(string.ascii_lowercase)
| set(
[
"-PRON-",
" ",
"i.e",
"e.g.",
"-",
"--",
"---",
"----",
"..",
"...",
"....",
"w/",
"^^",
"^^^",
"^^^^",
"’",
"~~",
"~~~",
"~~~~",
"and/or",
]
)
| set(with_stopword_params)
)
ct = CleanDataframe()
vectorizer = TfidfVectorizer(
tokenizer=ct.lematize,
ngram_range=(min_ngrams, max_ngrams),
stop_words=all_stop_words,
min_df=min_df,
max_df=max_df,
)
vectors = vectorizer.fit_transform(X[text_column_name]).todense()
# Adding words/phrases used in text data frequencies back into the dataset (so we can see feature importances later)
vocab = list(vectorizer.get_feature_names_out())
vector_df = pd.DataFrame(vectors, columns=vocab, index=X.index)
if not chosen_num_topics:
# Inspired by: https://bit.ly/3CqH2Zw
if X.shape[0] <= 5:
return "Too few examples to categorize."
elif X.shape[0] <= 75:
topic_nums = list(np.arange(5, X.shape[0], 5))
else:
topic_nums = list(np.arange(5, 75 + 1, 5))
texts = X[text_column_name].apply(ct.lematize)
# In gensim a dictionary is a mapping between words and their integer id
dictionary = Dictionary(texts)
# Filter out extremes to limit the number of features
dictionary.filter_extremes(no_below=2, no_above=0.85, keep_n=5000)
# Create the bag-of-words format (list of (token_id, token_count))
corpus = [dictionary.doc2bow(text) for text in texts]
NMF_models = {}
NMF_topics = {}
# Iterate through possible topic numbers and make models for each
for i in topic_nums:
NMF_models[i] = nmf.Nmf(
corpus=corpus,
id2word=dictionary,
num_topics=i,
chunksize=2000,
passes=20,
random_state=42,
)
shown_topics = NMF_models[i].show_topics(
num_topics=i, num_words=15, formatted=False
)
NMF_topics[i] = [
[word[0] for word in topic[1]] for topic in shown_topics
]
# Measure Jaccard similarity between topics
# Verify that these scores are low, i.e. topics are distinct
NMF_stability = {}
for i in range(0, len(topic_nums) - 1):
jaccard_sims = []
for _, topic1 in enumerate(NMF_topics[topic_nums[i]]):
sims = []
for _, topic2 in enumerate(NMF_topics[topic_nums[i + 1]]):
sims.append(self._jaccard_similarity(topic1, topic2))
jaccard_sims.append(sims)
NMF_stability[topic_nums[i]] = jaccard_sims
mean_stabilities = [
np.array(NMF_stability[i]).mean() for i in topic_nums[:-1]
]
# Measure coherence scores
# Verify these are high, i.e. important words in topics appear consistently together in the texts
coherences = [
CoherenceModel(
model=NMF_models[i],
texts=texts,
dictionary=dictionary,
coherence="c_v",
).get_coherence()
for i in topic_nums[:-1]
]
coh_sta_diffs = [
coherences[i] - mean_stabilities[i]
for i in range(len(topic_nums[:-1]))[:-1]
]
coh_sta_max = max(coh_sta_diffs)
coh_sta_max_idxs = [
i for i, j in enumerate(coh_sta_diffs) if j == coh_sta_max
]
ideal_topic_num_index = coh_sta_max_idxs[0]
chosen_num_topics = topic_nums[ideal_topic_num_index]
model = NMF(n_components=chosen_num_topics, random_state=42)
model.fit(vectors)
component_loadings = model.transform(vectors)
top_topics = pd.DataFrame(
np.argmax(component_loadings, axis=1), columns=["top_topic_num"]
)
top_topics["second_topic"] = (-component_loadings).argsort(axis=1)[:, 1]
top_topics["third_topic"] = (-component_loadings).argsort(axis=1)[:, 2]
top_topic_loading = pd.DataFrame(
np.max(component_loadings, axis=1), columns=["top_topic_loading"]
)
X.reset_index(inplace=True, drop=False)
vector_df.reset_index(inplace=True, drop=True)
# Fix for duplicate text_column_name
vector_df.columns = [x + "_vector" for x in vector_df.columns]
combined_df = pd.concat([X, vector_df, top_topics, top_topic_loading], axis=1)
combined_df.sort_values(by="top_topic_loading", ascending=False, inplace=True)
combined_df = pd.concat([X, vector_df, top_topics], axis=1)
topic_words = {}
sample_texts_lst = []
for topic, comp in enumerate(model.components_):
word_idx = np.argsort(comp)[::-1][:num_examples]
topic_words[topic] = [vocab[i] for i in word_idx]
examples_lst = [
x
for x in list(
combined_df[combined_df["top_topic_num"] == topic][
text_column_name
].values
)
if topic_words[topic][0] in x
or topic_words[topic][1] in x
or topic_words[topic][2] in x
or topic_words[topic][3] in x
or topic_words[topic][4] in x
]
# If not enough examples, check for second, third, etc. topic loading
if len(examples_lst) < num_examples:
extra_examples_lst_2 = [
x
for x in list(
combined_df[combined_df["second_topic"] == topic][
text_column_name
].values
)
if topic_words[topic][0] in x
or topic_words[topic][1] in x
or topic_words[topic][2] in x
or topic_words[topic][3] in x
or topic_words[topic][4] in x
]
examples_lst.extend(extra_examples_lst_2)
# If not enough examples still, check for third topic loading
if len(examples_lst) < num_examples:
extra_examples_lst_3 = [
x
for x in list(
combined_df[combined_df["third_topic"] == topic][
text_column_name
].values
)
if topic_words[topic][0] in x
or topic_words[topic][1] in x
or topic_words[topic][2] in x
or topic_words[topic][3] in x
or topic_words[topic][4] in x
]
examples_lst.extend(extra_examples_lst_3)
# Append examples that have one of the top keywords in topic
sample_texts_lst.append(examples_lst[:num_examples])
topic_words_df = pd.DataFrame(
columns=[
"topic_num",
"num_in_category",
"top_words_and_phrases",
"sample_texts",
]
)
topic_words_df["topic_num"] = [k for k, _ in topic_words.items()]
topic_words_df["num_in_category"] = (
combined_df.groupby("top_topic_num").count().iloc[:, 0]
)
topic_words_df["top_words_and_phrases"] = [x for x in topic_words.values()]
topic_words_df["sample_texts"] = sample_texts_lst
topic_words_explode = pd.DataFrame(
topic_words_df["sample_texts"].tolist(),
index=topic_words_df.index,
)
topic_words_explode.columns = [
"example{}".format(num) for num in range(len(topic_words_explode.columns))
]
all_topics = pd.concat(
[
topic_words_df[
["topic_num", "num_in_category", "top_words_and_phrases"]
],
topic_words_explode,
],
axis=1,
)
print("Topics created with top words & example texts:")
print(all_topics)
original_plus_topics = combined_df[list(X.columns) + ["index", "top_topic_num"]]
original_with_keywords = pd.merge(
original_plus_topics,
all_topics[["topic_num", "top_words_and_phrases"]],
left_on="top_topic_num",
right_on="topic_num",
how="left",
)[[text_column_name, "topic_num", "top_words_and_phrases"]]
all_topics.sort_values(by="num_in_category", ascending=False, inplace=True)
return (
all_topics,
original_with_keywords,
model,
)
[docs] def coefficients_graph(
self,
X_train,
X_test,
model,
model_type,
filename="shap_graph",
path="../images/",
multiclass=False,
y_test=None,
):
"""
Displays graph of feature importances.
* Number of horizontal axis indicates magnitude of effect on
target variable (e.g. affected by 0.25)
* Red/blue indicates feature value (increasing or decreasing feature
has _ effect)
* Blue & red mixed together indicate there isn't a clear
effect on the target variable
* For classification - interpreting magnitude number / x axis - changes the
predicted probability of y on average by _ percentage points (axis value * 100)
Parameters
--------
X_train: pd.DataFrame
X_test: pd.DataFrame
model: fit model object
model_type: str
'classification' or 'regression'
filename: str
multiclass: bool
y_test: pd.DataFrame
Only needed for multiclass models
"""
if not os.path.exists(path):
os.makedirs(path)
med = X_train.median().values.reshape((1, X_train.shape[1]))
# Runs too slow if X_test is huge, take a representative sample
if X_test.shape[0] > 1000:
X_test_sample = X_test.sample(1000)
else:
X_test_sample = X_test
if multiclass:
lst_all_shap_values = []
for class_num in range(len(y_test.columns)):
f = lambda x: model.predict_proba(x)[class_num][:, 1]
explainer = shap.KernelExplainer(f, med)
shap_values = explainer.shap_values(X_test_sample)
lst_all_shap_values.append(shap_values)
class_name = y_test.columns[class_num]
print(f"SHAP Summary Plot for Class: {class_name}")
shap.summary_plot(shap_values, X_test_sample)
plt.tight_layout()
plt.savefig(f"{path}{class_name}_{filename}.png")
return np.array(lst_all_shap_values)
elif model_type.lower() == "classification":
f = lambda x: model.predict_proba(x)[:, 1]
else:
f = lambda x: model.predict(x)
explainer = shap.KernelExplainer(f, med)
shap_values = explainer.shap_values(X_test_sample)
shap.summary_plot(shap_values, X_test_sample)
plt.tight_layout()
plt.savefig(f"{path}_{filename}.png")
return shap_values
[docs] def most_common_words_by_group(
self,
X,
text_col_name,
group_col_name,
num_examples,
num_times_min,
min_ngram,
):
"""
Get the most commons phrases for defined groups.
Parameters
--------
X: DataFrame
text_col_name: str
group_col_name: str
num_examples: int
Number of text examples to include per group
num_times_min: int
Minimum number of times word/phrase must appear in texts
min_ngram: int
Returns
--------
overall_counts_df: DataFrame
Has groups, top words, and counts
"""
# Fix for when column name is the same as an ngram column name
X["group_column"] = X[group_col_name]
# Remove all other unneeded columns
X = X[[text_col_name, "group_column"]]
all_stop_words = (
set(ENGLISH_STOP_WORDS)
| set(["-PRON-"])
| set(string.punctuation)
| set([" "])
)
cv = CountVectorizer(
stop_words=all_stop_words,
ngram_range=(min_ngram, 3),
min_df=num_times_min,
max_df=0.4,
)
vectors = cv.fit_transform(X[text_col_name]).todense()
words = list(cv.get_feature_names_out())
vectors_df = pd.DataFrame(vectors, columns=words)
group_plus_vectors = pd.concat([vectors_df, X.reset_index(drop=False)], axis=1)
count_words = pd.DataFrame(
group_plus_vectors.groupby("group_column").count()["index"]
)
count_words = count_words.loc[:, ~count_words.columns.duplicated()]
# Fix for when "count" is an ngram column
count_words.columns = ["count_ngrams"]
group_plus_vectors = group_plus_vectors.merge(
count_words, on="group_column", how="left"
)
group_plus_vectors["count_ngrams"].fillna(0, inplace=True)
sums_by_col = (
group_plus_vectors[
group_plus_vectors.columns[
~group_plus_vectors.columns.isin(
[
text_col_name,
"index",
]
)
]
]
.groupby("group_column")
.sum()
)
sums_by_col.sort_values(by="count_ngrams", ascending=False, inplace=True)
sums_by_col.drop("count_ngrams", axis=1, inplace=True)
array_sums = np.array(sums_by_col)
sums_values_descending = -np.sort(-array_sums, axis=1)
sums_indices_descending = (-array_sums).argsort()
highest_sum = pd.DataFrame(sums_values_descending[:, 0])
highest_sum.columns = ["highest_sum"]
sums_by_col["highest_sum"] = highest_sum["highest_sum"].values
overall_counts_df = pd.DataFrame(columns=["group_name", "top_words_and_counts"])
i = 0
for row in sums_by_col.index:
dict_scores = {}
temp_df = pd.DataFrame(columns=["group_name", "top_words_and_counts"])
temp_df["group_name"] = [row]
top_columns = sums_by_col.columns[
sums_indices_descending[i][:num_examples]
].values
top_counts = sums_values_descending[i][:num_examples]
[dict_scores.update({x: y}) for x, y in zip(top_columns, top_counts)]
temp_df["top_words_and_counts"] = [dict_scores]
overall_counts_df = overall_counts_df.append([temp_df])
print(f"Group Name: {row}\n")
for k, v in dict_scores.items():
print(k, v)
print("\n~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n")
i += 1
return overall_counts_df
[docs] def score_final_model(
self,
model_type,
X_test,
y_test,
trained_model,
csv_file_name="final_model_results",
multiclass=False,
):
"""
Score your model on the test dataset. Only run this once to get an idea of how your model will perform in realtime.
Run it after you have chosen your model & parameters to avoid problems with overfitting.
Parameters
--------
model_type: str
X_test: DataFrame
y_test: DataFrame
trained_model: sklearn model
multiclass: bool
csv_file_name: str
Returns
--------
model: model
Fit model
y_predicted: array
"""
set_config(print_changed_only=True)
cleaned_date = (
datetime.datetime.today().isoformat(" ", "seconds").replace(" ", "-")
)
# Predict actual scores
y_predicted = trained_model.predict(X_test)
if multiclass:
pscore = round(precision_score(y_test, y_predicted, average="weighted"), 7)
rscore = round(recall_score(y_test, y_predicted, average="weighted"), 7)
ascore = round(accuracy_score(y_test, y_predicted), 7)
f1score = round(f1_score(y_test, y_predicted, average="weighted"), 7)
temp_df = pd.DataFrame(
[
[
trained_model,
pscore,
rscore,
ascore,
f1score,
]
],
columns=[
"model",
"precision",
"recall",
"accuracy",
"f1",
],
)
with open(
f"{'/'.join(csv_file_name.split('/')[:-1])}/model_scoring_crosstabs_{cleaned_date}.txt",
"w",
) as f:
print(f"Model: {trained_model}", file=f)
print("\n", file=f)
print("Overall scores:", file=f)
print("\n", file=f)
print(
tabulate(
temp_df[
[
"precision",
"recall",
"accuracy",
"f1",
]
],
headers="keys",
tablefmt="pipe",
numalign="left",
showindex=False,
),
file=f,
)
print("\n", file=f)
# Precision / recall / f1-score for each predicted class
report_df = (
pd.DataFrame(
classification_report(
y_test,
y_predicted,
target_names=y_test.columns,
output_dict=True,
)
)
.transpose()
.drop(["micro avg", "macro avg", "weighted avg", "samples avg"])
.drop("support", axis=1)
)
print(
tabulate(
report_df,
headers="keys",
tablefmt="pipe",
numalign="left",
),
file=f,
)
print("\n", file=f)
# Counts of predicted vs actuals + true vs false
confusion_matrix = multilabel_confusion_matrix(y_test, y_predicted)
matrix_dfs = [
pd.DataFrame(
matrix,
columns=["Predicted False", "Predicted True"],
index=["Actual False", "Actual True"],
)
for matrix in confusion_matrix
]
# Print separately so class name gets printed cleanly first
for i in range(len(y_test.columns)):
print(y_test.columns[i], file=f)
print(
tabulate(
matrix_dfs[i],
headers="keys",
tablefmt="pipe",
numalign="left",
),
file=f,
)
print("\n", file=f)
elif model_type.lower() == "classification":
pscore = round(precision_score(y_test, y_predicted), 7)
rscore = round(recall_score(y_test, y_predicted), 7)
ascore = round(accuracy_score(y_test, y_predicted), 7)
f1score = round(f1_score(y_test, y_predicted), 7)
temp_df = pd.DataFrame(
[
[
trained_model,
pscore,
rscore,
ascore,
f1score,
]
],
columns=[
"model",
"precision",
"recall",
"accuracy",
"f1",
],
)
with open(
f"{'/'.join(csv_file_name.split('/')[:-1])}/model_scoring_crosstabs_{cleaned_date}.txt",
"w",
) as f:
print(f"Model: {trained_model}", file=f)
print("\n", file=f)
print("Overall scores:", file=f)
print("\n", file=f)
print(
tabulate(
temp_df[
[
"precision",
"recall",
"accuracy",
"f1",
]
],
headers="keys",
tablefmt="pipe",
numalign="left",
showindex=False,
),
file=f,
)
print("\n", file=f)
crosstab = pd.crosstab(
np.array(y_test),
y_predicted,
)
class_values = crosstab.columns
crosstab.columns = [f"Predicted {val}" for val in class_values]
crosstab.index = [f"Actual {val}" for val in class_values]
print(
tabulate(
crosstab,
headers="keys",
tablefmt="pipe",
numalign="left",
),
file=f,
)
print("\n", file=f)
sum_crosstab = crosstab.to_numpy().sum()
prop_crosstab = pd.crosstab(
np.array(y_test),
y_predicted,
).apply(lambda r: round(r / sum_crosstab, 3))
class_values = prop_crosstab.columns
prop_crosstab.columns = [f"Predicted {val}" for val in class_values]
prop_crosstab.index = [f"Actual {val}" for val in class_values]
print(
tabulate(
prop_crosstab,
headers="keys",
tablefmt="pipe",
numalign="left",
),
file=f,
)
else:
mse = mean_squared_error(y_test, y_predicted)
mae = median_absolute_error(y_test, y_predicted)
r2 = round(r2_score(y_test, y_predicted), 7)
temp_df = pd.DataFrame(
[
[
trained_model,
round((mse ** 5) * -1, 7),
round((mae * -1), 7),
r2,
]
],
columns=["model", "negative_rmse", "negative_mae", "r2"],
)
temp_df["timestamp"] = cleaned_date
try:
previous_df = pd.read_csv(f"{csv_file_name}.csv")
if temp_df.shape[1] != previous_df.shape[1]:
print(
f"""Unable to save csv because columns do not match.
The existing model results csv has these columns: {previous_df.columns}.
The new model results csv has these columns: {temp_df.columns}."""
)
return trained_model, y_predicted
final_model_results_df = pd.concat([previous_df, temp_df], axis=0)
final_model_results_df.reset_index(inplace=True, drop=True)
except Exception:
final_model_results_df = temp_df
final_model_results_df = final_model_results_df.reindex(
sorted(final_model_results_df.columns), axis=1
)
with open(f"{csv_file_name}.csv", "w") as csvfile:
csvwriter = csv.writer(csvfile, delimiter=",")
csvwriter.writerow(final_model_results_df.columns)
for _, row in final_model_results_df.iterrows():
csvwriter.writerow(row)
return trained_model, y_predicted
[docs] def coefficients_summary(
self,
X,
y,
num_repetitions,
num_coefficients,
model_type,
multiclass=False,
):
"""
Prints average coefficient values using a regression model.
Parameters
--------
X: DataFrame
y: DataFrame
num_repetitions: int
Number of times to create models
num_coefficients: int
Number of top coefficients to display
model_type: str
'classification' or 'regression'
multiclass: bool
Returns
--------
simplified_df: DataFrame
Has mean, median, and standard deviation for coefficients after several runs
"""
coefficients_df = pd.DataFrame(
columns=["coeff", "pvals", "conf_lower", "conf_higher"]
)
X["intercept"] = 1
for _ in range(num_repetitions):
X_train, _, y_train, _ = train_test_split(X, y)
# Fix for Singular matrix error
vt = VarianceThreshold(0)
vt.fit(X_train)
cols_to_keep = X_train.columns[np.where(vt.get_support() == True)].values
X_train = X_train[cols_to_keep]
if multiclass:
model = sm.MNLogit(
np.array(y_train.astype(float)), X_train.astype(float)
)
elif model_type.lower() == "classification":
model = sm.Logit(np.array(y_train.astype(float)), X_train.astype(float))
else:
model = sm.OLS(np.array(y_train.astype(float)), X_train.astype(float))
results = model.fit()
features = results.params.index
if multiclass:
pvals = [x[0] for x in results.pvalues.values]
coeff = [x[0] for x in results.params.values]
conf_lower = results.conf_int()["lower"].values
conf_higher = results.conf_int()["upper"].values
else:
pvals = results.pvalues.values
coeff = results.params.values
conf_lower = results.conf_int()[0]
conf_higher = results.conf_int()[1]
temp_df = pd.DataFrame(
{
"features": features,
"pvals": pvals,
"coeff": coeff,
"conf_lower": conf_lower,
"conf_higher": conf_higher,
}
)
temp_df = temp_df[
["features", "coeff", "pvals", "conf_lower", "conf_higher"]
].reset_index(drop=True)
coefficients_df = coefficients_df.append(temp_df)
summary_coefficients_df = pd.DataFrame(
coefficients_df.groupby("features").agg(
[
"mean",
"median",
]
)
).reset_index(drop=False)
summary_coefficients_df.columns = [
"_".join(col) for col in summary_coefficients_df.columns
]
summary_coefficients_df.sort_values("pvals_mean", inplace=True, ascending=True)
simplified_df = summary_coefficients_df.head(num_coefficients).round(3)
print("Coefficients summary (descending by mean abs se value):")
print(simplified_df)
return simplified_df
[docs] def coefficients_individual_predictions(
self,
model,
df,
X_train,
X_test,
id_col,
num_id_examples,
num_feature_examples,
model_type,
class_names=["False", "True"],
path="../images/",
):
"""
Uses LIME to inspect an individual prediction and the features that influenced that prediction.
Parameters
--------
model: sklearn model
df: pd.DataFrame
Used for getting ids since they aren't typically in training data
X_train: pd.DataFrame
X_test: pd.DataFrame
id_col: str
num_id_examples: int
num_feature_examples: int
model_type: str
'classification' or 'regression'
class_names: str
path: str
Returns
--------
features: list
"""
if not os.path.exists(path):
os.makedirs(path)
def model_preds_adjusted(data):
if model_type.lower() == "classification":
predictions = np.array(model.predict_proba(data))
else:
predictions = np.array(model.predict(data))
return predictions
if model_type.lower() == "classification":
explainer = lime.lime_tabular.LimeTabularExplainer(
np.array(X_train),
feature_names=X_train.columns,
class_names=class_names,
mode="classification",
)
else:
explainer = lime.lime_tabular.LimeTabularExplainer(
np.array(X_train),
feature_names=X_train.columns,
class_names=class_names,
mode="regression",
)
for _ in range(num_id_examples):
row_idx = random.sample(list(X_test.index), 1)[0]
exp = explainer.explain_instance(
np.array(X_test.loc[row_idx]),
model_preds_adjusted,
# Include all features
num_features=len(X_train.columns),
# Include all classes
top_labels=len(class_names),
)
if model_type.lower() == "classification":
prediction = class_names[
model.predict_proba(pd.DataFrame(X_test.loc[row_idx]).T).argmax()
]
else:
prediction = round(
model.predict(pd.DataFrame(X_test.loc[row_idx]).T)[0], 7
)
unique_id = df.loc[row_idx][id_col]
print(f"\nID: {unique_id}")
print(f"Prediction: {prediction}\n\n")
exp_list_all = exp.as_list()
raw_features = [x[0] for x in exp_list_all]
raw_values = [x[1] for x in exp_list_all]
cleaned_features = []
for feature in exp_list_all:
try:
feature_name = re.findall("<.*<|>.*>", feature[0])[0]
except Exception:
feature_name = re.findall(".*<|.*>", feature[0])[0]
cleaned_feature_name = (
feature_name.replace("<=", "")
.replace(">=", "")
.replace("<", "")
.replace(">", "")
.strip()
)
cleaned_features.append(cleaned_feature_name)
all_feature_types = X_test.dtypes
top_feature_types = [
all_feature_types[feature] for feature in cleaned_features
]
top_features_with_types = [
[raw_feature, cleaned_feature, feature_type, raw_value]
for raw_feature, cleaned_feature, feature_type, raw_value in zip(
raw_features, cleaned_features, top_feature_types, raw_values
)
]
i = 0
for (
raw_feature,
cleaned_feature,
feature_type,
raw_value,
) in top_features_with_types:
if i > num_feature_examples:
break
actual_feature_val = X_test.loc[row_idx][cleaned_feature]
# Things that decrease the likelihood of this class are less interesting
if raw_value < 0:
pass
# Note: uint8 is a bool
# False bools aren't super interesting
elif feature_type == "uint8" and actual_feature_val == 0:
pass
# Phrase true bools slightly differently
elif feature_type == "uint8":
print(f"For this id, {cleaned_feature} was true.")
print(
f"When {cleaned_feature} is true, this increases the likelihood of prediction: {prediction}."
)
print("\n--------\n")
i += 1
else:
print(f"For this id, {cleaned_feature} was {actual_feature_val}.\n")
print(
f"When {raw_feature}, this increases the likelihood of prediction: {prediction}."
)
print("\n--------\n")
i += 1
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
fig = plt.figure()
# Up the image quality to avoid pixelated graphs
plt.rc("savefig", dpi=300)
# Limit to top features that can fit cleanly on graph
exp_list_graph = exp.as_list()[:20]
vals = [x[1] for x in exp_list_graph]
# Some labels are really long, shortening them a bit
names = [x[0][:40] for x in exp_list_graph]
vals.reverse()
names.reverse()
colors = ["green" if x > 0 else "red" for x in vals]
pos = np.arange(len(exp_list_graph)) + 0.5
plt.barh(pos, vals, align="center", color=colors)
plt.yticks(pos, names)
title = f"id: {unique_id} - Prediction: {prediction}"
plt.title(title)
plt.tight_layout()
# Need bbox to make sure title isn't cut off
plt.savefig(
f"../images/lime_graph_id_{unique_id}.png",
bbox_inches="tight",
facecolor="white",
)
return exp_list_all
[docs] def get_tree_diagram(self, model, X_train, path="../images/"):
"""
Save a diagram of a trained DecisionTree model
Parameters
--------
model: sklearn model (trained)
X_train: pd.DataFrame
path: str
"""
# Exporting text form of decision tree
dot_data = export_graphviz(
model,
out_file=f"{path}decision-tree.dot",
feature_names=X_train.columns,
filled=True,
rounded=True,
special_characters=True,
)
graph = graphviz.Source(dot_data)
# Converting text to a visual png file
os.system(f"dot -Tpng {path}decision-tree.dot -o {path}decision-tree.png")
# If the file didn't write, try reinstalling graphviz
if not os.path.exists(f"{path}decision-tree.png"):
os.system("brew install graphviz")
os.system(f"dot -Tpng {path}decision-tree.dot -o {path}decision-tree.png")
return graph