|
|
|
""" |
|
MAC OS X INSTALL: pip3 install torch==2.1.1 torchvision torchaudio transformers==4.48.0 accelerate==0.28.0 (You must use these versions, higher version have some numerical instability bug on MPS chips) |
|
Interactive model evaluation script for pretraining experiments. |
|
Automatically discovers and loads all models with /hf subdirectories. |
|
""" |
|
|
|
import os |
|
import glob |
|
from pathlib import Path |
|
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline |
|
import torch |
|
import warnings |
|
|
|
|
|
warnings.filterwarnings("ignore") |
|
|
|
MODEL_NAME_FILTER = None |
|
|
|
class ModelEvaluator: |
|
def __init__(self): |
|
self.models = {} |
|
self.tokenizers = {} |
|
self.pipelines = {} |
|
self.model_names = [] |
|
|
|
def discover_models(self): |
|
"""Discover all models with /hf subdirectories.""" |
|
print("🔍 Discovering models with /hf subdirectories...") |
|
|
|
|
|
hf_dirs = [] |
|
for item in os.listdir('.'): |
|
if os.path.isdir(item) and os.path.exists(os.path.join(item, 'hf')): |
|
if MODEL_NAME_FILTER is None or MODEL_NAME_FILTER in item: |
|
hf_dirs.append(item) |
|
|
|
if not hf_dirs: |
|
print("❌ No models with /hf subdirectories found!") |
|
return False |
|
|
|
print(f"✅ Found {len(hf_dirs)} models:") |
|
for model_dir in hf_dirs: |
|
print(f" - {model_dir}") |
|
return hf_dirs |
|
|
|
def load_model(self, model_dir): |
|
"""Load a single model and its tokenizer.""" |
|
try: |
|
hf_path = os.path.join(model_dir, 'hf') |
|
print(f"🔄 Loading {model_dir}...") |
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained(hf_path) |
|
if tokenizer.pad_token is None: |
|
tokenizer.pad_token = tokenizer.eos_token |
|
|
|
|
|
model = AutoModelForCausalLM.from_pretrained( |
|
hf_path, |
|
device_map=None, |
|
torch_dtype=torch.float16, |
|
trust_remote_code=True |
|
) |
|
model = model.to(torch.float16) |
|
if torch.cuda.is_available(): |
|
model.to("cuda:0") |
|
else: |
|
model.to("mps") |
|
|
|
|
|
if "chat" in model_dir.lower() or "sft" in model_dir.lower(): |
|
pipe = pipeline( |
|
"text-generation", |
|
model=model, |
|
tokenizer=tokenizer, |
|
device_map="auto", |
|
torch_dtype=torch.float16 |
|
) |
|
print(f" 🔄 Using conversational pipeline for chat model") |
|
else: |
|
pipe = pipeline( |
|
"text-generation", |
|
model=model, |
|
tokenizer=tokenizer, |
|
device_map="auto", |
|
torch_dtype=torch.float16 |
|
) |
|
print(f" 🔄 Using text-generation pipeline") |
|
|
|
self.models[model_dir] = model |
|
self.tokenizers[model_dir] = tokenizer |
|
self.pipelines[model_dir] = pipe |
|
self.model_names.append(model_dir) |
|
|
|
print(f" ✅ {model_dir} loaded successfully") |
|
return True |
|
|
|
except Exception as e: |
|
print(f" ❌ Failed to load {model_dir}: {str(e)}") |
|
return False |
|
|
|
def load_all_models(self): |
|
"""Load all discovered models.""" |
|
hf_dirs = self.discover_models() |
|
if not hf_dirs: |
|
return False |
|
|
|
print("\n🚀 Loading models...") |
|
successful_loads = 0 |
|
|
|
for model_dir in hf_dirs: |
|
if self.load_model(model_dir): |
|
successful_loads += 1 |
|
|
|
print(f"\n📊 Loaded {successful_loads}/{len(hf_dirs)} models successfully") |
|
return successful_loads > 0 |
|
|
|
def generate_response(self, model_name, prompt, max_length=256): |
|
"""Generate response for a specific model.""" |
|
try: |
|
pipe = self.pipelines[model_name] |
|
|
|
|
|
if "chat" in model_name.lower() or "sft" in model_name.lower(): |
|
|
|
chat_input = [{"role": "user", "content": prompt}] |
|
outputs = pipe( |
|
chat_input, |
|
max_new_tokens=max_length, |
|
do_sample=True, |
|
temperature=0.7, |
|
top_p=0.9, |
|
repetition_penalty=1.1, |
|
pad_token_id=self.tokenizers[model_name].eos_token_id |
|
) |
|
|
|
if outputs and len(outputs) > 0: |
|
|
|
|
|
conversation = outputs[0]['generated_text'] |
|
if isinstance(conversation, list) and len(conversation) > 1: |
|
|
|
for message in reversed(conversation): |
|
if message.get('role') == 'assistant': |
|
return message.get('content', 'No response generated') |
|
|
|
return conversation[-1].get('content', 'No response generated') |
|
else: |
|
return str(conversation) |
|
else: |
|
return "No response generated" |
|
else: |
|
|
|
outputs = pipe( |
|
prompt, |
|
max_new_tokens=max_length, |
|
do_sample=True, |
|
temperature=0.7, |
|
top_p=0.9, |
|
pad_token_id=self.tokenizers[model_name].eos_token_id, |
|
return_full_text=False |
|
) |
|
|
|
return outputs[0]['generated_text'] |
|
|
|
except Exception as e: |
|
return f"❌ Generation failed: {str(e)}" |
|
|
|
def evaluate_prompt(self, prompt): |
|
"""Evaluate a prompt across all loaded models.""" |
|
print(f"\n🎯 Evaluating prompt: '{prompt}'") |
|
print("=" * 80) |
|
|
|
for model_name in self.model_names: |
|
print(f"\n🤖 {model_name}:") |
|
print("-" * 40) |
|
|
|
response = self.generate_response(model_name, prompt) |
|
print(response) |
|
|
|
print("\n" + "=" * 80) |
|
|
|
def interactive_loop(self): |
|
"""Main interactive evaluation loop.""" |
|
print("\n🎮 Interactive Evaluation Mode") |
|
print("Commands:") |
|
print(" - Type your prompt to evaluate all models") |
|
print(" - Type 'quit' or 'exit' to end") |
|
print(" - Type 'help' for this message") |
|
print(" - Type 'models' to list loaded models") |
|
print(" - Type 'clear' to clear screen") |
|
print("\n💡 Note: Models with 'chat' in their name use conversational pipeline,") |
|
print(" other models use text-generation pipeline.") |
|
|
|
while True: |
|
try: |
|
user_input = input("\n💬 Enter prompt (or command): ").strip() |
|
|
|
if not user_input: |
|
continue |
|
|
|
if user_input.lower() in ['quit', 'exit', 'q']: |
|
print("👋 Goodbye!") |
|
break |
|
|
|
elif user_input.lower() == 'help': |
|
print("\n🎮 Interactive Evaluation Mode") |
|
print("Commands:") |
|
print(" - Type your prompt to evaluate all models") |
|
print(" - Type 'quit' or 'exit' to end") |
|
print(" - Type 'help' for this message") |
|
print(" - Type 'models' to list loaded models") |
|
print(" - Type 'clear' to clear screen") |
|
print("\n💡 Note: Models with 'chat' in their name use conversational pipeline,") |
|
print(" other models use text-generation pipeline.") |
|
|
|
elif user_input.lower() == 'models': |
|
print(f"\n📋 Loaded models ({len(self.model_names)}):") |
|
for i, model_name in enumerate(self.model_names, 1): |
|
print(f" {i}. {model_name}") |
|
|
|
elif user_input.lower() == 'clear': |
|
os.system('clear' if os.name == 'posix' else 'cls') |
|
|
|
else: |
|
self.evaluate_prompt(user_input) |
|
|
|
except KeyboardInterrupt: |
|
print("\n\n👋 Goodbye!") |
|
break |
|
except Exception as e: |
|
print(f"❌ Error: {str(e)}") |
|
|
|
def main(): |
|
print("🚀 Model Evaluation Script") |
|
print("=" * 50) |
|
|
|
evaluator = ModelEvaluator() |
|
|
|
|
|
if not evaluator.load_all_models(): |
|
print("❌ No models could be loaded. Exiting.") |
|
return |
|
|
|
|
|
evaluator.interactive_loop() |
|
|
|
if __name__ == "__main__": |
|
main() |