| | |
| |
|
| | from diffusers import AutoencoderKL, DDPMScheduler, DDIMScheduler |
| | from transformers import CLIPTextModel, CLIPTokenizer |
| | from omegaconf import OmegaConf |
| | from huggingface_hub import hf_hub_download, try_to_load_from_cache |
| |
|
| | import os |
| | import json |
| | import base64 |
| |
|
| | from safetensors import safe_open |
| |
|
| | from diffusers.utils.import_utils import is_xformers_available |
| | from typing import Any |
| | import torch |
| | import imageio |
| | import torchvision |
| | import numpy as np |
| | from einops import rearrange |
| |
|
| | from animatediff.models.unet import UNet3DConditionModel |
| | from animatediff.pipelines.pipeline_animation import AnimationPipeline |
| | from animatediff.utils.util import save_videos_grid |
| | from animatediff.utils.util import load_weights |
| | from animatediff.utils.convert_from_ckpt import convert_ldm_unet_checkpoint, convert_ldm_clip_checkpoint, convert_ldm_vae_checkpoint |
| | from animatediff.utils.convert_lora_safetensor_to_diffusers import convert_lora |
| |
|
| | current_model = "backup" |
| |
|
| | class EndpointHandler(): |
| | def __init__(self, model_path: str = "bluestarburst/AnimateDiff-SceneFusion"): |
| | |
| | |
| | inference_config_path = hf_hub_download(repo_id="bluestarburst/AnimateDiff-SceneFusion", filename="configs/inference/inference-v3.yaml") |
| | print(inference_config_path) |
| | |
| | inference_config = OmegaConf.load(inference_config_path) |
| | |
| | |
| | |
| | |
| | tokenizer = CLIPTokenizer.from_pretrained(model_path, subfolder="models/StableDiffusion/tokenizer") |
| | text_encoder = CLIPTextModel.from_pretrained(model_path, subfolder="models/StableDiffusion/text_encoder") |
| | vae = AutoencoderKL.from_pretrained(model_path, subfolder="models/StableDiffusion/vae") |
| | |
| | unet_model_path = hf_hub_download(repo_id="bluestarburst/AnimateDiff-SceneFusion", filename="models/StableDiffusion/unet/diffusion_pytorch_model.bin") |
| | unet_config_path = hf_hub_download(repo_id="bluestarburst/AnimateDiff-SceneFusion", filename="models/StableDiffusion/unet/config.json") |
| |
|
| | print(unet_model_path) |
| |
|
| | unet = UNet3DConditionModel.from_pretrained_2d(pretrained_model_path=unet_model_path, unet_additional_kwargs=OmegaConf.to_container(inference_config.unet_additional_kwargs), config_path=unet_config_path) |
| |
|
| | |
| | inv_latent_path = hf_hub_download(repo_id="bluestarburst/AnimateDiff-SceneFusion", filename=f"models/Motion_Module/{current_model}/inv_latents/ddim_latent-1.pt") |
| | self.latents = torch.load(inv_latent_path).to(torch.float) |
| | print(self.latents.shape, self.latents.dtype) |
| | |
| | |
| | torch.backends.cuda.enable_flash_sdp(True) |
| | torch.backends.cuda.enable_math_sdp(True) |
| |
|
| | if is_xformers_available(): unet.enable_xformers_memory_efficient_attention() |
| | else: assert False |
| |
|
| | self.pipeline = AnimationPipeline( |
| | vae=vae, text_encoder=text_encoder, tokenizer=tokenizer, unet=unet, |
| | scheduler=DDIMScheduler(**OmegaConf.to_container(inference_config.noise_scheduler_kwargs.DDIMScheduler)) |
| | ).to("cuda") |
| | |
| | |
| |
|
| | |
| | motion_module = hf_hub_download(repo_id="bluestarburst/AnimateDiff-SceneFusion", filename=f"models/Motion_Module/{current_model}/mm.pth") |
| | |
| | |
| | LORA_DREAMBOOTH_PATH = None |
| | LORA_DREAMBOOTH_PATH = hf_hub_download(repo_id="bluestarburst/AnimateDiff-SceneFusion", filename="models/DreamBooth_LoRA/toonyou_beta3.safetensors") |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | motion_module_state_dict = torch.load(motion_module, map_location="cpu") |
| | missing, unexpected = self.pipeline.unet.load_state_dict(motion_module_state_dict, strict=False) |
| | assert len(unexpected) == 0 |
| | |
| | |
| | |
| | if LORA_DREAMBOOTH_PATH != "": |
| | if LORA_DREAMBOOTH_PATH.endswith(".ckpt"): |
| | state_dict = torch.load(LORA_DREAMBOOTH_PATH) |
| | self.pipeline.unet.load_state_dict(state_dict) |
| |
|
| | elif LORA_DREAMBOOTH_PATH.endswith(".safetensors"): |
| | state_dict = {} |
| | with safe_open(LORA_DREAMBOOTH_PATH, framework="pt", device="cpu") as f: |
| | for key in f.keys(): |
| | state_dict[key] = f.get_tensor(key) |
| |
|
| | is_lora = all("lora" in k for k in state_dict.keys()) |
| | if not is_lora: |
| | base_state_dict = state_dict |
| | else: |
| | base_state_dict = {} |
| | with safe_open("", framework="pt", device="cpu") as f: |
| | for key in f.keys(): |
| | base_state_dict[key] = f.get_tensor(key) |
| |
|
| | |
| | converted_vae_checkpoint = convert_ldm_vae_checkpoint(base_state_dict, self.pipeline.vae.config) |
| | self.pipeline.vae.load_state_dict(converted_vae_checkpoint) |
| | |
| | converted_unet_checkpoint = convert_ldm_unet_checkpoint(base_state_dict, self.pipeline.unet.config) |
| | self.pipeline.unet.load_state_dict(converted_unet_checkpoint, strict=False) |
| | |
| | |
| | |
| |
|
| | |
| | |
| | if is_lora: |
| | self.pipeline = convert_lora(self.pipeline, state_dict) |
| | |
| |
|
| | self.pipeline.to("cuda") |
| | |
| | def __call__(self, data : Any): |
| | """ |
| | __call__ method will be called once per request. This can be used to |
| | run inference. |
| | """ |
| | |
| | prompt = data.pop("prompt", "") |
| | negative_prompt = data.pop("negative_prompt", "") |
| | negative_prompt += ",easynegative,bad_construction,bad_structure,bad_wail,bad_windows,blurry,cloned_window,cropped,deformed,disfigured,error,extra_windows,extra_chimney,extra_door,extra_structure,extra_frame,fewer_digits,fused_structure,gross_proportions,jpeg_artifacts,long_roof,low_quality,structure_limbs,missing_windows,missing_doors,missing_roofs,mutated_structure,mutation,normal_quality,out_of_frame,owres,poorly_drawn_structure,poorly_drawn_house,signature,text,too_many_windows,ugly,username,uta,watermark,worst_quality" |
| | steps = data.pop("steps", 25) |
| | guidance_scale = data.pop("guidance_scale", 12.5) |
| | |
| | print(f"current seed: {torch.initial_seed()}") |
| | print(f"sampling {prompt} ...") |
| | vids = self.pipeline( |
| | prompt, |
| | negative_prompt = negative_prompt, |
| | num_inference_steps = steps, |
| | guidance_scale = guidance_scale, |
| | width = 256, |
| | height = 256, |
| | video_length = 5, |
| | latents = self.latents, |
| | ).videos |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | videos = rearrange(vids, "b c t h w -> t b c h w") |
| | n_rows=6 |
| | fps=1 |
| | loop = True |
| | rescale=False |
| | outputs = [] |
| | for x in videos: |
| | x = torchvision.utils.make_grid(x, nrow=n_rows) |
| | x = x.transpose(0, 1).transpose(1, 2).squeeze(-1) |
| | if rescale: |
| | x = (x + 1.0) / 2.0 |
| | x = (x * 255).numpy().astype(np.uint8) |
| | outputs.append(x) |
| | |
| | path = "output.gif" |
| | imageio.mimsave(path, outputs, fps=fps) |
| | |
| | |
| | with open(path, mode="rb") as file: |
| | file_content = file.read() |
| | |
| | |
| | base64_encoded_content = base64.b64encode(file_content).decode("utf-8") |
| |
|
| | |
| | json_data = { |
| | "filename": "output.gif", |
| | "content": base64_encoded_content |
| | } |
| |
|
| | |
| | return json.dumps(json_data) |
| | |
| |
|
| | |
| | |
| |
|
| |
|
| | |
| |
|