import base64 from flask import Flask, request, jsonify import io import math from PIL import Image import numpy as np import cv2 import requests from model.detector import Detector from drive_controller import DriveController import argparse from datetime import datetime import signal import sys import os from multiprocessing import Process, Queue # a custom function that blocks for a moment def task(queue : Queue): while True: if not queue.empty(): item = queue.get() print(f"Processing item: {item}") # block for a moment # sleep(1) # display a message # print('This is from another process') app = Flask(__name__) # TODO : adjust these calibration points as needed wheel_base = 0.45 # meters 45 cm drive_controller = DriveController(wheel_base=wheel_base, wheel_circumference=0.1, use_homography=True) # Lazy-loaded detector to avoid heavy startup cost on import _detector = None def get_detector(): global _detector if _detector is None: # Adjust yolo_dir/from_tensor_rt as needed for your environment _detector = Detector(threshold=0.5) return _detector @app.route("/") def hello_world(): return "

Hello, World!

" @app.route("/manual", methods=["GET", "POST"]) def manual(): # A very small web UI allowing a user to send simple manual commands # to the server. Buttons send JSON POSTs to /manual_cmd. if request.method == "GET": html = """ Manual Control

Manual Control

Status: idle
""" return html print("Manual control page served") data = request.get_json(silent=True) # returns dict or None, doesn't raise if data and 'cmd' in data: cmd = data['cmd'] else: # fallback to form fields or raw body cmd = request.form.get('cmd') or request.values.get('cmd') or request.get_data(as_text=True) if not cmd: return jsonify({'status':'error', 'message':'no cmd provided'}), 400 resp = drive_controller.manual_drive(cmd, speed=2) # do something with cmd (dispatch to motor controller) print('Manual cmd:', cmd) print('Drive response:', resp) return jsonify({'status':'ok', 'cmd': cmd}) @app.route("/_data", methods=["GET", "POST"]) def get_data(): """ Endpoint to receive data from the ESP32 camera or other device. If the POST contains an image (multipart form 'image' or raw JPEG bytes), run detector and compute approximate ground-plane (x,y) for each detection (bottom-center of bbox). Returns JSON: { detections: [ {bbox:[x1,y1,x2,y2], label:str, ground: [x,y], distance: float}, ... ] } """ if request.method == "POST": # Prefer uploaded file under key 'image' img = None if 'image' in request.files: f = request.files['image'] img = Image.open(f.stream).convert('RGB') else: raw = request.get_data() if raw: try: img = Image.open(io.BytesIO(raw)).convert('RGB') except Exception: img = None if img is None: return jsonify({"status": "error", "message": "image field has no valid image in POST body"}), 400 # Run detector detector = get_detector() # detector.predict accepts PIL Image or numpy array results = detector.predict(img, silent=True) predictions = results.get('predictions', []) out = [] for (x1, y1, x2, y2), label in predictions: # bottom-center pixel coordinate u = (x1 + x2) / 2.0 v = float(y2) ground = drive_controller.pixel_to_ground(u, v) print("Ground coords:", ground, "for bbox:", (x1, y1, x2, y2)) if ground is None: print("Could not compute ground coords, skipping detection") continue drive_command = drive_controller.controller(ground[0], ground[1], center_coords=detector.bbox_center((x1, y1, x2, y2)), image_size=results["image_size"]) out.append(drive_command) for item in out: print("Drive command:", item) if item is None: return jsonify({"status": "error", "message": "Could not compute drive command"}), 500 return jsonify({"status": "ok", "detections": out}), 200 # GET: return simple readiness/status return jsonify({"status": "ready", "message": "Send POST (image/jpeg or form 'image') to this endpoint"}), 200 @app.route('/detect_image', methods=['POST', 'GET']) def detect_image(): """POST an image (multipart 'image' or raw JPEG bytes). Returns a JPEG with boxes drawn. Useful for debugging and viewing detections in a browser or via curl. """ if request.method == 'GET': return ( "" "

Upload image for detection

" "
" "" "" "
" ) img = None if 'image' in request.files: f = request.files['image'] img = Image.open(f.stream).convert('RGB') else: raw = request.get_data() if raw: try: img = Image.open(io.BytesIO(raw)).convert('RGB') except Exception: img = None if img is None: import os img_path = f"{os.path.dirname(__file__)}/test.jpg" img = Image.open(img_path) # results = model.predict(img) # # fallback to raw body # raw = request.get_data() # if raw: # try: # img = Image.open(io.BytesIO(raw)).convert('RGB') # except Exception: # img = None if img is None: return jsonify({"status": "error", "message": "image field has no valid image in POST body"}), 400 detector = get_detector() results = detector.predict(img, silent=True) predictions = results.get('predictions', []) original_image = results.get('original_image', None) out = detector.draw_box(original_image, predictions, draw_all=True) if out is None: return jsonify({'status': 'error', 'message': 'failed to draw boxes'}), 500 # Optionally downscale the annotated image to keep the response small MAX_WIDTH = 640 try: img_w, img_h = out.size except Exception: # PIL Image should have .size, but fall back if needed img_w, img_h = getattr(out, 'width', MAX_WIDTH), getattr(out, 'height', MAX_WIDTH) if img_w > MAX_WIDTH: # preserve aspect ratio out = out.copy() new_h = int(MAX_WIDTH * img_h / img_w) out.thumbnail((MAX_WIDTH, new_h), Image.LANCZOS) img_byte_arr = io.BytesIO() # save with reasonable JPEG quality to further reduce size out.save(img_byte_arr, format='JPEG', quality=80) b64 = base64.b64encode(img_byte_arr.getvalue()).decode() html = f"

Detection result

" return html # Global variable to store the latest annotated frame latest_frame = { 'image': None, 'detections': [], 'timestamp': 0 } @app.route('/webcam_stream') def webcam_stream(): """ Display a web page with live feed showing bounding boxes (drawn server-side) and velocity commands. Can receive frames from either webcam or ESP32. """ html = """ Live Detection Stream

🎥 Live Detection Stream

Source: Webcam
Mode: Initializing...
Detection feed
FPS: 0
Drive Commands:
Waiting for detections...
""" return html @app.route('/process_webcam_frame', methods=['POST']) def process_webcam_frame(): """ Process a single webcam frame: run detection, draw bounding boxes, compute drive commands. Returns JSON: { status: 'ok', annotated_image: base64_encoded_jpeg, detections: [{ bbox: [x1, y1, x2, y2], label: str, ground: [x, y], drive_command: {...} }, ...] } """ img = None if 'image' in request.files: f = request.files['image'] img = Image.open(f.stream).convert('RGB') else: raw = request.get_data() if raw: try: img = Image.open(io.BytesIO(raw)).convert('RGB') except Exception: pass if img is None: return jsonify({"status": "error", "message": "No valid image in request"}), 400 # Run detector detector = get_detector() results = detector.predict(img, silent=True) predictions = results.get('predictions', []) original_image = results.get('original_image', img) # Draw bounding boxes on the image img_base64 = get_annotated_image_base64(original_image, predictions, detector) out = compute_drive_commands(predictions, detector, results) # Store the latest frame globally for ESP32 mode latest_frame['image'] = img_base64 latest_frame['detections'] = out latest_frame['timestamp'] = datetime.now() return jsonify({ "status": "ok", "annotated_image": img_base64, "detections": out }), 200 # helper function to get annotated image as base64 def get_annotated_image_base64(image, predictions, detector): annotated_img = detector.draw_box(image, predictions, draw_all=True) if annotated_img is None: annotated_img = image # if detector.draw_box returned a numpy array (likely BGR from cv2), convert to PIL if isinstance(annotated_img, np.ndarray): arr = annotated_img # ensure uint8 if arr.dtype != np.uint8: arr = arr.astype(np.uint8) if arr.ndim == 2: # grayscale annotated_img = Image.fromarray(arr, mode="L") elif arr.ndim == 3 and arr.shape[2] == 3: # assume BGR -> RGB arr = arr[:, :, ::-1] annotated_img = Image.fromarray(arr, mode="RGB") elif arr.ndim == 3 and arr.shape[2] == 4: # assume BGRA -> RGBA arr = arr[:, :, [2, 1, 0, 3]] annotated_img = Image.fromarray(arr, mode="RGBA") else: # fallback, best-effort annotated_img = Image.fromarray(arr) # at this point annotated_img should be a PIL.Image img_byte_arr = io.BytesIO() # normalize to RGB for JPEG if needed if hasattr(annotated_img, "mode") and annotated_img.mode != "RGB": annotated_img = annotated_img.convert("RGB") annotated_img.save(img_byte_arr, format='JPEG', quality=85) img_base64 = base64.b64encode(img_byte_arr.getvalue()).decode('utf-8') return img_base64 # helper function to compute drive commands from predictions def compute_drive_commands(predictions, detector, results): out = [] for (x1, y1, x2, y2), label in predictions: u = (x1 + x2) / 2.0 v = float(y2) ground = drive_controller.pixel_to_ground(u, v) if ground is None: continue drive_command = drive_controller.controller( ground[0], ground[1], bbox=(x1, y1, x2, y2), center_coords=detector.bbox_center((x1, y1, x2, y2)), image_size=results.get("image_size") ) item = { 'bbox': [float(x1), float(y1), float(x2), float(y2)], 'label': label, 'ground': [float(ground[0]), float(ground[1])], 'drive_command': drive_command, } out.append(item) return out @app.route('/esp32_upload', methods=['POST']) def esp32_upload(): """ Endpoint for ESP32 to upload frames. - Processes the image - Stores annotated result for web display - Returns only drive commands (no image) to ESP32 """ img = None if 'image' in request.files: f = request.files['image'] img = Image.open(f.stream).convert('RGB') else: raw = request.get_data() if raw: try: img = Image.open(io.BytesIO(raw)).convert('RGB') except Exception: pass if img is None: return jsonify({"status": "error", "message": "No valid image in request"}), 400 # Run detector detector = get_detector() results = detector.predict(img, silent=True) predictions = results.get('predictions', []) original_image = results.get('original_image', img) # Draw bounding boxes (for web display only) img_base64 = get_annotated_image_base64(original_image, predictions, detector) # Compute drive commands out = compute_drive_commands(predictions, detector, results) # Store the latest frame for web display latest_frame['image'] = img_base64 latest_frame['detections'] = out latest_frame['timestamp'] = datetime.now() out = compute_drive_commands(predictions, detector, results) # only send if person detected if out: return jsonify({"status": "ok", "detections": out}), 200 else: return jsonify({"status": "ok", "detections": [{"v_left": 0.0, "v_right": 0.0, "stop": True}]}), 200 # Return only drive commands to ESP32 (no image!) # return jsonify({ # "status": "ok", # "detections": out # }), 200 @app.route('/get_latest_frame', methods=['GET']) def get_latest_frame(): """ Get the latest processed frame (for web display when using ESP32 mode). """ if latest_frame['image'] is None: return jsonify({"status": "waiting", "message": "No frames received yet"}), 200 return jsonify({ "status": "ok", "annotated_image": latest_frame['image'], "detections": latest_frame['detections'], "timestamp": latest_frame['timestamp'] }), 200 if __name__ == "__main__": # Listen on all interfaces so devices on the network can reach it queue = Queue() # Create worker process correctly: pass a tuple for args and mark # the process as daemon so it won't survive the parent process exit. process = Process(target=task, args=(queue,)) # Example queue message (you can remove or replace this with real work) queue.put("hello world") # Start the background process process.start() print('Background process started (pid=%s)' % getattr(process, 'pid', 'unknown')) # Ensure we terminate the child process cleanly on KeyboardInterrupt or exit arg_parser = argparse.ArgumentParser(description="YOLO Flask App") arg_parser.add_argument('--host', type=str, default='0.0.0.0') arg_parser.add_argument('--homography', type=bool, required=False, default=True) args = arg_parser.parse_args() drive_controller.use_homography = args.homography app.run(host="0.0.0.0", port=80, debug=True) process.join() # try: # # Run Flask. When the user hits Ctrl-C this will raise KeyboardInterrupt # app.run(host="0.0.0.0", port=80, debug=True) # except KeyboardInterrupt: # print('\nKeyboardInterrupt received, shutting down...') # finally: # # Attempt graceful shutdown of the background process # if process.is_alive(): # print('Terminating background process...') # process.terminate() # process.join(timeout=2) # if process.is_alive(): # print('Background process did not exit, killing...') # # On Unix you could os.kill(pid, 9) — keep it simple here # print('Shutdown complete') # # Exit explicitly to avoid any dangling threads/processes # try: # sys.exit(0) # except SystemExit: # os._exit(0) ''' Here are several easy ways to make a dummy POST to your Flask app for testing: Quick curl — send a JPEG file (multipart/form-data) Use this to simulate the ESP32 uploading an image file under the form key image: curl -v -X POST -F "image=@/path/to/test.jpg" http://localhost:5000/_data To hit the debug image-returning endpoint and save the returned annotated JPEG: curl — send raw JPEG bytes (body) curl -X POST -F "image=@/path/to/test.jpg" http://localhost:5000/detect_image --output annotated.jpg open annotated.jpg # macOS; or use an image viewer If your device posts raw JPEG in the request body (no multipart): curl -X POST --data-binary "@/path/to/test.jpg" -H "Content-Type: image/jpeg" http://localhost:5000/_data curl — send JSON or form data (no image) For testing the non-image code path (JSON or form data): JSON: form: Python requests — multipart (recommended for scripted tests) Sends a real image like your ESP32 would: To call /detect_image and save the returned JPEG: Use Flask's test client (unit-test style, runs inside the Flask app) Great if you want to call the app without starting the server (fast for automated tests). Add a small test (run from the repo): Quick debugging tips If your endpoint returns JSON, check the JSON for "detections" or error messages. For /detect_image, verify response header Content-Type is image/jpeg before saving. If labels are not visible, increase font size or check the PIL font fallback (I use system font if available). If the server returns a 500, check the Flask console logs where you started the app for tracebacks. If you'd like, I can: Add a small test script file to the repo (Python) that runs a few of these tests automatically. Add a simple web page that lets you upload an image in the browser and view the annotated result. Which would you prefer? '''