import torch import torch.nn as nn class NeighborExchange(nn.Module): def __init__(self, config: MeshConfig): super().__init__() self.config = config self.num_experts_x = config.mesh_grid_size[0] self.num_experts_y = config.mesh_grid_size[1] self.num_experts = self.num_experts_x * self.num_experts_y # Define parameters for neighbor communication. # A simple approach: a learned linear combination of neighbor features. # We can define a weight for each potential neighbor direction (e.g., up, down, left, right). # For a 2x2 grid, each expert has 2 or 3 neighbors. # A more general approach is a linear layer that takes concatenated neighbor features. # Let's use a linear layer to transform the aggregated neighbor information. # The input size to this layer will be the sum of hidden sizes of all potential neighbors # multiplied by the hidden size, but that's too complex. # A simpler approach: a linear layer per direction, or a single layer after aggregating. # Let's define a linear layer to process the information received from neighbors. # The input size is the hidden size (from neighbors), output size is hidden size # This layer will transform the aggregated neighbor features before adding to the expert's own output. self.exchange_projection = nn.Linear(config.hidden_size, config.hidden_size) # Projects aggregated neighbor info # Optional: Learned weights for different neighbor directions # self.neighbor_weights = nn.Parameter(torch.ones(4)) # Example for 4 directions (N, S, E, W) def forward(self, expert_outputs, expert_indices=None): # expert_outputs shape: (batch_size, sequence_length, num_experts, hidden_size) # expert_indices shape: (batch_size, sequence_length, k) - indices of selected experts (not directly used for neighbor exchange in this simple model) if not self.config.neighbor_exchange_enabled: return expert_outputs batch_size, seq_length, num_experts, hidden_size = expert_outputs.shape # Reshape expert_outputs to reflect the grid structure (batch_size, seq_length, grid_x, grid_y, hidden_size) reshaped_outputs = expert_outputs.view(batch_size, seq_length, self.num_experts_x, self.num_experts_y, hidden_size) # Create a tensor to store the aggregated neighbor information for each expert aggregated_neighbor_info = torch.zeros_like(reshaped_outputs) # Implement neighbor exchange logic # Iterate through each expert in the grid for i in range(self.num_experts_x): for j in range(self.num_experts_y): current_expert_output = reshaped_outputs[:, :, i, j, :] neighbor_info = torch.zeros_like(current_expert_output) # Accumulate info from neighbors # Define neighbor directions (example: up, down, left, right) neighbors = [] if i > 0: # Up neighbor neighbors.append(reshaped_outputs[:, :, i-1, j, :]) if i < self.num_experts_x - 1: # Down neighbor neighbors.append(reshaped_outputs[:, :, i+1, j, :]) if j > 0: # Left neighbor neighbors.append(reshaped_outputs[:, :, i, j-1, :]) if j < self.num_experts_y - 1: # Right neighbor neighbors.append(reshaped_outputs[:, :, i, j+1, :]) # Aggregate information from neighbors (simple average as an example) if neighbors: # Stack neighbors along a new dimension and take the mean neighbor_stack = torch.stack(neighbors, dim=-2) # shape (batch, seq, num_neighbors, hidden) aggregated_info = torch.mean(neighbor_stack, dim=-2) # shape (batch, seq, hidden) neighbor_info = aggregated_info # Use the aggregated info # Apply the exchange projection to the aggregated neighbor information transformed_neighbor_info = self.exchange_projection(neighbor_info) # Store the transformed neighbor info for the current expert's position aggregated_neighbor_info[:, :, i, j, :] = transformed_neighbor_info # Reshape aggregated_neighbor_info back to (batch_size, sequence_length, num_experts, hidden_size) aggregated_neighbor_info = aggregated_neighbor_info.view(batch_size, seq_length, num_experts, hidden_size) # Combine expert outputs with aggregated neighbor information (additive combination) exchanged_expert_outputs = expert_outputs + aggregated_neighbor_info return exchanged_expert_outputs