import os import paho.mqtt.client as mqtt import json import requests import cv2 import numpy as np import time import threading # Configuration from environment variables MQTT_BROKER = os.environ.get('MQTT_BROKER', 'frigate') MQTT_PORT = int(os.environ.get('MQTT_PORT', '1883')) MQTT_USERNAME = os.environ.get('MQTT_USERNAME') MQTT_PASSWORD = os.environ.get('MQTT_PASSWORD') MQTT_TOPIC_SUBSCRIBE = 'frigate/events' MQTT_TOPIC_PUBLISH = 'homeassistant/sensor/dayglo_rating/state' FRIGATE_URL = os.environ.get('FRIGATE_URL', 'http://frigate:5000') INTERESTED_ZONES = ['Door_Front'] # Debug mode flag DEBUG_MODE = True def debug_print(message): """ Helper function to print debug messages if debug mode is enabled """ if DEBUG_MODE: print(f"[DEBUG] {message}") def calculate_dayglo_rating(image): debug_print("Calculating dayglo rating...") hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) # Define HSV range for "dayglo" colors (adjust as needed) lower_color = np.array([20, 100, 100]) upper_color = np.array([40, 255, 255]) # Create a mask for the colors mask = cv2.inRange(hsv_image, lower_color, upper_color) # Calculate the percentage of "dayglo" pixels dayglo_pixels = cv2.countNonZero(mask) total_pixels = image.shape[0] * image.shape[1] dayglo_rating = (dayglo_pixels / total_pixels) * 100 debug_print(f"Dayglo Rating calculated: {dayglo_rating:.2f}%") return dayglo_rating def on_connect(client, userdata, flags, rc): debug_print(f"Connected to MQTT broker with result code {rc}") client.subscribe(MQTT_TOPIC_SUBSCRIBE) def on_message(client, userdata, msg): debug_print("MQTT message received") payload = json.loads(msg.payload) event_type = payload.get('type') after = payload.get('after', {}) label = after.get('label') if event_type == 'new' and label == 'person': zones = after.get('entered_zones', []) if any(zone in INTERESTED_ZONES for zone in zones): debug_print(f"Person detected in zones: {zones}") threading.Thread(target=process_event, args=(after,)).start() def process_event(event_data): event_id = event_data.get('id') camera = event_data.get('camera') debug_print(f"Processing event {event_id} from camera {camera}") # Get the snapshot URL for the event snapshot_url = f"{FRIGATE_URL}/api/events/{event_id}/snapshot.jpg" # Retrieve the snapshot try: response = requests.get(snapshot_url) if response.status_code == 200: debug_print(f"Snapshot retrieved successfully from {snapshot_url}") np_arr = np.frombuffer(response.content, np.uint8) image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) if image is not None: dayglo_rating = calculate_dayglo_rating(image) debug_print(f"Publishing dayglo rating: {dayglo_rating:.2f}%") client.publish(MQTT_TOPIC_PUBLISH, f"{dayglo_rating:.2f}") else: debug_print("Failed to decode the snapshot image") else: debug_print(f"Failed to retrieve snapshot. HTTP Status code: {response.status_code}") except Exception as e: debug_print(f"Error retrieving snapshot: {e}") def assess_latest_image(): """ Assess the most recent image when the script first starts """ debug_print("Assessing the most recent image on startup...") # Retrieve the most recent event from Frigate events_url = f"{FRIGATE_URL}/api/events?limit=1&has_snapshot=1" try: response = requests.get(events_url) if response.status_code == 200: events = response.json() if events: event_data = events[0] debug_print(f"Most recent event ID: {event_data.get('id')}") process_event(event_data) else: debug_print("No recent events found with snapshots") else: debug_print(f"Failed to retrieve events. HTTP Status code: {response.status_code}") except Exception as e: debug_print(f"Error retrieving events: {e}") # MQTT setup #client = mqtt.Client() client = mqtt.Client(protocol=mqtt.MQTTv5) #client = mqtt.Client(protocol=mqtt.MQTTv311) if MQTT_USERNAME and MQTT_PASSWORD: client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) client.on_connect = on_connect client.on_message = on_message # Start MQTT connection debug_print(f"Connecting to MQTT broker at {MQTT_BROKER}:{MQTT_PORT}...") client.connect(MQTT_BROKER, MQTT_PORT, 60) # Assess the latest image when the script starts assess_latest_image() # Start the MQTT loop client.loop_forever() import os import paho.mqtt.client as mqtt import numpy as np import cv2 import base64 import json # Configuration MQTT_BROKER = os.environ.get('MQTT_BROKER', '10.59.221.172') MQTT_PORT = int(os.environ.get('MQTT_PORT', '1883')) MQTT_SNAPSHOT_TOPIC = "/frigate/patiocam/person/snapshot" MQTT_TOPIC_PUBLISH = "homeassistant/sensor/dayglo_rating/state" DAYGLO_THRESHOLD_TOPIC = "homeassistant/sensor/dayglo_threshold/state" DISCOVERY_PREFIX = "homeassistant" # Default threshold dayglo_threshold = 50 def on_connect(client, userdata, flags, rc): print("Connected with result code", rc) client.subscribe(MQTT_SNAPSHOT_TOPIC) publish_discovery_configurations() def publish_discovery_configurations(): rating_config = { "name": "Dayglo Rating", "state_topic": MQTT_TOPIC_PUBLISH, "unit_of_measurement": "%", "value_template": "{{ value_json.rating }}", "icon": "mdi:brush", "unique_id": "mqtt_dayglo_rating" } threshold_config = { "name": "Dayglo Threshold", "state_topic": DAYGLO_THRESHOLD_TOPIC, "unit_of_measurement": "%", "value_template": "{{ value_json.threshold }}", "icon": "mdi:tune-vertical", "unique_id": "mqtt_dayglo_threshold" } client.publish(f"{DISCOVERY_PREFIX}/sensor/dayglo_rating/config", json.dumps(rating_config), retain=True) client.publish(f"{DISCOVERY_PREFIX}/sensor/dayglo_threshold/config", json.dumps(threshold_config), retain=True) def on_message(client, userdata, msg): if msg.topic == MQTT_SNAPSHOT_TOPIC: print("Snapshot received") process_snapshot(msg.payload) elif msg.topic == DAYGLO_THRESHOLD_TOPIC: global dayglo_threshold dayglo_threshold = float(msg.payload.decode('utf-8')) print("Dayglo threshold updated to:", dayglo_threshold) def process_snapshot(payload): image_data = base64.b64decode(payload) nparr = np.frombuffer(image_data, np.uint8) image = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if image is not None: rating = calculate_dayglo_rating(image) print("Dayglo Rating calculated:", rating) client.publish(MQTT_TOPIC_PUBLISH, json.dumps({"rating": rating})) def calculate_dayglo_rating(image): hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) lower_color = np.array([20, 100, 100]) upper_color = np.array([40, 255, 255]) mask = cv2.inRange(hsv_image, lower_color, upper_color) dayglo_pixels = cv2.countNonZero(mask) total_pixels = image.shape[0] * image.shape[1] rating = (dayglo_pixels / total_pixels) * 100 return rating client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect(MQTT_BROKER, MQTT_PORT, 60) client.loop_forever()