import os import json import re import logging from typing import List, Dict, Any # Ensure langchain is available for paddlex/paddleocr try: import langchain import langchain_community except ImportError: logging.warning("LangChain modules not found. PaddleOCR might fail.") from core.ocr_engine import OCREngine from core.vlm_engine import GroqVLMEngine from core.ner_engine import NEREngine # Global instances (Lazy load) _ocr = None _vlm = None _ner = None def get_ocr(): global _ocr if not _ocr: _ocr = OCREngine() return _ocr def get_vlm(): global _vlm if not _vlm: _vlm = GroqVLMEngine() return _vlm def get_ner(): global _ner if not _ner: _ner = NEREngine() return _ner def process_image_pipeline(image_paths: List[str]) -> Dict[str, Any]: logging.info(f"Pipeline: Starting processing for {len(image_paths)} images.") vlm = get_vlm() ocr = get_ocr() ner = get_ner() final_results = { "name": [], "contact_number": [], "Designation": [], "email": [], "Location": [], "Link": [], "Company": [], "extracted_text": {}, "status_message": "Primary: Groq VLM" } all_raw_text = {} for path in image_paths: img_name = os.path.basename(path) # 1. Primary: VLM logging.info(f"Pipeline: Attempting VLM extraction for {img_name}") vlm_data = vlm.process(path) if vlm_data: merge_structured_data(final_results, vlm_data) all_raw_text[path] = json.dumps(vlm_data) logging.info(f"Pipeline: VLM success for {img_name}") else: # 2. Fallback: OCR + NER logging.warning(f"Pipeline: VLM failed or skipped for {img_name}. Falling back to OCR+NER.") raw_text = ocr.extract_text(path) all_raw_text[path] = raw_text if raw_text: logging.info(f"Pipeline: OCR success for {img_name}, attempting NER.") ner_data = ner.extract_entities(raw_text) if ner_data: merge_structured_data(final_results, ner_data) logging.info(f"Pipeline: NER success for {img_name}") else: logging.warning(f"Pipeline: NER failed to extract entities for {img_name}") final_results["status_message"] = "Fallback: OCR+NER" else: logging.error(f"Pipeline: Both VLM and OCR failed for {img_name}") final_results["extracted_text"] = all_raw_text cleaned = cleanup_results(final_results) logging.info(f"Pipeline: Completed. Extracted data for {sum(1 for v in cleaned.values() if isinstance(v, list) and v)} fields.") return cleaned def merge_structured_data(main_data: Dict, new_data: Dict): mapping = { "Name": "name", "Contact": "contact_number", "Designation": "Designation", "Email": "email", "Address": "Location", "Link": "Link", "Company": "Company" } for key, val in new_data.items(): canonical_key = mapping.get(key.capitalize(), key.lower()) if canonical_key in main_data: if isinstance(val, list): main_data[canonical_key].extend(val) elif val: main_data[canonical_key].append(val) def cleanup_results(results: Dict) -> Dict: for key, val in results.items(): if isinstance(val, list): # Remove duplicates, empty strings, and 'not found' seen = set() unique = [] for item in val: item_str = str(item).strip() if item_str.lower() not in seen and item_str.lower() not in {"", "not found", "none", "null", "[]"}: unique.append(item_str) seen.add(item_str.lower()) results[key] = unique return results def extract_contact_details(text: str) -> Dict[str, List[str]]: # Regex fallback for extra safety email_regex = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b') phone_regex = re.compile(r'(\+?\d{1,3}[-.\s()]?)?\(?\d{3,5}\)?[-.\s()]?\d{3,5}[-.\s()]?\d{3,5}') return { "emails": email_regex.findall(text), "phone_numbers": phone_regex.findall(text) }