import os import paho.mqtt.client as mqtt import json import requests import cv2 import numpy as np import threading # Configuration from environment variables MQTT_BROKER = os.environ.get('MQTT_BROKER', 'frigate') MQTT_PORT = int(os.environ.get('MQTT_PORT', '1883')) MQTT_USERNAME = os.environ.get('MQTT_USERNAME') MQTT_PASSWORD = os.environ.get('MQTT_PASSWORD') MQTT_TOPIC_SUBSCRIBE = 'frigate/events' MQTT_TOPIC_PUBLISH = 'homeassistant/sensor/dayglo_rating/state' FRIGATE_URL = os.environ.get('FRIGATE_URL', 'http://frigate:5000') INTERESTED_ZONES = ['Door_Front'] def calculate_dayglo_rating(image): hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) # Define HSV range for "dayglo" colors (adjust as needed) lower_color = np.array([20, 100, 100]) upper_color = np.array([40, 255, 255]) # Create a mask for the colors mask = cv2.inRange(hsv_image, lower_color, upper_color) # Calculate the percentage of "dayglo" pixels dayglo_pixels = cv2.countNonZero(mask) total_pixels = image.shape[0] * image.shape[1] dayglo_rating = (dayglo_pixels / total_pixels) * 100 return dayglo_rating def on_connect(client, userdata, flags, rc): print("Connected to MQTT broker") client.subscribe(MQTT_TOPIC_SUBSCRIBE) def on_message(client, userdata, msg): payload = json.loads(msg.payload) event_type = payload.get('type') after = payload.get('after', {}) label = after.get('label') if event_type == 'new' and label == 'person': zones = after.get('entered_zones', []) if any(zone in INTERESTED_ZONES for zone in zones): threading.Thread(target=process_event, args=(after,)).start() def process_event(event_data): event_id = event_data.get('id') camera = event_data.get('camera') # Get the snapshot URL for the event snapshot_url = f"{FRIGATE_URL}/api/events/{event_id}/snapshot.jpg" # Retrieve the snapshot response = requests.get(snapshot_url) if response.status_code == 200: np_arr = np.frombuffer(response.content, np.uint8) image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) if image is not None: dayglo_rating = calculate_dayglo_rating(image) print(f"Dayglo Rating: {dayglo_rating:.2f}%") # Publish the rating to Home Assistant client.publish(MQTT_TOPIC_PUBLISH, f"{dayglo_rating:.2f}") else: print("Failed to decode image") else: print(f"Failed to retrieve snapshot: {response.status_code}") client = mqtt.Client() if MQTT_USERNAME and MQTT_PASSWORD: client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) client.on_connect = on_connect client.on_message = on_message client.connect(MQTT_BROKER, MQTT_PORT, 60) client.loop_forever()