| import json |
| import zipfile |
| import pandas as pd |
| from huggingface_hub import hf_hub_download, list_repo_files |
| from llama_index.core import Document |
| from llama_index.core.text_splitter import SentenceSplitter |
| from logger.my_logging import log_message |
| from config import CHUNK_SIZE, CHUNK_OVERLAP, MAX_CHARS_TABLE, MAX_ROWS_TABLE |
| import re |
|
|
| def normalize_text(text): |
| if not text: |
| return text |
| |
| |
| |
| text = text.replace('С-', 'C') |
| text = re.sub(r'\bС(\d)', r'С\1', text) |
| return text |
|
|
| def normalize_steel_designations(text): |
| if not text: |
| return text, 0, [] |
|
|
| import re |
| |
| changes_count = 0 |
| changes_list = [] |
|
|
| |
| replacements = { |
| 'Х': 'X', |
| 'Н': 'H', |
| 'Т': 'T', |
| 'С': 'C', |
| 'В': 'B', |
| 'К': 'K', |
| 'М': 'M', |
| 'А': 'A', |
| 'Р': 'P', |
| } |
|
|
| |
| |
| pattern = r'\b\d{1,3}(?:[A-ZА-ЯЁ]\d*)+\b' |
| |
| |
| pattern_wire = r'\b[СC][ВB]-\d{1,3}(?:[A-ZА-ЯЁ]\d*)+\b' |
|
|
| def replace_in_steel_grade(match): |
| nonlocal changes_count, changes_list |
| original = match.group(0) |
| converted = ''.join(replacements.get(ch, ch) for ch in original) |
| if converted != original: |
| changes_count += 1 |
| changes_list.append(f"{original} → {converted}") |
| return converted |
| normalized_text = re.sub(pattern, replace_in_steel_grade, text) |
| normalized_text = re.sub(pattern_wire, replace_in_steel_grade, normalized_text) |
|
|
| return normalized_text, changes_count, changes_list |
|
|
| def extract_preamble(text): |
| """ |
| Извлекает контекст (первое предложение или преамбулу до двоеточия) |
| для вставки в продолжение чанков. |
| """ |
| if not text: |
| return "" |
| |
| |
| colon_match = re.match(r'^.*?:', text, re.DOTALL) |
| if colon_match: |
| preamble = colon_match.group(0) |
| if len(preamble) < 300: |
| return preamble.strip() |
|
|
| |
| sentence_match = re.match(r'^.*?(?:\.|\?|!)(?:\s|$)', text, re.DOTALL) |
| if sentence_match: |
| sentence = sentence_match.group(0) |
| if len(sentence) < 300: |
| return sentence.strip() |
| |
| |
| return text[:300] + "..." |
|
|
| def chunk_text_documents(documents): |
| text_splitter = SentenceSplitter( |
| chunk_size=CHUNK_SIZE, |
| chunk_overlap=CHUNK_OVERLAP |
| ) |
| total_normalizations = 0 |
| chunks_with_changes = 0 |
| |
| chunked = [] |
| for doc in documents: |
| parent_context = extract_preamble(doc.text) |
|
|
| chunks = text_splitter.get_nodes_from_documents([doc]) |
|
|
| for i, chunk in enumerate(chunks): |
| |
| if i > 0 and parent_context: |
| if not chunk.text.strip().startswith(parent_context[:20]): |
| original_len = len(chunk.text) |
| chunk.text = f"[Текст из начала п. {parent_context}] {chunk.text}" |
|
|
| chunk.text, changes, change_list = normalize_steel_designations(chunk.text) |
| |
| if changes > 0: |
| chunks_with_changes += 1 |
| total_normalizations += changes |
| |
| chunk.metadata.update({ |
| 'chunk_id': i, |
| 'total_chunks': len(chunks), |
| 'chunk_size': len(chunk.text) |
| }) |
| chunked.append(chunk) |
| |
| |
| if chunked: |
| avg_size = sum(len(c.text) for c in chunked) / len(chunked) |
| min_size = min(len(c.text) for c in chunked) |
| max_size = max(len(c.text) for c in chunked) |
| log_message(f"✓ Text: {len(documents)} docs → {len(chunked)} chunks") |
| log_message(f" Size stats: avg={avg_size:.0f}, min={min_size}, max={max_size} chars") |
| log_message(f" Steel designation normalization:") |
| log_message(f" - Chunks with changes: {chunks_with_changes}/{len(chunked)}") |
| log_message(f" - Total steel grades normalized: {total_normalizations}") |
| log_message(f" - Avg per affected chunk: {total_normalizations/chunks_with_changes:.1f}" if chunks_with_changes > 0 else " - No normalizations needed") |
| |
| log_message("="*60) |
| |
| return chunked |
|
|
| def chunk_table_by_content(table_data, doc_id, max_chars=MAX_CHARS_TABLE, max_rows=MAX_ROWS_TABLE): |
| headers = table_data.get('headers', []) |
| rows = table_data.get('data', []) |
| table_num = table_data.get('table_number', 'unknown') |
| table_title = table_data.get('table_title', '') |
| section = table_data.get('section', '') |
| sheet_name = table_data.get('sheet_name', '') |
| |
| |
| table_title, _, _ = normalize_steel_designations(str(table_title)) |
| section, _, _ = normalize_steel_designations(section) |
| table_num_clean = str(table_num).strip() |
| |
| |
| import re |
| if table_num_clean in ['-', '', 'unknown', 'nan']: |
| if 'приложени' in sheet_name.lower() or 'приложени' in section.lower(): |
| appendix_match = re.search(r'приложени[еия]\s*[№]?\s*(\d+)', (sheet_name + ' ' + section).lower()) |
| table_identifier = f"Приложение {appendix_match.group(1)}" if appendix_match else "Приложение" |
| else: |
| if table_title: |
| table_identifier = ' '.join(table_title.split()[:5]) |
| else: |
| table_identifier = section.split(',')[0] if section else "БезНомера" |
| else: |
| if 'приложени' in section.lower(): |
| appendix_match = re.search(r'приложени[еия]\s*[№]?\s*(\d+)', section.lower()) |
| table_identifier = f"{table_num_clean} Приложение {appendix_match.group(1)}" if appendix_match else table_num_clean |
| else: |
| table_identifier = table_num_clean |
| |
| if not rows: |
| return [] |
| |
| |
| normalized_rows = [] |
| for row in rows: |
| if isinstance(row, dict): |
| normalized_row = {} |
| for k, v in row.items(): |
| normalized_val, _, _ = normalize_steel_designations(str(v)) |
| normalized_row[k] = normalized_val |
| normalized_rows.append(normalized_row) |
| else: |
| normalized_rows.append(row) |
| |
| |
| intro_content = format_table_header(table_title) |
| |
| |
| context_content = format_table_footer(section, doc_id, table_identifier) |
| |
| |
| static_size = len(intro_content) + len(context_content) |
| available_space = max_chars - static_size - 50 |
| |
| |
| full_rows_content = format_table_rows([{**row, '_idx': i+1} for i, row in enumerate(normalized_rows)]) |
| |
| if static_size + len(full_rows_content) <= max_chars and len(normalized_rows) <= max_rows: |
| |
| content = intro_content + full_rows_content + "\n" + context_content |
| |
| metadata = { |
| 'type': 'table', |
| 'document_id': doc_id, |
| 'table_number': table_num_clean if table_num_clean not in ['-', 'unknown'] else table_identifier, |
| 'table_identifier': table_identifier, |
| 'table_title': table_title, |
| 'section': section, |
| 'sheet_name': sheet_name, |
| 'total_rows': len(normalized_rows), |
| 'chunk_size': len(content), |
| 'is_complete_table': True |
| } |
| |
| return [Document(text=content, metadata=metadata)] |
|
|
| |
| chunks = [] |
| current_rows = [] |
| current_size = 0 |
| chunk_num = 0 |
| |
| for i, row in enumerate(normalized_rows): |
| row_text = format_single_row(row, i + 1) |
| row_size = len(row_text) |
| |
| should_split = (current_size + row_size > available_space or |
| len(current_rows) >= max_rows) and current_rows |
| |
| if should_split: |
| rows_content = format_table_rows(current_rows) |
| |
| content = f"{intro_content}{rows_content}{'='*5}\nСтроки: {current_rows[0]['_idx']}-{current_rows[-1]['_idx']}\n{context_content}" |
| |
| metadata = { |
| 'type': 'table', |
| 'document_id': doc_id, |
| 'table_identifier': table_identifier, |
| 'table_title': table_title, |
| 'section': section, |
| 'chunk_id': chunk_num, |
| 'row_start': current_rows[0]['_idx'] - 1, |
| 'row_end': current_rows[-1]['_idx'], |
| 'total_rows': len(normalized_rows), |
| 'chunk_size': len(content), |
| 'is_complete_table': False |
| } |
| |
| chunks.append(Document(text=content, metadata=metadata)) |
| |
| chunk_num += 1 |
| current_rows = [] |
| current_size = 0 |
| |
| row_copy = row.copy() if isinstance(row, dict) else {'data': row} |
| row_copy['_idx'] = i + 1 |
| current_rows.append(row_copy) |
| current_size += row_size |
|
|
| if current_rows: |
| rows_content = format_table_rows(current_rows) |
| content = f"{intro_content}{rows_content}{'='*5}\nСтроки: {current_rows[0]['_idx']}-{current_rows[-1]['_idx']}\n{context_content}" |
| |
| metadata = { |
| 'type': 'table', |
| 'document_id': doc_id, |
| 'table_identifier': table_identifier, |
| 'table_title': table_title, |
| 'section': section, |
| 'chunk_id': chunk_num, |
| 'row_start': current_rows[0]['_idx'] - 1, |
| 'row_end': current_rows[-1]['_idx'], |
| 'total_rows': len(normalized_rows), |
| 'chunk_size': len(content), |
| 'is_complete_table': False |
| } |
| |
| chunks.append(Document(text=content, metadata=metadata)) |
| |
| return chunks |
|
|
| def format_table_header(table_title): |
| content = "" |
|
|
| if table_title: |
| content += f"ТАБЛИЦА {normalize_text(table_title)}\n" |
| |
| content += "ДАННЫЕ:\n" |
|
|
| return content |
|
|
| def format_single_row(row, idx): |
| if isinstance(row, dict): |
| parts = [f"{k}: {v}" for k, v in row.items() |
| if v and str(v).strip() and str(v).lower() not in ['nan', 'none', '']] |
| if parts: |
| return f"{idx}. {' | '.join(parts)}\n" |
| elif isinstance(row, list): |
| parts = [str(v) for v in row if v and str(v).strip() and str(v).lower() not in ['nan', 'none', '']] |
| if parts: |
| return f"{idx}. {' | '.join(parts)}\n" |
| return "" |
|
|
| def format_table_rows(rows): |
| content = "" |
| for row in rows: |
| idx = row.get('_idx', 0) |
| content += format_single_row(row, idx) |
| return content |
|
|
| def format_table_footer(table_identifier, doc_id, section): |
| content = "" |
|
|
| if table_identifier: |
| content += f"НОМЕР ТАБЛИЦЫ: {normalize_text(table_identifier)}\n" |
|
|
| if section: |
| content += f"РАЗДЕЛ: {normalize_text(section)}\n" |
|
|
| if doc_id: |
| content += f"ДОКУМЕНТ: {doc_id}\n" |
|
|
| return content |
|
|
| def load_json_documents(repo_id, hf_token, json_dir): |
| import zipfile |
| import tempfile |
| import os |
| |
| log_message("Loading JSON documents...") |
| |
| files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) |
| json_files = [f for f in files if f.startswith(json_dir) and f.endswith('.json')] |
| zip_files = [f for f in files if f.startswith(json_dir) and f.endswith('.zip')] |
| |
| log_message(f"Found {len(json_files)} JSON files and {len(zip_files)} ZIP files") |
| |
| documents = [] |
| stats = {'success': 0, 'failed': 0, 'empty': 0} |
| |
| for file_path in json_files: |
| try: |
| log_message(f" Loading: {file_path}") |
| local_path = hf_hub_download( |
| repo_id=repo_id, |
| filename=file_path, |
| repo_type="dataset", |
| token=hf_token |
| ) |
| |
| docs = extract_sections_from_json(local_path) |
| if docs: |
| documents.extend(docs) |
| stats['success'] += 1 |
| log_message(f" ✓ Extracted {len(docs)} sections") |
| else: |
| stats['empty'] += 1 |
| log_message(f" ⚠ No sections found") |
| |
| except Exception as e: |
| stats['failed'] += 1 |
| log_message(f" ✗ Error: {e}") |
| |
| for zip_path in zip_files: |
| try: |
| log_message(f" Processing ZIP: {zip_path}") |
| local_zip = hf_hub_download( |
| repo_id=repo_id, |
| filename=zip_path, |
| repo_type="dataset", |
| token=hf_token |
| ) |
| |
| with zipfile.ZipFile(local_zip, 'r') as zf: |
| json_files_in_zip = [f for f in zf.namelist() |
| if f.endswith('.json') |
| and not f.startswith('__MACOSX') |
| and not f.startswith('.') |
| and not '._' in f] |
| |
| log_message(f" Found {len(json_files_in_zip)} JSON files in ZIP") |
| |
| for json_file in json_files_in_zip: |
| try: |
| file_content = zf.read(json_file) |
| |
| |
| if len(file_content) < 10: |
| log_message(f" ✗ Skipping: {json_file} (file too small)") |
| stats['failed'] += 1 |
| continue |
| |
| try: |
| text_content = file_content.decode('utf-8') |
| except UnicodeDecodeError: |
| try: |
| text_content = file_content.decode('utf-8-sig') |
| except UnicodeDecodeError: |
| try: |
| text_content = file_content.decode('utf-16') |
| except UnicodeDecodeError: |
| try: |
| text_content = file_content.decode('windows-1251') |
| except UnicodeDecodeError: |
| log_message(f" ✗ Skipping: {json_file} (encoding failed)") |
| stats['failed'] += 1 |
| continue |
| |
| |
| if not text_content.strip().startswith('{') and not text_content.strip().startswith('['): |
| log_message(f" ✗ Skipping: {json_file} (not valid JSON)") |
| stats['failed'] += 1 |
| continue |
| |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, |
| suffix='.json', encoding='utf-8') as tmp: |
| tmp.write(text_content) |
| tmp_path = tmp.name |
| |
| docs = extract_sections_from_json(tmp_path) |
| if docs: |
| documents.extend(docs) |
| stats['success'] += 1 |
| log_message(f" ✓ {json_file}: {len(docs)} sections") |
| else: |
| stats['empty'] += 1 |
| log_message(f" ⚠ {json_file}: No sections") |
| |
| os.unlink(tmp_path) |
| |
| except json.JSONDecodeError as e: |
| stats['failed'] += 1 |
| log_message(f" ✗ {json_file}: Invalid JSON") |
| except Exception as e: |
| stats['failed'] += 1 |
| log_message(f" ✗ {json_file}: {str(e)[:100]}") |
| |
| except Exception as e: |
| log_message(f" ✗ Error with ZIP: {e}") |
| |
| log_message(f"="*60) |
| log_message(f"JSON Loading Stats:") |
| log_message(f" Success: {stats['success']}") |
| log_message(f" Empty: {stats['empty']}") |
| log_message(f" Failed: {stats['failed']}") |
| log_message(f"="*60) |
| |
| return documents |
|
|
| def extract_sections_from_json(json_path): |
| documents = [] |
| |
| try: |
| with open(json_path, 'r', encoding='utf-8') as f: |
| data = json.load(f) |
| |
| doc_id = data.get('document_metadata', {}).get('document_id', 'unknown') |
| |
| |
| for section in data.get('sections', []): |
| if section.get('section_text', '').strip(): |
| documents.append(Document( |
| text=section['section_text'], |
| metadata={ |
| 'type': 'text', |
| 'document_id': doc_id, |
| 'section_id': section.get('section_id', '') |
| } |
| )) |
| |
| |
| for subsection in section.get('subsections', []): |
| if subsection.get('subsection_text', '').strip(): |
| documents.append(Document( |
| text=subsection['subsection_text'], |
| metadata={ |
| 'type': 'text', |
| 'document_id': doc_id, |
| 'section_id': subsection.get('subsection_id', '') |
| } |
| )) |
| |
| |
| for sub_sub in subsection.get('sub_subsections', []): |
| if sub_sub.get('sub_subsection_text', '').strip(): |
| documents.append(Document( |
| text=sub_sub['sub_subsection_text'], |
| metadata={ |
| 'type': 'text', |
| 'document_id': doc_id, |
| 'section_id': sub_sub.get('sub_subsection_id', '') |
| } |
| )) |
| |
| except Exception as e: |
| log_message(f"Error extracting from {json_path}: {e}") |
| |
| return documents |
|
|
| def load_table_documents(repo_id, hf_token, table_dir): |
| log_message("Loading tables...") |
| log_message("="*60) |
| files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) |
| table_files = [f for f in files if f.startswith(table_dir) and (f.endswith('.json') or f.endswith('.xlsx') or f.endswith('.xls'))] |
| |
| all_chunks = [] |
| tables_processed = 0 |
| |
| for file_path in table_files: |
| try: |
| local_path = hf_hub_download( |
| repo_id=repo_id, |
| filename=file_path, |
| repo_type="dataset", |
| token=hf_token |
| ) |
| |
| |
| if file_path.endswith(('.xlsx', '.xls')): |
| from converters.converter import convert_single_excel_to_json |
| import tempfile |
| import os |
| |
| with tempfile.TemporaryDirectory() as temp_dir: |
| json_path = convert_single_excel_to_json(local_path, temp_dir) |
| local_path = json_path |
| |
| with open(local_path, 'r', encoding='utf-8') as f: |
| data = json.load(f) |
| |
| file_doc_id = data.get('document_id', data.get('document', 'unknown')) |
| |
| for sheet in data.get('sheets', []): |
| sheet_doc_id = sheet.get('document_id', sheet.get('document', file_doc_id)) |
| tables_processed += 1 |
| |
| chunks = chunk_table_by_content(sheet, sheet_doc_id, |
| max_chars=MAX_CHARS_TABLE, |
| max_rows=MAX_ROWS_TABLE) |
| all_chunks.extend(chunks) |
| |
| except Exception as e: |
| log_message(f"Error loading {file_path}: {e}") |
| |
| log_message(f"✓ Loaded {len(all_chunks)} table chunks from {tables_processed} tables") |
| log_message("="*60) |
| |
| return all_chunks |
|
|
|
|
| def load_image_documents(repo_id, hf_token, image_dir): |
| log_message("Loading images...") |
| |
| files = list_repo_files(repo_id=repo_id, repo_type="dataset", token=hf_token) |
| csv_files = [f for f in files if f.startswith(image_dir) and (f.endswith('.csv') or f.endswith('.xlsx') or f.endswith('.xls'))] |
| |
| documents = [] |
| for file_path in csv_files: |
| try: |
| local_path = hf_hub_download( |
| repo_id=repo_id, |
| filename=file_path, |
| repo_type="dataset", |
| token=hf_token |
| ) |
| |
| |
| if file_path.endswith(('.xlsx', '.xls')): |
| from converters.converter import convert_single_excel_to_csv |
| import tempfile |
| import os |
| |
| with tempfile.TemporaryDirectory() as temp_dir: |
| csv_path = convert_single_excel_to_csv(local_path, temp_dir) |
| local_path = csv_path |
| |
| df = pd.read_csv(local_path) |
| |
| for _, row in df.iterrows(): |
| content = f"Документ: {row.get('Обозначение документа', 'unknown')}\n" |
| content += f"Рисунок: {row.get('№ Изображения', 'unknown')}\n" |
| content += f"Название: {row.get('Название изображения', '')}\n" |
| content += f"Описание: {row.get('Описание изображение', '')}\n" |
| content += f"Раздел: {row.get('Раздел документа', '')}\n" |
| |
| chunk_size = len(content) |
| |
| documents.append(Document( |
| text=content, |
| metadata={ |
| 'type': 'image', |
| 'document_id': str(row.get('Обозначение документа', 'unknown')), |
| 'image_number': str(row.get('№ Изображения', 'unknown')), |
| 'section': str(row.get('Раздел документа', '')), |
| 'chunk_size': chunk_size |
| } |
| )) |
| except Exception as e: |
| log_message(f"Error loading {file_path}: {e}") |
| |
| if documents: |
| avg_size = sum(d.metadata['chunk_size'] for d in documents) / len(documents) |
| log_message(f"✓ Loaded {len(documents)} images (avg size: {avg_size:.0f} chars)") |
| |
| return documents |
|
|
| def load_all_documents(repo_id, hf_token, json_dir, table_dir, image_dir): |
| """Main loader - combines all document types""" |
| log_message("="*60) |
| log_message("STARTING DOCUMENT LOADING") |
| log_message("="*60) |
| |
| |
| text_docs = load_json_documents(repo_id, hf_token, json_dir) |
| text_chunks = chunk_text_documents(text_docs) |
| |
| |
| table_chunks = load_table_documents(repo_id, hf_token, table_dir) |
| |
| |
| image_docs = load_image_documents(repo_id, hf_token, image_dir) |
| |
| all_docs = text_chunks + table_chunks + image_docs |
| |
| log_message("="*60) |
| log_message(f"TOTAL DOCUMENTS: {len(all_docs)}") |
| log_message(f" Text chunks: {len(text_chunks)}") |
| log_message(f" Table chunks: {len(table_chunks)}") |
| log_message(f" Images: {len(image_docs)}") |
| log_message("="*60) |
| |
| return all_docs |