ashkihotah
prova
ed993b7
import html
import re
import great_expectations as ge
from loguru import logger
import pandas as pd
from syntetic_issue_report_data_generation.config import (
INTERIM_DATA_DIR,
RAW_DATA_DIR,
DATASET_CONFIGs,
)
class DataPreprocessing:
"""
Class used to check dataset integrity and clean it
"""
def __init__(self, dataset_name):
"""
Initialize the class with the dataset configuration
Args:
dataset_name (str): Name of the dataset that needs to be processed
"""
self.__dataset_conf = DATASET_CONFIGs[dataset_name]
self.__dataset_name = dataset_name
self.__body_col = self.__dataset_conf["body_col"]
self.__label_col = self.__dataset_conf["label_col"]
self.__title_col = self.__dataset_conf["title_col"]
self.__unmeaningful_body_length = None
self.__validation_definition = None
def load_dataset(self):
"""
Load the dataset in memory and initialize the GE context
"""
# load dataset
self.__df = pd.read_csv(
RAW_DATA_DIR / f"{self.__dataset_conf['data_path']}",
sep=self.__dataset_conf.get("sep", ","),
encoding="utf-8",
)
# load the dataset in GE
self.__context = ge.get_context()
# set data source
self.__data_source = self.__context.data_sources.add_pandas(
name="df",
)
self.__data_asset = self.__data_source.add_dataframe_asset(name="df_asset")
# batch definition
self.__batch_definition = self.__data_asset.add_batch_definition_whole_dataframe(
"batch definition"
)
self.__batch = self.__batch_definition.get_batch(batch_parameters={"dataframe": self.__df})
self.__suite = self.__context.suites.add(
ge.core.expectation_suite.ExpectationSuite(
name="Dataset expectation suite",
)
)
logger.info(f"Dataset loaded: {self.__dataset_conf['data_path']}")
def basic_stats(self):
"""
Get the basic statistics of the dataset
"""
print(self.__df.describe(include="all"))
def __get_unmeaningful_body_length(self):
"""
Get the maximum length that a body needs to have to be not considered as unmeaningful
"""
# get the 3% quantile of the body length distribution
lengths = self.__df[self.__body_col].fillna("").astype(str).str.len()
q_vals = lengths.quantile([0.03])
unmeaningful_body_length = int(q_vals.iloc[0])
logger.info(
f"Maximum body length to be considered unmeaningful: {unmeaningful_body_length}"
)
return unmeaningful_body_length
def __clean_text(
self,
text,
lower=False,
remove_html=False,
remove_urls=False, # si
remove_code_blocks=False, # forse
replace_digits=False,
remove_paths=False, # si
remove_hex=False,
url_token="<URL>",
digits_token="<NUM>",
path_token="<PATH>",
hex_token="<HEX>",
):
"""
Cleans the input text by removing the specificed components as input to this function
Args:
text (str): The text to be cleaned
lower (bool): Whether to convert the text to lowercase
remove_html (bool): Whether to remove HTML tags
remove_urls (bool): Whether to remove URLs
remove_code_blocks (bool): Whether to remove code blocks
replace_digits (bool): Whether to replace digits with a token
remove_paths (bool): Whether to remove paths
remove_hex (bool): Whether to remove hex values
url_token (str): The token to replace URLs with
digits_token (str): The token to replace digits with
path_token (str): The token to replace paths with
hex_token (str): The token to replace hex values with
Returns:
str: The cleaned text
"""
if text is None:
return ""
s = str(text)
if remove_html:
s = html.unescape(s)
# remove simple html tags
s = re.sub(r"<[^>]+>", " ", s)
if remove_code_blocks:
# remove fenced code blocks ```...``` and inline `...`
s = re.sub(r"```[\s\S]*?```", " ", s)
s = re.sub(r"`[^`]+`", " ", s)
if remove_urls:
s = re.sub(r"https?://\S+|www\.\S+", url_token, s)
if replace_digits:
s = re.sub(r"\d+", f" {digits_token} ", s)
if remove_hex:
# hex with 0x prefix, e.g. 0x1a2f
s = re.sub(r"\b0x[0-9a-fA-F]+\b", hex_token, s)
if remove_paths:
# Windows drive paths (e.g. C:\path\to\file.txt) and UNC paths (\\server\share\file)
s = re.sub(r"\b[A-Za-z]:\\(?:[^\\\s]+\\)*[^\\\s]*\b", path_token, s)
s = re.sub(r"\\\\(?:[^\\\s]+\\)*[^\\\s]*\b", path_token, s)
# Relative paths: ./file, ../dir/file, .\file, ..\dir\file, ~/something
s = re.sub(r"(?<!\S)(?:\./|\.\./|\.\\|(?:\.\.\\)|~/)[^\s]+", path_token, s)
# Unix absolute paths (e.g. /usr/bin/file) — require at least one non-slash segment
s = re.sub(r"(?<!\S)/(?:[^/\s]+/)*[^/\s]+", path_token, s)
# Repo-style or long slash-separated paths without leading slash (e.g. home/travis/build/.../file.c)
# require at least two '/' to avoid matching ordinary text with a single slash
s = re.sub(r"(?<!\S)(?:[A-Za-z0-9_.~-]+/(?:[^/\s]+/){1,}[^/\s]+)", path_token, s)
# Fallback: file-like tokens with common extensions
s = re.sub(
r"(?<!\S)[\w\-/\\]+?\.(?:py|txt|md|log|json|yml|yaml|cfg|ini|csv|sql|java|cpp|c|h|js|ts|rb|go)(?=\s|$)",
path_token,
s,
)
# collapse whitespace and strip
# s = re.sub(r"\s+", " ", s).strip()
if lower:
s = s.lower()
return s
def __check_columns_type_integrity_expectation(self):
"""
Adds the check for column type integrity to the GE suite
"""
# check if all columns are of the same type
for column in self.__df.columns:
most_frequent_type_in_column = self.__df[column].apply(type).mode()[0]
logger.info(f"Most frequent type in column {column}: {most_frequent_type_in_column}")
expectation = ge.expectations.ExpectColumnValuesToBeOfType(
column=column, type_=most_frequent_type_in_column.__name__, meta={"tag":"type_integrity_column"+column}
)
self.__suite.add_expectation(expectation)
logger.info("Columns type integrity checks set")
def __check_missing_values_expectation(self):
"""
Adds the check for missing values in the dataset columns to the GE suite
"""
# check if all columns have no missing values
for column in self.__df.columns:
expectation = ge.expectations.ExpectColumnValuesToNotBeNull(column=column, meta={"tag":"missing_values_column"+column})
self.__suite.add_expectation(expectation)
logger.info("Missing values checks set")
def __check_duplicates_expectation(self):
"""
Adds the check for duplicated rows in the dataset to the GE suite
"""
# check if there are no duplicated rows
expectation = ge.expectations.ExpectCompoundColumnsToBeUnique(
column_list=list(self.__df.columns),
meta={"tag":"duplicates"}
)
self.__suite.add_expectation(expectation)
logger.info("Duplicates checks set")
def __check_unmeaningful_bodies_expectation(self):
"""
Adds the check for unmeaningful bodies in the dataset to the GE suite
"""
# check if there are unmeaningful bodies
# an unmeaningful body is a body that has a length less that the 3% quantile of the body length distribution
if not self.__unmeaningful_body_length:
self.__unmeaningful_body_length = self.__get_unmeaningful_body_length()
expectation = ge.expectations.ExpectColumnValueLengthsToBeBetween(
column=self.__body_col, min_value=self.__unmeaningful_body_length, max_value=None, meta={"tag":"unmeaningful_bodies"}
)
self.__suite.add_expectation(expectation)
logger.info("Unmeaningful bodies checks set")
def check_dataset(self, checks, save_report=False, report_path="Raw data"):
"""
Checks the dataset integrity and returns the GE suite result
Args:
checks (list): List of checks to performed. Possible values are:
- "column_types" checks if all the columns' values are of the same type
- "missing_values" checks if there are missing values in the dataset
- "duplicates" checks if there are duplicated rows in the dataset
- "unmeaningful_bodies" checks if there are unmeaningful bodies in the dataset
"""
if "column_types" in checks:
self.__check_columns_type_integrity_expectation()
if "missing_values" in checks:
self.__check_missing_values_expectation()
if "duplicates" in checks:
self.__check_duplicates_expectation()
if "unmeaningful_bodies" in checks:
self.__check_unmeaningful_bodies_expectation()
# run the suite
if not self.__validation_definition:
self.__validation_definition = self.__context.validation_definitions.add(
ge.core.validation_definition.ValidationDefinition(
name="Validation definition",
data=self.__batch_definition,
suite=self.__suite,
)
)
res = self.__validation_definition.run(
batch_parameters={"dataframe": self.__df},
result_format={
"result_format": "COMPLETE",
"unexpected_index_column_names": [self.__body_col],
"return_unexpected_index_query": True,
},
)
# save the results on html file
if save_report:
document_model = ge.render.renderer.ValidationResultsPageRenderer().render(res)
html_content = ge.render.view.DefaultJinjaPageView().render(document_model)
with open("../reports/Great Expectation Results/"+report_path+"/"+self.__dataset_name+"_results.html", "w", encoding="utf-8") as f:
f.write(html_content)
results = [
{
"success": r["success"],
"config": r["expectation_config"],
"num_of_failed_rows": r["result"]["unexpected_count"]
if "unexpected_count" in r["result"].keys()
else None,
"percent_of_failed_rows": r["result"]["unexpected_percent"]
if "unexpected_percent" in r["result"].keys()
else None,
}
for r in res["results"]
]
logger.info("Dataset checking completed!")
return results
def automated_cleaning(self):
"""
Automatically cleans the dataset by running the cleaning functions in the following order:
- clean_columns_integrity
- clean_missing_values
- clean_duplicates
- clean_bodies
- clean_unmeaningful_bodies
"""
self.clean_columns_integrity()
self.clean_missing_values()
self.clean_duplicates()
self.clean_bodies()
self.clean_unmeaningful_bodies()
def clean_columns_integrity(self):
"""
Cleans the dataset columns by removing rows of which values types differs from the type of the most values in the columns
"""
logger.info("Solving columns integrity issues...")
# get the most common type in the dataset, column by column
for column in self.__df.columns:
most_frequent_type_in_column = self.__df[column].apply(type).mode()[0]
# remove rows of which values (for the curent column) types differs from the type of the most values in the column
self.__df = self.__df[
self.__df[column].apply(lambda x: type(x) is most_frequent_type_in_column)
]
logger.info("Columns integrity issues solved!")
logger.info(
"Number of samples after cleaning columns integrity: {}".format(self.__df.shape[0])
)
def clean_missing_values(self):
"""
Cleans the dataset by removing rows with missing values and empty body strings
"""
logger.info("Cleaning missing values...")
# remove missing values and empty body strings
str_cols = self.__df.select_dtypes(include=["object"]).columns
self.__df = self.__df.dropna().reset_index(drop=True)
self.__df = self.__df[self.__df[str_cols].apply(lambda col: (col != "").all(), axis=1)]
logger.info("Missing values cleaned!")
logger.info(
"Number of samples after cleaning missing values: {}".format(self.__df.shape[0])
)
def clean_duplicates(self):
"""
Cleans the dataset by removing duplicate rows
"""
logger.info("Cleaning duplicates...")
# remove duplicate rows
self.__df = self.__df.drop_duplicates(subset=self.__df.columns, keep="first")
logger.info("Duplicates cleaned!")
logger.info("Number of samples after cleaning duplicates: {}".format(self.__df.shape[0]))
def clean_unmeaningful_bodies(self):
"""
Cleans the dataset by removing unmeaningful bodies
"""
logger.info("Cleaning unmeaningful bodies...")
# remove unmeaningful bodies
if not self.__unmeaningful_body_length:
self.__unmeaningful_body_length = self.__get_unmeaningful_body_length()
self.__df = self.__df[
self.__df[self.__body_col]
.astype(str)
.apply(lambda b: len(b) > self.__unmeaningful_body_length)
]
logger.info("Unmeaningful bodies cleaned!")
logger.info(
"Number of samples after cleaning unmeaningful bodies: {}".format(self.__df.shape[0])
)
def clean_bodies(self):
"""
Cleans the dataset by cleaning the bodies remving HTML tags, URLs, Paths and Hex values
"""
logger.info("Cleaning bodies...")
# clean the bodies
self.__df[self.__body_col] = self.__df[self.__body_col].map(
lambda x: self.__clean_text(
x, remove_html=True, remove_urls=True, remove_paths=True, remove_hex=True
)
)
logger.info("Bodies cleaned!")
logger.info("Number of samples after cleaning bodies: {}".format(self.__df.shape[0]))
def get_dataset(self):
"""
Returns the dataset
"""
return self.__df
def save_dataset(self, save_path=INTERIM_DATA_DIR):
"""
Saves the dataset to the processed data folder
"""
self.__df.to_csv(save_path / f"{self.__dataset_conf['data_path']}", index=False)
logger.info(f"Dataset saved: {self.__dataset_conf['data_path']}")