Source code for datto.CleanDataframe

import os
import re
import string

import numpy as np
import pandas as pd
import spacy
from sklearn.linear_model import ElasticNet
from spacy.cli import download

try:
    nlp = spacy.load("en_core_web_sm")
except Exception:
    download("en_core_web_sm")
    nlp = spacy.load("en_core_web_sm")


[docs]class CleanDataframe: """ Clean data using NLP, regex, calculations, etc. """
[docs] def remove_names(self, text): """ Parameters -------- text: str Returns -------- cleaned_text: str """ all_names = pd.read_pickle( os.path.join(os.path.dirname(__file__), "data/all_names") ) cleaned_text = text for _, row in all_names.iterrows(): # Matches name as long as it is not followed by lowercase characters # Removing names that are a part of another word cleaned_text = re.sub(row["name"] + "(?![a-z])", " ", cleaned_text) return cleaned_text
[docs] def remove_pii(self, text): """ Remove common patterns of personally identifiable information (PII) Parameters -------- text: str Returns -------- cleaned_text: str """ regex_dict = { "credit_card_numbers": r"(?:\d[ -]*?){13,16}", "phone_numbers": r"[\+]?[\d]{0,3}[\s]?[\(]?\d{3}[\)]?[\s\-\.]{0,1}\d{3}[\s\-\.]{0,1}\d{4}", "social_security_numbers": r"(\d{3}[-\s]?\d{2}[-\s]?\d{4})", "ip_addresses": r"((?:[0-9]{1,3}\.){3}[0-9]{1,3})", "email_addresses": r"([a-zA-Z0-9_\.-]+)@([1-9a-zA-Z\.-]+)\.([a-zA-Z\.]{2,6})", "urls": r"((https?:\/\/)?(www\.)?[-a-zA-Z0-9@:%._\+~#=]{2,255}\.[a-z]{2,6}\b([-a-zA-Z0-9@:%_\+.~#?&//=]*))", } cleaned_text = text for this_pii_item in regex_dict: cleaned_text = re.sub( regex_dict[this_pii_item], "", cleaned_text, ) return cleaned_text
[docs] def lematize(self, text): """ Parameters -------- text: str Returns -------- list of spacy tokens """ spacy_text = nlp(text) return [token.lemma_ for token in spacy_text if not token.is_space]
[docs] def remove_email_greetings_signatures(self, text): """ In order to obtain the main text of an email only, this method removes greetings, signoffs, and signatures by identifying sentences with less than 5% verbs to drop. Does not replace links. Inspiration from: https://github.com/mynameisvinn/EmailParser Parameters -------- text: str Returns -------- text: str """ sentences = text.strip().split("\n") non_sentences = [] for sentence in sentences: spacy_text = nlp(sentence.strip()) verb_count = np.sum( [ ( token.pos_ == "VERB" or token.pos_ == "AUX" or token.pos_ == "ROOT" or token.pos_ == "pcomp" ) for token in spacy_text ] ) try: prob = float(verb_count) / len(spacy_text) except Exception: prob = 1.0 # If 5% or less of a sentence is verbs, it's probably not a real sentence if prob <= 0.05: non_sentences.append(sentence) for non_sentence in non_sentences: # Don't replace links if "http" not in non_sentence and non_sentence not in string.punctuation: text = text.replace(non_sentence, "") return text
[docs] def clean_column_names(self, df): """ Rename all columns to use underscores to reference columns without bracket formatting Parameters -------- df: DataFrame Returns -------- df: DataFrame """ df.rename( columns=lambda x: str(x).strip().replace(" ", "_").lower(), inplace=True ) return df
[docs] def remove_duplicate_columns(self, df): """ Remove columns with the same name Parameters -------- df: DataFrame Returns -------- df: DataFrame """ df = df.loc[:, ~df.columns.duplicated()] return df
[docs] def fix_col_data_type(self, df, col, desired_dt): """ Change column datatype using the best method for each type. Parameters -------- df: DataFrame col: str Column to change the dtype for desired_dt: str {'float', 'int', 'datetime', 'str'} Returns -------- df: DataFrame """ if desired_dt in ("float", "int"): df[col] = pd.to_numeric(df[col], errors="coerce") elif desired_dt == "datetime": df[col] = pd.to_datetime(df[col], errors="coerce") elif desired_dt == "str": df[col] = df[col].astype(str) return df
[docs] def compress_df(self, df): """ Compresses each dataframe column as much as possible depending on type and values. Parameters -------- df: DataFrame Returns -------- df: DataFrame """ for col in df.columns: if df[col].dtype == "O": unique_vals = df[col].nunique() count_vals = df[col].shape[0] if unique_vals < (count_vals * 0.5): df[col] = df[col].astype("category") elif df[col].dtype == "int64": df[col] = pd.to_numeric(df[col], downcast="unsigned") elif df[col].dtype == "float64": df[col] = pd.to_numeric(df[col], downcast="float") return df
[docs] def make_uuid(self, id_num): """ Make a UUid_num from a text string Parameters -------- id_num: str Returns -------- uuid: str """ return ( id_num[:8] + "-" + id_num[8:12] + "-" + id_num[12:16] + "-" + id_num[16:20] + "-" + id_num[20:] )
[docs] def batch_pandas_operation(self, df, num_splits, identifier_col, func): """ Use a function on a Pandas DataFrame in chunks for faster processing. Parameters -------- df: DataFrame num_splits: int identifier_col: str func: Function Returns -------- new_df: DataFrame """ new_df = pd.DataFrame() ids = df[identifier_col].unique() division = len(ids) / float(num_splits) part_ids = [ ids[int(round(division * i)) : int(round(division * (i + 1)))] for i in range(num_splits) ] for lst in part_ids: temp_df = df[df[identifier_col].isin(lst)] new_df = new_df.append(func(temp_df)) return new_df
[docs] def batch_merge_operation(self, df_1, df_2, num_splits, identifier_col, merge_col): """ Merge two Pandas DataFrame in chunks for faster processing. Parameters -------- df_1: DataFrame df_2: DataFrame num_splits: int identifier_col: str merge_col: str Returns -------- new_df: DataFrame """ new_df = pd.DataFrame() ids = df_1[identifier_col].unique() division = len(ids) / float(num_splits) part_ids = [ ids[int(round(division * i)) : int(round(division * (i + 1)))] for i in range(num_splits) ] for lst in part_ids: temp_df = df_1[df_1[identifier_col].isin(lst)] temp_df = pd.merge(temp_df, df_2, how="left", on=merge_col) new_df = new_df.append(temp_df) return new_df
[docs] def fill_nulls_using_regression_model(self, X_train, X_test): """ Trains a regression model on non-null data and predicts values to fill in nulls Parameters -------- X_train: pd.DataFrame X_test: pd.DataFrme Returns -------- X_train: pd.DataFrame X_test: pd.DataFrame """ numerical_vals = X_train.select_dtypes(exclude=["object", "bool"]) # Filling null values using a smaller regression model for column in numerical_vals.columns: # Getting indices in which the given column is not null filled = list(X_train[column].dropna().index) # Getting all the X train data for the rows in which the given column is not blank mini_training_data = ( X_train.drop(column, axis=1) .apply(lambda x: x.fillna(x.mean()), axis=0) .iloc[filled] ) # Getting the column to be filled data where it is not null mini_training_target = X_train[column].iloc[filled] # Instantiating model to predict values model = ElasticNet(alpha=0.1, l1_ratio=0.5) model.fit(mini_training_data, mini_training_target) # Getting indices in which the given column is null nulls = [x for x in X_train.index if x not in filled] # Getting all the X train data for the rows in which the given column has blank values mini_testing_data1 = ( X_train.drop(column, axis=1) .apply(lambda x: x.fillna(x.mean()), axis=0) .iloc[nulls] ) if mini_testing_data1.empty == False: # Predicting the values of the given column where it is blank predictions1 = model.predict(mini_testing_data1) # Filling in the values that are blank using the predictions X_train[column].iloc[nulls] = predictions1 # Repeating the process for X test (but just using the already trained model) nulls = [x for x in X_test.index if x not in filled] # Getting all the X test data for the rows in which the given column has blank values mini_testing_data2 = ( X_test.drop(column, axis=1) .apply(lambda x: x.fillna(x.mean()), axis=0) .iloc[nulls] ) if mini_testing_data2.empty == False: # Predicting the values of the given column where it is blank predictions2 = model.predict(mini_testing_data2) # Filling in the values that are blank using the predictions X_test[column].iloc[nulls] = predictions2 return X_train, X_test