Here, I will implement proximal policy optimization from scratch, line by line. We will use numpy for matrix operations, but there are no black boxes other than that.
No reinforcement learning background is needed.
Intended for readers who are familiar with:
Reinforcement learning has a different setup compared to typical deep learning tasks, where you have a neat dataset with inputs and labelled outputs.
We don't exactly have a dataset here. We have an environment, a simulation, e.g. a video game like Dota or Flappy Birds. The agent is our AI. It can have one or more neural networks, atleast in sub-field of deep reinforcement learning. The agent is supposed to learn some kind of task within this simulation, like winning the game.
The agent learns by playing around in the simulation and collecting some data. Then, it trains it's internal neural nets with that, all on it's own.
This makes it more challenging, but also a lot more exciting as you are not explicitly training the neural network for every single task within the simulation. You just reward it for achieving high level objectives, and the agent discovers and learns all the smaller tasks on its own.
There are a bunch of algorithms for training the agent in such setups, and the current most popular choice is an algorithm called 'Proximal Policy Optimisation' or PPO, which is what we will be implementing here.
If you are attempting any RL experiment, PPO is pretty much your goto algorithm.
Personally, I got interested in PPO few years back after OpenAI trained a small neural network using PPO to play DoTA. For those of you who don't know, DoTA is a fairly complex game, with way too many possible strategies, choices, and in-game knowledge bits that you need to even start playing.
Plus, it's a team game, where 5 players in each team need to coordinate and consider each other's strengths and weaknesses in order to make any meaningful progress in the game.
OpenAI then set up a match between 5 copies of their neural network against this team called OGs, which is the only team that has managed to win consecutive world championships in this game. This team was also at the peak of their career during that time.
These neural networks not only managed to beat them, but they did it with very unconventional strategies and by a large margin, which should tell you why RL is useful.
Training agents with RL could lead to quite miraculous and superhuman levels of intelligence on certain environments, but training using RL is brittle and prone to instabilities.
PPO is the algorithm that has the best reputation for being stable even in complex environments, and also delivering the best performance in agents.
Current RL research is more blocked by absence of fast and complex environments to experiment with, than with the algorithms. If PPO is good enough for DoTA, it will probably work for your case as well.
A simple environment is just two functions.
Let's say the game of Tetris is your environment. The state then consists of every block on the screen. The ones that have fallen down. The one that is currently falling down. The next block in the queue.
The actions are one of the following three:
So the action variable is just an integer which has 4 valid values: [0, 1, 2, 3].
The step function will update the state, changing the position or rotation of the falling piece. The position will be updated based on both the action, as well as gravity. Then, if the piece has fallen down, it will clear the horizontal lines wherever possible.
What about the reward?
The reward for this setup could be programmed in a couple of different ways. One way could be to reward with a +1 for every line that gets cleared, and a 0 whenever no line gets cleared. Another way could be to reward with based on the increase or decrease in height of the settled blocks. The way rewards are programmed in an environment could make the agent learn different things.
For this article, I am going to create a clone of a popular environment called cart pole. Where the agent needs to learn to balance a pole on a movable cart.
The full code is pretty short (100 lines) and is availbale in the appendix.
The reason it's in the appendix is because the environment can and will be switched with something else for most experiments. The environment code is completely separate from the algorithm and its implementation that I will be writing about here.
The main points you need to know about this environment are:
So here is a loop that tells us how the agent interacts with the environment.
observations = reset(state)
while True:
chosen_action = choose_action(agent, observations)
new_observataions, rewards, done = step(state, chosen_action)
observations = new_observations
if done:
break
`done` - is True when the simulation or game is over.
The agent in PPO has 2 neural networks inside it.
One is called the actor. It is also known as the policy. It takes the observations from the environment as input, and returns the probabilities for each of the valid actions as output. This is very similar to typical classification task, except the categories here are the actions.
The other one is called the critic. This one also takes the observations as the input, and returns a single number as the output. This number is called the critic value. This one is similar to a network you would train in a regression task. Infact, we are going to compute some targets values, so that we can use that and the outputs of this network to compute a loss and train the critic network with it.
The critic network looks at the observations, and estimates the total rewards the agent should be able to get in this environment in the future. The intuition behind this is something I'll go deeper into in the section about training.
The `choose_action` written in the above loop, simply means choosing an action based on the probabilities provided by the actor network.
Initially the parameters of these networks are initialised randomly. So the agent ends up choosing random actions.
def ffn(inp, params, grad=False):
w1_out = np.dot(inp, params['w1']) + params['b1']
relu_out = np.maximum(w1_out, 0)
logits = np.dot(relu_out, params['w2']) + params['b2']
if grad: # everything needed to calculate gradients in backward pass
return logits, {'inp': inp, 'w1_out': w1_out, 'relu_out': relu_out}
return logits
For training, we let the agent play around in the environment. It chooses its (somewhat random) actions based on the probabilities predicted by the actor, and the environment executes those actions and gives back new observations.
How long does the agent play around for?
This actually is one of the choices you can make. You can let it operate for exactly N steps. Which means letting the loop we defined above run for N iterations. In this case, if we get `done == True` before N steps, we just reset the environment.
This approach requires some additional logic later on, so I'll be going with something simpler, which is to let it play around till the simulation or game finishes, which means till we get a `done == True` inside the loop.
In each iteration of the loop, we store the following values to prepare training data later no:
So now our loop looks like this:
observations = reset(state)
data = []
while True:
chosen_action, probability = choose_action(agent, observations)
new_observataions, reward, done = step(state, chosen_action)
data.append((
observations, chosen_action, reward,
probability, new_observations, done
))
observations = new_observations
if done:
break
Once this loop breaks, we are left with some recorded data.
We will take this recorded_data and turn it into batches. For our cart pole environment, the number of recorded points will not go beyond a few hundred due to the nature of the environment.
So, we can accomodate all of this data into a single batch easily on most laptops, and setups like Google Colab, which I recommend for running this code
So our batch making function will look like this:
def make_batch(data):
obs, acts, rews, newobs, probs, dones = zip(*[
(s, [a], [r], s_prime, [prob_a], [0 if done else 1])
for s, a, r, s_prime, prob_a, done in data
])
obs, acts, rews, newobs, probs, dones = map(
np.array,
(obs, acts, rews, newobs, probs, dones)
)
return obs, acts, rews, newobs, dones, probs
All I'm doing here, is separating item from our recorded tuples, into their own list and then converting them into numpy arrays
Now we do some computations using these batches. We are going to calculate the following values.
1. Target values for the critic:
There is a simple formula for this which can be written in one line of code like this:
critic_outputs = critic(new_obs, critic_params, grad=False)
targets = rewards + gamma * critic_outputs * dones
Let me explain. `rewards` and `dones` are numpy arrays of shape (N,1). We got these from the `make_batch` function above. `gamma` is a hyperparameter that we set to 0.98. `critic_outputs` is also a numpy array of shape (N,1). We get these by feeding the `new_observations` batch into the critic network.
Now, the `dones` array is going to have all 1s except the last value, which is going to be 0. This 0 was recorded during the last iteration of our agent-environment loop.
The N in the shapes mentioned above, refer to the N time steps within the environment, when they were recorded.
So for time step t, we can take a look at `rewards[t,0]` and that would give you the reward that was recorded at the t-th iteration.
`critic_outputs[t,0]` however, would give you the critic value at time step t+1, because we are using the `new_observations` recorded during that iteration and not the observations given to the actor.
As I mentioned earlier, the critic's output represents the estimate of total rewards the agent should be able to get, based on the input observations.
Now the critic value at time step (t+1) represents that, and we multiply that with gamma, which is slightly less than 1 and then we add the actual rewards we received at time step t.
The reason we multiply the total rewards estimated in the next time step with gamma (0.98) is to put slightly more emphasis on the actual rewards we received immediately compared to the rewards we estimate to receive later on.
Becase the last value in the `dones` array is 0, the target for the last time step simply equals the actual rewards we had recorded.
Note that `grad` is False while calling the neural network function. This is because we will not be doing a backward pass for the forward passed used in these computation. These are just values we are computing in order to calculate the targets for our training.
2. The second value we need to calculate is called advantages. This is also going to be of the shape (N,1) so 1 value for each of the time steps in the batch.This one is calculated in reverse order.
delta = targets - critic(obs, critic_params, grad=False) advantage_lst = [] advantage = 0.0 for delta_t in delta[::-1]: advantage = gamma * lmbda * advantage + delta_t[0] advantage_lst.append([advantage]) advantage_lst.reverse() advantage = np.array(advantage_lst, dtype=np.float32)
Let's go through this line by line. `delta` is simply the difference between the targets we just calculated and the critic's output using the observations of that time step. (And not the next time step like we used during the calculation of targets)
Then, we start by setting the advantage for the last time step to the delta of that time step.
For the second last time step, the advantage is delta[N-2] + delta[N-1] * gamma * lambda.
lambda is another hyperparameter that is set to the value 0.95.
For the third last time step, the advantage is delta[N-3] + delta[N-2] * gamma * lambda + delta[N-1] * gamma * lambda * gamma * lambda
For time step t, the advantage is delta[t] + delta[t+1] * lambda * gamma + delta[t+2] * (gamma * lambda) ^ 2 + ...
Advantage for a particular time step tells you the cumulative effect of chosen actions over time, putting more emphasis on the immediate time steps than the future ones by multiplying them with gamma and lambda. The effect we are talking about here is just the difference between the rewards predicted by critic, and the target values for the critic based on the actual rewards we recieved.
Now for calculating the critic loss, we can simply use a root mean square loss function, as we just want our critic to predict as close as possible to the targets we just calculated.
Root mean square loss should be familiar for a lot of you. Here is the code for it in plain numpy
critic_loss = 0.5 * (critic_vals - targets) ** 2
critic_vals_grad = (critic_vals - targets)
All we are doing here, is calculating the loss value as per the formula, and doing the backward operation to calculate the gradients for the predictions.
Now, let's do a forward pass for the actor network.
logits, hs_actor = actor(obs, actor_params, grad=True)
probs = softmax(logits)
logprobs = np.log(probs)
logpi_a = logprobs[np.arange(probs.shape[0]), a.flatten()]
ratio = np.exp(logpi_a - np.log(prob_a))
surr1 = ratio * advantage
surr2 = np.clip(ratio, 1-eps_clip, 1+eps_clip) * advantage
actor_loss = -np.minimum(surr1, surr2)
The process may seem long and complex. But let me try and give you some intuition behind it.
We have two main terms here: ratio and advantage. Ratio at time step t, is the ratio of log probabilities for the chosen action by the current actor (which is undergoing the training) and the log of probabilities that were recorded from our agent environment loop.
This ratio represents the deviation from the old actor (the one just before this training started) and clipping it basically prevents large updates in the parameters of the actor network in a single training step.
The advantage at time step t, as explained earlier, represents the total difference in rewards we actually received based on the actions chosen by the actor, and the rewards predicted by the critic, discounting the future values and putting more weight on the immediate time steps
If the advantage is positive, meaning the chosen action was good, the loss value would be lower. If the advantage is negative, meaning the chosen action was bad, the loss value would be higher.
We are going to calculate the gradients, and update the parameters, in order to make this loss value lower, which means, making the actor choose good actions more.
Backward pass without using libraries like pytorch may seem daunting, but it's really not. There are only a handful of operations that we have to go through.
First lets define the backward function for our feed forward network.
def ffn_backward(out_grad, hidden_states, params):
relu_out_grad = np.dot(out_grad, params['w2'].T)
b2_grad = np.sum(out_grad, axis=0, keepdims=False)
w2_grad = np.dot(hidden_states['relu_out'].T, out_grad)
w1_out_grad = relu_out_grad * (hidden_states['w1_out'] > 0)
w1_grad = np.dot(hidden_states['inp'].T, w1_out_grad)
b1_grad = np.sum(w1_out_grad, axis=0, keepdims=False)
return {'w1': w1_grad, 'w2': w2_grad, 'b1': b1_grad, 'b2': b2_grad}
This function simply takes the gradient of the outputs, and the hidden states that were returned by the forward function when we passed `grad = True`, to go backward on the matmul and relu operations.
It finally returns a dict which contains the gradients for each parameter of the neural network.
Now we need to calculate the gradients of the outputs of actor and critic.
The gradients of critic outputs are already provided by our `smooth_l1_loss` function.
To calculate the gradients of actor outputs, we do the following:
surr1_grad = np.zeros_like(surr1)
surr2_grad = np.zeros_like(surr2)
surr1_grad += np.where(surr1 <= surr2, -1, 0)
surr2_grad += np.where(surr1 >= surr2, -1, 0)
ratio_grad = np.zeros_like(ratio)
ratio_grad += surr1_grad * advantage
clip_grad = surr2_grad * advantage
ratio_grad += np.where(np.clip(ratio, 1-eps_clip, 1+eps_clip) == ratio, clip_grad, 0)
diff_grad = ratio * ratio_grad
logpi_a_grad = diff_grad
logprobs_grad = np.zeros_like(logprobs)
for i in range(logprobs_grad.shape[0]):
logprobs_grad[i,a[i,0]] += logpi_a_grad[i,0]
softmax_output = probs
logits_grad = logprobs_grad - softmax_output * np.sum(logprobs_grad, axis=1, keepdims=True)
Going through this line by line.
The first 4 lines simply go backward on the min operation.
The gradient of the loss is 1.0.
The next 3 lines go backward on the multiplication operation.
The next line then goes backward on the clipping operation.
The for loop you see simply assigns gradient values to the log probabilities of the chosen actions.
The gradients for all other actions remain 0.
Finally, the last 2 lines go backward on the log softmax operation, which is much more practical than going backward on log and softmax operation separately.
Now, we can pass the `logits_grad` and `critic_vals_grad` into the `ffn_backward` function to get the gradients of the parameters of these neural nets.
actor_grads = ffn_backward(logits_grad, hs_actor, actor_params)
critic_grads = ffn_backward(critic_vals_grad, hs_critic, critic_params)
Now we can update the parameters by multiplying the gradients by a learning rate, and subtracting that from the parameters. But we'll use the adam optimiser for better results.
Adam optimiser is also quite simple to implement. We just need two functions.
Explaining how adam works, and is better than simple stochastic gradient descent is not in the scope of this article, so I'll just write the code for that below, and you can refer to the links given after that, if you want to dig deeper.
def init_adam(params, lr=0.001, beta1=0.9, beta2=0.999, eps=1e-8):
adam_dict = {
'lr': lr,
'beta1': beta1,
'beta2': beta2,
'eps': eps,
'state': {},
'state_step': 0
}
for key, val in params.items():
adam_dict['state'][key] = {
'm': np.zeros_like(val), # First moment vector
'v': np.zeros_like(val) # Second moment vector
}
return adam_dict
def adam_step(grads, params, adam_dict):
adam_dict['state_step'] += 1
state_step = adam_dict['state_step']
lr = adam_dict['lr']
beta1 = adam_dict['beta1']
beta2 = adam_dict['beta2']
eps = adam_dict['eps']
state = adam_dict['state']
for key in params:
grad = grads[key]
param = params[key]
m = state[key]['m']
v = state[key]['v']
m = beta1 * m + (1 - beta1) * grad
v = beta2 * v + (1 - beta2) * (grad ** 2)
m_hat = m / (1 - beta1 ** state_step)
v_hat = v / (1 - beta2 ** state_step)
param -= lr * m_hat / (np.sqrt(v_hat) + eps)
state[key]['m'] = m
state[key]['v'] = v
Further reading for adam optimiser:
Now we have everything needed to define a main function that utilised all the code we've written so far to train an agent.
def main():
env = CartPoleEnv()
actor_params, critic_params = init_model()
opta = init_adam(actor_params, lr=learning_rate)
optc = init_adam(critic_params, lr=learning_rate)
data = []
score = 0.0
print_interval = 20
running_reward = 0
for n_epi in range(5000):
s = env.reset()
done = False
episode_reward = 0
while not done:
for t in range(T_horizon):
obs = s.reshape(1, -1)
logits = actor(obs, actor_params, grad=False)
prob = softmax(logits)[0]
a = np.random.choice(num_actions, p=prob)
s_prime, r, done, info = env.step(a)
data.append((s, a, r/100.0, s_prime, prob[a], done))
s = s_prime
score += r
episode_reward += r
if done:
break
train(actor_params, critic_params, data, opta, optc)
data = []
running_reward = 0.05 * episode_reward + (1 - 0.05) * running_reward
if n_epi % 100 == 0:
template = "running reward: {:.2f} at episode {}"
print(template.format(running_reward, n_epi))
if running_reward > 195: # Condition to consider the task solved
print("solved at episode {}!".format(n_epi))
break
Everything that we talked about till now - the agent interacting with the environment, collecting data, computing advantages, computing losses, calculating gradients, and updating the parameters - all of this is considered one episode of training.
We repeat these episodes many many times, and the agent starts learning.
For the cart pole environment, the training is considered complete when the running reward is more than 195, which basically means that the agent has been able to balance the pole for a long time for a bunch of episodes.
The `train` function is in the code above, is something we have coded in pieces, but not defined yet. So here the code for that, combining the code snippets I explained earlier.
def train(actor_params, critic_params, data, opta, optc):
obs, acts, rews, newobs, dones, probs = make_batch(data)
for i in range(K_epoch):
targets = rews + gamma * ffn(newobs, critic_params, grad=False) * dones
delta = targets - ffn(obs, critic_params, grad=False)
advantage_lst = []
advantage = 0.0
for delta_t in delta[::-1]:
advantage = gamma * lmbda * advantage + delta_t[0]
advantage_lst.append([advantage])
advantage_lst.reverse()
advantage = np.array(advantage_lst, dtype=np.float32)
logits, hs_actor = ffn(obs, actor_params, grad=True)
actor_probs = softmax(logits)
logprobs = np.log(actor_probs)
logpi_a = logprobs[np.arange(actor_probs.shape[0]), acts.flatten()]
ratio = np.exp(logpi_a - np.log(probs))
surr1 = ratio * advantage
surr2 = np.clip(ratio, 1-eps_clip, 1+eps_clip) * advantage
critic_vals, hs_critic = ffn(obs, critic_params, grad=True)
actor_loss = -np.minimum(surr1, surr2)
critic_loss = 0.5 * (critic_vals - targets) ** 2
# backward
surr1_grad = np.zeros_like(surr1)
surr2_grad = np.zeros_like(surr2)
surr1_grad += np.where(surr1 <= surr2, -1, 0)
surr2_grad += np.where(surr1 >= surr2, -1, 0)
ratio_grad = np.zeros_like(ratio)
ratio_grad += surr1_grad * advantage
clip_grad = surr2_grad * advantage
ratio_grad += np.where(np.clip(ratio, 1-eps_clip, 1+eps_clip) == ratio, clip_grad, 0)
diff_grad = ratio * ratio_grad
logpi_a_grad = diff_grad
logprobs_grad = np.zeros_like(logprobs)
for i in range(logprobs_grad.shape[0]):
logprobs_grad[i,acts[i,0]] += logpi_a_grad[i,0]
softmax_output = actor_probs
logits_grad = logprobs_grad - softmax_output * np.sum(logprobs_grad, axis=1, keepdims=True)
actor_grads = ffn_backward(logits_grad, hs_actor, actor_params)
adam_step(actor_grads, actor_params, opta)
critic_vals_grad = np.zeros_like(critic_vals)
critic_vals_grad += (critic_vals - targets)
critic_grads = ffn_backward(critic_vals_grad, hs_critic, critic_params)
adam_step(critic_grads, critic_params, optc)
All of the code combined in a single file can be found in this gist.
Go through it, run it in a colab notebook, tweak it, try other environments, and see how it all comes together!
That's it! Thank you for reading this far. If you have any question, you can find me on X/twitter. If you'd like to buy me a coffee to support my work, here's the link.
Note on hyperparamters:
We have a bunch of hyperparameters over here:
gamma, lambda, learning rate for adam, T_horizon, K_epoch, eps_clip. Good values for this setup with the cart pole env are already known. You can find those values in the gist.
However, RL algorithms, including PPO are quite sensitive to their values. So for your environment and setup, you might feel like your code is not working, and it might just be one hyperparameter that is slightly off.
It's a good idea to do some sort of sweep over different values to find the one that works in your case.
Code for the cartpole environment
import numpy as np
import math
class CartPoleEnv:
def __init__(self):
# Environment parameters
self.gravity = 9.8
self.masscart = 1.0
self.masspole = 0.1
self.total_mass = self.masscart + self.masspole
self.length = 0.5 # Half the pole's length
self.polemass_length = self.masspole * self.length
self.force_mag = 10.0
self.tau = 0.02 # Time step (seconds)
self.kinematics_integrator = "euler"
# Termination thresholds
self.theta_threshold_radians = 12 * 2 * math.pi / 360 # ±12° in radians
self.x_threshold = 2.4 # Cart position threshold (±2.4 meters)
# Observation space bounds
high = np.array(
[
self.x_threshold * 2, # Cart position
np.finfo(np.float32).max, # Cart velocity
self.theta_threshold_radians * 2, # Pole angle
np.finfo(np.float32).max, # Pole angular velocity
],
dtype=np.float32,
)
self.observation_space = np.array([-high, high])
# Action space
self.action_space = 2 # Discrete actions: 0 (left) or 1 (right)
# Initialize state
self.state = None
self.steps_beyond_terminated = None
def reset(self):
# Initialize state with random values in the range (-0.05, 0.05)
self.state = np.random.uniform(low=-0.05, high=0.05, size=(4,))
self.steps_beyond_terminated = None
return np.array(self.state, dtype=np.float32)
def step(self, action):
# Ensure the action is valid
assert action in [0, 1], f"Invalid action: {action}"
# Unpack the state
x, x_dot, theta, theta_dot = self.state
# Apply force
force = self.force_mag if action == 1 else -self.force_mag
# Compute dynamics
costheta = math.cos(theta)
sintheta = math.sin(theta)
# Temporary variable for pole dynamics
temp = (
force + self.polemass_length * theta_dot**2 * sintheta
) / self.total_mass
# Compute angular acceleration
thetaacc = (self.gravity * sintheta - costheta * temp) / (
self.length * (4.0 / 3.0 - self.masspole * costheta**2 / self.total_mass)
)
# Compute cart acceleration
xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass
# Update state using Euler integration
if self.kinematics_integrator == "euler":
x = x + self.tau * x_dot
x_dot = x_dot + self.tau * xacc
theta = theta + self.tau * theta_dot
theta_dot = theta_dot + self.tau * thetaacc
else: # Semi-implicit Euler
x_dot = x_dot + self.tau * xacc
x = x + self.tau * x_dot
theta_dot = theta_dot + self.tau * thetaacc
theta = theta + self.tau * theta_dot
# Update the state
self.state = np.array([x, x_dot, theta, theta_dot])
# Check termination conditions
terminated = bool(
x < -self.x_threshold
or x > self.x_threshold
or theta < -self.theta_threshold_radians
or theta > self.theta_threshold_radians
)
# Assign reward
if not terminated:
reward = 1.0
elif self.steps_beyond_terminated is None:
# Pole just fell!
self.steps_beyond_terminated = 0
reward = 1.0
else:
if self.steps_beyond_terminated == 0:
print(
"Warning: You are calling 'step()' even though this "
"environment has already returned terminated = True. You "
"should always call 'reset()' once you receive 'terminated = "
"True' -- any further steps are undefined behavior."
)
self.steps_beyond_terminated += 1
reward = 0.0
return np.array(self.state, dtype=np.float32), reward, terminated, {}