File size: 3,266 Bytes
29fdac9
 
 
 
 
 
 
 
 
 
 
 
 
 
d76ef9a
 
 
29fdac9
 
 
 
 
 
d76ef9a
 
 
29fdac9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d76ef9a
 
 
29fdac9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d76ef9a
 
 
29fdac9
 
 
 
 
 
 
 
 
d76ef9a
 
 
29fdac9
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import re
from dataclasses import dataclass
from typing import List, Tuple


UUID_RE = re.compile(r"\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[1-5][0-9a-fA-F]{3}-[89abAB][0-9a-fA-F]{3}-[0-9a-fA-F]{12}\b")
IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
EMAIL_RE = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
PATH_RE = re.compile(r"(?:[A-Za-z]:)?(?:/|\\)[\w\-/\\\.]+")
TIMESTAMP_RE = re.compile(r"\b\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}(?:\.\d+)?\b")


@dataclass
class PreprocessResult:
    """
    Результат предобработки: очищенный текст, сигнатуры и замаскированные значения.
    """
    cleaned_text: str
    signatures: List[str]
    masked: List[str]


def detect_signatures(text: str) -> List[str]:
    """
    Ищет в тексте характерные маркеры (стектрейсы, уровни логов и т.д.).
    """
    signatures = []
    if re.search(r"Traceback|Exception|Error:|Caused by:", text, re.IGNORECASE):
        signatures.append("stacktrace")
    if TIMESTAMP_RE.search(text):
        signatures.append("timestamps")
    if re.search(r"\bINFO\b|\bWARN\b|\bERROR\b|\bDEBUG\b|\bTRACE\b", text):
        signatures.append("log_levels")
    if re.search(r"CrashLoopBackOff|OOMKilled|Back-off restarting", text, re.IGNORECASE):
        signatures.append("k8s")
    if re.search(r"OutOfMemoryError|Java heap space", text, re.IGNORECASE):
        signatures.append("oom")
    if re.search(r"timeout|timed out|Connection timed out", text, re.IGNORECASE):
        signatures.append("timeout")
    return signatures


def mask_sensitive(text: str) -> Tuple[str, List[str]]:
    """
    Маскирует UUID/IP/почты/пути, возвращая новый текст и список найденных значений.
    """
    masked = []

    def _mask(pattern: re.Pattern, placeholder: str, value: str) -> str:
        matches = pattern.findall(value)
        if matches:
            masked.extend(f"{placeholder}:{m}" for m in matches)
        return pattern.sub(placeholder, value)

    text = _mask(UUID_RE, "<UUID>", text)
    text = _mask(IP_RE, "<IP>", text)
    text = _mask(EMAIL_RE, "<EMAIL>", text)
    text = _mask(PATH_RE, "<PATH>", text)
    return text, masked


def truncate_logs(text: str, head_lines: int = 120, tail_lines: int = 80, max_lines: int = 400) -> str:
    """
    Обрезает длинные логи, сохраняя головы/хвост и вставляя разделитель.
    """
    lines = text.splitlines()
    if len(lines) <= max_lines:
        return text
    head = "\n".join(lines[:head_lines])
    tail = "\n".join(lines[-tail_lines:])
    return head + "\n...\n" + tail


def preprocess_logs(raw_text: str) -> PreprocessResult:
    """
    Комплексная подготовка логов к классификации: нормализация, маскировка, сигнатуры.
    """
    normalized = raw_text.strip()
    truncated = truncate_logs(normalized)
    masked_text, masked = mask_sensitive(truncated)
    signatures = detect_signatures(masked_text)
    return PreprocessResult(cleaned_text=masked_text, signatures=signatures, masked=list(masked))