Spaces:
Running
Running
| import os | |
| import random | |
| import shutil | |
| import zipfile | |
| import logging | |
| import asyncio | |
| import aiohttp | |
| import time | |
| from typing import List, Dict, Tuple, Optional, Any | |
| import gradio as gr | |
| import librosa | |
| import edge_tts | |
| from pydub import AudioSegment | |
| from audio_separator.separator import Separator | |
| from infer_rvc_python import BaseLoader | |
| # Attempt to import HuggingFace spaces | |
| try: | |
| import spaces | |
| SPACES_STATUS = True | |
| except ImportError: | |
| SPACES_STATUS = False | |
| # Custom modules | |
| import tts_voice | |
| import model_handler | |
| # --- Configuration & Constants --- | |
| TEMP_DIR = "temp" | |
| MODEL_PREFIX = "model" | |
| MAX_FILE_SIZE = 500_000_000 # 500 MB | |
| LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s" | |
| # Ensure temp directory exists | |
| os.makedirs(TEMP_DIR, exist_ok=True) | |
| # Setup Logging | |
| logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) | |
| logger = logging.getLogger(__name__) | |
| # --- Global State --- | |
| UVR_5_MODELS = [ | |
| {"model_name": "BS-Roformer-Viperx-1297", "checkpoint": "model_bs_roformer_ep_317_sdr_12.9755.ckpt"}, | |
| {"model_name": "MDX23C-InstVoc HQ 2", "checkpoint": "MDX23C-8KFFT-InstVoc_HQ_2.ckpt"}, | |
| {"model_name": "Kim Vocal 2", "checkpoint": "Kim_Vocal_2.onnx"}, | |
| {"model_name": "5_HP-Karaoke", "checkpoint": "5_HP-Karaoke-UVR.pth"}, | |
| {"model_name": "UVR-DeNoise by FoxJoy", "checkpoint": "UVR-DeNoise.pth"}, | |
| {"model_name": "UVR-DeEcho-DeReverb by FoxJoy", "checkpoint": "UVR-DeEcho-DeReverb.pth"}, | |
| ] | |
| # Default Models | |
| MODELS: List[Dict[str, str]] = [ | |
| {"model": "model.pth", "index": "model.index", "model_name": "Test Model"}, | |
| ] | |
| BAD_WORDS = ['puttana', 'whore', 'badword3', 'badword4'] | |
| # Initialize Core Components | |
| # Note: Loading these globally can consume memory immediately. | |
| # In a production app, you might want lazy loading. | |
| try: | |
| separator = Separator() | |
| converter = BaseLoader( | |
| only_cpu=not SPACES_STATUS, | |
| hubert_path=None, | |
| rmvpe_path=None | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to initialize separator or converter: {e}") | |
| separator = None | |
| converter = None | |
| # --- Helper Classes --- | |
| class BadWordError(Exception): | |
| pass | |
| # --- Core Functions --- | |
| async def text_to_speech_edge(text: str, language_code: str) -> str: | |
| """Converts text to speech using Edge TTS and saves to a temp file.""" | |
| if not text or not text.strip(): | |
| raise ValueError("Text input cannot be empty") | |
| voice = tts_order_voice.get(language_code, tts_order_voice[list(tts_order_voice.keys())[0]]) | |
| communicate = edge_tts.Communicate(text, voice) | |
| temp_path = os.path.join(TEMP_DIR, f"tts_{random.randint(1000, 9999)}.mp3") | |
| try: | |
| await communicate.save(temp_path) | |
| if not os.path.exists(temp_path): | |
| raise RuntimeError("TTS failed to generate audio file") | |
| return temp_path | |
| except Exception as e: | |
| logger.error(f"TTS Error: {e}") | |
| raise e | |
| async def download_from_url(url: str, name: str, progress: gr.Progress = gr.Progress()) -> List[str]: | |
| """Downloads a model from HuggingFace, extracts it, and registers it.""" | |
| try: | |
| if not url.startswith("https://huggingface.co"): | |
| raise ValueError("URL must be from Hugging Face") | |
| if not name or not name.strip(): | |
| raise ValueError("Model name cannot be empty") | |
| if any(bad_word in url.lower() or bad_word in name.lower() for bad_word in BAD_WORDS): | |
| raise BadWordError("Input contains restricted words") | |
| # Resolve URL to direct download link | |
| download_url = url.replace("/blob/", "/resolve/") | |
| filename = os.path.join(TEMP_DIR, f"{MODEL_PREFIX}{random.randint(1, 1000)}.zip") | |
| # Async Download | |
| progress(0, desc="Starting download...") | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(download_url) as response: | |
| if response.status != 200: | |
| raise ValueError(f"Failed to download file. Status: {response.status}") | |
| total = int(response.headers.get('content-length', 0)) | |
| if total > MAX_FILE_SIZE: | |
| raise ValueError(f"File size exceeds {MAX_FILE_SIZE / 1_000_000} MB limit") | |
| current = 0 | |
| with open(filename, "wb") as f: | |
| async for data in response.content.iter_chunked(4096): | |
| f.write(data) | |
| current += len(data) | |
| if total > 0: | |
| progress(current / total, desc="Downloading model...") | |
| # Extraction | |
| progress(1.0, desc="Extracting files...") | |
| extract_dir = os.path.join(TEMP_DIR, os.path.basename(filename).split(".")[0]) | |
| try: | |
| with zipfile.ZipFile(filename, 'r') as zip_ref: | |
| zip_ref.extractall(extract_dir) | |
| except zipfile.BadZipFile: | |
| raise ValueError("Downloaded file is not a valid zip file") | |
| # Clean up zip file | |
| os.remove(filename) | |
| # Find Model and Index files | |
| pth_files = [] | |
| index_files = [] | |
| for root, _, files in os.walk(extract_dir): | |
| for file in files: | |
| if file.endswith(".pth"): | |
| pth_files.append(os.path.join(root, file)) | |
| elif file.endswith(".index"): | |
| index_files.append(os.path.join(root, file)) | |
| if not pth_files: | |
| raise ValueError("No .pth model file found in the zip archive") | |
| # Use the first found files | |
| pth_file = pth_files[0] | |
| index_file = index_files[0] if index_files else "" | |
| # Register Model | |
| clean_name = name.strip() or os.path.basename(pth_file).split(".")[0] | |
| # Avoid duplicate names if possible | |
| final_name = clean_name | |
| counter = 1 | |
| while any(m['model_name'] == final_name for m in MODELS): | |
| final_name = f"{clean_name}_{counter}" | |
| counter += 1 | |
| MODELS.append({ | |
| "model": pth_file, | |
| "index": index_file, | |
| "model_name": final_name | |
| }) | |
| logger.info(f"Successfully loaded model: {final_name}") | |
| return [f"Downloaded as {final_name}", pth_file, index_file] | |
| except Exception as e: | |
| logger.exception("Error in download_from_url") | |
| raise e | |
| def inf_handler(audio_path: str, model_name: str) -> Tuple[str, str]: | |
| """Handles audio separation using UVR5 models.""" | |
| if not audio_path: | |
| raise ValueError("Audio input is missing") | |
| if separator is None: | |
| raise RuntimeError("Audio Separator is not initialized") | |
| model_found = False | |
| for model_info in UVR_5_MODELS: | |
| if model_info["model_name"] == model_name: | |
| logger.info(f"Loading UVR Model: {model_info['checkpoint']}") | |
| separator.load_model(model_info["checkpoint"]) | |
| model_found = True | |
| break | |
| if not model_found: | |
| logger.warning("Model not found, loading default UVR model") | |
| separator.load_model() | |
| try: | |
| output_files = separator.separate(audio_path) | |
| # Ensure we have at least two outputs (Vocals, Instrumental) | |
| if len(output_files) < 2: | |
| raise RuntimeError("Separation did not return expected number of files") | |
| return output_files[0], output_files[1] | |
| except Exception as e: | |
| logger.error(f"Separation failed: {e}") | |
| raise e | |
| def run( | |
| model_name: str, | |
| audio_input: str, | |
| pitch_alg: str, | |
| pitch_lvl: int, | |
| index_inf: float, | |
| r_m_f: int, | |
| e_r: float, | |
| c_b_p: float | |
| ) -> str: | |
| """Runs RVC Inference.""" | |
| if not audio_input: | |
| raise ValueError("Please upload an audio file") | |
| if converter is None: | |
| raise RuntimeError("RVC Converter is not initialized") | |
| audio_files = [audio_input] if isinstance(audio_input, str) else audio_input | |
| # Find model files | |
| file_m = "" | |
| file_index = "" | |
| for m in MODELS: | |
| if m["model_name"] == model_name: | |
| file_m = m["model"] | |
| file_index = m["index"] | |
| break | |
| if not file_m or not os.path.exists(file_m): | |
| raise ValueError("Model file not found or invalid") | |
| random_tag = f"USER_{random.randint(10000000, 99999999)}" | |
| logger.info(f"Running inference: Model={file_m}, Tag={random_tag}") | |
| try: | |
| converter.apply_conf( | |
| tag=random_tag, | |
| file_model=file_m, | |
| pitch_algo=pitch_alg, | |
| pitch_lvl=pitch_lvl, | |
| file_index=file_index, | |
| index_influence=index_inf, | |
| respiration_median_filtering=r_m_f, | |
| envelope_ratio=e_r, | |
| consonant_breath_protection=c_b_p, | |
| resample_sr=44100 if audio_files[0].endswith('.mp3') else 0, | |
| ) | |
| # Small delay to ensure config is applied | |
| time.sleep(0.1) | |
| output_paths = converter( | |
| audio_files, | |
| random_tag, | |
| overwrite=False, | |
| parallel_workers=8 | |
| ) | |
| if not output_paths: | |
| raise RuntimeError("Conversion returned no results") | |
| return output_paths[0] | |
| except Exception as e: | |
| logger.error(f"Inference error: {e}") | |
| raise e | |
| def upload_model(index_file, pth_file, model_name: str) -> str: | |
| """Manually uploads model files.""" | |
| if not index_file or not pth_file: | |
| raise ValueError("Both index and model files are required") | |
| if not model_name.strip(): | |
| raise ValueError("Model name cannot be empty") | |
| MODELS.append({ | |
| "model": pth_file.name, | |
| "index": index_file.name, | |
| "model_name": model_name.strip() | |
| }) | |
| return "Model uploaded successfully!" | |
| def json_to_markdown_table(json_data: Dict[str, Any]) -> str: | |
| table = "| Key | Value |\n| --- | --- |\n" | |
| for key, value in json_data.items(): | |
| table += f"| {key} | {value} |\n" | |
| return table | |
| def get_model_info(name: str) -> str: | |
| """Retrieves model metadata.""" | |
| for model in MODELS: | |
| if model["model_name"] == name: | |
| try: | |
| info = model_handler.model_info(model["model"]) | |
| info2 = { | |
| "Model Name": model["model_name"], | |
| "Model Config": info.get('config', 'N/A'), | |
| "Epochs Trained": info.get('epochs', 'N/A'), | |
| "Sample Rate": info.get('sr', 'N/A'), | |
| "Pitch Guidance": info.get('f0', 'N/A'), | |
| "Model Precision": info.get('size', 'N/A'), | |
| } | |
| return json_to_markdown_table(info2) | |
| except Exception as e: | |
| logger.error(f"Failed to get model info: {e}") | |
| return "Error reading model metadata." | |
| return "Model not found" | |
| def refresh_models(): | |
| """Helper to refresh the dropdown choices.""" | |
| return gr.Dropdown(choices=[m["model_name"] for m in MODELS]) | |
| # --- UI Construction --- | |
| with gr.Blocks(title="Ilaria RVC 💖", theme="NeoPy/Soft") as app: | |
| gr.Label("Ilaria RVC 💖") | |
| gr.Markdown("Support the project by donating on [Ko-Fi](https://ko-fi.com/ilariaowo)") | |
| gr.Markdown("Maintained by BF667") | |
| with gr.Tab("Inference"): | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| models_dropdown = gr.Dropdown( | |
| label="Select Model", | |
| choices=[m["model_name"] for m in MODELS], | |
| value=MODELS[0]["model_name"] if MODELS else None | |
| ) | |
| with gr.Column(scale=1, min_width=150): | |
| refresh_button = gr.Button("Refresh Models", variant="secondary") | |
| refresh_button.click(refresh_models, outputs=models_dropdown) | |
| sound_gui = gr.Audio(label="Input Audio", type="filepath") | |
| with gr.Accordion("Text-to-Speech", open=False): | |
| text_tts = gr.Textbox(label="Text Input", placeholder="Enter text to convert to speech", lines=3) | |
| dropdown_tts = gr.Dropdown(label="Language and Voice", choices=list(tts_voice.tts_order_voice.keys()), value=list(tts_voice.tts_order_voice.keys())[0]) | |
| button_tts = gr.Button("Generate Speech", variant="primary") | |
| # Wrap async call in a sync wrapper for older gradio or just let gradio handle it (Gradio 4+ supports async) | |
| button_tts.click( | |
| fn=lambda txt, lang: asyncio.run(text_to_speech_edge(txt, lang)), | |
| inputs=[text_tts, dropdown_tts], | |
| outputs=sound_gui | |
| ) | |
| with gr.Accordion("Conversion Settings", open=False): | |
| pitch_algo_conf = gr.Radio( | |
| choices=["pm", "harvest", "crepe", "rmvpe", "rmvpe+"], | |
| value="rmvpe", | |
| label="Pitch Algorithm", | |
| info="Select the algorithm for pitch detection" | |
| ) | |
| with gr.Row(): | |
| pitch_lvl_conf = gr.Slider(label="Pitch Level", minimum=-24, maximum=24, step=1, value=0, info="Negative for male, positive for female") | |
| index_inf_conf = gr.Slider(minimum=0, maximum=1, value=0.75, label="Index Influence", info="Controls accent application") | |
| with gr.Row(): | |
| respiration_filter_conf = gr.Slider(minimum=0, maximum=7, value=3, step=1, label="Respiration Median Filtering") | |
| envelope_ratio_conf = gr.Slider(minimum=0, maximum=1, value=0.25, label="Envelope Ratio") | |
| consonant_protec_conf = gr.Slider(minimum=0, maximum=0.5, value=0.5, label="Consonant Breath Protection") | |
| with gr.Row(): | |
| button_conf = gr.Button("Convert Audio", variant="primary", size="lg") | |
| output_conf = gr.Audio(type="filepath", label="Converted Audio") | |
| button_conf.click( | |
| fn=run, | |
| inputs=[models_dropdown, sound_gui, pitch_algo_conf, pitch_lvl_conf, index_inf_conf, respiration_filter_conf, envelope_ratio_conf, consonant_protec_conf], | |
| outputs=output_conf | |
| ) | |
| with gr.Tab("Model Loader"): | |
| with gr.Accordion("Download Model", open=False): | |
| gr.Markdown("Download a model from Hugging Face (RVC model, max 500 MB)") | |
| model_url = gr.Textbox(label="Hugging Face Model URL", placeholder="https://huggingface.co/username/model") | |
| model_name = gr.Textbox(label="Model Name", placeholder="Enter a unique model name") | |
| download_button = gr.Button("Download Model", variant="primary") | |
| status = gr.Textbox(label="Status", interactive=False) | |
| model_pth = gr.Textbox(label="Model .pth File", interactive=False) | |
| index_pth = gr.Textbox(label="Index .index File", interactive=False) | |
| # Handle async download | |
| download_button.click( | |
| fn=lambda url, name: asyncio.run(download_from_url(url, name)), | |
| inputs=[model_url, model_name], | |
| outputs=[status, model_pth, index_pth] | |
| ) | |
| with gr.Accordion("Upload Model", open=False): | |
| index_file_upload = gr.File(label="Index File (.index)") | |
| pth_file_upload = gr.File(label="Model File (.pth)") | |
| model_name_upload = gr.Textbox(label="Model Name", placeholder="Enter a unique model name") | |
| upload_button = gr.Button("Upload Model", variant="primary") | |
| upload_status = gr.Textbox(label="Status", interactive=False) | |
| upload_button.click( | |
| upload_model, | |
| [index_file_upload, pth_file_upload, model_name_upload], | |
| upload_status | |
| ) | |
| with gr.Tab("Vocal Separator"): | |
| gr.Markdown("Separate vocals and instruments using UVR models (CPU only)") | |
| uvr5_audio_file = gr.Audio(label="Input Audio", type="filepath") | |
| with gr.Row(): | |
| uvr5_model = gr.Dropdown(label="UVR Model", choices=[m["model_name"] for m in UVR_5_MODELS]) | |
| uvr5_button = gr.Button("Separate", variant="primary") | |
| with gr.Row(): | |
| uvr5_output_voc = gr.Audio(label="Vocals", type="filepath") | |
| uvr5_output_inst = gr.Audio(label="Instrumental", type="filepath") | |
| uvr5_button.click( | |
| inf_handler, | |
| [uvr5_audio_file, uvr5_model], | |
| [uvr5_output_voc, uvr5_output_inst] | |
| ) | |
| if __name__ == "__main__": | |
| app.queue().launch(share=True) |