pretraining_experiments / eval_models.py
AjayP13's picture
Allow chatting with SFT models
f982381
#!/usr/bin/env python3
"""
MAC OS X INSTALL: pip3 install torch==2.1.1 torchvision torchaudio transformers==4.48.0 accelerate==0.28.0 (You must use these versions, higher version have some numerical instability bug on MPS chips)
Interactive model evaluation script for pretraining experiments.
Automatically discovers and loads all models with /hf subdirectories.
"""
import os
import glob
from pathlib import Path
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import torch
import warnings
# Suppress warnings for cleaner output
warnings.filterwarnings("ignore")
MODEL_NAME_FILTER = None
class ModelEvaluator:
def __init__(self):
self.models = {}
self.tokenizers = {}
self.pipelines = {}
self.model_names = []
def discover_models(self):
"""Discover all models with /hf subdirectories."""
print("🔍 Discovering models with /hf subdirectories...")
# Find all directories that contain an /hf subdirectory
hf_dirs = []
for item in os.listdir('.'):
if os.path.isdir(item) and os.path.exists(os.path.join(item, 'hf')):
if MODEL_NAME_FILTER is None or MODEL_NAME_FILTER in item:
hf_dirs.append(item)
if not hf_dirs:
print("❌ No models with /hf subdirectories found!")
return False
print(f"✅ Found {len(hf_dirs)} models:")
for model_dir in hf_dirs:
print(f" - {model_dir}")
return hf_dirs
def load_model(self, model_dir):
"""Load a single model and its tokenizer."""
try:
hf_path = os.path.join(model_dir, 'hf')
print(f"🔄 Loading {model_dir}...")
# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(hf_path)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
# Load model
model = AutoModelForCausalLM.from_pretrained(
hf_path,
device_map=None,
torch_dtype=torch.float16,
trust_remote_code=True
)
model = model.to(torch.float16)
if torch.cuda.is_available():
model.to("cuda:0")
else:
model.to("mps")
# Create pipeline - use conversational for chat models, text-generation for others
if "chat" in model_dir.lower() or "sft" in model_dir.lower():
pipe = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
device_map="auto",
torch_dtype=torch.float16
)
print(f" 🔄 Using conversational pipeline for chat model")
else:
pipe = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
device_map="auto",
torch_dtype=torch.float16
)
print(f" 🔄 Using text-generation pipeline")
self.models[model_dir] = model
self.tokenizers[model_dir] = tokenizer
self.pipelines[model_dir] = pipe
self.model_names.append(model_dir)
print(f" ✅ {model_dir} loaded successfully")
return True
except Exception as e:
print(f" ❌ Failed to load {model_dir}: {str(e)}")
return False
def load_all_models(self):
"""Load all discovered models."""
hf_dirs = self.discover_models()
if not hf_dirs:
return False
print("\n🚀 Loading models...")
successful_loads = 0
for model_dir in hf_dirs:
if self.load_model(model_dir):
successful_loads += 1
print(f"\n📊 Loaded {successful_loads}/{len(hf_dirs)} models successfully")
return successful_loads > 0
def generate_response(self, model_name, prompt, max_length=256):
"""Generate response for a specific model."""
try:
pipe = self.pipelines[model_name]
# Check if this is a conversational pipeline
if "chat" in model_name.lower() or "sft" in model_name.lower():
# For conversational models, use the chat format
chat_input = [{"role": "user", "content": prompt}]
outputs = pipe(
chat_input,
max_new_tokens=max_length,
do_sample=True,
temperature=0.7,
top_p=0.9,
repetition_penalty=1.1,
pad_token_id=self.tokenizers[model_name].eos_token_id
)
# Extract the assistant's response from the conversational output
if outputs and len(outputs) > 0:
# The conversational pipeline returns the full conversation
# We need to extract just the assistant's last response
conversation = outputs[0]['generated_text']
if isinstance(conversation, list) and len(conversation) > 1:
# Find the last assistant message
for message in reversed(conversation):
if message.get('role') == 'assistant':
return message.get('content', 'No response generated')
# If no assistant message found, return the last message content
return conversation[-1].get('content', 'No response generated')
else:
return str(conversation)
else:
return "No response generated"
else:
# For text-generation models, use the original format
outputs = pipe(
prompt,
max_new_tokens=max_length,
do_sample=True,
temperature=0.7,
top_p=0.9,
pad_token_id=self.tokenizers[model_name].eos_token_id,
return_full_text=False
)
return outputs[0]['generated_text']
except Exception as e:
return f"❌ Generation failed: {str(e)}"
def evaluate_prompt(self, prompt):
"""Evaluate a prompt across all loaded models."""
print(f"\n🎯 Evaluating prompt: '{prompt}'")
print("=" * 80)
for model_name in self.model_names:
print(f"\n🤖 {model_name}:")
print("-" * 40)
response = self.generate_response(model_name, prompt)
print(response)
print("\n" + "=" * 80)
def interactive_loop(self):
"""Main interactive evaluation loop."""
print("\n🎮 Interactive Evaluation Mode")
print("Commands:")
print(" - Type your prompt to evaluate all models")
print(" - Type 'quit' or 'exit' to end")
print(" - Type 'help' for this message")
print(" - Type 'models' to list loaded models")
print(" - Type 'clear' to clear screen")
print("\n💡 Note: Models with 'chat' in their name use conversational pipeline,")
print(" other models use text-generation pipeline.")
while True:
try:
user_input = input("\n💬 Enter prompt (or command): ").strip()
if not user_input:
continue
if user_input.lower() in ['quit', 'exit', 'q']:
print("👋 Goodbye!")
break
elif user_input.lower() == 'help':
print("\n🎮 Interactive Evaluation Mode")
print("Commands:")
print(" - Type your prompt to evaluate all models")
print(" - Type 'quit' or 'exit' to end")
print(" - Type 'help' for this message")
print(" - Type 'models' to list loaded models")
print(" - Type 'clear' to clear screen")
print("\n💡 Note: Models with 'chat' in their name use conversational pipeline,")
print(" other models use text-generation pipeline.")
elif user_input.lower() == 'models':
print(f"\n📋 Loaded models ({len(self.model_names)}):")
for i, model_name in enumerate(self.model_names, 1):
print(f" {i}. {model_name}")
elif user_input.lower() == 'clear':
os.system('clear' if os.name == 'posix' else 'cls')
else:
self.evaluate_prompt(user_input)
except KeyboardInterrupt:
print("\n\n👋 Goodbye!")
break
except Exception as e:
print(f"❌ Error: {str(e)}")
def main():
print("🚀 Model Evaluation Script")
print("=" * 50)
evaluator = ModelEvaluator()
# Load all models
if not evaluator.load_all_models():
print("❌ No models could be loaded. Exiting.")
return
# Start interactive loop
evaluator.interactive_loop()
if __name__ == "__main__":
main()