import math import cv2 import numpy as np from typing import Optional, Tuple, List import logging from motor.motor import MotorController _LOGGER = logging.getLogger(__name__) class DriveController: def __init__(self, wheel_base: float, wheel_circumference: float, use_homography: bool = True): """ Args: wheel_base: Distance between left and right drive wheels (meters) wheel_circumference: Circumference of drive wheels (meters) meters_per_inch: Conversion factor pts_image_plane: Homography calibration points (image) pts_ground_plane: Homography calibration points (ground, inches) """ self.wheel_base = float(wheel_base) self.wheel_radius = 0.097/2 # meters self.stop_distance = 0.1 # meters self.seq_num = 0 self.direction_map = { "forward": [-1, 1], "backward": [1, -1], "cw": [1, 1], "ccw": [-1, -1], } self.motor_controller = MotorController() self.motor_controller.start_motor_demo() # Roller ball distance is stored but not used in differential drive control # (it only affects physical stability, not control calculations) # Default homography points pts_image_plane = [[329.0, 188.0], [521.0, 223.0], [93.0, 269.0], [265.0, 192.0], [47.0, 228.0]] pts_ground_plane = [[69.0, 0.0], [37.5, -21.0], [25.0, 18.0], [70.75, 13.0], [38.0, 30.0]] self._meters_per_inch = 0.0254 self._homography = self._compute_homography(np.array(pts_image_plane, dtype=np.float32), np.array(pts_ground_plane, dtype=np.float32) * self._meters_per_inch) def _compute_homography(self, img_pts: np.ndarray, ground_pts: np.ndarray) -> np.ndarray: h, status = cv2.findHomography(img_pts, ground_pts) if h is None: _LOGGER.warning("Homography computation failed; using identity") return np.eye(3, dtype=np.float64) return h def pixel_to_ground(self, u: float, v: float) -> Optional[Tuple[float, float]]: """Transform image pixel (u,v) to ground-plane (x,y) in meters.""" p = np.array([[u], [v], [1.0]], dtype=np.float64) xyw = self._homography.dot(p) if abs(xyw[2, 0]) < 1e-9: return None xy = xyw / xyw[2, 0] return float(xy[0, 0]), float(xy[1, 0]) def get_relative_position(self, bbox: List[float]) -> Optional[Tuple[float, float]]: """Return ground-plane x,y (meters) for bottom-center of bbox. bbox: [x1,y1,x2,y2] """ x1, y1, x2, y2 = bbox u = (x1 + x2) / 2.0 v = float(y2) return self.pixel_to_ground(u, v) # speed is 2 to 32 microsteps def manual_drive(self, direction: str, speed: int =2) -> dict: """Compute left/right wheel velocities for manual driving commands. Args: direction: One of "forward", "backward", "cw", "ccw" speed: Linear speed in m/s for forward/backward, angular speed in rad/s for cw/ccw Returns: dict: {v_left, v_right, stop} """ if direction not in self.direction_map: if direction == "stop": self.motor_controller.stop_all_motors() return { "status": 200, "stop": True } elif direction == "start": self.motor_controller.start_motor_demo() return { "status": 200, "stop": False } else: self.motor_controller.stop_all_motors() return { "status": 400, "message": "Unknown direction", "stop": True } if direction == "forward": self.motor_controller.forward(speed) return { "status": 200, "stop": False, "direction": "forward", "speed": speed } elif direction == "backward": self.motor_controller.backward(speed) return { "status": 200, "stop": False, "direction": "backward", "speed": speed } elif direction == "cw": self.motor_controller.cw(speed) return { "status": 200, "stop": False, "direction": "cw", "speed": speed } elif direction == "ccw": self.motor_controller.ccw(speed) return { "status": 200, "stop": False, "direction": "ccw", "speed": speed } else: self.motor_controller.stop_all_motors() return { "status": 400, "message": "Unknown direction", "stop": True } def controller(self, x: float, y: float, bbox, lookahead_gain: float = 1.0, center_coords: Optional[Tuple[float, float]] = None, image_size: Optional[Tuple[int, int]] = None) -> dict: """ Args: center_coords: (u, v) pixel coords of bbox center image_size: (width, height) of image CHANGED such that it doesn't take into account distance, just angle from center. """ L = math.hypot(x, y) bbox_w = bbox[2] - bbox[0] # x2 - x1 bbox_h = bbox[3] - bbox[1] # y2 - y1 bbox_w_ratio = bbox_w / image_size[0] if image_size else 0 bbox_h_ratio = bbox_h / image_size[1] if image_size else 0 threshold_w = 0.6 # Example threshold for width ratio if bbox_w_ratio > threshold_w: return {"v_left": 0.0, "v_right": 0.0, "stop": True, "distance_m": float(L)} if L < self.stop_distance: return {"v_left": 0.0, "v_right": 0.0, "stop": True, "distance_m": float(L)} # constant base speed (ignore distance) v_linear = 0.15 # m/s, adjust as needed # angular velocity from pixel offset if center_coords and image_size: u_center, v_center = center_coords img_width, img_height = image_size # normalized offset: -1 (left edge) to +1 (right edge) offset = (u_center - img_width/2) / (img_width/2) # proportional control: turn rate proportional to offset k_turn = 1.5 # tuning parameter, adjust for responsiveness omega = -k_turn * offset # negative bc right offset → turn right (positive omega) else: # fallback to ground coords if no pixel info omega = (2.0 * y) / (L * L) * v_linear # differential drive half_base = self.wheel_base / 2.0 v_left = v_linear - (omega * half_base) v_right = v_linear + (omega * half_base) # hard clamp max_wheel_speed = 0.2 v_left = max(-max_wheel_speed, min(max_wheel_speed, v_left)) v_right = max(-max_wheel_speed, min(max_wheel_speed, v_right)) return { "v_left": float(v_left), "v_right": float(v_right), "stop": False, "distance_m": float(L) } def velocities_to_wheel_speeds(self, v_left: float, v_right: float) -> Tuple[float, float]: """Convert linear wheel velocities (m/s) to angular wheel speeds (rad/s). Useful if your motor controller expects radians/second. """ omega_left = v_left / self.wheel_radius omega_right = v_right / self.wheel_radius return omega_left, omega_right # IDK ABOUT THIS THIS IS PURE CHATGPT class StepperTranslator: """Translate linear wheel velocities (m/s) to discrete step deltas for steppers. The translator keeps fractional remainder between calls so that over time the integer step stream matches the desired continuous velocity (no systematic drift). Use `step_deltas(v_left, v_right, dt)` each loop iteration and send the returned integer deltas to your motor interface. If your device only accepts small per-packet deltas (e.g. max 32), use `chunk_pair` to split them. Example: trans = StepperTranslator(microsteps_per_rev=6400, wheel_circ=0.3049) while True: dl, dr = trans.step_deltas(v_left, v_right, dt=0.01) # optionally split into sub-deltas and send each to device left_chunks, right_chunks = StepperTranslator.chunk_pair(dl, dr, max_delta=32) for a, b in zip(left_chunks, right_chunks): device.write(pack(a,b)) sleep(dt) """ def __init__(self, microsteps_per_rev: int, wheel_circumference: float): self.microsteps_per_rev = int(microsteps_per_rev) self.wheel_circumference = float(wheel_circumference) # steps per meter (linear) self.steps_per_meter = self.microsteps_per_rev / self.wheel_circumference # fractional accumulators for left and right wheels self._acc_l = 0.0 self._acc_r = 0.0 def vel_to_steps_per_sec(self, v: float) -> float: """Return (signed) steps per second for linear velocity v (m/s).""" return float(v) * self.steps_per_meter def step_deltas(self, v_left: float, v_right: float, dt: float = 0.01) -> Tuple[int, int]: """Compute integer step deltas to send for a timestep dt. Returns (steps_left, steps_right) as integers. Internal fractional remainder is preserved for the next call. """ sps_l = self.vel_to_steps_per_sec(v_left) sps_r = self.vel_to_steps_per_sec(v_right) steps_f_l = sps_l * dt + self._acc_l steps_f_r = sps_r * dt + self._acc_r # Round to nearest integer; keep fractional remainder steps_l = int(round(steps_f_l)) steps_r = int(round(steps_f_r)) self._acc_l = steps_f_l - steps_l self._acc_r = steps_f_r - steps_r return steps_l, steps_r @staticmethod def chunk_pair(steps_l: int, steps_r: int, max_delta: int = 32): """Yield lists of chunked deltas (signed) for left and right up to max_delta. Returns two lists (left_chunks, right_chunks) of equal length so they can be iterated in parallel when sending packets that take both deltas together. """ if max_delta <= 0: raise ValueError("max_delta must be positive") # Helper to create chunk list for a signed integer def chunks(x: int): out = [] remaining = abs(x) sign = -1 if x < 0 else 1 while remaining > 0: this = min(remaining, max_delta) out.append(sign * this) remaining -= this if not out: out = [0] return out left_chunks = chunks(steps_l) right_chunks = chunks(steps_r) # pad shorter list with zeros so zipping is safe max_len = max(len(left_chunks), len(right_chunks)) left_chunks += [0] * (max_len - len(left_chunks)) right_chunks += [0] * (max_len - len(right_chunks)) return left_chunks, right_chunks