#!/usr/bin/env python3 """ Basic inference example for Isaac Sim Robotics Qwen model. This script demonstrates how to load and use the fine-tuned model for Isaac Sim robotics queries. """ import torch from transformers import AutoModelForCausalLM, AutoTokenizer import argparse import sys import os def load_model(model_path, device="auto", load_in_8bit=False): """ Load the Isaac Sim Robotics Qwen model. Args: model_path (str): Path to the model (local or HuggingFace hub) device (str): Device to load model on ("auto", "cpu", "cuda") load_in_8bit (bool): Whether to use 8-bit quantization Returns: tuple: (model, tokenizer) """ print(f"Loading model from: {model_path}") # Load tokenizer tokenizer = AutoTokenizer.from_pretrained(model_path) # Set pad token if not present if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token # Load model if load_in_8bit: try: model = AutoModelForCausalLM.from_pretrained( model_path, load_in_8bit=True, device_map=device, torch_dtype=torch.float16 ) except ImportError: print("8-bit quantization not available. Install bitsandbytes.") model = AutoModelForCausalLM.from_pretrained( model_path, device_map=device, torch_dtype=torch.float16 ) else: model = AutoModelForCausalLM.from_pretrained( model_path, device_map=device, torch_dtype=torch.float16 ) print("Model loaded successfully!") return model, tokenizer def generate_response(model, tokenizer, query, max_length=1024, temperature=0.7): """ Generate a response using the model. Args: model: The loaded model tokenizer: The loaded tokenizer query (str): The input query max_length (int): Maximum length of generated response temperature (float): Sampling temperature Returns: str: Generated response """ # Format query for Qwen2.5-Coder formatted_query = f"<|im_start|>user\n{query}<|im_end|>\n<|im_start|>assistant" # Tokenize input inputs = tokenizer(formatted_query, return_tensors="pt") # Move to same device as model device = next(model.parameters()).device inputs = {k: v.to(device) for k, v in inputs.items()} # Generate response with torch.no_grad(): outputs = model.generate( **inputs, max_length=max_length, temperature=temperature, do_sample=True, pad_token_id=tokenizer.eos_token_id, eos_token_id=tokenizer.eos_token_id ) # Decode response response = tokenizer.decode(outputs[0], skip_special_tokens=True) # Extract only the assistant response if "<|im_start|>assistant" in response: response = response.split("<|im_start|>assistant")[1].strip() return response def main(): parser = argparse.ArgumentParser(description="Isaac Sim Robotics Qwen Inference") parser.add_argument( "--model_path", type=str, default="TomBombadyl/Qwen2.5-Coder-7B-Instruct-Omni1.1", help="Path to model (local or HuggingFace hub)" ) parser.add_argument( "--device", type=str, default="auto", choices=["auto", "cpu", "cuda"], help="Device to use for inference" ) parser.add_argument( "--load_8bit", action="store_true", help="Use 8-bit quantization to reduce memory usage" ) parser.add_argument( "--max_length", type=int, default=1024, help="Maximum length of generated response" ) parser.add_argument( "--temperature", type=float, default=0.7, help="Sampling temperature" ) parser.add_argument( "--query", type=str, help="Query to ask (if not provided, will use interactive mode)" ) args = parser.parse_args() try: # Load model model, tokenizer = load_model( args.model_path, device=args.device, load_in_8bit=args.load_8bit ) if args.query: # Single query mode response = generate_response( model, tokenizer, args.query, args.max_length, args.temperature ) print(f"\nQuery: {args.query}") print(f"Response:\n{response}") else: # Interactive mode print("\n=== Isaac Sim Robotics Qwen Interactive Mode ===") print("Type 'quit' to exit") print("Example queries:") print("- How do I create a differential drive robot in Isaac Sim?") print("- How to add a depth camera to my robot?") print("- What physics parameters should I use for a manipulator?") print() while True: try: query = input("Enter your Isaac Sim question: ").strip() if query.lower() in ['quit', 'exit', 'q']: break if not query: continue print("Generating response...") response = generate_response( model, tokenizer, query, args.max_length, args.temperature ) print(f"\nResponse:\n{response}\n") except KeyboardInterrupt: print("\nExiting...") break except Exception as e: print(f"Error generating response: {e}") except Exception as e: print(f"Error loading model: {e}") sys.exit(1) if __name__ == "__main__": main()