from flask import Flask, jsonify, request, send_file, send_from_directory from PIL import Image, ImageDraw, ImageFont import os import io import hashlib import time from functools import wraps import requests from requests.exceptions import HTTPError app = Flask(__name__, static_folder='static') # API Key authentication API_KEY = os.environ.get("HTMAA_API_KEY") if not API_KEY: raise ValueError("HTMAA_API_KEY environment variable must be set") # Storage for images and current selection IMAGES_DIR = "images" CURRENT_IMAGE_FILE = os.path.join(IMAGES_DIR, ".current") CURRENT_HASH_FILE = os.path.join(IMAGES_DIR, ".current_hash") os.makedirs(IMAGES_DIR, exist_ok=True) def get_current_image_name(): """Read the current image name from file.""" if os.path.exists(CURRENT_IMAGE_FILE): with open(CURRENT_IMAGE_FILE, "r") as f: return f.read().strip() or None return None def get_current_image_hash(): """Read the hash of the current image.""" if os.path.exists(CURRENT_HASH_FILE): with open(CURRENT_HASH_FILE, "r") as f: return f.read().strip() or None return None def compute_image_hash(image_path): """Compute SHA256 hash of an image file.""" sha256_hash = hashlib.sha256() with open(image_path, "rb") as f: sha256_hash.update(f.read()) return sha256_hash.hexdigest() def set_current_image_name(name): """Write the current image name and hash to file.""" with open(CURRENT_IMAGE_FILE, "w") as f: f.write(name if name else "") # Compute and record hash if name: image_path = os.path.join(IMAGES_DIR, f"{name}.bin") if os.path.exists(image_path): image_hash = compute_image_hash(image_path) with open(CURRENT_HASH_FILE, "w") as f: f.write(image_hash) else: with open(CURRENT_HASH_FILE, "w") as f: f.write("") else: with open(CURRENT_HASH_FILE, "w") as f: f.write("") def require_api_key(f): """Decorator to require API key authentication.""" @wraps(f) def decorated_function(*args, **kwargs): # Let CORS preflight through without auth if request.method == 'OPTIONS': return '', 200 auth_header = request.headers.get('X-API-Key') if not auth_header or auth_header != API_KEY: return jsonify(ok=False, error="Invalid or missing API key"), 401 return f(*args, **kwargs) return decorated_function def pack_1bpp(im, invert=False, msb_first=True, rotate_180=True): """ Convert PIL image to 1-bit packed binary format. Returns (bytes, width, height). """ # Convert to 1-bit (black and white) im = im.convert("1") if rotate_180: im = im.transpose(Image.ROTATE_180) w, h = im.size pixels = list(im.getdata()) # Pack pixels into bytes buf = bytearray() for y in range(h): for x in range(0, w, 8): byte = 0 for bit in range(8): if x + bit < w: pixel_idx = y * w + x + bit pixel = pixels[pixel_idx] # In mode '1', 0 is black, 255 is white bit_value = 0 if pixel == 0 else 1 if invert: bit_value = 1 - bit_value if msb_first: byte |= (bit_value << (7 - bit)) else: byte |= (bit_value << bit) buf.append(byte) return bytes(buf), w, h @app.get("/healthz") def health(): return jsonify(ok=True) @app.get("/") def index(): """Serve the main HTML viewer page.""" return send_from_directory(app.static_folder, 'index.html') @app.get("/current") @require_api_key def get_current(): """ Get the current image name and hash. Returns: {'ok': true, 'name': 'image_name', 'hash': 'abc123...'} """ current_name = get_current_image_name() current_hash = get_current_image_hash() if current_name is None: return jsonify(ok=True, name=None, hash=None) return jsonify(ok=True, name=current_name, hash=current_hash) @app.get("/images") @require_api_key def images_list(): """ List all available image names/keys. """ try: files = os.listdir(IMAGES_DIR) # Strip .bin extension to get image names image_names = [f[:-4] for f in files if f.endswith('.bin')] return jsonify(ok=True, images=image_names) except Exception as e: return jsonify(ok=False, error=str(e)), 500 @app.post("/current") @require_api_key def set_current(): """ Set the current image by name. Expected JSON: {'name': 'image_name'} """ data = request.get_json() if not data or 'name' not in data: return jsonify(ok=False, error="No name provided"), 400 image_name = data['name'] image_path = os.path.join(IMAGES_DIR, f"{image_name}.bin") if not os.path.exists(image_path): return jsonify(ok=False, error=f"Image '{image_name}' not found"), 404 set_current_image_name(image_name) return jsonify(ok=True, current=image_name) @app.get("/image") @require_api_key def image_get(): """ Get the current image binary file. Query param 'hash' (optional): Client's last known hash. If provided and matches current hash, returns JSON indicating no update. Otherwise returns the image binary. """ client_hash = request.args.get('hash') current_image_name = get_current_image_name() if current_image_name is None: return jsonify(ok=False, error="No current image set"), 404 # If client provided hash, check if update is needed current_hash = get_current_image_hash() if client_hash is not None and client_hash == current_hash: return jsonify(ok=True, updated=False, message="No update needed") image_path = os.path.join(IMAGES_DIR, f"{current_image_name}.bin") if not os.path.exists(image_path): return jsonify(ok=False, error=f"Current image '{current_image_name}' not found"), 404 # Send the image with hash in response header response = send_file(image_path, mimetype='application/octet-stream', as_attachment=True, download_name=f"{current_image_name}.bin") if current_hash: response.headers['X-Image-Hash'] = current_hash return response @app.get("/image/") @require_api_key def image_get_by_name(name): """ Get a specific image binary file by name. Returns the raw .bin file. """ image_path = os.path.join(IMAGES_DIR, f"{name}.bin") if not os.path.exists(image_path): return jsonify(ok=False, error=f"Image '{name}' not found"), 404 return send_file(image_path, mimetype='application/octet-stream') @app.delete("/image/") @require_api_key def image_delete(name): """ Delete an image by name. """ image_path = os.path.join(IMAGES_DIR, f"{name}.bin") if not os.path.exists(image_path): return jsonify(ok=False, error=f"Image '{name}' not found"), 404 try: # If this is the current image, clear the current selection if get_current_image_name() == name: set_current_image_name(None) os.remove(image_path) return jsonify(ok=True, deleted=name) except Exception as e: return jsonify(ok=False, error=str(e)), 500 @app.post("/image") @require_api_key def image_post(): """ Upload an image with a name. Processes it to 1bpp format and saves to disk. Expected form data: 'image' (file), 'name' (string) """ if 'image' not in request.files: return jsonify(ok=False, error="No image file provided"), 400 if 'name' not in request.form: return jsonify(ok=False, error="No image name provided"), 400 image_file = request.files['image'] image_name = request.form['name'] if not image_file: return jsonify(ok=False, error="Empty image file"), 400 try: # Open and process the image im = Image.open(image_file) # Check size and resize if needed if im.size != (200, 200): print(f"Warning: Image '{image_name}' is {im.size[0]}x{im.size[1]}, resizing to 200x200") im = im.resize((200, 200)) # Pack to 1bpp format buf, w, h = pack_1bpp(im, invert=False, msb_first=True) # Sanity check expected = (w * h + 7) // 8 if len(buf) != expected: print(f"Warning: got {len(buf)} bytes, expected {expected}") # Save to disk output_path = os.path.join(IMAGES_DIR, f"{image_name}.bin") with open(output_path, "wb") as f: f.write(buf) return jsonify(ok=True, name=image_name, width=w, height=h, size=len(buf)) except Exception as e: return jsonify(ok=False, error=str(e)), 500 SPLEEN_FONT_PATH = os.environ.get("SPLEEN_FONT_PATH", "/usr/share/fonts/opentype/spleen/spleen-8x16.otf") SPLEEN_FONT_SIZE = 16 # match the height in the filename (8x12 → 12) IMAGE_SIZE_SQ = 200 # Try to load a font with the specified size try: print("Loading font from:", SPLEEN_FONT_PATH) FONT = ImageFont.truetype(SPLEEN_FONT_PATH, SPLEEN_FONT_SIZE) except Exception as e: print("Warning: Could not load TTF font, using default font. Error:", e) FONT = ImageFont.load_default() @app.post("/prompt") @require_api_key def prompt(): """ """ # Just make sure that the caller knows how to pass arguments.. data = request.get_json() print("Data given: ", data) if not data: return jsonify(ok=False, error="No data given"), 400 if 'message' not in data: return jsonify(ok=False, error="No message provided"), 400 if not isinstance(data['options'], list): return jsonify(ok=False, error="No options provided"), 400 if not len(data['options']) == 2: return jsonify(ok=False, error=f"{len(data['options'])} options provided. Please pass exactly 2."), 400 if not 'callback' in data: return jsonify(ok=False, error=f"Please provide a callback to return response to."), 400 # We're good, let's make images. text = data['message'] options = " | ".join(reversed(data['options'])) callback = data['callback'] # Define look of these text pages margin_x = 40 margin_y = 10 try: image = text_to_image(text, margin_x, margin_y) image = add_vertical_text_left_margin(image, options, margin_w=30, margin_top=20, font=FONT) # Dump the image to disk in a format that the screen likes. image_name = f"prompt_{int(time.time())}" pack_to_disk(image, image_name) # Set as current image for the screen to show. set_current_image_name(image_name) # Store callback for later, ugly but works image_hash = get_current_image_hash() store_callback(image_hash, callback) return jsonify(ok=True, name=image_name, current=True) except Exception as e: return jsonify(ok=False, error=str(e)), 500 @app.post("/prompt-response") @require_api_key def prompt_response(): """Handle response from glyph device. It sends the hash of the current image. """ data = request.get_json() client_hash = data.get('hash') response = data.get('response') # currently just either '1' or '2' if not client_hash: return jsonify(ok=False, error=f"Please provide the hash of the prompt you're responding to."), 400 if not response: return jsonify(ok=False, error=f"Please provide the response."), 400 # Look up callback, attach response, send it out, Bob's your uncle. callback = read_callback(client_hash) if not callback: # That's ok, we'll just ignore button presses like that return jsonify(ok=True, message=f"There is no callback registered for {client_hash}.") webhook = f"{callback}?response={response}" r = requests.get(webhook, timeout=5) try: r.raise_for_status() except HTTPError as hte: if r.status_code == 409: print("N8N callback was already validated. Moving on.") else: raise hte return jsonify(ok=True, message="Executed callback.") @app.post("/text") @require_api_key def text_to_image(): """ Convert text to an image and set it as current. Expected JSON: {'text': 'your text here', 'x': 10, 'y': 10} (x, y are optional) Automatically generates a timestamp-based name and sets as current. Text will be wrapped to fit within the 200px width. """ # Just make sure that the caller knows how to pass arguments.. data = request.get_json() if not data or 'text' not in data: return jsonify(ok=False, error="No text provided"), 400 text = data['text'] # Define look of these text pages margin_x = 30 margin_y = 10 try: im = text_to_image(text, margin_x, margin_y) # Dump the image to disk in a format that the screen likes. image_name = f"text_{int(time.time())}" pack_to_disk(im, image_name) # Set as current image for the screen to show. set_current_image_name(image_name) return jsonify(ok=True, name=image_name, current=True) except Exception as e: return jsonify(ok=False, error=str(e)), 500 def pack_to_disk(image: Image.Image, name: str): """ Make sure that the image is stored properly, in 2 formats (png, data buffer). """ image.save(IMAGES_DIR + f"/{name}.png") # Pack to 1bpp format buffer, _, _ = pack_1bpp(image, invert=False, msb_first=True) # Save packed data to disk, as well output_path = os.path.join(IMAGES_DIR, f"{name}.bin") with open(output_path, "wb") as f: f.write(buffer) # Storing callbacks like this is messy, but I'm trying to prevent issues with multiple workers running the server by persisting # callback data to disk instead of holding information in memory. That would lead to sync issues between workers. This is # probably for a simple PoC like this. def store_callback(identifier: str, callback: str): """Store callback for when the responses comes in.""" os.makedirs("callbacks", exist_ok=True) file_name = f"callbacks/{identifier}.callback" # This is ugly, but I don't want to spin up a database for this PoC with open(file_name, "w") as fh: fh.write(callback) return file_name def read_callback(identifier: str): """Store callback for when the responses comes in.""" file_name = f"callbacks/{identifier}.callback" # This is ugly, but I don't want to spin up a database for this PoC if not os.path.exists(file_name): return with open(file_name, "r") as fh: callback = fh.read() # We trust that the file contents are ok. Should probably check. return callback def text_to_image(text, margin_x, margin_y) -> Image.Image: try: # Create a white 200x200 image im = Image.new('1', (IMAGE_SIZE_SQ, IMAGE_SIZE_SQ), 1) # 1 = white in mode '1' draw = ImageDraw.Draw(im) # Word wrap text to fit within width max_width = IMAGE_SIZE_SQ - margin_x - 10 # Leave margin on right lines = [] words = text.split() current_line = [] for word in words: test_line = ' '.join(current_line + [word]) bbox = draw.textbbox((0, 0), test_line, font=FONT) line_width = bbox[2] - bbox[0] if line_width <= max_width: current_line.append(word) else: if current_line: lines.append(' '.join(current_line)) current_line = [word] if current_line: lines.append(' '.join(current_line)) # Draw wrapped text (0 = black in mode '1') wrapped_text = '\n'.join(lines) draw.multiline_text((margin_x, margin_y), wrapped_text, fill=0, font=FONT) return im except Exception as e: print("Failed to render text to image. ", e) def add_vertical_text_left_margin( im: Image.Image, text: str, margin_w: int, margin_top: int, font, padding: int = 6, direction: str = "bottom-to-top", # or "top-to-bottom" ) -> Image.Image: """ Adds white vertical text on a black strip in the *existing* left margin of `im`. Assumes `im` is mode '1' (PIL 1-bit). Returns the same image (mutated) for convenience. """ if im.mode != "1": im = im.convert("1") W, H = im.size draw = ImageDraw.Draw(im) # Paint the left strip black draw.rectangle((0, 0, margin_w - 1, H - 1), fill=0) # 0 = black in mode '1' # Measure text dummy = Image.new("1", (1, 1), 0) d = ImageDraw.Draw(dummy) bbox = d.textbbox((0, 0), text, font=font) tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] # Render text into a temp image (black bg, white text) txt = Image.new("1", (tw, th), 0) td = ImageDraw.Draw(txt) td.text((-bbox[0], -bbox[1]), text, fill=1, font=font) # 1 = white # Rotate so it becomes vertical if direction == "bottom-to-top": txt = txt.rotate(90, expand=True) # reads bottom->top along the margin elif direction == "top-to-bottom": txt = txt.rotate(-90, expand=True) # reads top->bottom else: raise ValueError("direction must be 'bottom-to-top' or 'top-to-bottom'") rw, rh = txt.size # Center in the left strip x = max(padding, (margin_w - rw) // 2) y = margin_top # Paste using itself as mask (works well for mode '1') im.paste(txt, (x, y), txt) return im if __name__ == "__main__": # Just for testing from the commandline, so we don't have to run the server + curl. im = text_to_image("Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud", margin_x=40, margin_y=10) im = add_vertical_text_left_margin(im, "NO | YES", margin_w=30, margin_top=20, font=FONT) im.save("test.png") # cheap test for callback storage "system" callback = "https://my-callback.com/some-uuid" image_name = "foo.image" store_callback(image_name, callback) assert(read_callback(image_name) == callback) print("Success.") # Make sure we fail gracefully store_callback(image_name, callback) assert(read_callback(image_name+"fail") == None) print("Success.")