|
|
|
"""
|
|
Basic inference example for Isaac Sim Robotics Qwen model.
|
|
|
|
This script demonstrates how to load and use the fine-tuned model
|
|
for Isaac Sim robotics queries.
|
|
"""
|
|
|
|
import torch
|
|
from transformers import AutoModelForCausalLM, AutoTokenizer
|
|
import argparse
|
|
import sys
|
|
import os
|
|
|
|
def load_model(model_path, device="auto", load_in_8bit=False):
|
|
"""
|
|
Load the Isaac Sim Robotics Qwen model.
|
|
|
|
Args:
|
|
model_path (str): Path to the model (local or HuggingFace hub)
|
|
device (str): Device to load model on ("auto", "cpu", "cuda")
|
|
load_in_8bit (bool): Whether to use 8-bit quantization
|
|
|
|
Returns:
|
|
tuple: (model, tokenizer)
|
|
"""
|
|
print(f"Loading model from: {model_path}")
|
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained(model_path)
|
|
|
|
|
|
if tokenizer.pad_token is None:
|
|
tokenizer.pad_token = tokenizer.eos_token
|
|
|
|
|
|
if load_in_8bit:
|
|
try:
|
|
model = AutoModelForCausalLM.from_pretrained(
|
|
model_path,
|
|
load_in_8bit=True,
|
|
device_map=device,
|
|
torch_dtype=torch.float16
|
|
)
|
|
except ImportError:
|
|
print("8-bit quantization not available. Install bitsandbytes.")
|
|
model = AutoModelForCausalLM.from_pretrained(
|
|
model_path,
|
|
device_map=device,
|
|
torch_dtype=torch.float16
|
|
)
|
|
else:
|
|
model = AutoModelForCausalLM.from_pretrained(
|
|
model_path,
|
|
device_map=device,
|
|
torch_dtype=torch.float16
|
|
)
|
|
|
|
print("Model loaded successfully!")
|
|
return model, tokenizer
|
|
|
|
def generate_response(model, tokenizer, query, max_length=1024, temperature=0.7):
|
|
"""
|
|
Generate a response using the model.
|
|
|
|
Args:
|
|
model: The loaded model
|
|
tokenizer: The loaded tokenizer
|
|
query (str): The input query
|
|
max_length (int): Maximum length of generated response
|
|
temperature (float): Sampling temperature
|
|
|
|
Returns:
|
|
str: Generated response
|
|
"""
|
|
|
|
formatted_query = f"<|im_start|>user\n{query}<|im_end|>\n<|im_start|>assistant"
|
|
|
|
|
|
inputs = tokenizer(formatted_query, return_tensors="pt")
|
|
|
|
|
|
device = next(model.parameters()).device
|
|
inputs = {k: v.to(device) for k, v in inputs.items()}
|
|
|
|
|
|
with torch.no_grad():
|
|
outputs = model.generate(
|
|
**inputs,
|
|
max_length=max_length,
|
|
temperature=temperature,
|
|
do_sample=True,
|
|
pad_token_id=tokenizer.eos_token_id,
|
|
eos_token_id=tokenizer.eos_token_id
|
|
)
|
|
|
|
|
|
response = tokenizer.decode(outputs[0], skip_special_tokens=True)
|
|
|
|
|
|
if "<|im_start|>assistant" in response:
|
|
response = response.split("<|im_start|>assistant")[1].strip()
|
|
|
|
return response
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Isaac Sim Robotics Qwen Inference")
|
|
parser.add_argument(
|
|
"--model_path",
|
|
type=str,
|
|
default="TomBombadyl/Qwen2.5-Coder-7B-Instruct-Omni1.1",
|
|
help="Path to model (local or HuggingFace hub)"
|
|
)
|
|
parser.add_argument(
|
|
"--device",
|
|
type=str,
|
|
default="auto",
|
|
choices=["auto", "cpu", "cuda"],
|
|
help="Device to use for inference"
|
|
)
|
|
parser.add_argument(
|
|
"--load_8bit",
|
|
action="store_true",
|
|
help="Use 8-bit quantization to reduce memory usage"
|
|
)
|
|
parser.add_argument(
|
|
"--max_length",
|
|
type=int,
|
|
default=1024,
|
|
help="Maximum length of generated response"
|
|
)
|
|
parser.add_argument(
|
|
"--temperature",
|
|
type=float,
|
|
default=0.7,
|
|
help="Sampling temperature"
|
|
)
|
|
parser.add_argument(
|
|
"--query",
|
|
type=str,
|
|
help="Query to ask (if not provided, will use interactive mode)"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
|
|
model, tokenizer = load_model(
|
|
args.model_path,
|
|
device=args.device,
|
|
load_in_8bit=args.load_8bit
|
|
)
|
|
|
|
if args.query:
|
|
|
|
response = generate_response(
|
|
model, tokenizer, args.query, args.max_length, args.temperature
|
|
)
|
|
print(f"\nQuery: {args.query}")
|
|
print(f"Response:\n{response}")
|
|
else:
|
|
|
|
print("\n=== Isaac Sim Robotics Qwen Interactive Mode ===")
|
|
print("Type 'quit' to exit")
|
|
print("Example queries:")
|
|
print("- How do I create a differential drive robot in Isaac Sim?")
|
|
print("- How to add a depth camera to my robot?")
|
|
print("- What physics parameters should I use for a manipulator?")
|
|
print()
|
|
|
|
while True:
|
|
try:
|
|
query = input("Enter your Isaac Sim question: ").strip()
|
|
if query.lower() in ['quit', 'exit', 'q']:
|
|
break
|
|
if not query:
|
|
continue
|
|
|
|
print("Generating response...")
|
|
response = generate_response(
|
|
model, tokenizer, query, args.max_length, args.temperature
|
|
)
|
|
print(f"\nResponse:\n{response}\n")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nExiting...")
|
|
break
|
|
except Exception as e:
|
|
print(f"Error generating response: {e}")
|
|
|
|
except Exception as e:
|
|
print(f"Error loading model: {e}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |