# Install: # pip install torch transformers datasets accelerate safetensors packaging import math import inspect import torch import torch.nn as nn from typing import Optional, Tuple from datasets import Dataset from transformers import ( AutoTokenizer, DataCollatorForLanguageModeling, Trainer, TrainingArguments, PreTrainedModel, PretrainedConfig, ) import transformers # ============================= # 0) Helper: make TrainingArguments backward/forward compatible # ============================= def build_training_arguments(**base_kwargs) -> TrainingArguments: """Create TrainingArguments that work on both old and new transformers. We introspect the __init__ signature to only pass supported kwargs. Also adds legacy fallbacks (e.g., evaluate_during_training) when needed. """ sig = inspect.signature(TrainingArguments.__init__) supported = set(sig.parameters.keys()) # New API keys we'd like to use if available desired = { "output_dir": "./mini_custom_transformer_safetensors", "overwrite_output_dir": True, "per_device_train_batch_size": 2, "per_device_eval_batch_size": 2, "num_train_epochs": 5, "logging_steps": 5, "learning_rate": 5e-4, # a bit higher since training from scratch "weight_decay": 0.01, "fp16": torch.cuda.is_available(), "save_total_limit": 2, "report_to": None, "optim": "adamw_torch", # Prefer these if the installed transformers supports them "evaluation_strategy": "epoch", "save_strategy": "epoch", "save_safetensors": True, } desired.update(base_kwargs) # Filter to only supported keys filtered = {k: v for k, v in desired.items() if k in supported} # If evaluation_strategy not supported, try legacy flag if "evaluation_strategy" not in supported: # old flag name (very old versions) if "evaluate_during_training" in supported: filtered["evaluate_during_training"] = True # If save_safetensors not supported, just drop it (will save .bin by default) # "save_strategy" will also be dropped automatically if not supported print(f"[INFO] transformers={transformers.__version__} " f"TrainingArguments accepts: {sorted(supported)} " f"We will pass: {sorted(filtered.keys())}") return TrainingArguments(**filtered) # ============================= # 1) Minimal GPT-style Transformer (from scratch) compatible with HF Trainer # ============================= class SimpleGPTConfig(PretrainedConfig): model_type = "simple_gpt" def __init__( self, vocab_size: int = 50257, # Default GPT-2 vocab size n_embd: int = 128, n_head: int = 4, n_layer: int = 2, max_position_embeddings: int = 128, dropout: float = 0.1, **kwargs, ): super().__init__(**kwargs) self.vocab_size = vocab_size self.n_embd = n_embd self.n_head = n_head self.n_layer = n_layer self.max_position_embeddings = max_position_embeddings self.dropout = dropout class CausalSelfAttention(nn.Module): def __init__(self, config: SimpleGPTConfig): super().__init__() assert config.n_embd % config.n_head == 0 self.n_head = config.n_head self.head_dim = config.n_embd // config.n_head self.qkv = nn.Linear(config.n_embd, 3 * config.n_embd) self.proj = nn.Linear(config.n_embd, config.n_embd) self.attn_drop = nn.Dropout(config.dropout) self.resid_drop = nn.Dropout(config.dropout) def forward(self, x: torch.Tensor, attn_mask: Optional[torch.Tensor] = None): B, T, C = x.size() qkv = self.qkv(x) # (B, T, 3C) q, k, v = qkv.split(C, dim=2) # shape into heads q = q.view(B, T, self.n_head, self.head_dim).transpose(1, 2) # (B, nh, T, hs) k = k.view(B, T, self.n_head, self.head_dim).transpose(1, 2) v = v.view(B, T, self.n_head, self.head_dim).transpose(1, 2) # scaled dot-product attention att = (q @ k.transpose(-2, -1)) / (self.head_dim ** 0.5) # (B, nh, T, T) # Causal mask causal_mask = torch.tril(torch.ones(T, T, device=x.device, dtype=torch.bool)) att = att.masked_fill(~causal_mask, float('-inf')) # Padding mask (1=keep, 0=pad) if attn_mask is not None: key_mask = (attn_mask > 0).unsqueeze(1).unsqueeze(2) # (B,1,1,T) att = att.masked_fill(~key_mask, float('-inf')) att = torch.softmax(att, dim=-1) att = self.attn_drop(att) y = att @ v # (B, nh, T, hs) y = y.transpose(1, 2).contiguous().view(B, T, C) y = self.resid_drop(self.proj(y)) return y class FeedForward(nn.Module): def __init__(self, config: SimpleGPTConfig): super().__init__() self.net = nn.Sequential( nn.Linear(config.n_embd, 4 * config.n_embd), nn.GELU(), nn.Linear(4 * config.n_embd, config.n_embd), nn.Dropout(config.dropout), ) def forward(self, x): return self.net(x) class Block(nn.Module): def __init__(self, config: SimpleGPTConfig): super().__init__() self.ln1 = nn.LayerNorm(config.n_embd) self.attn = CausalSelfAttention(config) self.ln2 = nn.LayerNorm(config.n_embd) self.ff = FeedForward(config) def forward(self, x, attn_mask=None): x = x + self.attn(self.ln1(x), attn_mask) x = x + self.ff(self.ln2(x)) return x class SimpleGPTLMHeadModel(PreTrainedModel): config_class = SimpleGPTConfig _tied_weights_keys = ["lm_head.weight"] _dynamic_tied_weights_keys = ["lm_head.weight"] def __init__(self, config: SimpleGPTConfig): super().__init__(config) self.wte = nn.Embedding(config.vocab_size, config.n_embd) # token embeddings self.wpe = nn.Embedding(config.max_position_embeddings, config.n_embd) # positional embeddings self.drop = nn.Dropout(config.dropout) self.h = nn.ModuleList([Block(config) for _ in range(config.n_layer)]) self.ln_f = nn.LayerNorm(config.n_embd) self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False) # Tie weights self.lm_head.weight = self.wte.weight self.post_init() def get_input_embeddings(self): return self.wte def set_input_embeddings(self, value): self.wte = value def get_output_embeddings(self): return self.lm_head def set_output_embeddings(self, new_embeddings): self.lm_head = new_embeddings def tie_weights(self): """Tie the weights between the input embeddings and the output embeddings.""" self.lm_head.weight = self.wte.weight def forward( self, input_ids: torch.LongTensor, attention_mask: Optional[torch.Tensor] = None, labels: Optional[torch.LongTensor] = None, **kwargs, ) -> Tuple[torch.Tensor, Optional[torch.Tensor]]: B, T = input_ids.shape device = input_ids.device pos = torch.arange(0, T, device=device).unsqueeze(0) # (1, T) tok = self.wte(input_ids) # (B, T, C) pos = self.wpe(pos) # (1, T, C) x = self.drop(tok + pos) for block in self.h: x = block(x, attn_mask=attention_mask) x = self.ln_f(x) logits = self.lm_head(x) # (B, T, vocab) loss = None if labels is not None: # Shift for causal LM loss shift_logits = logits[:, :-1, :].contiguous() shift_labels = labels[:, 1:].contiguous() loss_fct = nn.CrossEntropyLoss() loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1)) return {"loss": loss, "logits": logits} # ============================= # 2) Data: tiny custom set (same idea as your script) # ============================= print("[INFO] Creating small dataset for training...") train_texts = [ "Hello, my name is Ankit.", "I love programming in Python.", "Transformers library makes NLP easy.", "PyTorch is great for deep learning.", "I am learning to fine-tune GPT models.", ] test_texts = [ "Hello, I am training a small GPT.", "Deep learning is fun!", "Python is my favorite programming language.", ] train_data = Dataset.from_dict({"text": train_texts}) test_data = Dataset.from_dict({"text": test_texts}) # ============================= # 3) Tokenizer (use GPT2 tokenizer for convenience) # ============================= print("[INFO] Loading tokenizer...") TOKENIZER_NAME = "distilgpt2" # any GPT2-compatible tokenizer works tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_NAME) # GPT2 has no pad_token by default; use eos as pad if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token max_len = 64 def tokenize(batch): return tokenizer(batch["text"], truncation=True, padding="max_length", max_length=max_len) train_data = train_data.map(tokenize, batched=True) test_data = test_data.map(tokenize, batched=True) cols = ["input_ids", "attention_mask"] train_data.set_format("torch", columns=cols) test_data.set_format("torch", columns=cols) # ============================= # 4) Build our custom model # =============================# ============================= # 4) Build our custom model # ============================= print("[INFO] Building SimpleGPT model...") config = SimpleGPTConfig( vocab_size=tokenizer.vocab_size, # ✅ auto-read from tokenizer n_embd=128, n_head=4, n_layer=2, max_position_embeddings=max_len, dropout=0.1, ) model = SimpleGPTLMHeadModel(config) # ============================= # 5) Trainer bits (version-robust TrainingArguments) # ============================= collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False) args = build_training_arguments() trainer = Trainer( model=model, args=args, data_collator=collator, train_dataset=train_data, eval_dataset=test_data, ) # ============================= # 6) Train & Save (safetensors if supported) # ============================= print("[INFO] Training model...") trainer.train() print("[INFO] Saving model...") trainer.save_model("./mini_custom_transformer_safetensors") # creates model.safetensors on new versions print("[SUCCESS] Done! You trained a tiny GPT-like model with a custom Transformer block and saved it safely.")