Spaces:
Sleeping
Sleeping
| import re | |
| from dataclasses import dataclass | |
| from typing import List, Tuple | |
| UUID_RE = re.compile(r"\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}\b") | |
| IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b") | |
| EMAIL_RE = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}") | |
| PATH_RE = re.compile(r"(?:[A-Za-z]:)?(?:/|\\)[\w\-/\\\.]+") | |
| TIMESTAMP_RE = re.compile(r"\b\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}(?:\.\d+)?\b") | |
| class PreprocessResult: | |
| """ | |
| Результат предобработки: очищенный текст, сигнатуры и замаскированные значения. | |
| """ | |
| cleaned_text: str | |
| signatures: List[str] | |
| masked: List[str] | |
| def detect_signatures(text: str) -> List[str]: | |
| """ | |
| Ищет в тексте характерные маркеры (стектрейсы, уровни логов и т.д.). | |
| """ | |
| signatures = [] | |
| if re.search(r"Traceback|Exception|Error:|Caused by:", text, re.IGNORECASE): | |
| signatures.append("stacktrace") | |
| if TIMESTAMP_RE.search(text): | |
| signatures.append("timestamps") | |
| if re.search(r"\bINFO\b|\bWARN\b|\bERROR\b|\bDEBUG\b|\bTRACE\b", text): | |
| signatures.append("log_levels") | |
| if re.search(r"CrashLoopBackOff|OOMKilled|Back-off restarting", text, re.IGNORECASE): | |
| signatures.append("k8s") | |
| if re.search(r"OutOfMemoryError|Java heap space", text, re.IGNORECASE): | |
| signatures.append("oom") | |
| if re.search(r"timeout|timed out|Connection timed out", text, re.IGNORECASE): | |
| signatures.append("timeout") | |
| return signatures | |
| def mask_sensitive(text: str) -> Tuple[str, List[str]]: | |
| """ | |
| Маскирует UUID/IP/почты/пути, возвращая новый текст и список найденных значений. | |
| """ | |
| masked = [] | |
| def _mask(pattern: re.Pattern, placeholder: str, value: str) -> str: | |
| matches = pattern.findall(value) | |
| if matches: | |
| masked.extend(f"{placeholder}:{m}" for m in matches) | |
| return pattern.sub(placeholder, value) | |
| text = _mask(UUID_RE, "<UUID>", text) | |
| text = _mask(IP_RE, "<IP>", text) | |
| text = _mask(EMAIL_RE, "<EMAIL>", text) | |
| text = _mask(PATH_RE, "<PATH>", text) | |
| return text, masked | |
| def truncate_logs(text: str, head_lines: int = 120, tail_lines: int = 80, max_lines: int = 400) -> str: | |
| """ | |
| Обрезает длинные логи, сохраняя головы/хвост и вставляя разделитель. | |
| """ | |
| lines = text.splitlines() | |
| if len(lines) <= max_lines: | |
| return text | |
| head = "\n".join(lines[:head_lines]) | |
| tail = "\n".join(lines[-tail_lines:]) | |
| return head + "\n...\n" + tail | |
| def preprocess_logs(raw_text: str) -> PreprocessResult: | |
| """ | |
| Комплексная подготовка логов к классификации: нормализация, маскировка, сигнатуры. | |
| """ | |
| normalized = raw_text.strip() | |
| truncated = truncate_logs(normalized) | |
| masked_text, masked = mask_sensitive(truncated) | |
| signatures = detect_signatures(masked_text) | |
| return PreprocessResult(cleaned_text=masked_text, signatures=signatures, masked=list(masked)) | |