|
|
import os |
|
|
import re |
|
|
import time |
|
|
import regex |
|
|
import requests |
|
|
from tqdm import tqdm |
|
|
from typing import Union, Any, List, Set |
|
|
|
|
|
from ..core.logging import logger |
|
|
|
|
|
def make_parent_folder(path: str): |
|
|
"""Checks if the parent folder of a given path exists, and creates it if not. |
|
|
|
|
|
Args: |
|
|
path (str): The file path for which to create the parent folder. |
|
|
""" |
|
|
dir_folder = os.path.dirname(path) |
|
|
if dir_folder and not os.path.exists(dir_folder): |
|
|
logger.info(f"creating folder {dir_folder} ...") |
|
|
os.makedirs(dir_folder, exist_ok=True) |
|
|
|
|
|
def safe_remove(data: Union[List[Any], Set[Any]], remove_value: Any): |
|
|
try: |
|
|
data.remove(remove_value) |
|
|
except ValueError: |
|
|
pass |
|
|
|
|
|
def generate_dynamic_class_name(base_name: str) -> str: |
|
|
|
|
|
base_name = base_name.strip() |
|
|
|
|
|
cleaned_name = re.sub(r'[^a-zA-Z0-9\s]', ' ', base_name) |
|
|
components = cleaned_name.split() |
|
|
class_name = ''.join(x.capitalize() for x in components) |
|
|
|
|
|
return class_name if class_name else 'DefaultClassName' |
|
|
|
|
|
def normalize_text(s: str) -> str: |
|
|
|
|
|
def remove_articles(text): |
|
|
return regex.sub(r'\b(a|an|the)\b', ' ', text) |
|
|
|
|
|
def white_space_fix(text): |
|
|
return ' '.join(text.split()) |
|
|
|
|
|
def remove_punc(text): |
|
|
return text.replace("_", " ") |
|
|
|
|
|
|
|
|
|
|
|
def lower(text): |
|
|
return text.lower() |
|
|
|
|
|
return white_space_fix(remove_articles(remove_punc(lower(s)))) |
|
|
|
|
|
|
|
|
def download_file(url: str, save_file: str, max_retries=3, timeout=10): |
|
|
|
|
|
make_parent_folder(save_file) |
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
resume_byte_pos = 0 |
|
|
if os.path.exists(save_file): |
|
|
resume_byte_pos = os.path.getsize(save_file) |
|
|
|
|
|
response_head = requests.head(url=url) |
|
|
total_size = int(response_head.headers.get("content-length", 0)) |
|
|
|
|
|
if resume_byte_pos >= total_size: |
|
|
logger.info("File already downloaded completely.") |
|
|
return |
|
|
|
|
|
headers = {'Range': f'bytes={resume_byte_pos}-'} if resume_byte_pos else {} |
|
|
response = requests.get(url=url, stream=True, headers=headers, timeout=timeout) |
|
|
response.raise_for_status() |
|
|
|
|
|
mode = 'ab' if resume_byte_pos else 'wb' |
|
|
progress_bar = tqdm(total=total_size, unit="iB", unit_scale=True, initial=resume_byte_pos) |
|
|
|
|
|
with open(save_file, mode) as file: |
|
|
for chunk_data in response.iter_content(chunk_size=1024): |
|
|
if chunk_data: |
|
|
size = file.write(chunk_data) |
|
|
progress_bar.update(size) |
|
|
|
|
|
progress_bar.close() |
|
|
|
|
|
if os.path.getsize(save_file) >= (total_size + resume_byte_pos): |
|
|
logger.info("Download completed successfully.") |
|
|
break |
|
|
else: |
|
|
logger.warning("File size mismatch, retrying...") |
|
|
time.sleep(5) |
|
|
except (requests.ConnectionError, requests.Timeout) as e: |
|
|
logger.warning(f"Download error: {e}. Retrying ({attempt+1}/{max_retries})...") |
|
|
time.sleep(5) |
|
|
except Exception as e: |
|
|
error_message = f"Unexpected error: {e}" |
|
|
logger.error(error_message) |
|
|
raise ValueError(error_message) |
|
|
else: |
|
|
error_message = "Exceeded maximum retries. Download failed." |
|
|
logger.error(error_message) |
|
|
raise RuntimeError(error_message) |
|
|
|