#!/usr/bin/env python3 import os import time import json import hashlib import textwrap import subprocess from pathlib import Path from datetime import datetime from enum import Enum, auto # --- config you may want to tweak --- AUDIO_DEVICE = "default" # e.g. "hw:1,0" AUDIO_SECONDS = 5 PRINTER_DEVICE = "/dev/usb/lp0" LCD_I2C_ADDRESS = 0x27 LCD_COLS = 20 LCD_ROWS = 4 MAX_RECEIPT_WIDTH = 32 MODEL_TRANSCRIBE = "gpt-4o-mini-transcribe" MODEL_TEXT = "gpt-4o-mini" # --- LCD --- from RPLCD.i2c import CharLCD class LCD: def __init__(self): self.lcd = CharLCD( i2c_expander="PCF8574", address=LCD_I2C_ADDRESS, cols=LCD_COLS, rows=LCD_ROWS, ) self.clear() def clear(self): self.lcd.clear() def show(self, *lines): self.clear() for i, line in enumerate(lines[:LCD_ROWS]): self.lcd.cursor_pos = (i, 0) self.lcd.write_string(line[:LCD_COLS]) def idle(self): self.show(" TAP CARD ", " FOR KNOWLEDGE ", "", "") def listening(self): self.show("SPEAK NOW", f"{AUDIO_SECONDS} SECONDS...", "", "") def processing(self): self.show("PROCESSING...", "", "", "") def printing(self): self.show("PRINTING", "", "", "") def error(self, msg="ERROR"): self.show(msg[:LCD_COLS], "", "", "") # --- RFID --- import RPi.GPIO as GPIO from mfrc522 import SimpleMFRC522 class RFID: def __init__(self): self.reader = SimpleMFRC522() def wait_for_card(self): try: while True: uid, _ = self.reader.read() if uid: return str(uid) finally: GPIO.cleanup() # --- audio recording --- class Recorder: def __init__(self, device=AUDIO_DEVICE, seconds=AUDIO_SECONDS): self.device = device self.seconds = seconds def record(self, path: Path): cmd = [ "arecord", "-D", self.device, "-f", "S16_LE", "-r", "16000", "-c", "1", "-d", str(self.seconds), "-t", "wav", str(path), ] subprocess.run(cmd, check=True) return path # --- printer (raw ESC/POS) --- class Printer: def __init__(self, device=PRINTER_DEVICE): self.device = device def write(self, data: bytes): with open(self.device, "wb", buffering=0) as f: f.write(data) def text(self, s: str): self.write(s.encode("ascii", errors="ignore")) def cut(self, feed_lines=6): self.text("\n" * feed_lines) self.write(b"\x1dV\x00") def code39(self, data: str): self.text("\nBARCODE\n") payload = f"*{data}*" # GS k m d...NUL (m=4 for CODE39 old style) self.write(b"\x1d\x6b\x04" + payload.encode("ascii") + b"\x00") self.text("\n\n") # --- receipt formatting --- def wrap(text, width=MAX_RECEIPT_WIDTH): return "\n".join(textwrap.wrap(text, width=width, replace_whitespace=False)) def center(text, width=MAX_RECEIPT_WIDTH): text = text[:width] pad = max(0, (width - len(text)) // 2) return " " * pad + text def ticket_id(uid: str) -> str: h = hashlib.sha1(uid.encode("utf-8")).hexdigest()[:8].upper() return f"KVM-{h}" def build_receipt(uid, question, answer, annex=None): now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") tid = ticket_id(uid) lines = [] lines.append("=" * MAX_RECEIPT_WIDTH) lines.append(center("KNOWLEDGE RECEIPT")) lines.append(center("DEPT OF INFORMATION CONTROL")) lines.append("-" * MAX_RECEIPT_WIDTH) lines.append(f"TIME: {now}") lines.append(f"UID: {uid}") lines.append(f"CASE: {tid}") lines.append("-" * MAX_RECEIPT_WIDTH) lines.append("Q:") lines.append(wrap(question)) lines.append("") lines.append("A (OFFICIAL SUMMARY):") lines.append(wrap(answer)) if annex: lines.append("") lines.append("ANNEX A (COMPUTED):") lines.append(wrap(annex)) lines.append("-" * MAX_RECEIPT_WIDTH) lines.append(center("UNAUTHORISED COPIES")) lines.append(center("ARE ENCOURAGED")) lines.append("=" * MAX_RECEIPT_WIDTH) lines.append("") return "\n".join(lines) # --- OpenAI client --- from openai import OpenAI client = OpenAI() def transcribe(path: Path) -> str: with open(path, "rb") as f: r = client.audio.transcriptions.create( model=MODEL_TRANSCRIBE, file=f, ) return r.text.strip() def ask_llm(question: str) -> dict: system_prompt = ( "You are the core of a bureaucratic Knowledge Vending Machine. " "Return JSON with keys 'answer' and 'code'. " "'answer' is plain ASCII, <=100 words, neutral, official. " "'code' is either null or Python defining run() -> str, " "no imports, no I/O, just pure computation." ) resp = client.responses.create( model=MODEL_TEXT, input=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": question}, ], response_format={"type": "json_object"}, max_output_tokens=256, ) data = json.loads(resp.output_text) if "answer" not in data: data["answer"] = resp.output_text if "code" not in data: data["code"] = None return data # --- tiny sandbox --- class SandboxError(Exception): pass class Sandbox: banned = ["import ", "open(", "exec(", "eval(", "os.", "subprocess.", "__"] def run(self, code: str | None) -> str | None: if not code: return None for b in self.banned: if b in code: raise SandboxError(f"banned token {b}") loc = {} glb = {"__builtins__": {"len": len, "min": min, "max": max, "sum": sum, "round": round}} exec(code, glb, loc) if "run" not in loc: raise SandboxError("no run()") out = loc["run"]() if out is None: return None return str(out) # --- state machine --- class State(Enum): IDLE = auto() LISTEN = auto() PROCESS = auto() PRINT = auto() class Machine: def __init__(self): self.lcd = LCD() self.rfid = RFID() self.rec = Recorder() self.prn = Printer() self.sbx = Sandbox() self.state = State.IDLE def loop(self): while True: try: self.run_once() except KeyboardInterrupt: break except Exception as e: self.lcd.error("FATAL") self.prn.text(f"\nERROR: {e}\n") self.prn.cut() time.sleep(2) def run_once(self): # IDLE self.state = State.IDLE self.lcd.idle() uid = self.rfid.wait_for_card() # LISTEN self.state = State.LISTEN self.lcd.listening() audio = Path("/tmp/kvm_question.wav") self.rec.record(audio) # PROCESS self.state = State.PROCESS self.lcd.processing() question = transcribe(audio) data = ask_llm(question) ans = str(data.get("answer", "")).strip() code = data.get("code") annex = None if code: try: annex = self.sbx.run(code) except Exception as e: annex = f"Code error: {e}" # PRINT self.state = State.PRINT self.lcd.printing() receipt = build_receipt(uid, question, ans, annex) self.prn.text(receipt) # simple, stable barcode self.prn.code39(uid[-8:] if len(uid) > 8 else uid) self.prn.cut() time.sleep(1) def main(): m = Machine() m.loop() if __name__ == "__main__": main()