Spaces:
Sleeping
Sleeping
File size: 3,266 Bytes
29fdac9 d76ef9a 29fdac9 d76ef9a 29fdac9 d76ef9a 29fdac9 d76ef9a 29fdac9 d76ef9a 29fdac9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
import re
from dataclasses import dataclass
from typing import List, Tuple
UUID_RE = re.compile(r"\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}\b")
IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
EMAIL_RE = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
PATH_RE = re.compile(r"(?:[A-Za-z]:)?(?:/|\\)[\w\-/\\\.]+")
TIMESTAMP_RE = re.compile(r"\b\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}(?:\.\d+)?\b")
@dataclass
class PreprocessResult:
"""
Результат предобработки: очищенный текст, сигнатуры и замаскированные значения.
"""
cleaned_text: str
signatures: List[str]
masked: List[str]
def detect_signatures(text: str) -> List[str]:
"""
Ищет в тексте характерные маркеры (стектрейсы, уровни логов и т.д.).
"""
signatures = []
if re.search(r"Traceback|Exception|Error:|Caused by:", text, re.IGNORECASE):
signatures.append("stacktrace")
if TIMESTAMP_RE.search(text):
signatures.append("timestamps")
if re.search(r"\bINFO\b|\bWARN\b|\bERROR\b|\bDEBUG\b|\bTRACE\b", text):
signatures.append("log_levels")
if re.search(r"CrashLoopBackOff|OOMKilled|Back-off restarting", text, re.IGNORECASE):
signatures.append("k8s")
if re.search(r"OutOfMemoryError|Java heap space", text, re.IGNORECASE):
signatures.append("oom")
if re.search(r"timeout|timed out|Connection timed out", text, re.IGNORECASE):
signatures.append("timeout")
return signatures
def mask_sensitive(text: str) -> Tuple[str, List[str]]:
"""
Маскирует UUID/IP/почты/пути, возвращая новый текст и список найденных значений.
"""
masked = []
def _mask(pattern: re.Pattern, placeholder: str, value: str) -> str:
matches = pattern.findall(value)
if matches:
masked.extend(f"{placeholder}:{m}" for m in matches)
return pattern.sub(placeholder, value)
text = _mask(UUID_RE, "<UUID>", text)
text = _mask(IP_RE, "<IP>", text)
text = _mask(EMAIL_RE, "<EMAIL>", text)
text = _mask(PATH_RE, "<PATH>", text)
return text, masked
def truncate_logs(text: str, head_lines: int = 120, tail_lines: int = 80, max_lines: int = 400) -> str:
"""
Обрезает длинные логи, сохраняя головы/хвост и вставляя разделитель.
"""
lines = text.splitlines()
if len(lines) <= max_lines:
return text
head = "\n".join(lines[:head_lines])
tail = "\n".join(lines[-tail_lines:])
return head + "\n...\n" + tail
def preprocess_logs(raw_text: str) -> PreprocessResult:
"""
Комплексная подготовка логов к классификации: нормализация, маскировка, сигнатуры.
"""
normalized = raw_text.strip()
truncated = truncate_logs(normalized)
masked_text, masked = mask_sensitive(truncated)
signatures = detect_signatures(masked_text)
return PreprocessResult(cleaned_text=masked_text, signatures=signatures, masked=list(masked))
|