vlm_test_model_for_sglang_toy / test_model_loading.py
yonghenglh6's picture
Upload folder using huggingface_hub
76abf0f verified
#!/usr/bin/env python3
"""
Test script to verify DotsVLMForCausalLM model loading
"""
import os
import json
import shutil
import torch
from transformers import AutoConfig, AutoProcessor
from .configuration_dots_vlm import DotsVLMConfig, DotsVLMProcessor
from .modeling_dots_vlm import DotsVLMForCausalLM
import traceback
def create_debug_model_path(original_path, debug_layers=3):
"""
创建一个debug版本的模型路径,只包含指定层数的配置和权重
Args:
original_path: 原始模型路径
debug_layers: debug模式下使用的层数
Returns:
debug模型的临时路径
"""
debug_path = original_path + "_debug_temp"
# 如果debug目录已存在,先删除
# if os.path.exists(debug_path):
# shutil.rmtree(debug_path)
# 创建debug目录
os.makedirs(debug_path, exist_ok=True)
# 复制基本配置文件
files_to_copy = [
'tokenizer_config.json',
'tokenizer.json',
'special_tokens_map.json',
'preprocessor_config.json',
'modeling_deepseek.py',
'configuration_deepseek.py'
]
for file in files_to_copy:
src = os.path.join(original_path, file)
dst = os.path.join(debug_path, file)
if os.path.exists(src):
if not os.path.exists(dst):
print(f"📁 Copying basefile {file}... from {src} to {dst}")
shutil.copy2(src, dst)
# # 复制ve目录(vision encoder)
# ve_src = os.path.join(original_path, 've')
# ve_dst = os.path.join(debug_path, 've')
# if os.path.exists(ve_src):
# shutil.copytree(ve_src, ve_dst)
# 修改config.json - 减少层数
config_src = os.path.join(original_path, 'config.json')
config_dst = os.path.join(debug_path, 'config.json')
with open(config_src, 'r') as f:
config = json.load(f)
# 修改为debug层数
original_layers = config['num_hidden_layers']
config['num_hidden_layers'] = debug_layers
print(f"🔧 DEBUG: Reducing num_hidden_layers from {original_layers} to {debug_layers}")
with open(config_dst, 'w') as f:
json.dump(config, f, indent=2)
# 读取原始safetensor索引
index_src = os.path.join(original_path, 'model.safetensors.index.json')
with open(index_src, 'r') as f:
index_data = json.load(f)
original_weight_map = index_data['weight_map']
# 确定需要的safetensor文件
# 前8个文件:包含嵌入层和前几层
front_files = [f"model-{i:05d}-of-00316.safetensors" for i in range(1, 9)]
# 第2层的权重在model-00056中,需要特别包含
layer2_files = ["model-00056-of-00316.safetensors"]
# 后2个文件:包含最后的输出层和vision_tower
back_files = ["model-00315-of-00316.safetensors", "model-00316-of-00316.safetensors"]
# model.norm所在文件
norm_files = ["model-00314-of-00316.safetensors"]
needed_files = set(front_files + layer2_files + back_files + norm_files)
print(f"🔧 DEBUG: Will load {len(needed_files)} safetensor files instead of 316:")
for f in sorted(needed_files):
print(f" - {f}")
# 过滤权重映射,只保留需要的层和基础组件
new_weight_map = {}
# 保留嵌入层和输出层
for key, file in original_weight_map.items():
# 保留基础组件
if any(key.startswith(prefix) for prefix in [
'model.embed_tokens',
'model.norm',
'lm_head'
]):
if file in needed_files:
new_weight_map[key] = file
# 保留vision_tower权重(这些在model-00315和model-00316中)
elif key.startswith('vision_tower.'):
if file in needed_files:
new_weight_map[key] = file
# 只保留前debug_layers层
elif key.startswith('model.layers.'):
# 提取层号
layer_parts = key.split('.')
if len(layer_parts) >= 3 and layer_parts[2].isdigit():
layer_num = int(layer_parts[2])
if layer_num < debug_layers and file in needed_files:
new_weight_map[key] = file
print(f"🔧 DEBUG: Filtered weight map from {len(original_weight_map)} to {len(new_weight_map)} entries")
# 复制需要的safetensor文件
copied_files = set()
for file in new_weight_map.values():
if file not in copied_files:
src_file = os.path.join(original_path, file)
dst_file = os.path.join(debug_path, file)
if os.path.exists(src_file):
if not os.path.exists(dst_file):
print(f"📁 Copying Safetensor {file}... from {src_file} to {dst_file}")
shutil.copy2(src_file, dst_file)
copied_files.add(file)
else:
print(f"⚠️ File not found: {src_file}")
# 创建新的索引文件
new_index_data = {
"metadata": index_data.get("metadata", {}),
"weight_map": new_weight_map
}
index_dst = os.path.join(debug_path, 'model.safetensors.index.json')
with open(index_dst, 'w') as f:
json.dump(new_index_data, f, indent=2)
print(f"✅ DEBUG: Created debug model at {debug_path}")
return debug_path
def test_model_loading():
"""Test loading the model from pretrained weights"""
# Path to your model weights
model_path = "."
# Check if DEBUG mode is enabled
debug_minimal_layers = os.getenv('DEBUG_MINIMAL_LAYERS', '0') == '1'
if debug_minimal_layers:
print("🔧 DEBUG MINIMAL LAYERS: Using only 3 layers and minimal safetensors")
# 创建debug版本的模型
model_path = create_debug_model_path(model_path, debug_layers=3)
print("Loading model configuration...")
try:
config = AutoConfig.from_pretrained(model_path)
print(f"✓ Config loaded successfully: {config.__class__.__name__}")
print(f" Model type: {config.model_type}")
print(f" Architecture: {config.architectures}")
print(f" Number of hidden layers: {config.num_hidden_layers}")
# Check if quantization config exists and is fp8
if hasattr(config, 'quantization_config') and config.quantization_config is not None:
quant_config = config.quantization_config
if isinstance(quant_config, dict) and quant_config.get('quant_method') == 'fp8':
print(" Detected FP8 quantization configuration")
print(f" Format: {quant_config.get('fmt', 'unknown')}")
print(f" Weight block size: {quant_config.get('weight_block_size', 'unknown')}")
print(f" Activation scheme: {quant_config.get('activation_scheme', 'unknown')}")
except Exception as e:
print(f"✗ Error loading config: {e}")
return False
print("\nTesting processor loading...")
try:
# Test if processor can be loaded
try:
processor = AutoProcessor.from_pretrained(model_path)
print(f"✓ AutoProcessor loaded successfully: {processor.__class__.__name__}")
except Exception as e:
print(f"⚠️ AutoProcessor failed, trying direct import: {e}")
# Fallback to direct processor creation
processor = DotsVLMProcessor.from_pretrained(model_path)
print(f"✓ Direct processor loaded successfully: {processor.__class__.__name__}")
# Check processor attributes
if hasattr(processor, 'image_token'):
print(f" Image token: {processor.image_token}")
if hasattr(processor, 'tokenizer'):
print(f" Tokenizer type: {processor.tokenizer.__class__.__name__}")
if hasattr(processor, 'image_processor'):
print(f" Image processor type: {processor.image_processor.__class__.__name__}")
except Exception as e:
print(f"⚠️ Processor loading failed (this is OK for now): {e}")
processor = None
print("\nLoading model with auto-fixing vision_tower quantization...")
try:
# Use the custom model class directly for elegant handling
model = DotsVLMForCausalLM.from_pretrained(
model_path,
config=config,
torch_dtype="auto",
device_map="auto",
trust_remote_code=True
)
print(f"✓ Model loaded successfully: {model.__class__.__name__}")
print(f" Number of parameters: {model.num_parameters():,}")
# Check vision_tower dtype
if hasattr(model, 'vision_tower') and model.vision_tower is not None:
vision_sample_param = list(model.vision_tower.parameters())[0]
print(f" Vision tower dtype: {vision_sample_param.dtype}")
except Exception as e:
traceback.print_exc()
print(f"✗ Error loading model: {e}")
return False
# print("\nLoading vision weights...")
# try:
# vision_weights_path = f"{model_path}/ve/visual.pt"
# vision_state_dict = torch.load(vision_weights_path, map_location='cpu', weights_only=False)
# # Extract vision_encoder weights and remove vision_tower prefix
# vision_encoder_weights = vision_state_dict['vision_encoder']
# cleaned_weights = {}
# for key, value in vision_encoder_weights.items():
# if key.startswith('vision_tower.'):
# new_key = key[len('vision_tower.'):] # Remove 'vision_tower.' prefix
# cleaned_weights[new_key] = value
# else:
# cleaned_weights[key] = value
# # Convert weights to match the vision_tower dtype
# target_dtype = list(model.vision_tower.parameters())[0].dtype
# for key, value in cleaned_weights.items():
# if value.dtype != target_dtype and value.dtype in [torch.float32, torch.float16, torch.bfloat16]:
# cleaned_weights[key] = value.to(target_dtype)
# # Use assign=True to avoid meta tensor copying warnings
# model.vision_tower.load_state_dict(cleaned_weights, assign=True, strict=False)
# print("✓ Vision weights loaded successfully")
# print(f" Vision tower final dtype: {list(model.vision_tower.parameters())[0].dtype}")
# except Exception as e:
# print(f"✗ Error loading vision weights: {e}")
# return False
print("\nTesting model forward pass...")
try:
# Full model forward pass
batch_size = 1
seq_len = 10
input_ids = torch.randint(0, 1000, (batch_size, seq_len))
# Move inputs to the same device as the model
if hasattr(model, 'device'):
device = next(model.parameters()).device
input_ids = input_ids.to(device)
with torch.no_grad():
outputs = model(input_ids=input_ids)
print(f"✓ Forward pass successful")
print(f" Output shape: {outputs.logits.shape}")
print(f" Output dtype: {outputs.logits.dtype}")
except Exception as e:
print(f"✗ Error in forward pass: {e}")
traceback.print_exc()
return False
# 清理临时debug目录
if debug_minimal_layers and model_path.endswith("_debug_temp"):
try:
# shutil.rmtree(model_path)
print(f"🧹 Cleaned up temporary debug directory: {model_path}")
except Exception as e:
print(f"⚠️ Failed to clean up debug directory: {e}")
elif debug_minimal_layers:
print("\n🎉 DEBUG MINIMAL LAYERS: 3-layer model test completed!")
print("💡 To test with full 61-layer model, run without DEBUG_MINIMAL_LAYERS=1")
else:
print("\n🎉 All tests passed! Model is working correctly.")
return True
if __name__ == "__main__":
test_model_loading()