#!/usr/bin/env python3 """ Test script to verify DotsVLMForCausalLM model loading """ import os import json import shutil import torch from transformers import AutoConfig, AutoProcessor from .configuration_dots_vlm import DotsVLMConfig, DotsVLMProcessor from .modeling_dots_vlm import DotsVLMForCausalLM import traceback def create_debug_model_path(original_path, debug_layers=3): """ 创建一个debug版本的模型路径,只包含指定层数的配置和权重 Args: original_path: 原始模型路径 debug_layers: debug模式下使用的层数 Returns: debug模型的临时路径 """ debug_path = original_path + "_debug_temp" # 如果debug目录已存在,先删除 # if os.path.exists(debug_path): # shutil.rmtree(debug_path) # 创建debug目录 os.makedirs(debug_path, exist_ok=True) # 复制基本配置文件 files_to_copy = [ 'tokenizer_config.json', 'tokenizer.json', 'special_tokens_map.json', 'preprocessor_config.json', 'modeling_deepseek.py', 'configuration_deepseek.py' ] for file in files_to_copy: src = os.path.join(original_path, file) dst = os.path.join(debug_path, file) if os.path.exists(src): if not os.path.exists(dst): print(f"📁 Copying basefile {file}... from {src} to {dst}") shutil.copy2(src, dst) # # 复制ve目录(vision encoder) # ve_src = os.path.join(original_path, 've') # ve_dst = os.path.join(debug_path, 've') # if os.path.exists(ve_src): # shutil.copytree(ve_src, ve_dst) # 修改config.json - 减少层数 config_src = os.path.join(original_path, 'config.json') config_dst = os.path.join(debug_path, 'config.json') with open(config_src, 'r') as f: config = json.load(f) # 修改为debug层数 original_layers = config['num_hidden_layers'] config['num_hidden_layers'] = debug_layers print(f"🔧 DEBUG: Reducing num_hidden_layers from {original_layers} to {debug_layers}") with open(config_dst, 'w') as f: json.dump(config, f, indent=2) # 读取原始safetensor索引 index_src = os.path.join(original_path, 'model.safetensors.index.json') with open(index_src, 'r') as f: index_data = json.load(f) original_weight_map = index_data['weight_map'] # 确定需要的safetensor文件 # 前8个文件:包含嵌入层和前几层 front_files = [f"model-{i:05d}-of-00316.safetensors" for i in range(1, 9)] # 第2层的权重在model-00056中,需要特别包含 layer2_files = ["model-00056-of-00316.safetensors"] # 后2个文件:包含最后的输出层和vision_tower back_files = ["model-00315-of-00316.safetensors", "model-00316-of-00316.safetensors"] # model.norm所在文件 norm_files = ["model-00314-of-00316.safetensors"] needed_files = set(front_files + layer2_files + back_files + norm_files) print(f"🔧 DEBUG: Will load {len(needed_files)} safetensor files instead of 316:") for f in sorted(needed_files): print(f" - {f}") # 过滤权重映射,只保留需要的层和基础组件 new_weight_map = {} # 保留嵌入层和输出层 for key, file in original_weight_map.items(): # 保留基础组件 if any(key.startswith(prefix) for prefix in [ 'model.embed_tokens', 'model.norm', 'lm_head' ]): if file in needed_files: new_weight_map[key] = file # 保留vision_tower权重(这些在model-00315和model-00316中) elif key.startswith('vision_tower.'): if file in needed_files: new_weight_map[key] = file # 只保留前debug_layers层 elif key.startswith('model.layers.'): # 提取层号 layer_parts = key.split('.') if len(layer_parts) >= 3 and layer_parts[2].isdigit(): layer_num = int(layer_parts[2]) if layer_num < debug_layers and file in needed_files: new_weight_map[key] = file print(f"🔧 DEBUG: Filtered weight map from {len(original_weight_map)} to {len(new_weight_map)} entries") # 复制需要的safetensor文件 copied_files = set() for file in new_weight_map.values(): if file not in copied_files: src_file = os.path.join(original_path, file) dst_file = os.path.join(debug_path, file) if os.path.exists(src_file): if not os.path.exists(dst_file): print(f"📁 Copying Safetensor {file}... from {src_file} to {dst_file}") shutil.copy2(src_file, dst_file) copied_files.add(file) else: print(f"⚠️ File not found: {src_file}") # 创建新的索引文件 new_index_data = { "metadata": index_data.get("metadata", {}), "weight_map": new_weight_map } index_dst = os.path.join(debug_path, 'model.safetensors.index.json') with open(index_dst, 'w') as f: json.dump(new_index_data, f, indent=2) print(f"✅ DEBUG: Created debug model at {debug_path}") return debug_path def test_model_loading(): """Test loading the model from pretrained weights""" # Path to your model weights model_path = "." # Check if DEBUG mode is enabled debug_minimal_layers = os.getenv('DEBUG_MINIMAL_LAYERS', '0') == '1' if debug_minimal_layers: print("🔧 DEBUG MINIMAL LAYERS: Using only 3 layers and minimal safetensors") # 创建debug版本的模型 model_path = create_debug_model_path(model_path, debug_layers=3) print("Loading model configuration...") try: config = AutoConfig.from_pretrained(model_path) print(f"✓ Config loaded successfully: {config.__class__.__name__}") print(f" Model type: {config.model_type}") print(f" Architecture: {config.architectures}") print(f" Number of hidden layers: {config.num_hidden_layers}") # Check if quantization config exists and is fp8 if hasattr(config, 'quantization_config') and config.quantization_config is not None: quant_config = config.quantization_config if isinstance(quant_config, dict) and quant_config.get('quant_method') == 'fp8': print(" Detected FP8 quantization configuration") print(f" Format: {quant_config.get('fmt', 'unknown')}") print(f" Weight block size: {quant_config.get('weight_block_size', 'unknown')}") print(f" Activation scheme: {quant_config.get('activation_scheme', 'unknown')}") except Exception as e: print(f"✗ Error loading config: {e}") return False print("\nTesting processor loading...") try: # Test if processor can be loaded try: processor = AutoProcessor.from_pretrained(model_path) print(f"✓ AutoProcessor loaded successfully: {processor.__class__.__name__}") except Exception as e: print(f"⚠️ AutoProcessor failed, trying direct import: {e}") # Fallback to direct processor creation processor = DotsVLMProcessor.from_pretrained(model_path) print(f"✓ Direct processor loaded successfully: {processor.__class__.__name__}") # Check processor attributes if hasattr(processor, 'image_token'): print(f" Image token: {processor.image_token}") if hasattr(processor, 'tokenizer'): print(f" Tokenizer type: {processor.tokenizer.__class__.__name__}") if hasattr(processor, 'image_processor'): print(f" Image processor type: {processor.image_processor.__class__.__name__}") except Exception as e: print(f"⚠️ Processor loading failed (this is OK for now): {e}") processor = None print("\nLoading model with auto-fixing vision_tower quantization...") try: # Use the custom model class directly for elegant handling model = DotsVLMForCausalLM.from_pretrained( model_path, config=config, torch_dtype="auto", device_map="auto", trust_remote_code=True ) print(f"✓ Model loaded successfully: {model.__class__.__name__}") print(f" Number of parameters: {model.num_parameters():,}") # Check vision_tower dtype if hasattr(model, 'vision_tower') and model.vision_tower is not None: vision_sample_param = list(model.vision_tower.parameters())[0] print(f" Vision tower dtype: {vision_sample_param.dtype}") except Exception as e: traceback.print_exc() print(f"✗ Error loading model: {e}") return False # print("\nLoading vision weights...") # try: # vision_weights_path = f"{model_path}/ve/visual.pt" # vision_state_dict = torch.load(vision_weights_path, map_location='cpu', weights_only=False) # # Extract vision_encoder weights and remove vision_tower prefix # vision_encoder_weights = vision_state_dict['vision_encoder'] # cleaned_weights = {} # for key, value in vision_encoder_weights.items(): # if key.startswith('vision_tower.'): # new_key = key[len('vision_tower.'):] # Remove 'vision_tower.' prefix # cleaned_weights[new_key] = value # else: # cleaned_weights[key] = value # # Convert weights to match the vision_tower dtype # target_dtype = list(model.vision_tower.parameters())[0].dtype # for key, value in cleaned_weights.items(): # if value.dtype != target_dtype and value.dtype in [torch.float32, torch.float16, torch.bfloat16]: # cleaned_weights[key] = value.to(target_dtype) # # Use assign=True to avoid meta tensor copying warnings # model.vision_tower.load_state_dict(cleaned_weights, assign=True, strict=False) # print("✓ Vision weights loaded successfully") # print(f" Vision tower final dtype: {list(model.vision_tower.parameters())[0].dtype}") # except Exception as e: # print(f"✗ Error loading vision weights: {e}") # return False print("\nTesting model forward pass...") try: # Full model forward pass batch_size = 1 seq_len = 10 input_ids = torch.randint(0, 1000, (batch_size, seq_len)) # Move inputs to the same device as the model if hasattr(model, 'device'): device = next(model.parameters()).device input_ids = input_ids.to(device) with torch.no_grad(): outputs = model(input_ids=input_ids) print(f"✓ Forward pass successful") print(f" Output shape: {outputs.logits.shape}") print(f" Output dtype: {outputs.logits.dtype}") except Exception as e: print(f"✗ Error in forward pass: {e}") traceback.print_exc() return False # 清理临时debug目录 if debug_minimal_layers and model_path.endswith("_debug_temp"): try: # shutil.rmtree(model_path) print(f"🧹 Cleaned up temporary debug directory: {model_path}") except Exception as e: print(f"⚠️ Failed to clean up debug directory: {e}") elif debug_minimal_layers: print("\n🎉 DEBUG MINIMAL LAYERS: 3-layer model test completed!") print("💡 To test with full 61-layer model, run without DEBUG_MINIMAL_LAYERS=1") else: print("\n🎉 All tests passed! Model is working correctly.") return True if __name__ == "__main__": test_model_loading()