|
|
import html |
|
|
import re |
|
|
|
|
|
import great_expectations as ge |
|
|
from loguru import logger |
|
|
import pandas as pd |
|
|
|
|
|
from syntetic_issue_report_data_generation.config import ( |
|
|
INTERIM_DATA_DIR, |
|
|
RAW_DATA_DIR, |
|
|
DATASET_CONFIGs, |
|
|
) |
|
|
|
|
|
|
|
|
class DataPreprocessing: |
|
|
""" |
|
|
Class used to check dataset integrity and clean it |
|
|
""" |
|
|
|
|
|
def __init__(self, dataset_name): |
|
|
""" |
|
|
Initialize the class with the dataset configuration |
|
|
|
|
|
Args: |
|
|
dataset_name (str): Name of the dataset that needs to be processed |
|
|
""" |
|
|
self.__dataset_conf = DATASET_CONFIGs[dataset_name] |
|
|
self.__dataset_name = dataset_name |
|
|
self.__body_col = self.__dataset_conf["body_col"] |
|
|
self.__label_col = self.__dataset_conf["label_col"] |
|
|
self.__title_col = self.__dataset_conf["title_col"] |
|
|
self.__unmeaningful_body_length = None |
|
|
self.__validation_definition = None |
|
|
|
|
|
def load_dataset(self): |
|
|
""" |
|
|
Load the dataset in memory and initialize the GE context |
|
|
""" |
|
|
|
|
|
self.__df = pd.read_csv( |
|
|
RAW_DATA_DIR / f"{self.__dataset_conf['data_path']}", |
|
|
sep=self.__dataset_conf.get("sep", ","), |
|
|
encoding="utf-8", |
|
|
) |
|
|
|
|
|
|
|
|
self.__context = ge.get_context() |
|
|
|
|
|
|
|
|
self.__data_source = self.__context.data_sources.add_pandas( |
|
|
name="df", |
|
|
) |
|
|
|
|
|
self.__data_asset = self.__data_source.add_dataframe_asset(name="df_asset") |
|
|
|
|
|
|
|
|
self.__batch_definition = self.__data_asset.add_batch_definition_whole_dataframe( |
|
|
"batch definition" |
|
|
) |
|
|
self.__batch = self.__batch_definition.get_batch(batch_parameters={"dataframe": self.__df}) |
|
|
|
|
|
self.__suite = self.__context.suites.add( |
|
|
ge.core.expectation_suite.ExpectationSuite( |
|
|
name="Dataset expectation suite", |
|
|
) |
|
|
) |
|
|
|
|
|
logger.info(f"Dataset loaded: {self.__dataset_conf['data_path']}") |
|
|
|
|
|
def basic_stats(self): |
|
|
""" |
|
|
Get the basic statistics of the dataset |
|
|
""" |
|
|
print(self.__df.describe(include="all")) |
|
|
|
|
|
def __get_unmeaningful_body_length(self): |
|
|
""" |
|
|
Get the maximum length that a body needs to have to be not considered as unmeaningful |
|
|
""" |
|
|
|
|
|
lengths = self.__df[self.__body_col].fillna("").astype(str).str.len() |
|
|
q_vals = lengths.quantile([0.03]) |
|
|
unmeaningful_body_length = int(q_vals.iloc[0]) |
|
|
|
|
|
logger.info( |
|
|
f"Maximum body length to be considered unmeaningful: {unmeaningful_body_length}" |
|
|
) |
|
|
|
|
|
return unmeaningful_body_length |
|
|
|
|
|
def __clean_text( |
|
|
self, |
|
|
text, |
|
|
lower=False, |
|
|
remove_html=False, |
|
|
remove_urls=False, |
|
|
remove_code_blocks=False, |
|
|
replace_digits=False, |
|
|
remove_paths=False, |
|
|
remove_hex=False, |
|
|
url_token="<URL>", |
|
|
digits_token="<NUM>", |
|
|
path_token="<PATH>", |
|
|
hex_token="<HEX>", |
|
|
): |
|
|
""" |
|
|
Cleans the input text by removing the specificed components as input to this function |
|
|
|
|
|
Args: |
|
|
text (str): The text to be cleaned |
|
|
lower (bool): Whether to convert the text to lowercase |
|
|
remove_html (bool): Whether to remove HTML tags |
|
|
remove_urls (bool): Whether to remove URLs |
|
|
remove_code_blocks (bool): Whether to remove code blocks |
|
|
replace_digits (bool): Whether to replace digits with a token |
|
|
remove_paths (bool): Whether to remove paths |
|
|
remove_hex (bool): Whether to remove hex values |
|
|
url_token (str): The token to replace URLs with |
|
|
digits_token (str): The token to replace digits with |
|
|
path_token (str): The token to replace paths with |
|
|
hex_token (str): The token to replace hex values with |
|
|
|
|
|
Returns: |
|
|
str: The cleaned text |
|
|
""" |
|
|
if text is None: |
|
|
return "" |
|
|
s = str(text) |
|
|
if remove_html: |
|
|
s = html.unescape(s) |
|
|
|
|
|
s = re.sub(r"<[^>]+>", " ", s) |
|
|
if remove_code_blocks: |
|
|
|
|
|
s = re.sub(r"```[\s\S]*?```", " ", s) |
|
|
s = re.sub(r"`[^`]+`", " ", s) |
|
|
if remove_urls: |
|
|
s = re.sub(r"https?://\S+|www\.\S+", url_token, s) |
|
|
if replace_digits: |
|
|
s = re.sub(r"\d+", f" {digits_token} ", s) |
|
|
if remove_hex: |
|
|
|
|
|
s = re.sub(r"\b0x[0-9a-fA-F]+\b", hex_token, s) |
|
|
if remove_paths: |
|
|
|
|
|
s = re.sub(r"\b[A-Za-z]:\\(?:[^\\\s]+\\)*[^\\\s]*\b", path_token, s) |
|
|
s = re.sub(r"\\\\(?:[^\\\s]+\\)*[^\\\s]*\b", path_token, s) |
|
|
|
|
|
s = re.sub(r"(?<!\S)(?:\./|\.\./|\.\\|(?:\.\.\\)|~/)[^\s]+", path_token, s) |
|
|
|
|
|
s = re.sub(r"(?<!\S)/(?:[^/\s]+/)*[^/\s]+", path_token, s) |
|
|
|
|
|
|
|
|
s = re.sub(r"(?<!\S)(?:[A-Za-z0-9_.~-]+/(?:[^/\s]+/){1,}[^/\s]+)", path_token, s) |
|
|
|
|
|
s = re.sub( |
|
|
r"(?<!\S)[\w\-/\\]+?\.(?:py|txt|md|log|json|yml|yaml|cfg|ini|csv|sql|java|cpp|c|h|js|ts|rb|go)(?=\s|$)", |
|
|
path_token, |
|
|
s, |
|
|
) |
|
|
|
|
|
|
|
|
if lower: |
|
|
s = s.lower() |
|
|
return s |
|
|
|
|
|
def __check_columns_type_integrity_expectation(self): |
|
|
""" |
|
|
Adds the check for column type integrity to the GE suite |
|
|
""" |
|
|
|
|
|
for column in self.__df.columns: |
|
|
most_frequent_type_in_column = self.__df[column].apply(type).mode()[0] |
|
|
logger.info(f"Most frequent type in column {column}: {most_frequent_type_in_column}") |
|
|
expectation = ge.expectations.ExpectColumnValuesToBeOfType( |
|
|
column=column, type_=most_frequent_type_in_column.__name__, meta={"tag":"type_integrity_column"+column} |
|
|
) |
|
|
self.__suite.add_expectation(expectation) |
|
|
|
|
|
logger.info("Columns type integrity checks set") |
|
|
|
|
|
def __check_missing_values_expectation(self): |
|
|
""" |
|
|
Adds the check for missing values in the dataset columns to the GE suite |
|
|
""" |
|
|
|
|
|
for column in self.__df.columns: |
|
|
expectation = ge.expectations.ExpectColumnValuesToNotBeNull(column=column, meta={"tag":"missing_values_column"+column}) |
|
|
self.__suite.add_expectation(expectation) |
|
|
|
|
|
logger.info("Missing values checks set") |
|
|
|
|
|
def __check_duplicates_expectation(self): |
|
|
""" |
|
|
Adds the check for duplicated rows in the dataset to the GE suite |
|
|
""" |
|
|
|
|
|
expectation = ge.expectations.ExpectCompoundColumnsToBeUnique( |
|
|
column_list=list(self.__df.columns), |
|
|
meta={"tag":"duplicates"} |
|
|
) |
|
|
self.__suite.add_expectation(expectation) |
|
|
|
|
|
logger.info("Duplicates checks set") |
|
|
|
|
|
def __check_unmeaningful_bodies_expectation(self): |
|
|
""" |
|
|
Adds the check for unmeaningful bodies in the dataset to the GE suite |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
if not self.__unmeaningful_body_length: |
|
|
self.__unmeaningful_body_length = self.__get_unmeaningful_body_length() |
|
|
|
|
|
expectation = ge.expectations.ExpectColumnValueLengthsToBeBetween( |
|
|
column=self.__body_col, min_value=self.__unmeaningful_body_length, max_value=None, meta={"tag":"unmeaningful_bodies"} |
|
|
) |
|
|
|
|
|
self.__suite.add_expectation(expectation) |
|
|
|
|
|
logger.info("Unmeaningful bodies checks set") |
|
|
|
|
|
def check_dataset(self, checks, save_report=False, report_path="Raw data"): |
|
|
""" |
|
|
Checks the dataset integrity and returns the GE suite result |
|
|
|
|
|
Args: |
|
|
checks (list): List of checks to performed. Possible values are: |
|
|
- "column_types" checks if all the columns' values are of the same type |
|
|
- "missing_values" checks if there are missing values in the dataset |
|
|
- "duplicates" checks if there are duplicated rows in the dataset |
|
|
- "unmeaningful_bodies" checks if there are unmeaningful bodies in the dataset |
|
|
""" |
|
|
if "column_types" in checks: |
|
|
self.__check_columns_type_integrity_expectation() |
|
|
if "missing_values" in checks: |
|
|
self.__check_missing_values_expectation() |
|
|
if "duplicates" in checks: |
|
|
self.__check_duplicates_expectation() |
|
|
if "unmeaningful_bodies" in checks: |
|
|
self.__check_unmeaningful_bodies_expectation() |
|
|
|
|
|
|
|
|
if not self.__validation_definition: |
|
|
self.__validation_definition = self.__context.validation_definitions.add( |
|
|
ge.core.validation_definition.ValidationDefinition( |
|
|
name="Validation definition", |
|
|
data=self.__batch_definition, |
|
|
suite=self.__suite, |
|
|
) |
|
|
) |
|
|
|
|
|
res = self.__validation_definition.run( |
|
|
batch_parameters={"dataframe": self.__df}, |
|
|
result_format={ |
|
|
"result_format": "COMPLETE", |
|
|
"unexpected_index_column_names": [self.__body_col], |
|
|
"return_unexpected_index_query": True, |
|
|
}, |
|
|
) |
|
|
|
|
|
|
|
|
if save_report: |
|
|
document_model = ge.render.renderer.ValidationResultsPageRenderer().render(res) |
|
|
|
|
|
html_content = ge.render.view.DefaultJinjaPageView().render(document_model) |
|
|
|
|
|
with open("../reports/Great Expectation Results/"+report_path+"/"+self.__dataset_name+"_results.html", "w", encoding="utf-8") as f: |
|
|
f.write(html_content) |
|
|
|
|
|
results = [ |
|
|
{ |
|
|
"success": r["success"], |
|
|
"config": r["expectation_config"], |
|
|
"num_of_failed_rows": r["result"]["unexpected_count"] |
|
|
if "unexpected_count" in r["result"].keys() |
|
|
else None, |
|
|
"percent_of_failed_rows": r["result"]["unexpected_percent"] |
|
|
if "unexpected_percent" in r["result"].keys() |
|
|
else None, |
|
|
} |
|
|
for r in res["results"] |
|
|
] |
|
|
|
|
|
logger.info("Dataset checking completed!") |
|
|
|
|
|
return results |
|
|
|
|
|
def automated_cleaning(self): |
|
|
""" |
|
|
Automatically cleans the dataset by running the cleaning functions in the following order: |
|
|
- clean_columns_integrity |
|
|
- clean_missing_values |
|
|
- clean_duplicates |
|
|
- clean_bodies |
|
|
- clean_unmeaningful_bodies |
|
|
""" |
|
|
|
|
|
self.clean_columns_integrity() |
|
|
self.clean_missing_values() |
|
|
self.clean_duplicates() |
|
|
self.clean_bodies() |
|
|
self.clean_unmeaningful_bodies() |
|
|
|
|
|
def clean_columns_integrity(self): |
|
|
""" |
|
|
Cleans the dataset columns by removing rows of which values types differs from the type of the most values in the columns |
|
|
""" |
|
|
|
|
|
logger.info("Solving columns integrity issues...") |
|
|
|
|
|
for column in self.__df.columns: |
|
|
most_frequent_type_in_column = self.__df[column].apply(type).mode()[0] |
|
|
|
|
|
|
|
|
self.__df = self.__df[ |
|
|
self.__df[column].apply(lambda x: type(x) is most_frequent_type_in_column) |
|
|
] |
|
|
|
|
|
logger.info("Columns integrity issues solved!") |
|
|
logger.info( |
|
|
"Number of samples after cleaning columns integrity: {}".format(self.__df.shape[0]) |
|
|
) |
|
|
|
|
|
def clean_missing_values(self): |
|
|
""" |
|
|
Cleans the dataset by removing rows with missing values and empty body strings |
|
|
""" |
|
|
|
|
|
logger.info("Cleaning missing values...") |
|
|
|
|
|
|
|
|
str_cols = self.__df.select_dtypes(include=["object"]).columns |
|
|
self.__df = self.__df.dropna().reset_index(drop=True) |
|
|
self.__df = self.__df[self.__df[str_cols].apply(lambda col: (col != "").all(), axis=1)] |
|
|
|
|
|
logger.info("Missing values cleaned!") |
|
|
logger.info( |
|
|
"Number of samples after cleaning missing values: {}".format(self.__df.shape[0]) |
|
|
) |
|
|
|
|
|
def clean_duplicates(self): |
|
|
""" |
|
|
Cleans the dataset by removing duplicate rows |
|
|
""" |
|
|
logger.info("Cleaning duplicates...") |
|
|
|
|
|
|
|
|
self.__df = self.__df.drop_duplicates(subset=self.__df.columns, keep="first") |
|
|
|
|
|
logger.info("Duplicates cleaned!") |
|
|
logger.info("Number of samples after cleaning duplicates: {}".format(self.__df.shape[0])) |
|
|
|
|
|
def clean_unmeaningful_bodies(self): |
|
|
""" |
|
|
Cleans the dataset by removing unmeaningful bodies |
|
|
""" |
|
|
logger.info("Cleaning unmeaningful bodies...") |
|
|
|
|
|
|
|
|
if not self.__unmeaningful_body_length: |
|
|
self.__unmeaningful_body_length = self.__get_unmeaningful_body_length() |
|
|
|
|
|
self.__df = self.__df[ |
|
|
self.__df[self.__body_col] |
|
|
.astype(str) |
|
|
.apply(lambda b: len(b) > self.__unmeaningful_body_length) |
|
|
] |
|
|
|
|
|
logger.info("Unmeaningful bodies cleaned!") |
|
|
logger.info( |
|
|
"Number of samples after cleaning unmeaningful bodies: {}".format(self.__df.shape[0]) |
|
|
) |
|
|
|
|
|
def clean_bodies(self): |
|
|
""" |
|
|
Cleans the dataset by cleaning the bodies remving HTML tags, URLs, Paths and Hex values |
|
|
""" |
|
|
|
|
|
logger.info("Cleaning bodies...") |
|
|
|
|
|
|
|
|
self.__df[self.__body_col] = self.__df[self.__body_col].map( |
|
|
lambda x: self.__clean_text( |
|
|
x, remove_html=True, remove_urls=True, remove_paths=True, remove_hex=True |
|
|
) |
|
|
) |
|
|
|
|
|
logger.info("Bodies cleaned!") |
|
|
logger.info("Number of samples after cleaning bodies: {}".format(self.__df.shape[0])) |
|
|
|
|
|
def get_dataset(self): |
|
|
""" |
|
|
Returns the dataset |
|
|
""" |
|
|
return self.__df |
|
|
|
|
|
def save_dataset(self, save_path=INTERIM_DATA_DIR): |
|
|
""" |
|
|
Saves the dataset to the processed data folder |
|
|
""" |
|
|
self.__df.to_csv(save_path / f"{self.__dataset_conf['data_path']}", index=False) |
|
|
|
|
|
logger.info(f"Dataset saved: {self.__dataset_conf['data_path']}") |
|
|
|