import base64
from flask import Flask, request, jsonify
import io
import math
from PIL import Image
import numpy as np
import cv2
import requests
from model.detector import Detector
from drive_controller import DriveController
import argparse
from datetime import datetime
import signal
import sys
import os
from multiprocessing import Process, Queue
# a custom function that blocks for a moment
def task(queue : Queue):
while True:
if not queue.empty():
item = queue.get()
print(f"Processing item: {item}")
# block for a moment
# sleep(1)
# display a message
# print('This is from another process')
app = Flask(__name__)
# TODO : adjust these calibration points as needed
wheel_base = 0.45 # meters 45 cm
drive_controller = DriveController(wheel_base=wheel_base, wheel_circumference=0.1, use_homography=True)
# Lazy-loaded detector to avoid heavy startup cost on import
_detector = None
def get_detector():
global _detector
if _detector is None:
# Adjust yolo_dir/from_tensor_rt as needed for your environment
_detector = Detector(threshold=0.5)
return _detector
@app.route("/")
def hello_world():
return "
Hello, World!
"
@app.route("/manual", methods=["GET", "POST"])
def manual():
# A very small web UI allowing a user to send simple manual commands
# to the server. Buttons send JSON POSTs to /manual_cmd.
if request.method == "GET":
html = """
Manual Control
Manual Control
Status: idle
"""
return html
print("Manual control page served")
data = request.get_json(silent=True) # returns dict or None, doesn't raise
if data and 'cmd' in data:
cmd = data['cmd']
else:
# fallback to form fields or raw body
cmd = request.form.get('cmd') or request.values.get('cmd') or request.get_data(as_text=True)
if not cmd:
return jsonify({'status':'error', 'message':'no cmd provided'}), 400
resp = drive_controller.manual_drive(cmd, speed=2)
# do something with cmd (dispatch to motor controller)
print('Manual cmd:', cmd)
print('Drive response:', resp)
return jsonify({'status':'ok', 'cmd': cmd})
@app.route("/_data", methods=["GET", "POST"])
def get_data():
"""
Endpoint to receive data from the ESP32 camera or other device.
If the POST contains an image (multipart form 'image' or raw JPEG bytes), run detector
and compute approximate ground-plane (x,y) for each detection (bottom-center of bbox).
Returns JSON: { detections: [ {bbox:[x1,y1,x2,y2], label:str, ground: [x,y], distance: float}, ... ] }
"""
if request.method == "POST":
# Prefer uploaded file under key 'image'
img = None
if 'image' in request.files:
f = request.files['image']
img = Image.open(f.stream).convert('RGB')
else:
raw = request.get_data()
if raw:
try:
img = Image.open(io.BytesIO(raw)).convert('RGB')
except Exception:
img = None
if img is None:
return jsonify({"status": "error", "message": "image field has no valid image in POST body"}), 400
# Run detector
detector = get_detector()
# detector.predict accepts PIL Image or numpy array
results = detector.predict(img, silent=True)
predictions = results.get('predictions', [])
out = []
for (x1, y1, x2, y2), label in predictions:
# bottom-center pixel coordinate
u = (x1 + x2) / 2.0
v = float(y2)
ground = drive_controller.pixel_to_ground(u, v)
print("Ground coords:", ground, "for bbox:", (x1, y1, x2, y2))
if ground is None:
print("Could not compute ground coords, skipping detection")
continue
drive_command = drive_controller.controller(ground[0], ground[1], center_coords=detector.bbox_center((x1, y1, x2, y2)), image_size=results["image_size"])
out.append(drive_command)
for item in out:
print("Drive command:", item)
if item is None:
return jsonify({"status": "error", "message": "Could not compute drive command"}), 500
return jsonify({"status": "ok", "detections": out}), 200
# GET: return simple readiness/status
return jsonify({"status": "ready", "message": "Send POST (image/jpeg or form 'image') to this endpoint"}), 200
@app.route('/detect_image', methods=['POST', 'GET'])
def detect_image():
"""POST an image (multipart 'image' or raw JPEG bytes). Returns a JPEG with boxes drawn.
Useful for debugging and viewing detections in a browser or via curl.
"""
if request.method == 'GET':
return (
""
"
Upload image for detection
"
""
)
img = None
if 'image' in request.files:
f = request.files['image']
img = Image.open(f.stream).convert('RGB')
else:
raw = request.get_data()
if raw:
try:
img = Image.open(io.BytesIO(raw)).convert('RGB')
except Exception:
img = None
if img is None:
import os
img_path = f"{os.path.dirname(__file__)}/test.jpg"
img = Image.open(img_path)
# results = model.predict(img)
# # fallback to raw body
# raw = request.get_data()
# if raw:
# try:
# img = Image.open(io.BytesIO(raw)).convert('RGB')
# except Exception:
# img = None
if img is None:
return jsonify({"status": "error", "message": "image field has no valid image in POST body"}), 400
detector = get_detector()
results = detector.predict(img, silent=True)
predictions = results.get('predictions', [])
original_image = results.get('original_image', None)
out = detector.draw_box(original_image, predictions, draw_all=True)
if out is None:
return jsonify({'status': 'error', 'message': 'failed to draw boxes'}), 500
# Optionally downscale the annotated image to keep the response small
MAX_WIDTH = 640
try:
img_w, img_h = out.size
except Exception:
# PIL Image should have .size, but fall back if needed
img_w, img_h = getattr(out, 'width', MAX_WIDTH), getattr(out, 'height', MAX_WIDTH)
if img_w > MAX_WIDTH:
# preserve aspect ratio
out = out.copy()
new_h = int(MAX_WIDTH * img_h / img_w)
out.thumbnail((MAX_WIDTH, new_h), Image.LANCZOS)
img_byte_arr = io.BytesIO()
# save with reasonable JPEG quality to further reduce size
out.save(img_byte_arr, format='JPEG', quality=80)
b64 = base64.b64encode(img_byte_arr.getvalue()).decode()
html = f"
Detection result
"
return html
# Global variable to store the latest annotated frame
latest_frame = {
'image': None,
'detections': [],
'timestamp': 0
}
@app.route('/webcam_stream')
def webcam_stream():
"""
Display a web page with live feed showing bounding boxes (drawn server-side) and velocity commands.
Can receive frames from either webcam or ESP32.
"""
html = """
Live Detection Stream
🎥 Live Detection Stream
Source:Webcam Mode:Initializing...
FPS: 0
Drive Commands:
Waiting for detections...
"""
return html
@app.route('/process_webcam_frame', methods=['POST'])
def process_webcam_frame():
"""
Process a single webcam frame: run detection, draw bounding boxes, compute drive commands.
Returns JSON: {
status: 'ok',
annotated_image: base64_encoded_jpeg,
detections: [{
bbox: [x1, y1, x2, y2],
label: str,
ground: [x, y],
drive_command: {...}
}, ...]
}
"""
img = None
if 'image' in request.files:
f = request.files['image']
img = Image.open(f.stream).convert('RGB')
else:
raw = request.get_data()
if raw:
try:
img = Image.open(io.BytesIO(raw)).convert('RGB')
except Exception:
pass
if img is None:
return jsonify({"status": "error", "message": "No valid image in request"}), 400
# Run detector
detector = get_detector()
results = detector.predict(img, silent=True)
predictions = results.get('predictions', [])
original_image = results.get('original_image', img)
# Draw bounding boxes on the image
img_base64 = get_annotated_image_base64(original_image, predictions, detector)
out = compute_drive_commands(predictions, detector, results)
# Store the latest frame globally for ESP32 mode
latest_frame['image'] = img_base64
latest_frame['detections'] = out
latest_frame['timestamp'] = datetime.now()
return jsonify({
"status": "ok",
"annotated_image": img_base64,
"detections": out
}), 200
# helper function to get annotated image as base64
def get_annotated_image_base64(image, predictions, detector):
annotated_img = detector.draw_box(image, predictions, draw_all=True)
if annotated_img is None:
annotated_img = image
# if detector.draw_box returned a numpy array (likely BGR from cv2), convert to PIL
if isinstance(annotated_img, np.ndarray):
arr = annotated_img
# ensure uint8
if arr.dtype != np.uint8:
arr = arr.astype(np.uint8)
if arr.ndim == 2:
# grayscale
annotated_img = Image.fromarray(arr, mode="L")
elif arr.ndim == 3 and arr.shape[2] == 3:
# assume BGR -> RGB
arr = arr[:, :, ::-1]
annotated_img = Image.fromarray(arr, mode="RGB")
elif arr.ndim == 3 and arr.shape[2] == 4:
# assume BGRA -> RGBA
arr = arr[:, :, [2, 1, 0, 3]]
annotated_img = Image.fromarray(arr, mode="RGBA")
else:
# fallback, best-effort
annotated_img = Image.fromarray(arr)
# at this point annotated_img should be a PIL.Image
img_byte_arr = io.BytesIO()
# normalize to RGB for JPEG if needed
if hasattr(annotated_img, "mode") and annotated_img.mode != "RGB":
annotated_img = annotated_img.convert("RGB")
annotated_img.save(img_byte_arr, format='JPEG', quality=85)
img_base64 = base64.b64encode(img_byte_arr.getvalue()).decode('utf-8')
return img_base64
# helper function to compute drive commands from predictions
def compute_drive_commands(predictions, detector, results):
out = []
for (x1, y1, x2, y2), label in predictions:
u = (x1 + x2) / 2.0
v = float(y2)
ground = drive_controller.pixel_to_ground(u, v)
if ground is None:
continue
drive_command = drive_controller.controller(
ground[0],
ground[1],
bbox=(x1, y1, x2, y2),
center_coords=detector.bbox_center((x1, y1, x2, y2)),
image_size=results.get("image_size")
)
item = {
'bbox': [float(x1), float(y1), float(x2), float(y2)],
'label': label,
'ground': [float(ground[0]), float(ground[1])],
'drive_command': drive_command,
}
out.append(item)
return out
@app.route('/esp32_upload', methods=['POST'])
def esp32_upload():
"""
Endpoint for ESP32 to upload frames.
- Processes the image
- Stores annotated result for web display
- Returns only drive commands (no image) to ESP32
"""
img = None
if 'image' in request.files:
f = request.files['image']
img = Image.open(f.stream).convert('RGB')
else:
raw = request.get_data()
if raw:
try:
img = Image.open(io.BytesIO(raw)).convert('RGB')
except Exception:
pass
if img is None:
return jsonify({"status": "error", "message": "No valid image in request"}), 400
# Run detector
detector = get_detector()
results = detector.predict(img, silent=True)
predictions = results.get('predictions', [])
original_image = results.get('original_image', img)
# Draw bounding boxes (for web display only)
img_base64 = get_annotated_image_base64(original_image, predictions, detector)
# Compute drive commands
out = compute_drive_commands(predictions, detector, results)
# Store the latest frame for web display
latest_frame['image'] = img_base64
latest_frame['detections'] = out
latest_frame['timestamp'] = datetime.now()
out = compute_drive_commands(predictions, detector, results)
# only send if person detected
if out:
return jsonify({"status": "ok", "detections": out}), 200
else:
return jsonify({"status": "ok", "detections": [{"v_left": 0.0, "v_right": 0.0, "stop": True}]}), 200
# Return only drive commands to ESP32 (no image!)
# return jsonify({
# "status": "ok",
# "detections": out
# }), 200
@app.route('/get_latest_frame', methods=['GET'])
def get_latest_frame():
"""
Get the latest processed frame (for web display when using ESP32 mode).
"""
if latest_frame['image'] is None:
return jsonify({"status": "waiting", "message": "No frames received yet"}), 200
return jsonify({
"status": "ok",
"annotated_image": latest_frame['image'],
"detections": latest_frame['detections'],
"timestamp": latest_frame['timestamp']
}), 200
if __name__ == "__main__":
# Listen on all interfaces so devices on the network can reach it
queue = Queue()
# Create worker process correctly: pass a tuple for args and mark
# the process as daemon so it won't survive the parent process exit.
process = Process(target=task, args=(queue,))
# Example queue message (you can remove or replace this with real work)
queue.put("hello world")
# Start the background process
process.start()
print('Background process started (pid=%s)' % getattr(process, 'pid', 'unknown'))
# Ensure we terminate the child process cleanly on KeyboardInterrupt or exit
arg_parser = argparse.ArgumentParser(description="YOLO Flask App")
arg_parser.add_argument('--host', type=str, default='0.0.0.0')
arg_parser.add_argument('--homography', type=bool, required=False, default=True)
args = arg_parser.parse_args()
drive_controller.use_homography = args.homography
app.run(host="0.0.0.0", port=80, debug=True)
process.join()
# try:
# # Run Flask. When the user hits Ctrl-C this will raise KeyboardInterrupt
# app.run(host="0.0.0.0", port=80, debug=True)
# except KeyboardInterrupt:
# print('\nKeyboardInterrupt received, shutting down...')
# finally:
# # Attempt graceful shutdown of the background process
# if process.is_alive():
# print('Terminating background process...')
# process.terminate()
# process.join(timeout=2)
# if process.is_alive():
# print('Background process did not exit, killing...')
# # On Unix you could os.kill(pid, 9) — keep it simple here
# print('Shutdown complete')
# # Exit explicitly to avoid any dangling threads/processes
# try:
# sys.exit(0)
# except SystemExit:
# os._exit(0)
'''
Here are several easy ways to make a dummy POST to your Flask app for testing:
Quick curl — send a JPEG file (multipart/form-data)
Use this to simulate the ESP32 uploading an image file under the form key image:
curl -v -X POST -F "image=@/path/to/test.jpg" http://localhost:5000/_data
To hit the debug image-returning endpoint and save the returned annotated JPEG:
curl — send raw JPEG bytes (body)
curl -X POST -F "image=@/path/to/test.jpg" http://localhost:5000/detect_image --output annotated.jpg
open annotated.jpg # macOS; or use an image viewer
If your device posts raw JPEG in the request body (no multipart):
curl -X POST --data-binary "@/path/to/test.jpg" -H "Content-Type: image/jpeg" http://localhost:5000/_data
curl — send JSON or form data (no image)
For testing the non-image code path (JSON or form data):
JSON:
form:
Python requests — multipart (recommended for scripted tests)
Sends a real image like your ESP32 would:
To call /detect_image and save the returned JPEG:
Use Flask's test client (unit-test style, runs inside the Flask app)
Great if you want to call the app without starting the server (fast for automated tests).
Add a small test (run from the repo):
Quick debugging tips
If your endpoint returns JSON, check the JSON for "detections" or error messages.
For /detect_image, verify response header Content-Type is image/jpeg before saving.
If labels are not visible, increase font size or check the PIL font fallback (I use system font if available).
If the server returns a 500, check the Flask console logs where you started the app for tracebacks.
If you'd like, I can:
Add a small test script file to the repo (Python) that runs a few of these tests automatically.
Add a simple web page that lets you upload an image in the browser and view the annotated result.
Which would you prefer?
'''