Week 13

Machine Building

Intro - Dramaticotron >9000

We wanted to make a camera rails system that would take any normal looking, boring "thing" and make a highly dramatic, super cool, intense video of it.

Our group documentation is here: Group Documentations for Machine WEEK CBA 2023

My Role - Servers and Frontends

So you have now read the CBA team doc and learned how we made the thing - but what did I do?

Video Server

After testing extensively the ESP32CAM, we realized that the video/picture was grainy and worse - when the rails moved, the read-in of values was too slow, so we got this shearing/skew of the video. The only solution was moving slower and a lower resolution, which made the resulting video not too nice.

So we decided to switch to a Raspberry Pi connected to an HD USB webcam. Since we already had way too many cables, the PI was helpful so that we didn't need 1 compter plugged into every Pi.

What I did was:

Here is the code for that, since it's so small (also thanks to Lancelot Blanchard for adding some stuff to this):

                    
                    import subprocess
import os
import sys
import asyncio
import aiohttp
import shlex
import random

from aiohttp import web
import subprocess
import re

def get_audio_devices():
    try:
        result = subprocess.run(['arecord', '-l'], capture_output=True, text=True, check=True)
        return result.stdout
    except subprocess.CalledProcessError as e:
        print(f"Error: {e}")
        return None

def find_audio_device_id(device_name):
    audio_output = get_audio_devices()

    if audio_output:
        # Use regular expression to find the audio device ID based on the device name
        pattern = re.compile(r'card (\d+): .* \[.*' + re.escape(device_name) + r'\]')
        match = pattern.search(audio_output)

        if match:
            return match.group(1)

    return None

#set this to the device num of the plugged in webcam
video_device_num = 0
audio_device_num = find_audio_device_id("HD Pro Webcam C920")
print(f"Found Audio Device {audio_device_num}")
video_recordings_dir = "/dev/shm"

async def start_record(request):
    #get args
    duration = request.query.get('duration', 10)  # Default duration is 10 seconds
    session_id = request.query.get('session_id', random.randint(999,9999))
    try:
        duration = int(duration)
    except ValueError:
        return web.Response(text="Invalid duration parameter. Please provide an integer.")
    try:
        session_id = int(session_id)
    except ValueError:
        return web.Response(text="Invalid session id parameter. Please provide an integer.")

    # Start the ffmpeg subprocess asynchronously
    async def run_ffmpeg():
        video_file_name = f"{session_id}_pi_video.mkv"
        video_full_path = os.path.join(video_recordings_dir, video_file_name)
        video_record_time = duration
        #video_record_command = f"ffmpeg -y -f v4l2 -framerate 30 -video_size 1280x720 -input_format mjpeg -i /dev/video{video_device_num} -t {video_record_time} -c copy {video_full_path}"
        video_record_command = f"ffmpeg -y -thread_queue_size 4096 -f alsa -i plughw:{audio_device_num},0 -f v4l2 -framerate 25 -video_size 1920x1080 -input_format mjpeg -i /dev/video{video_device_num} -t {video_record_time} -c copy {video_full_path}"
        cmd = shlex.split(video_record_command)
        proc = await asyncio.create_subprocess_exec(*cmd)
        await proc.communicate()

    asyncio.ensure_future(run_ffmpeg())

    return web.Response(text=f"Recording for {duration} seconds...")

async def download_video(request):
    print("Download video request")
    session_id = request.query.get('session_id')
    if not session_id:
        return web.Response(text="Please provide a 'session_id' parameter.")
    print(f"-- Download video request ID: {session_id}")

    video_file_name = f"{session_id}_pi_video.mkv"
    video_path = os.path.join(video_recordings_dir, video_file_name)

    if not os.path.isfile(video_path):
        return web.Response(text=f"Video for session_id {session_id} not found.")

    response = web.FileResponse(video_path)
    response.headers['Content-Disposition'] = f'attachment; filename="{video_path}"'
    return response

async def coucou(request):
    return web.Response(text="Coucou")

app = web.Application()
app.router.add_get('/start_record', start_record)
app.router.add_get('/download_video', download_video)
app.router.add_get('/coucou', coucou)

if __name__ == '__main__':
    web.run_app(app)


                    
                    
Frontend Control Panel

The frontend, in all its glory, controlling 3 steppers, 6 servos, 21 LEDs, and 4 RPi webcams at the same time:

Chance Li and I made did the frontend control panel that controlled the servos, steppers, LEDs, and cameras of the system

This frontend used Modular Things to connect to and control components. We added many buttons and sliders to control the system dynamically.

I added code to do a "naive tracking" of object - specify an object postiion, and the servos would track that location as they slide along the track.

I also added code to ping all the RPis to start and download all of the videos from them.

Here is the frontend.

I added code to do a "naive tracking" of object - specify an object postiion, and the servos would track that location as they slide along the track.