<h1 style="text-align:center;">Transformer using PyTorch: Sentiment Classification in Sentences of Text</h1>
<p style="text-align:center;">
Nazar Khan
<br>CVML Lab
<br>University of The Punjab
</p>

### **Introduction**
Transformers are a type of deep learning architecture introduced in the seminal paper *"Attention is All You Need"* (Vaswani et al., 2017). They have revolutionized natural language processing (NLP) and are the foundation for models like GPT, BERT, and T5. Transformers use a mechanism called **self-attention** to weigh the importance of different tokens (words) in a sequence, making them effective for sequence-to-sequence tasks.

Our focus will be on understanding the key concepts, step-by-step implementation, and practical applications.

---

### **Key Concepts**
1. **Self-Attention**: Computes relationships between all tokens in a sequence.
2. **Multi-Head Attention**: Improves the model's ability to focus on different parts of the sequence.
3. **Positional Encoding**: Injects information about token positions since transformers are permutation-invariant.
4. **Feed-Forward Networks**: Applies transformations independently to each token.
5. **Layer Normalization**: Stabilizes training.
6. **Residual Connections**: Helps in learning deep architectures by mitigating vanishing gradients.

---

### **Step-by-Step Implementation**

#### **1. Import Libraries**

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

---

#### **2. Define the Scaled Dot-Product Attention**
This is the core operation of self-attention.

In [2]:
def scaled_dot_product_attention(query, key, value, mask=None):
    """
    Compute the attention weights and output.
    """
    # Calculate scores
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(query.size(-1))

    # Apply mask (optional)
    if mask is not None:
        scores = scores.masked_fill(mask == 0, float('-inf'))

    # Compute softmax to get attention weights
    attention_weights = F.softmax(scores, dim=-1)

    # Multiply weights by values
    output = torch.matmul(attention_weights, value)
    return output, attention_weights


---

#### **3. Implement Multi-Head Attention**

In [3]:
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_size, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert embed_size % num_heads == 0, "Embed size must be divisible by num_heads"
        self.head_dim = embed_size // num_heads
        self.num_heads = num_heads

        self.query = nn.Linear(embed_size, embed_size)
        self.key = nn.Linear(embed_size, embed_size)
        self.value = nn.Linear(embed_size, embed_size)
        self.fc_out = nn.Linear(embed_size, embed_size)

    def forward(self, x, mask=None):
        batch_size = x.shape[0]
        query = self.query(x)
        key = self.key(x)
        value = self.value(x)

        # Reshape for multi-heads
        query = query.view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        key = key.view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
        value = value.view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)

        # Perform scaled dot-product attention
        out, _ = scaled_dot_product_attention(query, key, value, mask)

        # Concatenate heads
        out = out.transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads * self.head_dim)

        return self.fc_out(out)


---

#### **4. Add Positional Encoding**



In [4]:
class PositionalEncoding(nn.Module):
    def __init__(self, embed_size, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, embed_size)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, embed_size, 2).float() * -(math.log(10000.0) / embed_size))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

---

#### **5. Define the Transformer Block**


In [5]:
class TransformerBlock(nn.Module):
    def __init__(self, embed_size, num_heads, ff_hidden_dim, dropout):
        super(TransformerBlock, self).__init__()
        self.attention = MultiHeadAttention(embed_size, num_heads)
        self.norm1 = nn.LayerNorm(embed_size)
        self.norm2 = nn.LayerNorm(embed_size)

        self.feed_forward = nn.Sequential(
            nn.Linear(embed_size, ff_hidden_dim),
            nn.ReLU(),
            nn.Linear(ff_hidden_dim, embed_size),
        )
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        attention = self.attention(x, mask)
        x = self.norm1(x + attention)
        forward = self.feed_forward(x)
        x = self.norm2(x + self.dropout(forward))
        return x

---

#### **6. Build a Transformer Encoder**

In [6]:
class TransformerEncoder(nn.Module):
    def __init__(self, embed_size, num_heads, ff_hidden_dim, num_layers, dropout, vocab_size, max_len):
        super(TransformerEncoder, self).__init__()
        self.embed_size = embed_size
        self.word_embedding = nn.Embedding(vocab_size, embed_size)
        self.position_embedding = PositionalEncoding(embed_size, max_len)
        self.layers = nn.ModuleList(
            [TransformerBlock(embed_size, num_heads, ff_hidden_dim, dropout) for _ in range(num_layers)]
        )
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        x = self.word_embedding(x)
        x = self.position_embedding(x)
        x = self.dropout(x)

        for layer in self.layers:
            x = layer(x, mask)
        return x

---

#### **7. Application: Sentiment Classification**
After building the encoder, you can use it for tasks like text classification, machine translation, or summarization. Here's a **hands-on demonstration** for sentiment classification using the transformer-based model that we have built from scratch. The text dataset used is IMDB (available via `torchtext`).


---

### **Hands-On Demonstration**
#### **Task**: Predict Sentiment on a Text Dataset
- Use the IMDB dataset. You can download it from <a href="http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz">here</a>.
- Preprocess text to numerical tokens.
- Train a transformer model for sentiment analysis.



#### **1. Import Required Libraries**

In [9]:
import torch
import torch.nn as nn
import torch.optim as optim
import os
from sklearn.model_selection import train_test_split
from collections import Counter
from itertools import chain
import numpy as np
import re

---
#### **2. Load and Preprocess IMDB Dataset**
Assume the dataset is in the directory aclImdb/ with subdirectories train/pos, train/neg, test/pos, test/neg.

In [10]:
def load_imdb_data(base_path):
    texts, labels = [], []
    for label, sentiment in enumerate(["neg", "pos"]):
        folder = os.path.join(base_path, sentiment)
        for file_name in os.listdir(folder):
            with open(os.path.join(folder, file_name), "r", encoding="utf-8") as f:
                texts.append(f.read().strip())
                labels.append(label)
    return texts, labels

# Load training and testing data
train_texts, train_labels = load_imdb_data("aclImdb/train")
test_texts, test_labels = load_imdb_data("aclImdb/test")

# Split training data into training and validation sets
train_texts, val_texts, train_labels, val_labels = train_test_split(
    train_texts, train_labels, test_size=0.2, random_state=42
)

---
#### **3. Tokenize and Build Vocabulary**

In [11]:
def clean_text(text):
    # Remove HTML tags and special characters
    return re.sub(r"<.*?>", "", text).lower()

def tokenize(texts):
    return [clean_text(text).split() for text in texts]

train_tokens = tokenize(train_texts)
val_tokens = tokenize(val_texts)
test_tokens = tokenize(test_texts)

# Build vocabulary
token_counts = Counter(chain.from_iterable(train_tokens))
vocab = {word: i + 1 for i, (word, _) in enumerate(token_counts.most_common(20000))}

# Convert tokens to numerical sequences
def tokens_to_ids(tokens, vocab):
    return [[vocab.get(token, 0) for token in text] for text in tokens]

train_sequences = tokens_to_ids(train_tokens, vocab)
val_sequences = tokens_to_ids(val_tokens, vocab)
test_sequences = tokens_to_ids(test_tokens, vocab)


---

#### **4. Create Data Loaders**

In [12]:
from torch.utils.data import DataLoader, Dataset

class IMDBDataset(Dataset):
    def __init__(self, sequences, labels, max_len=512):
        self.sequences = sequences
        self.labels = labels
        self.max_len = max_len

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        seq = self.sequences[idx][:self.max_len]
        padded_seq = torch.zeros(self.max_len, dtype=torch.long)
        padded_seq[:len(seq)] = torch.tensor(seq)
        label = self.labels[idx]
        return padded_seq, torch.tensor(label)

train_dataset = IMDBDataset(train_sequences, train_labels)
val_dataset = IMDBDataset(val_sequences, val_labels)
test_dataset = IMDBDataset(test_sequences, test_labels)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

---

#### **5. Define the Transformer-Based Model**

In [13]:
class TransformerClassifier(nn.Module):
    def __init__(self, embed_size, num_heads, ff_hidden_dim, num_layers, dropout, vocab_size, max_len, num_classes=2):
        super(TransformerClassifier, self).__init__()
        self.encoder = TransformerEncoder(
            embed_size, num_heads, ff_hidden_dim, num_layers, dropout, vocab_size, max_len
        )
        self.fc = nn.Linear(embed_size, num_classes)

    def forward(self, x, mask=None):
        encoded = self.encoder(x, mask)
        # Take the mean across tokens
        pooled = torch.mean(encoded, dim=1)
        return self.fc(pooled)


---

#### **6. Training and Evaluation Functions**

In [14]:
def train_model(model, train_loader, test_loader, num_epochs=5, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for texts, labels in train_loader:
            texts, labels = texts.to(device), labels.to(device)
            outputs = model(texts)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        print(f"Epoch {epoch+1}, Loss: {total_loss / len(train_loader)}")

        evaluate_model(model, test_loader)

def evaluate_model(model, test_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for texts, labels in test_loader:
            texts, labels = texts.to(device), labels.to(device)
            outputs = model(texts)
            predictions = torch.argmax(outputs, dim=1)
            correct += (predictions == labels).sum().item()
            total += labels.size(0)

    print(f"Test Accuracy: {correct / total * 100:.2f}%")

---

#### **7. Initialize, Train and Evaluate the Model**

In [16]:
# Model Hyperparameters
embed_size = 128
num_heads = 4
ff_hidden_dim = 256
num_layers = 2
dropout = 0.1
vocab_size = len(vocab) + 1
max_len = 512

model = TransformerClassifier(embed_size, num_heads, ff_hidden_dim, num_layers, dropout, vocab_size, max_len)

# Train the Model
train_model(model, train_loader, test_loader, num_epochs=2, lr=0.001)

Epoch 1, Loss: 0.5563821067333221
Test Accuracy: 79.79%
Epoch 2, Loss: 0.39982056922912595
Test Accuracy: 81.18%
Epoch 3, Loss: 0.31622911648750307
Test Accuracy: 84.70%
Epoch 4, Loss: 0.2586984635293484
Test Accuracy: 84.72%
Epoch 5, Loss: 0.21333774099349975
Test Accuracy: 84.52%


---

#### **8. Saving and Loading a Trained Model**

In [37]:
# Function to save the trained model
def save_model(model, file_path):
    torch.save(model.state_dict(), file_path)
    print(f"Model saved to {file_path}")

# Function to load the trained model
def load_model(model, file_path, device='cpu'):
    model.load_state_dict(torch.load(file_path, map_location=device))
    model.to(device)
    print(f"Model loaded from {file_path}")
    return model

# Save the trained model
save_model(model, "sentiment_model.pth")

# Create a new instance of the model and load the saved weights
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
new_model = TransformerClassifier(embed_size, num_heads, ff_hidden_dim, num_layers, dropout, vocab_size, max_len)
new_model = load_model(new_model, "sentiment_model.pth", device)

# Verify the model works as expected
new_model.eval()
sample_input = torch.tensor([[vocab[word] for word in ["this", "movie", "was", "amazing"]]], device=device)
output = new_model(sample_input)
prediction = torch.argmax(output, dim=1)
print(f"Prediction: {'Positive' if prediction == 1 else 'Negative'}")

Model saved to sentiment_model.pth
Model loaded from sentiment_model.pth
tensor([1])
Prediction: Positive


  model.load_state_dict(torch.load(file_path, map_location=device))


---

#### **9. Display Random Test Samples, Predictions, and Ground Truth**

In [38]:
import random

def display_random_predictions(model, test_dataset, vocab, num_samples=20):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    model.eval()

    # Select random indices from the test dataset
    random_indices = random.sample(range(len(test_dataset)), num_samples)

    # Extract samples and ground truth labels
    selected_samples = [test_dataset[i] for i in random_indices]
    sequences = torch.stack([sample[0] for sample in selected_samples]).to(device)
    ground_truths = [sample[1].item() for sample in selected_samples]

    # Convert numerical sequences back to text
    idx_to_word = {idx: word for word, idx in vocab.items()}
    input_texts = []
    for sample in selected_samples:
        words = [idx_to_word[idx.item()] for idx in sample[0] if idx.item() in idx_to_word]
        input_texts.append(" ".join(words))

    # Pass selected samples through the model
    with torch.no_grad():
        outputs = model(sequences)
        predictions = torch.argmax(outputs, dim=1).tolist()

    # Display results
    for i in range(num_samples):
        print(f"Sample {i+1}:")
        print(f"Input: {input_texts[i]}")
        print(f"Prediction: {'Positive' if predictions[i] == 1 else 'Negative'}")
        print(f"Ground Truth: {'Positive' if ground_truths[i] == 1 else 'Negative'}")
        print("-" * 80)

# Call the function
display_random_predictions(new_model, test_dataset, vocab, num_samples=20)

Sample 1:
Input: actually there was nothing funny about this monstrosity at this movie was a complete the in this movie almost made me want to i think that the people responsible for this movie took advantage of their viewing audience. they took a relatively decent series of movies (i did say decent, not and totally trashed it by trying to put money in their the making of was a way for hollywood to make up for this crappy flick. the worst part about it is that either nobody in 1979 realized the asinine events of the movie (such as door popping off at some high or shooting a flair gun out the window at 2 to avoid a nuclear were they totally unrealistic or they just didn't i think that it is the latter of the two. the writers and director of this if you want to call it that, really tried to suck the airport dry with this crap!
Prediction: Negative
Ground Truth: Negative
--------------------------------------------------------------------------------
Sample 2:
Input: ok, i'm italian but t

---

### **Outcome**
- We have trained a transformer-based sentiment classifier on the IMDB dataset.
- The `evaluate_model` function computes and displays the test accuracy after each epoch.

This example provides an end-to-end pipeline for understanding and applying transformers to real-world tasks. You can extend this by experimenting with different datasets and configurations.

---

### **Conclusion**
This tutorial introduces the core components of transformers. You can:
1. Experiment with different architectures (e.g., decoder-based transformers like GPT).
2. Apply transformers to real-world tasks using libraries like Hugging Face Transformers.