{ "cells": [ { "cell_type": "markdown", "id": "1fd58348-44b0-48f0-a1ff-e9a0729514dc", "metadata": {}, "source": [ "\n", "\n", "
\n", "\n", " \n", "
\n", " \"Department\n", "
\n", "\n", " \n", "
\n", "

Deep Reinforcement Learning (CS-866)

\n", "

Department of Computer Science

\n", "

University of the Punjab

\n", "

\n", "

CartPole with REINFORCE (Policy Gradient)

\n", "

Instructor: Nazar Khan

\n", "
\n", "\n", " \n", " \n", "\n", " \n", "
\n", "
\n", "\n", "#### Goal\n", "- We will train a neural network that outputs probabilities of actions that can be applied to a cart to balance a pole attached to it.\n", "- The network becomes more likely to repeat actions that led to good outcomes." ] }, { "cell_type": "markdown", "id": "2110f905-78e4-48c9-8fda-f7850a85a020", "metadata": {}, "source": [ "#### Imports" ] }, { "cell_type": "code", "execution_count": 2, "id": "4f302943-0521-4595-8f43-a497a81b6dea", "metadata": {}, "outputs": [], "source": [ "# We import Gymnasium to create RL environments like CartPole\n", "import gymnasium as gym\n", "\n", "# Torch is the PyTorch library for building and training neural networks\n", "import torch\n", "\n", "# nn gives us building blocks for neural networks (layers, activations, etc.)\n", "import torch.nn as nn\n", "\n", "# optim gives us optimization algorithms like Adam to adjust network weights\n", "import torch.optim as optim\n", "\n", "# numpy is a numerical library (we'll use it a tiny bit)\n", "import numpy as np" ] }, { "cell_type": "markdown", "id": "4ec7316a-9902-4f47-85c1-c3d380fb9c6f", "metadata": {}, "source": [ "#### Define the Neural Network that will represent the policy" ] }, { "cell_type": "code", "execution_count": 3, "id": "69250d7a-13e5-4297-916f-94f7b6975ffc", "metadata": {}, "outputs": [], "source": [ "class PolicyNetwork(nn.Module): \n", " # This class defines our neural network (our policy function π(a|s; θ))\n", " # It takes in the state and outputs probabilities for each action.\n", "\n", " def __init__(self, state_dim, action_dim):\n", " # state_dim = number of numbers that describe the state (CartPole has 4)\n", " # action_dim = number of possible actions (CartPole has 2: left or right)\n", " super().__init__()\n", "\n", " # nn.Sequential lets us stack layers in order, like a list\n", " self.net = nn.Sequential(\n", " nn.Linear(state_dim, 128), # first layer: input vector mapped to 128 neurons\n", " nn.ReLU(), # activation function: adds non-linearity\n", " nn.Linear(128, action_dim), # second layer: 128 neurons mapped to number of actions\n", " nn.Softmax(dim=-1) # convert numbers into probabilities\n", " )\n", "\n", " def forward(self, x):\n", " # forward defines how input flows through the network\n", " return self.net(x)" ] }, { "cell_type": "markdown", "id": "55308479-eebc-4066-bb73-361acdef9a0a", "metadata": {}, "source": [ "#### Implementation of the REINFORCE algorithm for learning neural network parameters of optimal policy" ] }, { "cell_type": "code", "execution_count": 4, "id": "fa29aff0-2cf8-4b49-a013-d961fb532441", "metadata": {}, "outputs": [], "source": [ "def reinforce(env_name='CartPole-v1', gamma=0.99, lr=1e-3, episodes=500):\n", "\n", " # Create the environment\n", " env = gym.make(env_name)\n", "\n", " # Get size of state and action spaces from environment\n", " state_dim = env.observation_space.shape[0] # e.g., 4 for CartPole\n", " action_dim = env.action_space.n # e.g., 2 actions\n", "\n", " # Create the neural network policy\n", " policy = PolicyNetwork(state_dim, action_dim)\n", "\n", " # Adam optimizer will adjust neural network weights based on gradients\n", " optimizer = optim.Adam(policy.parameters(), lr=lr)\n", "\n", " # Keep track of total reward each episode to see learning progress\n", " returns_history = []\n", "\n", " # Loop over episodes of training\n", " for episode in range(episodes):\n", "\n", " # Reset environment at start of episode and get initial state\n", " state, _ = env.reset()\n", "\n", " # Lists to store log-probabilities and rewards for this episode\n", " log_probs = [] \n", " rewards = [] \n", "\n", " done = False # episode is not finished yet\n", "\n", " # Generate an episode\n", " while not done:\n", " \n", " # Convert state list/array to PyTorch tensor (NEEDED for network input)\n", " state_tensor = torch.tensor(state, dtype=torch.float32)\n", "\n", " # Forward pass: get action probabilities from policy network\n", " action_probs = policy(state_tensor)\n", "\n", " # Turn probabilities into a \"distribution\" (randomness)\n", " dist = torch.distributions.Categorical(action_probs)\n", "\n", " # Sample an action according to probabilities\n", " action = dist.sample()\n", "\n", " # Save log(probability(action_taken)) for learning update later\n", " log_probs.append(dist.log_prob(action))\n", "\n", " # Take action in environment and observe next state and reward\n", " state, reward, done, truncated, _ = env.step(action.item())\n", "\n", " # Save reward to compute return G later\n", " rewards.append(reward)\n", "\n", " # Episode finished. Now compute returns (discounted reward sums)\n", "\n", " returns = []\n", " G = 0 # return accumulator\n", "\n", " # Compute returns G_t for each time step t, working backwards\n", " for r in reversed(rewards):\n", " G = r + gamma * G # Bellman return formula\n", " returns.insert(0, G)\n", "\n", " # Convert to PyTorch tensor so gradients flow properly\n", " returns = torch.tensor(returns, dtype=torch.float32)\n", "\n", " # Normalize returns. This helps stable training (optional but recommended)\n", " returns = (returns - returns.mean()) / (returns.std() + 1e-9)\n", "\n", " # Compute loss = −Σ log(pi(action|state)) * G_t\n", " # (negative because we want gradient ASCENT, but optimizer does DESCENT)\n", " loss = 0\n", " for log_p, Gt in zip(log_probs, returns):\n", " loss += -log_p * Gt\n", "\n", " # Backpropagation step\n", " optimizer.zero_grad() # clear old gradients\n", " loss.backward() # compute gradients\n", " optimizer.step() # update neural network weights\n", "\n", " # Store total reward for this episode for plotting later\n", " returns_history.append(sum(rewards))\n", "\n", " # Print progress occasionally\n", " if episode % 20 == 0:\n", " print(f\"Episode {episode:4d} | Return = {sum(rewards):.2f}\")\n", "\n", " env.close()\n", " return policy, returns_history\n" ] }, { "cell_type": "markdown", "id": "823fa588-7708-4fe0-8d8d-14be4bcc0a4f", "metadata": {}, "source": [ "#### Train the policy network" ] }, { "cell_type": "code", "execution_count": 5, "id": "24f2c143-7909-47fa-a47b-7e7378ba51fe", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Episode 0 | Return = 8.00\n", "Episode 20 | Return = 12.00\n", "Episode 40 | Return = 17.00\n", "Episode 60 | Return = 51.00\n", "Episode 80 | Return = 28.00\n", "Episode 100 | Return = 29.00\n", "Episode 120 | Return = 40.00\n", "Episode 140 | Return = 66.00\n", "Episode 160 | Return = 53.00\n", "Episode 180 | Return = 87.00\n", "Episode 200 | Return = 32.00\n", "Episode 220 | Return = 152.00\n", "Episode 240 | Return = 118.00\n", "Episode 260 | Return = 85.00\n", "Episode 280 | Return = 87.00\n", "Episode 300 | Return = 262.00\n", "Episode 320 | Return = 133.00\n", "Episode 340 | Return = 341.00\n", "Episode 360 | Return = 448.00\n", "Episode 380 | Return = 193.00\n", "Episode 400 | Return = 340.00\n", "Episode 420 | Return = 346.00\n", "Episode 440 | Return = 496.00\n", "Episode 460 | Return = 183.00\n", "Episode 480 | Return = 516.00\n" ] } ], "source": [ "policy, history = reinforce()" ] }, { "cell_type": "markdown", "id": "73d05033-88ee-44cb-a0e0-6f61668d48b6", "metadata": {}, "source": [ "#### Plot the learning curve" ] }, { "cell_type": "code", "execution_count": 6, "id": "dabd7a3a-3b0a-4a3c-90d7-d0a11e1d8930", "metadata": {}, "outputs": [ { "data": { "image/png": "", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ "import matplotlib.pyplot as plt\n", "\n", "plt.plot(history)\n", "plt.xlabel(\"Episode\")\n", "plt.ylabel(\"Return\")\n", "plt.title(\"REINFORCE Learning Curve (CartPole)\")\n", "plt.show()\n" ] }, { "cell_type": "markdown", "id": "261266e4-0826-4835-8e42-bbf507515d14", "metadata": {}, "source": [ "#### Run 5 episodes either until failure or until 500 steps (whichever comes earlier)" ] }, { "cell_type": "code", "execution_count": 7, "id": "f880fdf7-052e-4fb2-af34-9ccfd8e9c9c2", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Saved REINFORCE_cartpole_episodes.gif\n" ] } ], "source": [ "import imageio\n", "import cv2\n", "from IPython.display import Image\n", "\n", "def record_episodes(num_episodes=5, filename=\"episodes.gif\"):\n", " env = gym.make(\"CartPole-v1\", render_mode=\"rgb_array\")\n", " frames = []\n", " for episode in range(num_episodes):\n", " state, _ = env.reset()\n", " \n", " done = False\n", " step = 0\n", " \n", " while not done:\n", " step += 1\n", " \n", " # Convert state to tensor\n", " state_t = torch.tensor(state, dtype=torch.float32)\n", " probs = policy(state_t)\n", " action = torch.argmax(probs).item()\n", " \n", " # Act\n", " state, reward, terminated, truncated, _ = env.step(action)\n", " done = terminated or truncated\n", " \n", " # Get frame\n", " frame = env.render()\n", " \n", " ## frame is in RGB format but OpenCV expects it in BGR format\n", " #frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)\n", " \n", " # Draw step number on frame\n", " frame = cv2.putText(\n", " frame, \n", " f\"Episode: {episode+1}, Step: {step}\", \n", " (10, 30), \n", " cv2.FONT_HERSHEY_SIMPLEX, \n", " 1, (255, 0, 0), 2\n", " )\n", " \n", " #cv2.imshow(\"CartPole\", frame)\n", " #cv2.waitKey(5) # Adjust speed (ms)\n", " frames.append(frame)\n", " \n", " env.close()\n", " imageio.mimsave(filename, frames, fps=100)\n", " print(f\"Saved {filename}\")\n", " #cv2.destroyAllWindows()\n", "\n", "filename = \"REINFORCE_cartpole_episodes.gif\"\n", "record_episodes(num_episodes=5, filename=filename)" ] }, { "cell_type": "code", "execution_count": 15, "id": "fd3eba4f-b770-4d3b-9eae-b1fb54fb202c", "metadata": {}, "outputs": [], "source": [ "#Image(filename)" ] }, { "cell_type": "markdown", "id": "782d925e-d19d-42e0-bb93-9024a823a3ac", "metadata": {}, "source": [ "
\n", " \"REINFORCE_cartpole_episodes\"\n", "
" ] }, { "cell_type": "markdown", "id": "453382d1-df60-4af7-99cc-02d4bf6b8ddc", "metadata": {}, "source": [ "#### GIFs" ] }, { "cell_type": "code", "execution_count": 9, "id": "f7d62da6-6fb9-4d84-9bf0-03af33bacf5c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Saved trained_cartpole.gif\n" ] } ], "source": [ "import imageio\n", "import cv2\n", "\n", "def record_trained_gif(policy, filename=\"trained_cartpole.gif\", max_steps=500):\n", " env = gym.make(\"CartPole-v1\", render_mode=\"rgb_array\")\n", " state, _ = env.reset()\n", "\n", " frames = []\n", " step = 0\n", " done = False\n", "\n", " while not done and step < max_steps:\n", " step += 1\n", "\n", " # Select greedy action from policy\n", " state_t = torch.tensor(state, dtype=torch.float32)\n", " probs = policy(state_t)\n", " action = torch.argmax(probs).item()\n", "\n", " state, r, terminated, truncated, _ = env.step(action)\n", " done = terminated or truncated\n", "\n", " frame = env.render()\n", " frame = cv2.putText(frame.copy(),\n", " f\"Step {step}\", (10, 30),\n", " cv2.FONT_HERSHEY_SIMPLEX, 1, (255,0,0), 2)\n", "\n", " frames.append(frame)\n", "\n", " env.close()\n", " imageio.mimsave(filename, frames, fps=30)\n", " print(f\"Saved {filename}\")\n", "\n", "\n", "record_trained_gif(policy)\n" ] }, { "cell_type": "markdown", "id": "87ada559-25e5-4754-a10f-b640b9fa0aa4", "metadata": {}, "source": [ "\"trained_cartpole\"" ] }, { "cell_type": "markdown", "id": "e765c573-2ef7-4426-b065-b98b8d81de70", "metadata": {}, "source": [ "#### Random vs Trained Policy" ] }, { "cell_type": "code", "execution_count": 14, "id": "c2f75462-d297-40fa-9281-6cce43d5ad3c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Saved comparison.gif\n" ] } ], "source": [ "from PIL import Image\n", "\n", "def combine_frames(f1, f2):\n", " return np.hstack([f1, f2])\n", "\n", "def record_side_by_side(policy, filename=\"comparison.gif\", max_steps=500):\n", " env1 = gym.make(\"CartPole-v1\", render_mode=\"rgb_array\") # random\n", " env2 = gym.make(\"CartPole-v1\", render_mode=\"rgb_array\") # trained\n", "\n", " s1, _ = env1.reset()\n", " s2, _ = env2.reset()\n", "\n", " frames = []\n", " for step in range(max_steps):\n", "\n", " # --- Random Agent ---\n", " a1 = env1.action_space.sample()\n", " s1, _, d1, t1, _ = env1.step(a1)\n", " frame_random = env1.render()\n", " cv2.putText(frame_random, \"Random\", (10,30),\n", " cv2.FONT_HERSHEY_SIMPLEX, 1, (255,0,0), 2)\n", "\n", " # --- Trained Agent ---\n", " probs = policy(torch.tensor(s2, dtype=torch.float32))\n", " a2 = torch.argmax(probs).item()\n", " s2, _, d2, t2, _ = env2.step(a2)\n", " frame_rl = env2.render()\n", " cv2.putText(frame_rl, \"Trained\", (10,30),\n", " cv2.FONT_HERSHEY_SIMPLEX, 1, (255,0,0), 2)\n", "\n", " frame = combine_frames(frame_random, frame_rl)\n", " frames.append(frame)\n", "\n", " if d1 or t1 or d2 or t2:\n", " break\n", "\n", " env1.close(); env2.close()\n", " imageio.mimsave(filename, frames, fps=5)\n", " print(f\"Saved {filename}\")\n", "\n", "record_side_by_side(policy)\n" ] }, { "cell_type": "markdown", "id": "1539e373-3af2-458b-bb21-726b41758f52", "metadata": {}, "source": [ "\"trained_cartpole\"" ] }, { "cell_type": "markdown", "id": "13c17795-bb19-40f2-be54-b7fae67fcd3e", "metadata": {}, "source": [ "#### Live Policy Probability Plot\n", "\n", "Shows how policy output evolves during one run (probs for LEFT vs RIGHT at every step)" ] }, { "cell_type": "code", "execution_count": 11, "id": "75d26fd7-11a5-4da5-b623-30d60d0996ba", "metadata": {}, "outputs": [ { "data": { "image/png": "", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "import matplotlib.pyplot as plt\n", "from IPython.display import clear_output, display\n", "\n", "def run_with_live_probs(policy, episodes=1, max_steps=500):\n", " env = gym.make(\"CartPole-v1\", render_mode=\"rgb_array\")\n", "\n", " for ep in range(episodes):\n", " state,_ = env.reset()\n", " probs_list = []\n", "\n", " for step in range(max_steps):\n", " state_t = torch.tensor(state, dtype=torch.float32)\n", " probs = policy(state_t).detach().numpy()\n", " probs_list.append(probs)\n", "\n", " action = np.argmax(probs)\n", " state, _, terminated, truncated, _ = env.step(action)\n", "\n", " # live plot\n", " clear_output(wait=True)\n", " plt.figure(figsize=(6,4))\n", " arr = np.array(probs_list)\n", " plt.plot(arr[:,0], label=\"LEFT\")\n", " plt.plot(arr[:,1], label=\"RIGHT\")\n", " plt.ylim(0,1)\n", " plt.title(f\"CartPole Policy Probabilities (Step {step})\")\n", " plt.legend()\n", " plt.grid()\n", " display(plt.gcf())\n", " plt.close()\n", "\n", " if terminated or truncated:\n", " break\n", "\n", " env.close()\n", "\n", "run_with_live_probs(policy)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "65f44c36-6374-4073-8aea-397eb6610e6f", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.0" } }, "nbformat": 4, "nbformat_minor": 5 }