import json from picozero import pico_led import time from machine import Pin, UART, I2C from pumps import Pump, BiDirectionalPump from ecsensor import ECSensor from ds18b20 import DS18B20 from float_sensor import FloatSensor from water_level_analog import WaterLevelAnalog from smiley_test import draw_smiley_mouth import ssd1306 import logging import sys import utime logger = logging.getLogger('juggy') logger.setLevel(logging.INFO) log_file = 'logs/juggy.log' with open(log_file, 'a') as f: f.write('\n\n*******************\n\n') led = Pin("LED", Pin.OUT) i2c = I2C(0, scl=Pin(5), sda=Pin(4), freq=200000)#Grove - OLED oled 0.96" (SSD1315) for _ in range(3): try: oled = oled = ssd1306.SSD1306_I2C(128, 64, i2c) break except Exception as e: logger.error(f"Error initializing OLED: {e}") time.sleep(1) continue lines_on_screen = 0 uart = UART(0, 9600) uart.init(bits=8, parity=None, stop=1) # Input Devices temp = DS18B20(19) # Temperature sensor ec_sensor = ECSensor(26, 16, temp_sensor=temp) # EC Sensor garden_level_1 = FloatSensor(20, name="G1 Float Sensor") # Garden Level 1 garden_level_2 = FloatSensor(21, name="G2 Float Sensor") # Garden Level 2 overflow_1 = Pin(22, Pin.IN) # Overflow 1 overflow_2 = Pin(28, Pin.IN) # Overflow 2 test_bath_level = Pin(27, Pin.IN) # Test Bath Level print(overflow_1.value()) print(overflow_2.value()) print(test_bath_level.value()) EC_VALUES = [0.65, 0.65, 0.8, 1.2, 1.2, 1.35, 1.35, 1.55, 1.55, 1.55, 1.7, 1.55, 1.55, 1.55, 1.3, 1.2, 1.1, 1.55, 1.7, 1.55, 1.55, 1.55, 1.3, 1.2, 1.1, 1.55, 1.7, 1.55, 1.55, 1.55, 1.3, 1.2, 1.1, 1.55, 1.7, 1.55, 1.55, 1.55, 1.3, 1.2, 1.1] START_AT_DAY = 30 # change this value if restarting the system current_day = START_AT_DAY current_week = START_AT_WEEK = current_day // 7 EC_READING_BASELINE = EC_VALUES[current_week] EC_READING_INTERVAL = 43200 SLEEP_TIME = 3600 water_in_test_bath = False days_since_reset = 0 last_daily_summary = 0 start_time = time.time() # Output Devices print("Initializing pumps") pump_1_1 = Pump(10, "1-1") # 1-1: A Nutrients for Garden 1 pump_1_2 = Pump(13, "1-2") # 1-2: B Nutrients for Garden 1 pump_1_3 = BiDirectionalPump(6, 7, "1-3") # 1-3: Drain between Garden 1 and Test Bath pump_1_4 = Pump(8, "1-4") # 1-4: Add water to Garden 1 pump_2_1 = Pump(11, "2-1") # 2-1: A Nutrients for Garden 2 pump_2_2 = Pump(12, "2-2") # 2-2: B Nutrients for Garden 2 pump_2_3 = BiDirectionalPump(2, 3, "2-3") # 2-3: Drain between Garden 2 and Test Bath pump_2_4 = Pump(9, "2-4") # 2-4: Add water to Garden 2 flush_pump = Pump(14, "Flush") # Flush: Fills Test Bath with water after testing pump_1_3.on(duration=75, direction='forward') # define function to test each pump for 5 seconds def test_pumps(): pumps = [pump_1_1, pump_1_2, pump_1_3, pump_1_4, pump_2_1, pump_2_2, pump_2_3, pump_2_4, flush_pump] for pump in pumps: print(f"Testing pump {pump}") if pump.bidirectional: pump.on(duration=5, direction='forward') time.sleep(1) pump.on(duration=5, direction='backward') else: pump.on(duration=5) time.sleep(1) def test_garden_1_pumps(): pumps = [pump_1_1, pump_1_2, pump_1_3, pump_1_4] for pump in pumps: print(f"Testing pump {pump}") if pump.bidirectional: pump.on(duration=5, direction='forward') time.sleep(1) pump.on(duration=5, direction='backward') else: pump.on(duration=5) time.sleep(1) def test_garden_2_pumps(): pumps = [pump_2_1, pump_2_2, pump_2_3, pump_2_4] for pump in pumps: print(f"Testing pump {pump}") if pump.bidirectional: pump.on(duration=5, direction='forward') time.sleep(1) pump.on(duration=5, direction='backward') else: pump.on(duration=5) time.sleep(1) def test_screen(): oled.fill(0)#clear oled.text("Hello,World!",0,0) oled.show() time.sleep(0.5) oled.fill(0) oled.rect(0, 0, 128, 32, 1) oled.show() time.sleep(3) oled.fill(0) oled.fill_rect(0, 0, 32, 32, 1) oled.fill_rect(2, 2, 28, 28, 0) oled.vline(9, 8, 22, 1) oled.vline(16, 2, 22, 1) oled.vline(23, 8, 22, 1) oled.fill_rect(26, 24, 2, 4, 1) oled.text('MicroPython', 40, 0, 1) oled.text('SSD1306', 40, 12, 1) oled.text('OLED 128x64', 40, 24, 1) oled.show() time.sleep(1) oled.fill(0) oled.show() oled.fill(0) oled.text('12345678901234567890', 0, 0) oled.text('2', 0, 8) oled.text('3', 0, 16) oled.text('4', 0, 24) oled.text('5', 0, 32) oled.text('6', 0, 40) oled.text('7', 0, 48) oled.text('8', 0, 56) oled.show() time.sleep(5) oled.fill(0) write_to_screen("Hello my name is Sam Laney and I am making the Juggy watering sytstem") time.sleep(3) write_to_screen("This is a really long message that should wrap around to the next line") time.sleep(3) write_to_screen("This is a really long message that should wrap around to the next line and then wrap around again") def uart_test_loop(): uart_receieved = False data = '' for i in range(200): oled.fill(0) oled.text(f'waiting for UART',0,0) #next line oled.text(f'message (i={i})',0,10) oled.text(f'{uart_receieved}',0,20) oled.text(f'msg = {data}',0,30) oled.show() if uart.any(): uart_receieved = True data = uart.read() print(data) oled.fill(0) oled.text('OMG',0,0) oled.text(f'{data}',0,10) oled.show() time.sleep(1) oled.fill(0) uart.write(f'test {i}\n') time.sleep(1) # Define function to disable all pumps def disable_pumps(): pumps = [pump_1_1, pump_1_2, pump_1_3, pump_1_4, pump_2_1, pump_2_2, pump_2_3, pump_2_4, flush_pump] for pump in pumps: pump.off() def write_to_screen(message): global lines_on_screen #calculate how many lines on the screen this message will take up lines_needed = len(message) // 14 if lines_on_screen == 8 or ((lines_on_screen + lines_needed) > 8): oled.fill(0) lines_on_screen = 0 oled.show() time.sleep(0.5) # remove the first char if it is a space if message[0] == " ": message = message[1:] remaining_message = None if len(message) > 16: # if we are cutting off a word add a hyphen if (message[15] != " ") and (message[16] != " ") and (message[14] != " "): first_line = message[:15] + "-" remaining_message = message[15:] message = first_line else: first_line = message[:16] remaining_message = message[16:] message = first_line oled.text(message, 0, lines_on_screen * 8) lines_on_screen += 1 oled.show() time.sleep(0.75) if remaining_message: write_to_screen(remaining_message) def log_message(message, screen_message='duplicate', level=logging.INFO): global log_file with open(log_file, 'a+') as f: f.write(message + '\n') logger.log(level, message) print(message) write_attempts = 0 while write_attempts < 3: try: if screen_message: if screen_message == 'duplicate': screen_message = message write_to_screen(screen_message) return except Exception as e: logger.error(f"Error writing to screen: {e}") time.sleep(1) write_attempts += 1 continue # Call this function to disable all pumps disable_pumps() #test_screen() motor_dict = { "1": { "1": pump_1_1, "2": pump_1_2, "3": pump_1_3, "4": pump_1_4 }, "2": { "1": pump_2_1, "2": pump_2_2, "3": pump_2_3, "4": pump_2_4 }, "flush": flush_pump } garden_level_checkers = { "1": garden_level_1, "2": garden_level_2 } overflow_sensors = { "1": overflow_1, "2": overflow_2 } last_ec = { "1": { "time": 0, "reading": 0, "temperature": 0 }, "2": { "time": 0, "reading": 0, "temperature": 0 } } def test_bath_water_check(): global water_in_test_bath print(test_bath_level.value()) if test_bath_level.value() == 1: log_message("Test bath has water", "Test bath full") water_in_test_bath = True else: log_message("Test bath has no water", "Test bath empty") water_in_test_bath = False def check_overflows(): for overflow_number, sensor in overflow_sensors.items(): if sensor.value() == 1: log_message(f"Overflow {overflow_number} has water", f"Overflow!! - {overflow_number}", level=logging.CRITICAL) # TODO: Notify about overflow if overflow_number == "1": water_in_overflow_1 = True else: water_in_overflow_2 = True else: log_message(f"Overflow {overflow_number} all clear", f"Overflow {overflow_number} clear") if overflow_number == "1": water_in_overflow_1 = False else: water_in_overflow_2 = False if water_in_overflow_1 or water_in_overflow_2: disable_pumps() log_message("Pumps disabled due to overflow", "Pumps disabled due to overflow") exit() def check_garden_water(garden_number): global water_in_test_bath # Make sure we are updating the global variable garden_checker = garden_level_checkers[garden_number] print(f"Checking Garden {garden_number}") if garden_checker.check_float() == 1: log_message(f"Garden {garden_number} has water", f"Garden {garden_number} full") # Set a variable to indicate water in the garden, if needed else: counter = 0 while garden_checker.check_float() == 0 and counter < 4: counter += 1 if water_in_test_bath: log_message(f"Garden {garden_number} has no water, draining from test bath", f"Filling Garden {garden_number} from test bath") motor_dict[garden_number]["3"].on(duration=75, direction='forward') # TODO: Confirm direction and duration water_in_test_bath = False else: log_message(f"Garden {garden_number} has no water, turning on pump {garden_number}-4 for 10 seconds", f"Jug -> Garden {garden_number}") motor_dict[garden_number]["4"].on(duration=10) # TODO: Confirm duration def take_ec_reading(garden_number): global water_in_test_bath # Ensure updates to global variable are reflected #test_bath_water_check() current_time = time.time() if ((current_time - last_ec[garden_number]["time"]) > EC_READING_INTERVAL) and not water_in_test_bath: log_message(f"Taking EC reading for Garden {garden_number}", f"Testing EC({garden_number})") log_message("Filling Test Bath") fill_duration = fill_test_bath(garden_number) log_message("Test Bath filled", None) log_message("Taking EC reading") temperature = temp.read_temp(samples=5) reading = ec_sensor.get_ec_reading(samples=5) last_ec[garden_number]["reading"] = reading log_message(f"G{garden_number} EC: {reading}") log_message(f"G{garden_number} Temp: {temperature}") last_ec[garden_number]["time"] = current_time temperature = temp.read_temp(samples=5) last_ec[garden_number]["temperature"] = temperature # Empty test bath empty_test_bath(garden_number, fill_duration) # Adjust nutrients if needed adjust_nutrients(garden_number, reading) def fill_test_bath(garden_number): global water_in_test_bath # Ensure updates to global variable are reflected # TODO: set timeout limit """ fill_duration = 0 while test_bath_level.value() == 0: motor_dict[garden_number]["3"].on(duration=5, direction='backward') fill_duration += 5 water_in_test_bath = True """ fill_duration = 75 motor_dict[garden_number]["3"].on(duration=fill_duration, direction='backward') water_in_test_bath = True return fill_duration def empty_test_bath(garden_number, fill_duration): global water_in_test_bath # Ensure updates to global variable are reflected log_message("Emptying Test Bath into Garden " + garden_number) motor_dict[garden_number]["3"].on(duration=fill_duration + 5, direction='forward') # Wash test bath and probe log_message("Washing Test Bath", "Washing") motor_dict["flush"].on(duration=75) log_message("Test Bath washed and filled") water_in_test_bath = True def adjust_nutrients(garden_number, reading): if reading < EC_READING_BASELINE: log_message(f"EC reading is too low, adding nutrients to Garden {garden_number}", f"Adding nutrients to Garden {garden_number}") motor_dict[garden_number]["1"].on(duration=3) # TODO: Test duration time.sleep(30) motor_dict[garden_number]["2"].on(duration=3) log_message(f"Nutrients added to Garden {garden_number}") def daily_summary(): # write entry to summary_log.txt global days_since_reset global last_daily_summary global motor_dict global last_ec global start_time, EC_READING_BASELINE, current_week, START_AT_WEEK, current_day #check if we need to update the EC baseline (every week) if ((time.time() - start_time) / 604800) > (current_week - START_AT_WEEK): current_week += 1 EC_READING_BASELINE = EC_VALUES[current_week] log_message(f"New week! EC baseline updated to {EC_READING_BASELINE}", f"New week! EC baseline updated to {EC_READING_BASELINE}") summary = { "days_since_reset": days_since_reset, "current_day": current_day, "current_week": current_week, "ec_baseline": EC_VALUES[current_week], "last_ec_readings": last_ec, "water_usage": { "1": motor_dict["1"]["4"].total_time_on, "2": motor_dict["2"]["4"].total_time_on }, "nutrient_usage": { "1": motor_dict["1"]["1"].total_time_on, "2": motor_dict["1"]["2"].total_time_on } } days_since_reset += 1 current_day += 1 motor_dict["1"]["4"].total_time_on = 0 motor_dict["2"]["4"].total_time_on = 0 motor_dict["1"]["1"].total_time_on = 0 motor_dict["1"]["2"].total_time_on = 0 with open('logs/daily_summary.log', 'a+') as f: f.write(json.dumps(summary) + '\n') last_daily_summary = time.time() log_message("Juggy initialized", "Juggy init") while True: try: log_message('checking sensors...') if uart.any(): data = uart.read() #log_message(f'Recv from UART: {data}') #TODO: fix when ESP is fixed if b'test_ec_1' in data: log_message("Testing EC 1") take_ec_reading("1") elif b'test_ec_2' in data: log_message("Testing EC 2") take_ec_reading("2") elif b'test_screen' in data: test_screen() elif b'test_pumps' in data: test_pumps() elif b'test_uart' in data: uart_test_loop() elif b'device' in data: # TODO implement testing each pump pass # Check for water in overflows and handle overflow situation check_overflows() # Check for water in gardens for garden_number in ["1", "2"]: check_garden_water(garden_number) # Take EC readings for each garden for garden_number in ["1", "2"]: take_ec_reading(garden_number) #check if we've written the daily summary if (time.time() - last_daily_summary) > 86400: daily_summary() if current_day == START_AT_DAY: log_message("Logging Day 1 Summary") daily_summary() log_message('Sensor checks done') time.sleep(10) # draw smiley and sleep for an hour draw_smiley_mouth(oled, 16, 0, 100, 20) time.sleep(SLEEP_TIME) except Exception as e: log_message(f"Err: {e}", level=logging.ERROR) #print error traceback log_message(f"{e.with_traceback()}", level=logging.ERROR) time.sleep(5) continue