from fastapi import FastAPI, Request, HTTPException from fastapi.responses import HTMLResponse from pydantic import BaseModel # For data validation import sqlite3 import random import string import datetime from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from fastapi.responses import HTMLResponse, JSONResponse import pandas as pd import plotly import plotly.express as px import json from typing import List templates = Jinja2Templates(directory="templates") # app = FastAPI() # Comment this out app = FastAPI( title="EFI Development", # You can choose any title openapi_url="/openapi.json", # API specification endpoint docs_url="/docs", # Swagger UI endpoint redoc_url="/redoc", # ReDoc UI endpoint root_path="/tunnel_durra" # IMPORTANT: Adjust this (e.g., /tunnel_durra) ) MOIST_DATABASE_NAME = "moisture_data_v2.db" # SQLite database name class MoistureReading(BaseModel): sensor_id: str raw_moisture_value: int moisture_percent: int class MoistureReadingResponse(MoistureReading): timestamp: str class PumpCommand(BaseModel): command: str # A temporary storage for the latest pump command (in a real application, you might use a database) latest_pump_command = None def init_db(): """ Initialize the SQLite database and creates the moisture_readings table if it doesn't exist. """ conn = sqlite3.connect(MOIST_DATABASE_NAME) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS moisture_readings ( id INTEGER PRIMARY KEY AUTOINCREMENT, sensor_id TEXT NOT NULL, timestamp TEXT NOT NULL, raw_moisture_value INTEGER NOT NULL, moisture_percent INTEGER NOT NULL ) ''') conn.commit() conn.close() print(f"Database '{MOIST_DATABASE_NAME}' initialized and 'moisture_readings' table ensured.") # Initialize the database when the script is run init_db() @app.post("/send_pump_command/") async def send_command(pump_command: PumpCommand): global latest_pump_command latest_pump_command = pump_command.command return {"message": f"Command '{pump_command.command}' received."} @app.get("/get_latest_pump_command/") async def get_command(): global latest_pump_command command = latest_pump_command latest_pump_command = None # Clear the command after reading (optional, depending on desired behavior) return {"command": command} @app.post("/moisture_data") async def receive_and_store_moisture_data_sqlite(reading: MoistureReading): """ Receives structured moisture data, validates it, prints it, adds a server-side timestamp, and inserts it into an SQLite database. """ try: # Data is already validated by FastAPI by this point if it matches MoistureReading print("--- Received Validated Moisture Data (for SQLite) ---") print(f"Sensor ID: {reading.sensor_id}") print(f"Raw Value: {reading.raw_moisture_value}") print(f"Moisture Percent: {reading.moisture_percent}") print("---------------------------------------------------") # Add a server-side timestamp server_timestamp = datetime.datetime.now().isoformat() # Insert data into SQLite conn = sqlite3.connect(MOIST_DATABASE_NAME) cursor = conn.cursor() cursor.execute(''' INSERT INTO moisture_readings (sensor_id, timestamp, raw_moisture_value, moisture_percent) VALUES (?, ?, ?, ?) ''', (reading.sensor_id, server_timestamp, reading.raw_moisture_value, reading.moisture_percent)) conn.commit() conn.close() return JSONResponse(content={"status": "success", "message": "Data received and logged to SQLite"}, status_code=200) except HTTPException: # Re-raise HTTPException if it's a validation error from FastAPI raise except sqlite3.Error as e: print(f"--- SQLite Error: {e} ---") # Log the problematic data if possible # print(f"Problematic data: {reading.dict()}") raise HTTPException(status_code=500, detail=f"Database error: {e}") except Exception as e: # Catch any other unexpected errors print(f"--- Error processing or storing data: {e} ---") # print(f"Problematic data: {reading.dict() if 'reading' in locals() else 'N/A'}") raise HTTPException(status_code=500, detail=f"Internal server error while processing data: {e}") @app.get("/latest_readings_all_sensors", response_model=List[MoistureReadingResponse]) # Added response_model for better docs async def get_latest_readings_all_sensors(): """ Retrieves the most recent reading (including moisture_percent) for each unique sensor_id. """ conn = sqlite3.connect(MOIST_DATABASE_NAME) conn.row_factory = sqlite3.Row cursor = conn.cursor() query = """ WITH RankedReadings AS ( SELECT sensor_id, timestamp, raw_moisture_value, moisture_percent, ROW_NUMBER() OVER (PARTITION BY sensor_id ORDER BY timestamp DESC) as rn FROM moisture_readings ) SELECT sensor_id, timestamp, -- This timestamp is from the DB, not part of MoistureReading model directly for this response raw_moisture_value, moisture_percent FROM RankedReadings WHERE rn = 1; """ try: cursor.execute(query) rows = cursor.fetchall() # Adapt rows to fit the MoistureReading model (Pydantic doesn't expect 'timestamp' here) # For simplicity, we'll return a list of dicts that match the model structure. # If you want timestamp in response, add it to MoistureReading or use a different response model. results = [] for row in rows: results.append({ "sensor_id": row["sensor_id"], "raw_moisture_value": row["raw_moisture_value"], "moisture_percent": row["moisture_percent"], "timestamp": row["timestamp"] # Optionally include if needed }) return JSONResponse(content=results) # Return as JSON directly except sqlite3.Error as e: raise HTTPException(status_code=500, detail=f"Database query error: {e}") finally: conn.close() @app.get("/latest_reading/{sensor_id_str}", response_model=MoistureReadingResponse) async def get_latest_reading_for_sensor(sensor_id_str: str): """ Retrieves the most recent reading (including moisture_percent and timestamp) for a specific sensor_id. """ conn = sqlite3.connect(MOIST_DATABASE_NAME) conn.row_factory = sqlite3.Row cursor = conn.cursor() query = """ SELECT sensor_id, timestamp, raw_moisture_value, moisture_percent FROM moisture_readings WHERE sensor_id = ? ORDER BY timestamp DESC LIMIT 1; """ try: cursor.execute(query, (sensor_id_str,)) # sensor_id is TEXT in DB now row = cursor.fetchone() if row: return dict(row) # FastAPI will convert this dict to JSON based on response_model else: raise HTTPException(status_code=404, detail=f"No data found for sensor ID {sensor_id_str}") except sqlite3.Error as e: raise HTTPException(status_code=500, detail=f"Database query error: {e}") finally: conn.close() # @app.post("/moisture_data") # Define the path for POST requests # async def receive_moisture_data(: Request): # """ # Minimal endpoint to receive any JSON data via POST, # print it to the console, and return a success message. # (Corresponds to Step 3 of the prototyping plan) # """ # try: # # Await the raw body and parse it as JSON # data = await request.json() # # Print the received data to the FastAPI server console # print("--- Received Moisture Data ---") # print(data) # print("-----------------------------") # # Return a simple success response # return JSONResponse(content={"status": "success", "message": "Data received"}, status_code=200) # except json.JSONDecodeError: # # Handle cases where the body is not valid JSON # print("--- Received Invalid JSON ---") # try: # Corrected Python try block # # Try to print the raw body for debugging if it's not JSON # raw_body = await request.body() # print(raw_body.decode('utf-8', errors='ignore')) # except Exception: # Corrected Python except block # print("Could not decode raw body.") # print("-----------------------------") # raise HTTPException(status_code=400, detail="Invalid JSON data received") # except Exception as e: # # Catch any other unexpected errors # print(f"--- Error processing request: {e} ---") # raise HTTPException(status_code=500, detail=f"Internal server error: {e}") # def calculate_heat_index(temperature: float, humidity: float) -> float: # c1 = -42.379 # c2 = 2.04901523 # c3 = 10.14333127 # c4 = -0.22475541 # c5 = -0.00683783 # c6 = -0.05481717 # c7 = 0.00122874 # c8 = 0.00085282 # c9 = -0.00000199 # T = temperature # R = humidity # heat_index = (c1 + (c2 * T) + (c3 * R) + (c4 * T * R) + # (c5 * T**2) + (c6 * R**2) + (c7 * T**2 * R) + # (c8 * T * R**2) + (c9 * T**2 * R**2)) # return round(heat_index) @app.get("/") async def root(): return "hey there" @app.get("/basic_html") async def basic_html_res(): output = """
It is a day in the history of the universe.
""" return HTMLResponse(content=output, status_code=200) # @app.get("/heat_index") # async def root2(rh: float, t:float): # return calculate_heat_index(t, rh) # RHT_DB = "/Users/ahmaddurra/Documents/6.900/efi_test/exampleTime.db" # @app.get("/heat_index2") # def heat_index_function2(rh: float, t: float, history: int): #last thing does default argument # history_minutes_ago = datetime.datetime.now() - datetime.timedelta(seconds=history) # heat_index = calculate_heat_index(t,rh) #previously defined # conn = sqlite3.connect(RHT_DB) # connect to that database (will create if it doesn't already exist) # c = conn.cursor() # move cursor into database (allows us to execute commands) # c.execute('''CREATE TABLE IF NOT EXISTS rht_table (rh real,t real, heat_index real, timing timestamp);''') #jodalyst test # c.execute('''INSERT into rht_table VALUES (?,?,?,?);''', (rh,t,heat_index, datetime.datetime.now())) # if history is not None: # prev_data = c.execute('''SELECT * FROM rht_table WHERE timing > ? ORDER BY timing DESC;''',(history_minutes_ago,)).fetchall() # else: # prev_data = c.execute('''SELECT * FROM rht_table ORDER BY timing DESC;''').fetchone() # #rowid is an implicit 64 bit ordered row counter column that exists in all sqlite tables by default. # # you can use it to access most recents # conn.commit() # commit commands # conn.close() # close connection to database # outs = "" # for t in prev_data: # outs += f"rh: {t[0]} t: {t[1]} heat_index: {t[2]}! " # return outs # RHT_LOG_DB = "/Users/ahmaddurra/Documents/6.900/efi_test/exampleTime.db" # @app.get("/insert_dummy_readings") # def insert_dummy_readings(): # now = datetime.datetime.now() # conn = sqlite3.connect(RHT_LOG_DB) # c = conn.cursor() # c.execute(''' # CREATE TABLE IF NOT EXISTS readings ( # date TEXT, # time TEXT, # temperature REAL, # humidity REAL, # soc REAL # ); # ''') # for i in range(20): # # Create a timestamp spaced 1 minute apart # timestamp = now - datetime.timedelta(minutes=i) # date_str = timestamp.strftime('%Y-%m-%d') # time_str = timestamp.strftime('%H:%M:%S') # # Create dummy measurement values # temperature = random.uniform(20, 30) # humidity = random.uniform(40, 60) # soc = random.uniform(70, 100) # c.execute(''' # INSERT INTO readings (date, time, temperature, humidity, soc) # VALUES (?, ?, ?, ?, ?) # ''', (date_str, time_str, temperature, humidity, soc)) # conn.commit() # conn.close() # def get_sample_data(num_points: int, measurement: str): # with sqlite3.connect(RHT_LOG_DB) as conn: # if measurement == "temperature": # query = """ # SELECT date, time, temperature AS value # FROM readings # ORDER BY date DESC, time DESC # LIMIT ? # """ # elif measurement == "humidity": # query = """ # SELECT date, time, humidity AS value # FROM readings # ORDER BY date DESC, time DESC # LIMIT ? # """ # elif measurement == "soc": # query = """ # SELECT date, time, soc AS value # FROM readings # ORDER BY date DESC, time DESC # LIMIT ? # """ # else: # raise ValueError("Invalid measurement type specified.") # df = pd.read_sql_query(query, conn, params=(num_points,)) # df['timestamp'] = pd.to_datetime(df['date'] + ' ' + df['time']) # df = df.sort_values('timestamp') # return df[['timestamp', 'value']] # @app.get("/temp_plot") # async def temp_plot(request: Request, num_points: int): # df = get_sample_data(num_points, "temperature") # #make a line plot using pandas: # fig = px.line(df, x='timestamp', y='value', labels={'value': 'Temperature (°C)'}) # graphJSON = json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder) # return templates.TemplateResponse( # request=request, name="plotly_template.html", context={"graphJSON": graphJSON} # ) # @app.get("/rh_plot") # async def rh_plot(request: Request, num_points: int): # df = get_sample_data(num_points, "humidity") # fig = px.line(df, x='timestamp', y='value', labels={'value': 'Humidity (%)'}) # graphJSON = json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder) # return templates.TemplateResponse( # request=request, name="plotly_template.html", context={"graphJSON": graphJSON} # ) # @app.get("/soc_plot") # async def soc_plot(request: Request, num_points: int): # df = get_sample_data(num_points, "soc") # fig = px.line(df, x='timestamp', y='value', labels={'value': 'State of Charge (%)'}) # graphJSON = json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder) # return templates.TemplateResponse( # request=request, name="plotly_template.html", context={"graphJSON": graphJSON} # ) # # @app.post("/efi_test/logger") # # async def logger(temp: float, rh: float, kerberos: float, bat: float):