Week 14: Wildcard

HTMAA 2024 - Jonathan Cohen

Group Assignment

CMU sketch

We first did a safety training for using the UR15 robot arms. We learned about how to start, stop and manually control them. They have a useful python API that we will use for interfacing with them. ( Python Link )

Group Assignment

For this week's assignment, I teamed up with Marcello, Michelle, and Hyejun to build the best flappy bird player in the world with the UR robot arm. We built on the code from Marcello's earlier servo flappy bird project ( AI Flappy Bird )

CMU sketch

The computer is running two python scripts in parallel and commands are sent over a tcp socket to the robot arm.

CMU sketch

Flappy bird physics are very simple, the bird velocity is defined by impacts and gravity. The bird is always falling and when the keyboard is pressed, that velocity is simply made positive (equivalent to a perfectly elastic collision) and then begins falling again.

My Contributions

I helped seperate out the python programs into two so that the gamplay would not extremely slow. Marcello and I tuned the velocity, pipe generation, gravity, first pipe distance and other parameters to make it easier for the robot to train.

Final Results

CMU sketch

We achieved a high score of 8 which is...okay but the game velocity was super fast and very hard to play as a human. We spend a lot of time trying to get the robot actuation and gameplay working at a normal speed so we are happy with this. The robot needs more training time. Code below.

Flappy Code

    
#  edited by Michelle, Jonny, Marcello, and Hyejun
# (c) Marcello Tania 17/04/21
#
# This work may be reproduced, modified, distributed,
# performed, and displayed for any purpose. Copyright is
# retained and must be preserved. The work is provided
# as is; no warranty is provided, and users accept all 
# liability.
#
import time   
import pygame, sys, random
import numpy
# scipy.special for the sigmoid function expit() 
import scipy.special
import socket

# Define the serial port and baud rate.
# Ensure the 'COM#' corresponds to what was seen in the Windows Device Manager
# Note: All robot commands have been replaced with socket sends to the robot server.

# Connect to robot server
HOST = '127.0.0.1'  # or IP where robot_server.py is running
PORT = 50007
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))

def send_robot_command(cmd):
    s.sendall(cmd.encode('utf-8'))

# Send MOVE_HOME at the start
send_robot_command("MOVE_HOME")

PI = 3.1415926535
#keyboard = Controller()

# neural network class for the brain of the birds
class Individual():
        
    # initialise the neural network
    def __init__(self, inputnodes, hiddennodes, outputnodes,fitness):
        # set number of nodes in each input, hidden, output layer
        self.inodes = inputnodes
        self.hnodes = hiddennodes
        self.onodes = outputnodes
        
        self.fitness = fitness
        

        # link weight matrices, wih and who
        # weights inside the arrays are w_i_j, where link is from node i to node j in the next layer
        # w11 w21
        # w12 w22 etc
        '''
        self.wih = numpy.random.normal(0.0, pow(self.hnodes,-0.5), (self.hnodes, self.inodes))
        self.who = numpy.random.normal(0.0, pow(self.onodes,-0.5), (self.onodes, self.hnodes))
        
        '''
        
        self.wih = numpy.array([[-3.34829867,  0.61441159,  2.50556884, -0.37947868],
 [ 0.3240327,   2.35160679, -1.94486032, -2.1700158 ],
 [-2.62540542,  2.33445864, -0.46812661, -2.2635345 ],
 [-1.00241636,  1.89882317, -2.77465566, -1.34619994],
 [-0.20231266,  2.49082877,  1.08143091,  0.53047555],
 [ 0.67497593,  1.52985698, -3.86579115, -0.20542114],
 [ 2.7336978,   2.26497664,  1.96146316, -0.64662931]])
        self.who = numpy.array([[-4.33668794, -1.30929065,  0.05054322,  0.16018827,  2.50858315, -0.42650511, -1.36117085]])
        
        '''
        self.wih = numpy.array([[-3.24829867,  1.06441159,  2.35556884, -0.17947868],
                                [ 0.6740327,   2.15160679, -1.54486032, -1.4700158 ],
                                [-2.72540542,  2.18445864, -0.81812661, -2.3135345 ],
                                [-0.85241636,  2.39882317, -2.52465566, -1.19619994],
                                [-0.05231266,  2.19082877,  0.88143091,  0.23047555],
                                [ 0.57497593,  1.57985698, -4.36579115,  0.19457886],
                                [ 2.6336978,   2.51497664,  1.51146316, -0.69662931]])

        self.who = numpy.array([[-4.38668794, -1.20929065,  0.40054322,  0.16018827,  2.95858315, -0.42650511, -1.01117085]])

        '''
        
   

        # activation function is the sigmoid function
        self.activation_function = lambda x: scipy.special.expit(x)
        
        pass
    
      # query the neural network
    def query(self, inputs_list):
        # convert inputs list to 2d array
        inputs = numpy.array(inputs_list, ndmin=2).T
        # calculate signals into hidden layer 
        hidden_inputs = numpy.dot(self.wih, inputs)
        # calculate the signals emerging from hidden layer 
        hidden_outputs = self.activation_function(hidden_inputs)
        # calculate signals into final output layer 
        final_inputs = numpy.dot(self.who, hidden_outputs) 
        # calculate the signals emerging from final output layer
        final_outputs = self.activation_function(final_inputs)

        return final_outputs

class Block(pygame.sprite.Sprite):
    """
    Block class to take the surface and put a rectangle around it 
    and put it on the screen
    """
    def __init__(self,path,x_pos,y_pos):
        super().__init__()
        self.image = pygame.image.load(path).convert()
        self.image = pygame.transform.scale2x(self.image)
        self.rect = self.image.get_rect(center = (x_pos,y_pos))

class Floor(Block):
    """
    Floor class representing the floor of the game
    """
    VEL = 1
    def __init__(self,path,x_pos,y_pos):
        super().__init__(path,x_pos,y_pos)
        self.x_pos = x_pos
        self.y_pos = y_pos
        
    def move(self):
        """
        Move floor so it looks like its scrolling
        :param speed: the velocity of the floor
        :return: None
        """
        self.x_pos -= self.VEL
        
        if self.x_pos <= -576:
            self.x_pos = 576
        
    def draw(self, screen):
        """
        Draw the floor. This is two imgaes that move together.
        :param screen: the pygame surface or window
        :return: None
        """
        screen.blit(self.image, (self.x_pos,self.y_pos))
        
class Bird(Block):
    """
    Bird class representing the flappy bird
    """
    GRAVITY = 0.1
    VEL = 9
    
    def __init__(self,path,x_pos,y_pos):
        """
        Initialize the object
        :param x_pos: starting x pos (int)
        :param y_pos: starting y pos (int)
        :return: None
        """
        super().__init__(path,x_pos,y_pos)
        self.x_pos = x_pos
        self.y_pos = y_pos
        self.bird_movement = 0
        self.score = 0
        self.high_score = 0
        
    def jump(self):
        """
        make the bird jump
        :return: None
        """
        self.bird_movement = 0
        self.bird_movement -= self.VEL
        
        
        
    def move(self):
        """
        Make the bird fall and jump
        :param gravity: gravity velocity
        :return: None
        """
        self.bird_movement += self.GRAVITY
        
    
    def check_collision(self, pipes):
        """
        Check the bird if it collides vertically or with the pipes
        :param pipes: list of the pipes
        :return: True = collision detected
        :return: False = no collision
        """
        for pipe in pipes:
            if self.rect.colliderect(pipe):
                return False
        if self.rect.top <= -100 or self.rect.bottom >= 900:
            return False
        return True
        
    def draw(self, screen):
        """
        Draw the bird
        :param win: the pygame surface or window
        :return: None
        """
        self.rect.y += self.bird_movement
        screen.blit(self.image, (self.rect.x,self.rect.y))
        
        
    def pos_y(self):
        """
        Bird postition
        :return: y position of bird
        """
        return self.rect.y
        
    def pipe_score_check(self, pipes):
        """
        Add score if bird pass through the pipes
        :return: None
        """
        if pipes:
            for pipe in pipes:
                if 95 < pipe.centerx < 105:
                    self.score += 0.5
    
    def update_score(self):
        """
        Check for new high score
        :return: high score
        """
        if self.score > self.high_score:
            self.high_score = self.score
        return self.high_score
                    
    def score_display(self):
        """
        Display score and highscore
        :return: none
        """
        score_surface = game_font.render(f'Score: {int(self.score)}',True,(255,255,255))
        score_rect = score_surface.get_rect(topleft = (20,50))
        screen.blit(score_surface, score_rect)
        
        score_surface = game_font.render(f'High score: {int(self.update_score())}',True,(255,255,255))
        score_rect = score_surface.get_rect(topleft = (20,100))
        screen.blit(score_surface, score_rect)
        
        
class Pipe():
    """
    Pipe class representing the pipes
    """
    VEL = 10
    GAP = 500
    HEIGHT = [490,550,600]
    X_INIT = 700
    def __init__(self,path):
        """
        Initialize the object
        :return: None
        """
        self.image = pygame.image.load(path).convert()
        self.image = pygame.transform.scale2x(self.image)
        #self.height = self.image.get_height()
    
    def create_pipe(self):
        """
        Create a list of top and bottom pipes
        :return: tupple of top and bottom pipes
        """
        random_pipe_pos = random.choice(self.HEIGHT)
        bottom_pipe = self.image.get_rect(midtop = (self.X_INIT,random_pipe_pos))
        top_pipe =  self.image.get_rect(midbottom = (self.X_INIT,random_pipe_pos - self.GAP))
        return bottom_pipe,top_pipe
        
    def move(self, pipes):
        """
        Move all the pipes
        :return: visible pipes list
        """
        for pipe in pipes:
            pipe.centerx -= self.VEL
        visible_pipes = [pipe for pipe in pipes if pipe.right> -50]
        return visible_pipes

    
    def draw(self, screen, pipes):
        """
        Draw the pipe.
        :param screen: the pygame surface or window
        :return: None
        """
        for pipe in pipes:
            if pipe.bottom >= 1024:
                screen.blit(self.image, pipe)
            else:
                flip_pipe = pygame.transform.flip(self.image,False,True)
                screen.blit(flip_pipe, pipe)
                
    def pos_x(self, pipes):
        # only take the pipe in front of the bird and shown on the screen
        visible_pipes = [pipe for pipe in pipes if pipe.centerx > 100 and pipe.right < 550]
        # only take the bottom pipe because top and bottom pipes x positions are the same
        bottom_pipes = [pipe for pipe in visible_pipes if pipe.bottom >= 1024]
        for pipe in bottom_pipes:
            x = pipe.centerx
            return x
        
    def pos_y_bottom(self,pipes):
        # To do: Find the clossest pipe
        visible_pipes = [pipe for pipe in pipes if pipe.centerx > 100 and pipe.right < 550]
        # only take the bottom pipe because top and bottom pipes x positions are the same
        bottom_pipes = [pipe for pipe in visible_pipes if pipe.bottom >= 1024]
        for pipe in bottom_pipes:
            y = pipe.top
            return y
        
    def pos_y_top(self,pipes):
        # To do: Find the clossest pipe
        visible_pipes = [pipe for pipe in pipes if pipe.centerx > 100 and pipe.right < 550]
        # only take the bottom pipe because top and bottom pipes x positions are the same
        bottom_pipes = [pipe for pipe in visible_pipes if pipe.bottom >= 1024]
        for pipe in bottom_pipes:
            y = pipe.top + self.GAP
            return y

def end_game():
    send_robot_command("MOVE_DOWN")
    print('Generation\tBest fitness')
    print('------------------------------------')

    for i in range(1,gen+1):
        print('{}\t{}'.format(i,best_fitness[i]))

    print('The best so far is {}:'.format(round(best_so_far.fitness,5)))
    print('The best so far who {}:'.format(best_so_far.who))
    print('The best so far wih {}:'.format(best_so_far.wih))

    pygame.quit()
    sys.exit()
                

# General Setup
pygame.init()
clock = pygame.time.Clock()
game_font = pygame.font.Font('04B_19.ttf',40)
        
# Main Window
screen = pygame.display.set_mode((576,1024))
bg_surface = pygame.image.load('assets/background-day.png').convert()
bg_surface = pygame.transform.scale2x(bg_surface)
        
# Game Objects
floor_surface1 = Floor('assets/base.png',0,900)
floor_surface2 = Floor('assets/base.png',576,900)
bird_surface = Bird('assets/bluebird-midflap.png',100,512)
pipe_surface = Pipe('assets/pipe-green.png')
pipe_list = []
SPAWNPIPE = pygame.USEREVENT
pygame.time.set_timer(SPAWNPIPE,800)

# Global Variable
game_active = True


# number of input, hidden and output nodes 
input_nodes = 4
hidden_nodes = 7
output_nodes = 1


POP_SIZE = 10 # defining population size
NUM_GEN = 300

X_BIAS = 0.8
MUT_RATE = 0.3
STEP_SIZE = 0.05

person = [None] * POP_SIZE
offspring = [None] * POP_SIZE

best_fitness = [None] * (NUM_GEN+1)

flag_dead = False
indiv = 0
gen = 1

# initialize best so far
best_so_far = Individual(input_nodes,hidden_nodes,output_nodes,0)


# Create first population
print('Geration : 0, HELLO WORLD!')
print('Indiv\twho\tFitness')
print('------------------------------------------------')

for i in range(POP_SIZE):
    person[i] = Individual(input_nodes,hidden_nodes,output_nodes,0)
    offspring[i] = person[i]
    print('{}\t{}\t{}'.format(i,
                              person[i].who,
                              'UNKNOWN'))

for x in range(1000):
    send_robot_command("MOVE_DOWN")
    

while True:
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            end_game()

        if event.type == pygame.KEYDOWN:
            if event.key == pygame.K_SPACE and game_active:
                bird_surface.jump()
            if event.key == pygame.K_SPACE and game_active == False:
                game_active = True
                bird_surface.rect.center = (100,512)
                bird_surface.bird_movement = 0
                pipe_list.clear()
                bird_surface.score = 0

                for x in range(1000):
                    send_robot_command("MOVE_DOWN")

                flag_dead = False

        if event.type == SPAWNPIPE:
            pipe_list.extend(pipe_surface.create_pipe())
               
    # Background
    screen.blit(bg_surface, (0,0))
    
    if game_active:
        
        # Bird
        bird_surface.draw(screen)
        bird_surface.move()

        game_active = bird_surface.check_collision(pipe_list)

        # Pipe
        pipe_list = pipe_surface.move(pipe_list)
        pipe_surface.draw(screen,pipe_list)
        
        bird_surface.pipe_score_check(pipe_list)
        bird_surface.score_display()
        
        # Data input
        bird_y = bird_surface.pos_y()
        pipes_x = pipe_surface.pos_x(pipe_list)
        pipe_y_bottom = pipe_surface.pos_y_bottom(pipe_list)
        pipe_y_top = pipe_surface.pos_y_top(pipe_list)
        
        person[indiv].fitness += 0.01
        

        # Keep it neutral when the pipes has not shown on the screen
        if pipes_x == None:
            pipes_x = 500
            pipe_y_bottom = 600
            pipe_y_top = 900
            
        
        data_inputs = numpy.array([bird_y, pipes_x, pipe_y_bottom, pipe_y_top])
        # Bird think using the Artificial Neural Network
        data_output = person[indiv].query(data_inputs)
        
    
        
        if data_output >= 0.5:
            #send H to microcontroler
            send_robot_command("MOVE_UP")
            #bird_surface.jump()
        else:
            send_robot_command("MOVE_DOWN")

        
        
        # Show Bird ID on screen
        indiv_surface = game_font.render(f'Bird ID: {indiv}',True,(255,255,255))
        screen.blit(indiv_surface, (20,10))
        
        # Show generation on screen
        gen_surface = game_font.render(f'Generation: {gen}',True,(255,255,255))
        screen.blit(gen_surface, (20,150))
            

            
    else:
        # Print the performance after the player is death
        if flag_dead == False:
            flag_dead = True


            send_robot_command("MOVE_UP")

            if indiv < POP_SIZE-1:
                game_active = True
                bird_surface.rect.center = (100,512)
                bird_surface.bird_movement = 0
                pipe_list.clear()   
                bird_surface.score = 0
                #person[indiv].fitness = 0

                flag_dead = False
                indiv += 1
                
                
            else:                                
                
                    
                # select parents and generating offspring phenotype
                for indiv in range(POP_SIZE):

                    #Mom
                    i1 = random.randrange(POP_SIZE) # choose parent 
                    i2 = random.randrange(POP_SIZE) # choose parent 
                    i3 = random.randrange(POP_SIZE) # choose parent 

                    #Tournament
                    if person[i1].fitness >= person[i2].fitness:
                        mom = i1
                    else:
                        mom = i2
                    if person[i3].fitness >= person[mom].fitness:
                        mom = i3

                    #Dad
                    i1 = random.randrange(POP_SIZE) # choose parent 
                    i2 = random.randrange(POP_SIZE) # choose parent 
                    i3 = random.randrange(POP_SIZE) # choose parent 

                    #Tournament
                    if person[i1].fitness >= person[i2].fitness:
                        dad = i1
                    else:
                        dad = i2
                    if person[i3].fitness >= person[dad].fitness:
                        dad = i3
                    
                    #Crossover
                    
                    # Crossover for who
                    for i in range(hidden_nodes):
                        if random.random()< X_BIAS:
                            offspring[indiv].who[0,i] = person[mom].who[0][i]
                        else:
                            offspring[indiv].who[0,i] = person[dad].who[0][i]

                    # Crossover for wih
                    for i in range(input_nodes):
                        for ii in range(hidden_nodes):
                            if random.random()< X_BIAS:
                                offspring[indiv].wih[ii,i] = person[mom].wih[ii][i]
                            else:
                                offspring[indiv].wih[ii,i] = person[dad].wih[ii][i]
                    
                    #Mutation
    
                    # Mutation for who
                    for i in range(hidden_nodes):
                        if random.random()< MUT_RATE:
                            r = (random.randint(0, 1))%2 *2-1 # create a number either -1 or 1 (sign)
                            offspring[indiv].who[0,i] += r*STEP_SIZE

                    # Mutation for wih
                    for i in range(input_nodes):
                        for ii in range(hidden_nodes):
                            if random.random()< MUT_RATE:
                                r = (random.randint(0, 1))%2 *2-1 # create a number either -1 or 1 (sign)
                                offspring[indiv].wih[ii,i] += r*STEP_SIZE
                
                # update statistical analysis
                best_fitness[gen] = person[0].fitness
                
                print('Generation : {}'.format(gen))
                print('Indiv\twho\tFitness')
                print('------------------------------------------------')
                for i in range(POP_SIZE):
                    print('{}\t{}\t{}'.format(i,
                                              person[i].who,
                                              round(person[i].fitness,5)))
                    #update statistical analysis
                    if person[i].fitness >= best_fitness[gen]:
                        best_indiv = i
                        best_fitness[gen] = person[i].fitness

                
                
                if best_fitness[gen] > best_so_far.fitness:
                    best_so_far.who = person[best_indiv].who.copy()
                    best_so_far.wih = person[best_indiv].wih.copy()
                    best_so_far.fitness = person[best_indiv].fitness
        
                                                    
                print('The best fitness is {}'.format(round(best_fitness[gen],5)))
                print('The best so far is {}:'.format(round(best_so_far.fitness,5)))
                                
                
                print('Offspring :')
                print('Indiv\twho\tFitness')
                print('------------------------------------------------')
                for i in range(POP_SIZE):
                    print('{}\t{}\t{}'.format(i,
                                              person[i].who,
                                              'UNKNOWN'))

                # Restart for having a new generation
                if gen < NUM_GEN:
                    gen += 1
                    game_active = True
                    bird_surface.rect.center = (100,512)
                    bird_surface.bird_movement = 0
                    pipe_list.clear()
                    bird_surface.score = 0
                    indiv = 0 # restart for having a new generation
                    flag_dead = False
                    
                    # Next generation parents are replaced by the offspring
                    for i in range(POP_SIZE):
                        person[i] = offspring[i]
                        #restart fitness
                        person[i].fitness = 0
                        #TO DO with best so far

                    
                else :
                    end_game()
                    
                    
                
    # Floors
    floor_surface1.draw(screen)
    floor_surface2.draw(screen)
    floor_surface1.move()
    floor_surface2.move()
    
    pygame.display.update()
    clock.tick(120)

    

Robot Controller Script

    
import socket
import time
import urx
import numpy as np

PI = 3.1415926535

# Robot parameters (same as original)
acc_cmd = 1
vel_cmd = 0.3

# Pre-defined joint angles for home position
j_root_deg = [0, -90, 135, 225, -90, 90]

def deg2rad(deg):
    return [d * PI / 180 for d in deg]

def translateEndEffectorGlobal(robot, p_des, p_root):
    # p_des is the offset from p_root
    p_cmd = robot.getl()
    for i in range(len(p_des)):
        p_cmd[i] = p_des[i] + p_root[i]
    p_cmd[3:6] = p_root[3:6]
    print("Moving to global position: ", p_cmd)
    robot.movel(p_cmd, acc=acc_cmd, vel=vel_cmd)

def main():
    # Connect to robot
    print("Connecting to robot...")
    robot = urx.Robot("192.168.1.52")
    robot.set_tcp((0, 0, 0.1, 0, 0, 0))
    robot.set_payload(2, (0, 0, 0.1))

    # Move robot to home and store p_root
    j_root = deg2rad(j_root_deg)
    print("Moving robot to home...")
    robot.movej(j_root, acc=acc_cmd, vel=vel_cmd)
    time.sleep(0.5)
    p_root = robot.getl()
    print("Robot home position (p_root):", p_root)

    # Define commands
    # MOVE_HOME: Go back to j_root and set p_root
    # MOVE_UP: Move to [0.09, 0.16, -0.13] relative to p_root
    # MOVE_DOWN: Move to [0.09, 0.16, -0.194] relative to p_root
    # You can adjust these as needed.

    HOST = '127.0.0.1'  # or robot server IP if needed
    PORT = 50007
    print("Starting robot server on {}:{}".format(HOST, PORT))
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind((HOST, PORT))
        s.listen(1)
        print("Robot server listening...")
        conn, addr = s.accept()
        with conn:
            print('Connected by', addr)
            while True:
                data = conn.recv(1024)
                if not data:
                    break
                command = data.decode('utf-8').strip()

                if command == "MOVE_HOME":
                    # Move robot to home position
                    robot.movej(j_root, acc=acc_cmd, vel=vel_cmd)
                    time.sleep(0.5)
                    p_root = robot.getl()
                    print("Moved to home, updated p_root:", p_root)

                elif command == "MOVE_UP":
                    translateEndEffectorGlobal(robot, [0.09, 0.16, -0.13], p_root)

                elif command == "MOVE_DOWN":
                    translateEndEffectorGlobal(robot, [0.09, 0.16, -0.194], p_root)

                # You can add more commands if needed.
                # For now, just acknowledge
                conn.sendall(b"OK")

    # Close robot connection when done
    robot.close()
    print("Robot server closed.")

if __name__ == "__main__":
    main()