Teach a convolutional neural network to play Slither.io with deep deterministic policy gradients.

We start by setting up a reinforcement learning framework for Slither.io, a fun twist on the classic snake game. In this online game, the player controls a snake and tries to collect food while avoiding running into other snakes; when a snake dies, its body decomposes into food so players will often attempt to kill their opponents by blocking their path.

We used Electron to build a desktop app which runs the Slither.io client and listens on localhost, allowing us to run multiple games simultaneously and interact with them from a python client. Due to the highly stochastic nature of this task, we believe that being able to run simulations in parallel will be key to successfully training an RL agent. Next, we implement the Deep Deterministic Policy Gradient method described by Lillicrap et al. [1]

Actor-critic methods
import numpy as np
import tensorflow as tf
from keras.models import *
from keras.layers import *

TAU = 0.001

class DDPG:
    def __init__(self, state_dim, action_dim):
        sess = tf.Session()
        self.actor = Actor(sess, state_dim, action_dim)
        self.critic = Critic(sess, state_dim, action_dim)

    def fit(self, states, actions, rewards, nstates, over):
        gradients = self.critic.get_gradients(states, actions)
        self.actor._train(states, gradients)

        targets = self.critic.get_rewards(states, actions)
        nrewards = self.critic.get_rewards(nstates, self.actor.get_actions(nstates))
        for i in range(states.shape[0]):
            if over[i]: targets[i] = rewards[i]
            else: targets[i] = rewards[i] + 0.7 * nrewards[i]
        self.critic._train(states, actions, targets)

        self.actor._update()
        self.critic._update()

    def get_actions(self, states):
        return self.actor.get_actions(states)
Building the actor
class Actor:

    def __init__(self, sess, state_dim, action_dim):
        self.sess = sess
        K.set_session(sess)

        def make_model():
            state = Input(shape=[state_dim])
            action = Dense(16, activation='relu')(state)
            action = Dense(action_dim)(action)
            model = Model(input=state, output=action)
            parameter = model.trainable_weights
            return model, state, action, parameter

        self.target_model, _, _, _ = make_model()
        model, state, action, parameter = make_model()
        self.model = model
        self.state = state

        optimizer = tf.train.AdamOptimizer()
        action_gradient = tf.placeholder(tf.float32, [None, action_dim])
        parameter_gradient = tf.gradients(action, parameter, -action_gradient)
        optimize = optimizer.apply_gradients(zip(parameter_gradient, parameter))
        self.optimize = optimize
        self.action_gradient = action_gradient

        sess.run(tf.global_variables_initializer())

    def _train(self, states, gradients):
        self.sess.run(self.optimize, feed_dict={
            self.state: states,
            self.action_gradient: gradients
        })

    def _update(self):
        weights = self.model.get_weights()
        target_weights = self.target_model.get_weights()
        for i in range(len(weights)):
            target_weights[i] = TAU * weights[i] + (1 - TAU)* target_weights[i]
        self.target_model.set_weights(target_weights)

    def get_actions(self, states):
        return self.target_model.predict(states)

Building the critic
class Critic:

    def __init__(self, sess, state_dim, action_dim):
        self.sess = sess
        K.set_session(sess)

        def make_model():
            state = Input(shape=[state_dim])  
            action = Input(shape=[action_dim])
            reward = merge([state, action], mode='concat')
            reward = Dense(16, activation='relu')(reward)
            reward = Dense(1)(reward)
            model = Model(input=[state, action], output=reward)
            model.compile(loss='mse', optimizer='adam')
            return model, state, action

        self.target_model, _, _ = make_model()
        model, state, action = make_model()
        self.model = model
        self.state = state
        self.action = action

        self.action_gradient = tf.gradients(model.output, action)
        self.sess.run(tf.global_variables_initializer())

    def _train(self, states, actions, targets):
        self.model.fit([states, actions], targets, nb_epoch=1, verbose=False)

    def _update(self):
        weights = self.model.get_weights()
        target_weights = self.target_model.get_weights()
        for i in range(len(weights)):
            target_weights[i] = TAU * weights[i] + (1 - TAU) * target_weights[i]
        self.target_model.set_weights(target_weights)

    def get_rewards(self, states, actions):
        return self.target_model.predict([states, actions])

    def get_gradients(self, states, actions):
        return self.sess.run(self.action_gradient, feed_dict={
            self.state: states,
            self.action: actions
        })[0]
A quick sanity check
from random import random

class Game:
    def __init__(self, noise=False):
        self.steps = 0
        self.noise = noise
        self.target = random()
        self.location = random()

    def _noise(self):
        if not self.noise:
            return 0.0
        return (random() - 0.5) / 100.0

    def over(self):
        if self.steps >= 10: return True

    def get_state(self):
        state = [0, 0, 0, 0]
        state[0] = self.target - self.location + self._noise()
        state[1] = self.target - self.location + self._noise()
        state[2] = self.location - self.target + self._noise()
        state[3] = self.location - self.target + self._noise()
        return state

    def get_score(self):
        return -abs(self.target - self.location)

    def do_action(self, action):
        if action < -1.0: action = -1.0
        if action > 1.0: action = 1.0
        self.location += action
        self.steps += 1
import numpy as np
from tqdm import tqdm
from ddpg import DDPG
from game import Game
from random import random, sample

state_dim, action_dim = 4, 1
ddpg = DDPG(state_dim, action_dim)


history = []
nb_matches = 64
for match in range(nb_matches):
    total_reward = 0.0

    games = [Game() for _ in range(100)]
    while len(games) > 0:
        states = [game.get_state() for game in games]
        actions = ddpg.get_actions(np.array(states))
        for i in range(len(games)):
            if random() < max(0.1, 1.0 - match * 2.0 / nb_matches):
                actions[i,0] = random() - 0.5
            games[i].do_action(actions[i,0])
            reward = games[i].get_score()
            nstate = games[i].get_state()
            history.append((states[i], actions[i], reward, nstate, games[i].over()))
            total_reward += reward
        games = list(filter(lambda g: not g.over(), games))
    total_reward /= 100.0

    S, A, R, NS, O = map(np.array, zip(*history))
    ddpg.fit(S, A, R, NS, O)
    print(total_reward)

    if len(history) > 100000:
        history = sample(history, 10000)

  1. https://arxiv.org/abs/1509.02971 ↩︎