We wanted to make a camera rails system that would take any normal looking, boring "thing" and make a highly dramatic, super cool, intense video of it.
Our group documentation is here: Group Documentations for Machine WEEK CBA 2023
So you have now read the CBA team doc and learned how we made the thing - but what did I do?
After testing extensively the ESP32CAM, we realized that the video/picture was grainy and worse - when the rails moved, the read-in of values was too slow, so we got this shearing/skew of the video. The only solution was moving slower and a lower resolution, which made the resulting video not too nice.
So we decided to switch to a Raspberry Pi connected to an HD USB webcam. Since we already had way too many cables, the PI was helpful so that we didn't need 1 compter plugged into every Pi.
What I did was:
Here is the code for that, since it's so small (also thanks to Lancelot Blanchard for adding some stuff to this):
import subprocess
import os
import sys
import asyncio
import aiohttp
import shlex
import random
from aiohttp import web
import subprocess
import re
def get_audio_devices():
try:
result = subprocess.run(['arecord', '-l'], capture_output=True, text=True, check=True)
return result.stdout
except subprocess.CalledProcessError as e:
print(f"Error: {e}")
return None
def find_audio_device_id(device_name):
audio_output = get_audio_devices()
if audio_output:
# Use regular expression to find the audio device ID based on the device name
pattern = re.compile(r'card (\d+): .* \[.*' + re.escape(device_name) + r'\]')
match = pattern.search(audio_output)
if match:
return match.group(1)
return None
#set this to the device num of the plugged in webcam
video_device_num = 0
audio_device_num = find_audio_device_id("HD Pro Webcam C920")
print(f"Found Audio Device {audio_device_num}")
video_recordings_dir = "/dev/shm"
async def start_record(request):
#get args
duration = request.query.get('duration', 10) # Default duration is 10 seconds
session_id = request.query.get('session_id', random.randint(999,9999))
try:
duration = int(duration)
except ValueError:
return web.Response(text="Invalid duration parameter. Please provide an integer.")
try:
session_id = int(session_id)
except ValueError:
return web.Response(text="Invalid session id parameter. Please provide an integer.")
# Start the ffmpeg subprocess asynchronously
async def run_ffmpeg():
video_file_name = f"{session_id}_pi_video.mkv"
video_full_path = os.path.join(video_recordings_dir, video_file_name)
video_record_time = duration
#video_record_command = f"ffmpeg -y -f v4l2 -framerate 30 -video_size 1280x720 -input_format mjpeg -i /dev/video{video_device_num} -t {video_record_time} -c copy {video_full_path}"
video_record_command = f"ffmpeg -y -thread_queue_size 4096 -f alsa -i plughw:{audio_device_num},0 -f v4l2 -framerate 25 -video_size 1920x1080 -input_format mjpeg -i /dev/video{video_device_num} -t {video_record_time} -c copy {video_full_path}"
cmd = shlex.split(video_record_command)
proc = await asyncio.create_subprocess_exec(*cmd)
await proc.communicate()
asyncio.ensure_future(run_ffmpeg())
return web.Response(text=f"Recording for {duration} seconds...")
async def download_video(request):
print("Download video request")
session_id = request.query.get('session_id')
if not session_id:
return web.Response(text="Please provide a 'session_id' parameter.")
print(f"-- Download video request ID: {session_id}")
video_file_name = f"{session_id}_pi_video.mkv"
video_path = os.path.join(video_recordings_dir, video_file_name)
if not os.path.isfile(video_path):
return web.Response(text=f"Video for session_id {session_id} not found.")
response = web.FileResponse(video_path)
response.headers['Content-Disposition'] = f'attachment; filename="{video_path}"'
return response
async def coucou(request):
return web.Response(text="Coucou")
app = web.Application()
app.router.add_get('/start_record', start_record)
app.router.add_get('/download_video', download_video)
app.router.add_get('/coucou', coucou)
if __name__ == '__main__':
web.run_app(app)
The frontend, in all its glory, controlling 3 steppers, 6 servos, 21 LEDs, and 4 RPi webcams at the same time:
Chance Li and I made did the frontend control panel that controlled the servos, steppers, LEDs, and cameras of the system
This frontend used Modular Things to connect to and control components. We added many buttons and sliders to control the system dynamically.
I added code to do a "naive tracking" of object - specify an object postiion, and the servos would track that location as they slide along the track.
I also added code to ping all the RPis to start and download all of the videos from them.
Here is the frontend.
I added code to do a "naive tracking" of object - specify an object postiion, and the servos would track that location as they slide along the track.