WebashalarForML's picture
Upload 42 files
fad436e verified
import os
import json
import re
import logging
from typing import List, Dict, Any
# Ensure langchain is available for paddlex/paddleocr
try:
import langchain
import langchain_community
except ImportError:
logging.warning("LangChain modules not found. PaddleOCR might fail.")
from core.ocr_engine import OCREngine
from core.vlm_engine import GroqVLMEngine
from core.ner_engine import NEREngine
# Global instances (Lazy load)
_ocr = None
_vlm = None
_ner = None
def get_ocr():
global _ocr
if not _ocr:
_ocr = OCREngine()
return _ocr
def get_vlm():
global _vlm
if not _vlm:
_vlm = GroqVLMEngine()
return _vlm
def get_ner():
global _ner
if not _ner:
_ner = NEREngine()
return _ner
def process_image_pipeline(image_paths: List[str]) -> Dict[str, Any]:
logging.info(f"Pipeline: Starting processing for {len(image_paths)} images.")
vlm = get_vlm()
ocr = get_ocr()
ner = get_ner()
final_results = {
"name": [],
"contact_number": [],
"Designation": [],
"email": [],
"Location": [],
"Link": [],
"Company": [],
"extracted_text": {},
"status_message": "Primary: Groq VLM"
}
all_raw_text = {}
for path in image_paths:
img_name = os.path.basename(path)
# 1. Primary: VLM
logging.info(f"Pipeline: Attempting VLM extraction for {img_name}")
vlm_data = vlm.process(path)
if vlm_data:
merge_structured_data(final_results, vlm_data)
all_raw_text[path] = json.dumps(vlm_data)
logging.info(f"Pipeline: VLM success for {img_name}")
else:
# 2. Fallback: OCR + NER
logging.warning(f"Pipeline: VLM failed or skipped for {img_name}. Falling back to OCR+NER.")
raw_text = ocr.extract_text(path)
all_raw_text[path] = raw_text
if raw_text:
logging.info(f"Pipeline: OCR success for {img_name}, attempting NER.")
ner_data = ner.extract_entities(raw_text)
if ner_data:
merge_structured_data(final_results, ner_data)
logging.info(f"Pipeline: NER success for {img_name}")
else:
logging.warning(f"Pipeline: NER failed to extract entities for {img_name}")
final_results["status_message"] = "Fallback: OCR+NER"
else:
logging.error(f"Pipeline: Both VLM and OCR failed for {img_name}")
final_results["extracted_text"] = all_raw_text
cleaned = cleanup_results(final_results)
logging.info(f"Pipeline: Completed. Extracted data for {sum(1 for v in cleaned.values() if isinstance(v, list) and v)} fields.")
return cleaned
def merge_structured_data(main_data: Dict, new_data: Dict):
mapping = {
"Name": "name",
"Contact": "contact_number",
"Designation": "Designation",
"Email": "email",
"Address": "Location",
"Link": "Link",
"Company": "Company"
}
for key, val in new_data.items():
canonical_key = mapping.get(key.capitalize(), key.lower())
if canonical_key in main_data:
if isinstance(val, list):
main_data[canonical_key].extend(val)
elif val:
main_data[canonical_key].append(val)
def cleanup_results(results: Dict) -> Dict:
for key, val in results.items():
if isinstance(val, list):
# Remove duplicates, empty strings, and 'not found'
seen = set()
unique = []
for item in val:
item_str = str(item).strip()
if item_str.lower() not in seen and item_str.lower() not in {"", "not found", "none", "null", "[]"}:
unique.append(item_str)
seen.add(item_str.lower())
results[key] = unique
return results
def extract_contact_details(text: str) -> Dict[str, List[str]]:
# Regex fallback for extra safety
email_regex = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b')
phone_regex = re.compile(r'(\+?\d{1,3}[-.\s()]?)?\(?\d{3,5}\)?[-.\s()]?\d{3,5}[-.\s()]?\d{3,5}')
return {
"emails": email_regex.findall(text),
"phone_numbers": phone_regex.findall(text)
}