import re from dataclasses import dataclass from typing import List, Tuple UUID_RE = re.compile(r"\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}\b") IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b") EMAIL_RE = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}") PATH_RE = re.compile(r"(?:[A-Za-z]:)?(?:/|\\)[\w\-/\\\.]+") TIMESTAMP_RE = re.compile(r"\b\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}(?:\.\d+)?\b") @dataclass class PreprocessResult: """ Результат предобработки: очищенный текст, сигнатуры и замаскированные значения. """ cleaned_text: str signatures: List[str] masked: List[str] def detect_signatures(text: str) -> List[str]: """ Ищет в тексте характерные маркеры (стектрейсы, уровни логов и т.д.). """ signatures = [] if re.search(r"Traceback|Exception|Error:|Caused by:", text, re.IGNORECASE): signatures.append("stacktrace") if TIMESTAMP_RE.search(text): signatures.append("timestamps") if re.search(r"\bINFO\b|\bWARN\b|\bERROR\b|\bDEBUG\b|\bTRACE\b", text): signatures.append("log_levels") if re.search(r"CrashLoopBackOff|OOMKilled|Back-off restarting", text, re.IGNORECASE): signatures.append("k8s") if re.search(r"OutOfMemoryError|Java heap space", text, re.IGNORECASE): signatures.append("oom") if re.search(r"timeout|timed out|Connection timed out", text, re.IGNORECASE): signatures.append("timeout") return signatures def mask_sensitive(text: str) -> Tuple[str, List[str]]: """ Маскирует UUID/IP/почты/пути, возвращая новый текст и список найденных значений. """ masked = [] def _mask(pattern: re.Pattern, placeholder: str, value: str) -> str: matches = pattern.findall(value) if matches: masked.extend(f"{placeholder}:{m}" for m in matches) return pattern.sub(placeholder, value) text = _mask(UUID_RE, "", text) text = _mask(IP_RE, "", text) text = _mask(EMAIL_RE, "", text) text = _mask(PATH_RE, "", text) return text, masked def truncate_logs(text: str, head_lines: int = 120, tail_lines: int = 80, max_lines: int = 400) -> str: """ Обрезает длинные логи, сохраняя головы/хвост и вставляя разделитель. """ lines = text.splitlines() if len(lines) <= max_lines: return text head = "\n".join(lines[:head_lines]) tail = "\n".join(lines[-tail_lines:]) return head + "\n...\n" + tail def preprocess_logs(raw_text: str) -> PreprocessResult: """ Комплексная подготовка логов к классификации: нормализация, маскировка, сигнатуры. """ normalized = raw_text.strip() truncated = truncate_logs(normalized) masked_text, masked = mask_sensitive(truncated) signatures = detect_signatures(masked_text) return PreprocessResult(cleaned_text=masked_text, signatures=signatures, masked=list(masked))