| | import gradio as gr |
| | from huggingface_hub import HfApi, snapshot_download |
| | from huggingface_hub.utils import HfHubHTTPError |
| | import os |
| | import uuid |
| | import shutil |
| | import tempfile |
| | import pandas as pd |
| | import io |
| | |
| |
|
| | def get_hf_api(token): |
| | """Initializes the HfApi client. Allows read-only operations if no token is provided.""" |
| | return HfApi(token=token if token else None) |
| |
|
| | |
| | def is_tabular(filepath): |
| | """Checks if the file is a supported tabular dataset.""" |
| | if not filepath: return False |
| | ext = os.path.splitext(filepath)[1].lower() |
| | return ext in ['.csv', '.tsv', '.json', '.jsonl', '.parquet'] |
| |
|
| | def load_tabular_content(token, repo_id, repo_type, filepath): |
| | """Loads a file into a Pandas DataFrame.""" |
| | try: |
| | api = get_hf_api(token) |
| | local_path = api.hf_hub_download(repo_id=repo_id, repo_type=repo_type, filename=filepath, token=token) |
| | ext = os.path.splitext(filepath)[1].lower() |
| | |
| | if ext == '.csv': return pd.read_csv(local_path) |
| | if ext == '.tsv': return pd.read_csv(local_path, sep='\t') |
| | if ext == '.parquet': return pd.read_parquet(local_path) |
| | if ext == '.json': |
| | try: return pd.read_json(local_path) |
| | except: return pd.read_json(local_path, lines=True) |
| | if ext == '.jsonl': return pd.read_json(local_path, lines=True) |
| | return None |
| | except Exception as e: |
| | return pd.DataFrame({"Error": [str(e)]}) |
| | def handle_token_change(token): |
| | """ |
| | Called when the token is entered. Fetches user info, updates UI interactivity, |
| | and auto-fills the author fields in both tabs. |
| | """ |
| | if not token: |
| | updates = { |
| | manage_files_btn: gr.update(interactive=False), archive_repo_btn: gr.update(interactive=True), delete_repo_btn: gr.update(interactive=False), |
| | commit_btn: gr.update(interactive=False), author_input: gr.update(value=""), |
| | download_author_input: gr.update(value=""), whoami_output: gr.update(value=None, visible=False) |
| | } |
| | return (None, *updates.values()) |
| | try: |
| | api = get_hf_api(token) |
| | user_info = api.whoami() |
| | username = user_info.get('name') |
| | updates = { |
| | manage_files_btn: gr.update(interactive=True), archive_repo_btn: gr.update(interactive=True), delete_repo_btn: gr.update(interactive=True), |
| | commit_btn: gr.update(interactive=True), author_input: gr.update(value=username), |
| | download_author_input: gr.update(value=username), whoami_output: gr.update(value=user_info, visible=True) |
| | } |
| | return (token, *updates.values()) |
| | except HfHubHTTPError as e: |
| | gr.Warning(f"Invalid Token: {e}. You can only perform read-only actions.") |
| | updates = { |
| | manage_files_btn: gr.update(interactive=False), archive_repo_btn: gr.update(interactive=False), delete_repo_btn: gr.update(interactive=False), |
| | commit_btn: gr.update(interactive=False), whoami_output: gr.update(value=None, visible=False) |
| | } |
| | return (token, *updates.values()) |
| |
|
| | def list_repos_backend(token, author, repo_type): |
| | """Backend function to fetch repository IDs.""" |
| | if not author: |
| | gr.Info("Please enter an author (username or organization).") |
| | return [] |
| | try: |
| | api = get_hf_api(token) |
| | list_fn = getattr(api, f"list_{repo_type}s") |
| | repos = list_fn(author=author) |
| | repo_ids = [repo.id for repo in repos] |
| | gr.Info(f"Found {len(repo_ids)} {repo_type}s for '{author}'.") |
| | return repo_ids |
| | except HfHubHTTPError as e: |
| | gr.Error(f"Could not list repositories: {e}") |
| | return [] |
| |
|
| | def list_repos_for_management(token, author, repo_type): |
| | """Gradio wrapper to update the management dropdown and reset the UI.""" |
| | repo_ids = list_repos_backend(token, author, repo_type) |
| | return ( |
| | repo_type, |
| | gr.update(choices=repo_ids, value=None), |
| | gr.update(visible=False), |
| | gr.update(visible=False) |
| | ) |
| |
|
| | def list_repos_for_download(token, author, repo_type): |
| | """Gradio wrapper to update the download dropdown.""" |
| | repo_ids = list_repos_backend(token, author, repo_type) |
| | return repo_type, gr.update(choices=repo_ids, value=None) |
| |
|
| | def on_manage_repo_select(repo_id): |
| | """Shows action buttons when a repo is selected in the Manage tab.""" |
| | return gr.update(visible=bool(repo_id)),repo_id |
| |
|
| | def delete_repo(token, repo_id, repo_type): |
| | """Deletes the selected repository.""" |
| | if not token: |
| | gr.Error("A write-enabled Hugging Face token is required to delete a repository.") |
| | return repo_id, gr.update(visible=True), gr.update(visible=False) |
| | try: |
| | api = get_hf_api(token) |
| | api.delete_repo(repo_id=repo_id, repo_type=repo_type) |
| | gr.Info(f"Successfully deleted '{repo_id}'.") |
| | return None, gr.update(visible=False), gr.update(visible=False) |
| | except HfHubHTTPError as e: |
| | gr.Error(f"Failed to delete repository: {e}") |
| | return repo_id, gr.update(visible=True), gr.update(visible=False) |
| |
|
| | def archive_repo(token, repo_id, archive_repo, manage_repo_type_state): |
| | """Deletes the selected repository.""" |
| | if not token: |
| | gr.Error("A write-enabled Hugging Face token is required to delete a repository.") |
| | return repo_id, gr.update(visible=True), gr.update(visible=False) |
| | try: |
| | api = get_hf_api(token) |
| | archive_space=f'{archive_repo}/{repo_id.split("/")[1]}' |
| | print(f'moving from {repo_id} to {archive_space}') |
| | api.move_repo(from_id=repo_id, to_id=archive_space, repo_type=manage_repo_type_state) |
| | gr.Info(f"Successfully moved to '{archive_space}'.") |
| | return None, gr.update(visible=False), gr.update(visible=False) |
| | except HfHubHTTPError as e: |
| | gr.Error(f"Failed to archive repository: {e}") |
| | print(e) |
| | return repo_id, gr.update(visible=True), gr.update(visible=False) |
| | |
| |
|
| | def show_files_and_load_first(token, repo_id, repo_type): |
| | """Lists files and pre-loads the first one (Text OR DataFrame).""" |
| | if not repo_id: |
| | return gr.update(visible=False), gr.update(choices=[], value=None), gr.update(visible=False), gr.update(visible=False) |
| | try: |
| | api = get_hf_api(token) |
| | repo_files = api.list_repo_files(repo_id=repo_id, repo_type=repo_type) |
| | filtered_files = [f for f in repo_files if not f.startswith('.')] |
| |
|
| | if not filtered_files: |
| | return (gr.update(visible=True), gr.update(choices=[], value=None), |
| | gr.update(value="## Empty Repo", visible=True), gr.update(visible=False)) |
| | |
| | first_file = filtered_files[0] |
| | |
| | |
| | if is_tabular(first_file): |
| | df = load_tabular_content(token, repo_id, repo_type, first_file) |
| | return (gr.update(visible=True), gr.update(choices=filtered_files, value=first_file), |
| | gr.update(visible=False), gr.update(value=df, visible=True)) |
| | else: |
| | content, lang = load_file_content_backend(token, repo_id, repo_type, first_file) |
| | return (gr.update(visible=True), gr.update(choices=filtered_files, value=first_file), |
| | gr.update(value=content, language=lang, visible=True), gr.update(visible=False)) |
| | |
| | except Exception as e: |
| | gr.Error(f"Error: {e}") |
| | return gr.update(visible=False), gr.update(choices=[], value=None), gr.update(visible=True), gr.update(visible=False) |
| |
|
| | def load_file_content_backend(token, repo_id, repo_type, filepath): |
| | """Backend logic to fetch and format file content.""" |
| | if not filepath: return "## Select a file to view.", 'markdown' |
| | try: |
| | api = get_hf_api(token) |
| | local_path = api.hf_hub_download(repo_id=repo_id, repo_type=repo_type, filename=filepath, token=token) |
| | with open(local_path, 'r', encoding='utf-8') as f: content = f.read() |
| | |
| | ext = os.path.splitext(filepath)[1].lstrip('.').lower() |
| | lang_map = {'py': 'python', 'js': 'javascript', 'md': 'markdown'} |
| | language = lang_map.get(ext, 'python') |
| | return content, language |
| | except Exception as e: |
| | return f"Error loading file: {e}", 'python' |
| |
|
| | def load_file_content_for_editor(token, repo_id, repo_type, filepath): |
| | """Decides whether to show the Code Editor or the Data Editor.""" |
| | if is_tabular(filepath): |
| | df = load_tabular_content(token, repo_id, repo_type, filepath) |
| | |
| | return gr.update(visible=False), gr.update(value=df, visible=True) |
| | else: |
| | content, language = load_file_content_backend(token, repo_id, repo_type, filepath) |
| | |
| | return gr.update(value=content, language=language, visible=True), gr.update(visible=False) |
| |
|
| | def commit_file(token, repo_id, repo_type, filepath, code_content, df_content, commit_message): |
| | """Smart commit: saves text OR dataframe based on file extension.""" |
| | if not token: gr.Error("Write-token required."); return |
| | if not filepath: gr.Warning("No file selected."); return |
| | |
| | try: |
| | api = get_hf_api(token) |
| | |
| | if is_tabular(filepath): |
| | |
| | buffer = io.BytesIO() |
| | ext = os.path.splitext(filepath)[1].lower() |
| | if ext == '.csv': df_content.to_csv(buffer, index=False) |
| | elif ext == '.tsv': df_content.to_csv(buffer, sep='\t', index=False) |
| | elif ext == '.parquet': df_content.to_parquet(buffer, index=False) |
| | elif ext == '.json': df_content.to_json(buffer, orient='records') |
| | elif ext == '.jsonl': df_content.to_json(buffer, orient='records', lines=True) |
| | |
| | data_to_upload = buffer.getvalue() |
| | else: |
| | |
| | data_to_upload = bytes(code_content, 'utf-8') |
| |
|
| | api.upload_file(path_or_fileobj=data_to_upload, path_in_repo=filepath, |
| | repo_id=repo_id, repo_type=repo_type, commit_message=commit_message) |
| | gr.Info(f"Successfully committed '{filepath}'!") |
| | except Exception as e: gr.Error(f"Failed to commit: {e}") |
| |
|
| | |
| |
|
| | def download_repos_as_zip(token, selected_repo_ids, repo_type, progress=gr.Progress()): |
| | """Downloads selected repos and zips them.""" |
| | if not selected_repo_ids: |
| | gr.Warning("No repositories selected for download."); return gr.update(value=None, visible=False) |
| | if not repo_type: |
| | gr.Warning("Please list a repository type (Spaces, etc.) before downloading."); return gr.update(value=None, visible=False) |
| |
|
| | download_root_dir = tempfile.mkdtemp() |
| | try: |
| | total_repos = len(selected_repo_ids) |
| | for i, repo_id in enumerate(selected_repo_ids): |
| | progress((i) / total_repos, desc=f"Downloading {repo_id} ({i+1}/{total_repos})") |
| | try: |
| | folder_name = repo_id.replace("/", "__") |
| | snapshot_download(repo_id=repo_id, repo_type=repo_type, local_dir=os.path.join(download_root_dir, folder_name), |
| | token=token, local_dir_use_symlinks=False, resume_download=True) |
| | except Exception as e: gr.Error(f"Failed to download {repo_id}: {e}") |
| | |
| | progress(0.95, desc="All items downloaded. Creating ZIP file...") |
| | zip_base_name = os.path.join(tempfile.gettempdir(), f"hf_{repo_type}s_archive_{uuid.uuid4().hex}") |
| | zip_path = shutil.make_archive(zip_base_name, 'zip', download_root_dir) |
| | progress(1, desc="Download ready!") |
| | gr.Info("ZIP file created successfully!") |
| | return gr.update(value=zip_path, visible=True) |
| | finally: |
| | shutil.rmtree(download_root_dir, ignore_errors=True) |
| |
|
| | |
| | with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue"), title="Hugging Face Hub Toolkit") as demo: |
| | |
| | hf_token_state = gr.State(None) |
| | manage_repo_type_state = gr.State(None) |
| | download_repo_type_state = gr.State(None) |
| | selected_repo = gr.State(None) |
| |
|
| | gr.Markdown("# Hugging Face Hub Toolkit") |
| | with gr.Sidebar(): |
| | hf_token = gr.Textbox(label="Hugging Face API Token", type="password", placeholder="hf_...") |
| | whoami_output = gr.JSON(label="Authenticated User", visible=False) |
| |
|
| | with gr.Tabs(): |
| | with gr.TabItem("Manage Repositories"): |
| | |
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | gr.Markdown("### 1. Select a Repository") |
| | author_input = gr.Textbox(label="Author (Username or Org)") |
| | with gr.Row(): |
| | manage_buttons = [gr.Button(f"List {label}") for label in ["Spaces", "Models", "Datasets"]] |
| | manage_repo_dropdown = gr.Dropdown(label="Select a Repository", interactive=True) |
| | with gr.Column(scale=2): |
| | with gr.Column(visible=False) as action_panel: |
| | gr.Markdown("### 2. Choose an Action") |
| | manage_files_btn = gr.Button("Manage Files", interactive=False) |
| | delete_repo_btn = gr.Button("Delete This Repo", variant="stop", interactive=False) |
| | archive_repo_name = gr.Textbox(label="Archive",placeholder="my_archive, will append /repo_name") |
| | archive_repo_btn = gr.Button("Send to Archive", variant="secondary", interactive=False) |
| | with gr.Column(visible=False) as editor_panel: |
| | gr.Markdown("### 3. Edit Files") |
| | file_selector = gr.Dropdown(label="Select File", interactive=True) |
| | code_editor = gr.Code(label="File Content", interactive=True) |
| | dataframe_editor = gr.Dataframe(label="Dataset Editor", interactive=True, visible=False, wrap=True) |
| | commit_message_input = gr.Textbox(label="Commit Message", placeholder="e.g., Update README.md") |
| | commit_btn = gr.Button("Commit Changes", variant="primary", interactive=False) |
| | |
| | repo_types = ["space", "model", "dataset"] |
| | for i, btn in enumerate(manage_buttons): |
| | btn.click(fn=list_repos_for_management, |
| | inputs=[hf_token_state, author_input, gr.State(repo_types[i])], |
| | outputs=[manage_repo_type_state, manage_repo_dropdown, action_panel, editor_panel]) |
| |
|
| | with gr.TabItem("Bulk Download (ZIP)"): |
| | |
| | gr.Markdown("## Download Multiple Repositories as a ZIP") |
| | download_author_input = gr.Textbox(label="Author (Username or Org)") |
| | with gr.Row(): |
| | download_buttons = [gr.Button(f"List {label}") for label in ["Spaces", "Models", "Datasets"]] |
| | download_repo_dropdown = gr.Dropdown(label="Select Repositories", multiselect=True, interactive=True) |
| | download_btn = gr.Button("Download Selected as ZIP", variant="primary") |
| | download_output_file = gr.File(label="Your Downloaded ZIP File", visible=False) |
| | |
| | for i, btn in enumerate(download_buttons): |
| | btn.click(fn=list_repos_for_download, |
| | inputs=[hf_token_state, download_author_input, gr.State(repo_types[i])], |
| | outputs=[download_repo_type_state, download_repo_dropdown]) |
| |
|
| | |
| | hf_token.change(fn=handle_token_change, inputs=hf_token, |
| | outputs=[hf_token_state, manage_files_btn, archive_repo_btn, delete_repo_btn, commit_btn, author_input, download_author_input, whoami_output]) |
| |
|
| | manage_repo_dropdown.select(fn=on_manage_repo_select, inputs=manage_repo_dropdown, outputs=[action_panel, selected_repo]) |
| | |
| | manage_files_btn.click(fn=show_files_and_load_first, |
| | inputs=[hf_token_state, manage_repo_dropdown, manage_repo_type_state], |
| | outputs=[editor_panel, file_selector, code_editor, dataframe_editor]) |
| |
|
| | |
| | archive_repo_btn.click(fn=archive_repo, inputs=[hf_token_state, selected_repo, archive_repo_name, manage_repo_type_state], |
| | outputs=[manage_repo_dropdown, action_panel, editor_panel], |
| | |
| | ) |
| | |
| | delete_repo_btn.click(fn=delete_repo, inputs=[hf_token_state, selected_repo, manage_repo_type_state], |
| | outputs=[manage_repo_dropdown, action_panel, editor_panel], |
| | js="() => confirm('Are you sure you want to permanently delete this repository?')") |
| |
|
| | file_selector.change(fn=load_file_content_for_editor, |
| | inputs=[hf_token_state, manage_repo_dropdown, manage_repo_type_state, file_selector], |
| | outputs=[code_editor, dataframe_editor]) |
| |
|
| |
|
| | commit_btn.click(fn=commit_file, |
| | inputs=[hf_token_state, manage_repo_dropdown, manage_repo_type_state, file_selector, |
| | code_editor, dataframe_editor, |
| | commit_message_input]) |
| |
|
| |
|
| | download_btn.click(fn=download_repos_as_zip, |
| | inputs=[hf_token_state, download_repo_dropdown, download_repo_type_state], |
| | outputs=[download_output_file]) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch(debug=True) |