Update app_januspro.py

Change visual and device types (issues in MacOS and Windows 11 Version)
This commit is contained in:
William S 2025-01-29 18:48:35 -03:00 committed by GitHub
parent a74a59f8a9
commit e944b2ea94
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,245 +1,263 @@
import gradio as gr import gradio as gr
import torch import torch
import inspect
from transformers import AutoConfig, AutoModelForCausalLM from transformers import AutoConfig, AutoModelForCausalLM
from janus.models import MultiModalityCausalLM, VLChatProcessor from janus.models import MultiModalityCausalLM, VLChatProcessor
from janus.utils.io import load_pil_images
from PIL import Image from PIL import Image
import numpy as np import numpy as np
import os import logging
import time
# import spaces # Import spaces for ZeroGPU compatibility
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load model and processor # Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
logger.info(f"Using device: {device}")
def load_processor(model_path):
"""Load and configure the VLChatProcessor with proper parameter filtering"""
# Get valid initialization parameters
init_params = inspect.getfullargspec(VLChatProcessor.__init__).args
init_params.remove('self')
# Load model config to find processor parameters
model_config = AutoConfig.from_pretrained(model_path)
processor_config = getattr(model_config, 'processor_config', {})
# Filter valid parameters
valid_config = {k: v for k, v in processor_config.items() if k in init_params}
return VLChatProcessor.from_pretrained(
model_path,
**valid_config,
legacy=False,
use_fast=True
)
def load_model():
"""Load the model with proper configuration and device management"""
model_path = "deepseek-ai/Janus-Pro-7B" model_path = "deepseek-ai/Janus-Pro-7B"
config = AutoConfig.from_pretrained(model_path)
language_config = config.language_config
language_config._attn_implementation = 'eager'
vl_gpt = AutoModelForCausalLM.from_pretrained(model_path,
language_config=language_config,
trust_remote_code=True)
if torch.cuda.is_available():
vl_gpt = vl_gpt.to(torch.bfloat16).cuda()
else:
vl_gpt = vl_gpt.to(torch.float16)
vl_chat_processor = VLChatProcessor.from_pretrained(model_path) # Load model config
config = AutoConfig.from_pretrained(model_path)
config.language_config._attn_implementation = 'eager' if device.type == 'cpu' else 'flash_attention_2'
# Load model with mixed precision
torch_dtype = torch_dtype=torch.bfloat16 if torch.cuda.is_available() else torch.float16
vl_gpt = AutoModelForCausalLM.from_pretrained(
model_path,
config=config,
trust_remote_code=True,
torch_dtype=torch_dtype,
device_map="auto" if device.type != 'cpu' else None
)
# Load processor and tokenizer
vl_chat_processor = load_processor(model_path)
tokenizer = vl_chat_processor.tokenizer tokenizer = vl_chat_processor.tokenizer
cuda_device = 'cuda' if torch.cuda.is_available() else 'cpu'
if device.type == 'cuda':
vl_gpt = vl_gpt.to(device)
return vl_gpt, vl_chat_processor, tokenizer
try:
vl_gpt, vl_chat_processor, tokenizer = load_model()
except Exception as e:
logger.error(f"Failed to initialize model: {str(e)}")
raise
@torch.inference_mode() @torch.inference_mode()
# @spaces.GPU(duration=120) def multimodal_understanding(image, question, seed=42, top_p=0.95, temperature=0.1, max_new_tokens=1024):
# Multimodal Understanding function """Handle multimodal understanding requests"""
def multimodal_understanding(image, question, seed, top_p, temperature): try:
# Clear CUDA cache before generating # Input processing
torch.cuda.empty_cache() conversation = [{
# set seed
torch.manual_seed(seed)
np.random.seed(seed)
torch.cuda.manual_seed(seed)
conversation = [
{
"role": "<|User|>", "role": "<|User|>",
"content": f"<image_placeholder>\n{question}", "content": f"<image_placeholder>\n{question}",
"images": [image], "images": [image]
}, }, {"role": "<|Assistant|>", "content": ""}]
{"role": "<|Assistant|>", "content": ""},
]
pil_images = [Image.fromarray(image)] # Process images and text
pil_images = [Image.fromarray(image).convert('RGB')]
prepare_inputs = vl_chat_processor( prepare_inputs = vl_chat_processor(
conversations=conversation, images=pil_images, force_batchify=True conversations=conversation,
).to(cuda_device, dtype=torch.bfloat16 if torch.cuda.is_available() else torch.float16) images=pil_images,
force_batchify=True
).to(device, dtype=vl_gpt.dtype)
inputs_embeds = vl_gpt.prepare_inputs_embeds(**prepare_inputs)
# Generate response
outputs = vl_gpt.language_model.generate( outputs = vl_gpt.language_model.generate(
inputs_embeds=inputs_embeds, inputs_embeds=vl_gpt.prepare_inputs_embeds(**prepare_inputs),
attention_mask=prepare_inputs.attention_mask, attention_mask=prepare_inputs.attention_mask,
pad_token_id=tokenizer.eos_token_id, pad_token_id=tokenizer.eos_token_id,
bos_token_id=tokenizer.bos_token_id, bos_token_id=tokenizer.bos_token_id,
eos_token_id=tokenizer.eos_token_id, eos_token_id=tokenizer.eos_token_id,
max_new_tokens=512, max_new_tokens=max_new_tokens,
do_sample=False if temperature == 0 else True, do_sample=temperature > 0,
use_cache=True, temperature=temperature if temperature > 0 else None,
temperature=temperature, top_p=top_p if temperature > 0 else None,
top_p=top_p, use_cache=True
) )
answer = tokenizer.decode(outputs[0].cpu().tolist(), skip_special_tokens=True) return tokenizer.decode(outputs[0], skip_special_tokens=True)
return answer
except Exception as e:
logger.error(f"Understanding error: {str(e)}")
return f"Error processing request: {str(e)}"
def generate(input_ids, @torch.inference_mode()
width, def generate_image(prompt, seed=12345, guidance=5.0, temperature=1.0, parallel_size=4):
height, """Handle image generation requests"""
temperature: float = 1, try:
parallel_size: int = 5, # Text processing
cfg_weight: float = 5, messages = [{'role': '<|User|>', 'content': prompt}, {'role': '<|Assistant|>', 'content': ''}]
image_token_num_per_image: int = 576, text = vl_chat_processor.apply_sft_template_for_multi_turn_prompts(
patch_size: int = 16): conversations=messages,
# Clear CUDA cache before generating sft_format=vl_chat_processor.sft_format,
torch.cuda.empty_cache() system_prompt=''
) + vl_chat_processor.image_start_tag
tokens = torch.zeros((parallel_size * 2, len(input_ids)), dtype=torch.int).to(cuda_device) # Generate image tokens
for i in range(parallel_size * 2): input_ids = torch.LongTensor(tokenizer.encode(text)).to(device)
tokens[i, :] = input_ids generated_tokens, patches = generate(
if i % 2 != 0: input_ids=input_ids,
tokens[i, 1:-1] = vl_chat_processor.pad_id width=384,
height=384,
cfg_weight=guidance,
parallel_size=parallel_size,
temperature=temperature
)
# Process output images
images = unpack(patches, 384, 384, parallel_size)
return [Image.fromarray(img).resize((768, 768), Image.Resampling.LANCZOS) for img in images]
except Exception as e:
logger.error(f"Generation error: {str(e)}")
return []
def generate(input_ids, width, height, **kwargs):
"""Core image generation function"""
try:
parallel_size = kwargs.get('parallel_size', 4)
image_token_num_per_image = 576
# Initialize tokens
tokens = torch.stack([input_ids] * (parallel_size * 2), dim=0)
generated = torch.zeros((parallel_size, image_token_num_per_image),
dtype=torch.int, device=device)
inputs_embeds = vl_gpt.language_model.get_input_embeddings()(tokens) inputs_embeds = vl_gpt.language_model.get_input_embeddings()(tokens)
generated_tokens = torch.zeros((parallel_size, image_token_num_per_image), dtype=torch.int).to(cuda_device)
pkv = None pkv = None
for i in range(image_token_num_per_image): for i in range(image_token_num_per_image):
with torch.no_grad(): outputs = vl_gpt.language_model.model(
outputs = vl_gpt.language_model.model(inputs_embeds=inputs_embeds, inputs_embeds=inputs_embeds,
use_cache=True, past_key_values=pkv,
past_key_values=pkv) use_cache=True
)
pkv = outputs.past_key_values pkv = outputs.past_key_values
hidden_states = outputs.last_hidden_state logits = vl_gpt.gen_head(outputs.last_hidden_state[:, -1, :])
logits = vl_gpt.gen_head(hidden_states[:, -1, :])
logit_cond = logits[0::2, :]
logit_uncond = logits[1::2, :]
logits = logit_uncond + cfg_weight * (logit_cond - logit_uncond)
probs = torch.softmax(logits / temperature, dim=-1)
next_token = torch.multinomial(probs, num_samples=1)
generated_tokens[:, i] = next_token.squeeze(dim=-1)
next_token = torch.cat([next_token.unsqueeze(dim=1), next_token.unsqueeze(dim=1)], dim=1).view(-1)
img_embeds = vl_gpt.prepare_gen_img_embeds(next_token) # Classifier-free guidance
inputs_embeds = img_embeds.unsqueeze(dim=1) logit_cond, logit_uncond = logits[0::2], logits[1::2]
logits = logit_uncond + kwargs['cfg_weight'] * (logit_cond - logit_uncond)
# Sampling
probs = torch.softmax(logits / kwargs['temperature'], dim=-1)
next_token = torch.multinomial(probs, 1)
generated[:, i] = next_token.squeeze()
# Prepare next input
inputs_embeds = vl_gpt.prepare_gen_img_embeds(
next_token.repeat(1, 2).view(-1)
).unsqueeze(1)
patches = vl_gpt.gen_vision_model.decode_code(generated_tokens.to(dtype=torch.int), # Decode patches
shape=[parallel_size, 8, width // patch_size, height // patch_size]) return generated, vl_gpt.gen_vision_model.decode_code(
generated.to(torch.int),
return generated_tokens.to(dtype=torch.int), patches shape=[parallel_size, 8, width//16, height//16]
def unpack(dec, width, height, parallel_size=5):
dec = dec.to(torch.float32).cpu().numpy().transpose(0, 2, 3, 1)
dec = np.clip((dec + 1) / 2 * 255, 0, 255)
visual_img = np.zeros((parallel_size, width, height, 3), dtype=np.uint8)
visual_img[:, :, :] = dec
return visual_img
@torch.inference_mode()
# @spaces.GPU(duration=120) # Specify a duration to avoid timeout
def generate_image(prompt,
seed=None,
guidance=5,
t2i_temperature=1.0):
# Clear CUDA cache and avoid tracking gradients
torch.cuda.empty_cache()
# Set the seed for reproducible results
if seed is not None:
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
np.random.seed(seed)
width = 384
height = 384
parallel_size = 5
with torch.no_grad():
messages = [{'role': '<|User|>', 'content': prompt},
{'role': '<|Assistant|>', 'content': ''}]
text = vl_chat_processor.apply_sft_template_for_multi_turn_prompts(conversations=messages,
sft_format=vl_chat_processor.sft_format,
system_prompt='')
text = text + vl_chat_processor.image_start_tag
input_ids = torch.LongTensor(tokenizer.encode(text))
output, patches = generate(input_ids,
width // 16 * 16,
height // 16 * 16,
cfg_weight=guidance,
parallel_size=parallel_size,
temperature=t2i_temperature)
images = unpack(patches,
width // 16 * 16,
height // 16 * 16,
parallel_size=parallel_size)
return [Image.fromarray(images[i]).resize((768, 768), Image.LANCZOS) for i in range(parallel_size)]
# Gradio interface
with gr.Blocks() as demo:
gr.Markdown(value="# Multimodal Understanding")
with gr.Row():
image_input = gr.Image()
with gr.Column():
question_input = gr.Textbox(label="Question")
und_seed_input = gr.Number(label="Seed", precision=0, value=42)
top_p = gr.Slider(minimum=0, maximum=1, value=0.95, step=0.05, label="top_p")
temperature = gr.Slider(minimum=0, maximum=1, value=0.1, step=0.05, label="temperature")
understanding_button = gr.Button("Chat")
understanding_output = gr.Textbox(label="Response")
examples_inpainting = gr.Examples(
label="Multimodal Understanding examples",
examples=[
[
"explain this meme",
"images/doge.png",
],
[
"Convert the formula into latex code.",
"images/equation.png",
],
],
inputs=[question_input, image_input],
) )
except Exception as e:
logger.error(f"Generate core error: {str(e)}")
raise
gr.Markdown(value="# Text-to-Image Generation") def unpack(dec, width, height, parallel_size):
"""Convert model output to images"""
try:
dec = dec.float().cpu().numpy().transpose(0, 2, 3, 1)
dec = np.clip((dec + 1) * 127.5, 0, 255).astype(np.uint8)
return [dec[i] for i in range(parallel_size)]
except Exception as e:
logger.error(f"Unpack error: {str(e)}")
return [np.zeros((height, width, 3), dtype=np.uint8)] * parallel_size
# Gradio Interface
with gr.Blocks(title="Janus Pro 7B", theme=gr.themes.Soft()) as demo:
gr.Markdown("## 🖼️ Janus Pro 7B - Multimodal AI Assistant")
with gr.Tab("Image Understanding"):
with gr.Row(): with gr.Row():
cfg_weight_input = gr.Slider(minimum=1, maximum=10, value=5, step=0.5, label="CFG Weight") with gr.Column():
t2i_temperature = gr.Slider(minimum=0, maximum=1, value=1.0, step=0.05, label="temperature") image_input = gr.Image(label="Upload Image", type="numpy")
# examples_und = gr.Examples(
prompt_input = gr.Textbox(label="Prompt. (Prompt in more detail can help produce better images!)") # examples=[
seed_input = gr.Number(label="Seed (Optional)", precision=0, value=12345) # ["explain this meme", "images/doge.png"],
# ["Convert the formula into latex code", "images/equation.png"]
generation_button = gr.Button("Generate Images") # ],
# inputs=[gr.Textbox(), image_input], # Use component references
image_output = gr.Gallery(label="Generated Images", columns=2, rows=2, height=300) # label="Example Queries"
# )
with gr.Column():
question_input = gr.Textbox(label="Question", placeholder="Ask about the image...")
with gr.Accordion("Advanced Settings", open=False):
und_seed = gr.Number(42, label="Seed", precision=0)
top_p = gr.Slider(0, 1, 0.95, label="Top-p Sampling")
temperature = gr.Slider(0, 1, 0.1, label="Temperature")
max_tokens = gr.Slider(128, 2048, 1024, step=128, label="Max Tokens")
understanding_button = gr.Button("Analyze", variant="primary")
understanding_output = gr.Textbox(label="Response", interactive=False)
with gr.Tab("Image Generation"):
with gr.Row():
with gr.Column():
prompt_input = gr.Textbox(label="Prompt", placeholder="Describe your image...", lines=3)
examples_t2i = gr.Examples( examples_t2i = gr.Examples(
label="Text to image generation examples.",
examples=[ examples=[
"Master shifu racoon wearing drip attire as a street gangster.", "Master shifu raccoon wearing streetwear",
"The face of a beautiful girl", "Astronaut in a jungle, detailed 8k rendering"
"Astronaut in a jungle, cold color palette, muted colors, detailed, 8k",
"A glass of red wine on a reflective surface.",
"A cute and adorable baby fox with big brown eyes, autumn leaves in the background enchanting,immortal,fluffy, shiny mane,Petals,fairyism,unreal engine 5 and Octane Render,highly detailed, photorealistic, cinematic, natural colors.",
"The image features an intricately designed eye set against a circular backdrop adorned with ornate swirl patterns that evoke both realism and surrealism. At the center of attention is a strikingly vivid blue iris surrounded by delicate veins radiating outward from the pupil to create depth and intensity. The eyelashes are long and dark, casting subtle shadows on the skin around them which appears smooth yet slightly textured as if aged or weathered over time.\n\nAbove the eye, there's a stone-like structure resembling part of classical architecture, adding layers of mystery and timeless elegance to the composition. This architectural element contrasts sharply but harmoniously with the organic curves surrounding it. Below the eye lies another decorative motif reminiscent of baroque artistry, further enhancing the overall sense of eternity encapsulated within each meticulously crafted detail. \n\nOverall, the atmosphere exudes a mysterious aura intertwined seamlessly with elements suggesting timelessness, achieved through the juxtaposition of realistic textures and surreal artistic flourishes. Each component\u2014from the intricate designs framing the eye to the ancient-looking stone piece above\u2014contributes uniquely towards creating a visually captivating tableau imbued with enigmatic allure.",
], ],
inputs=prompt_input, inputs=prompt_input,
label="Example Prompts"
) )
with gr.Accordion("Advanced Settings", open=False):
cfg_weight = gr.Slider(1, 10, 5.0, label="CFG Weight")
t2i_temp = gr.Slider(0, 2, 1.0, label="Temperature")
seed_input = gr.Number(12345, label="Seed", precision=0)
parallel_size = gr.Slider(1, 8, 4, step=1, label="Batch Size")
generation_button = gr.Button("Generate", variant="primary")
with gr.Column():
image_output = gr.Gallery(label="Generated Images", columns=2, height=600)
# Event handlers
understanding_button.click( understanding_button.click(
multimodal_understanding, multimodal_understanding,
inputs=[image_input, question_input, und_seed_input, top_p, temperature], inputs=[image_input, question_input, und_seed, top_p, temperature, max_tokens],
outputs=understanding_output outputs=understanding_output
) )
generation_button.click( generation_button.click(
fn=generate_image, generate_image,
inputs=[prompt_input, seed_input, cfg_weight_input, t2i_temperature], inputs=[prompt_input, seed_input, cfg_weight, t2i_temp, parallel_size],
outputs=image_output outputs=image_output
) )
demo.launch(share=True) if __name__ == "__main__":
# demo.queue(concurrency_count=1, max_size=10).launch(server_name="0.0.0.0", server_port=37906, root_path="/path") demo.queue(concurrency_count=2).launch(
server_name="127.0.0.1",
server_port=7920,
share=False
)