From 3fd01f45512bbe941c58ba5bce71a6c0529421b3 Mon Sep 17 00:00:00 2001 From: Marcus Brown Date: Sun, 20 Oct 2024 14:04:42 +1100 Subject: [PATCH] Improve --- dayglo_detector/dayglo_detector.py | 131 ----------------------------- 1 file changed, 131 deletions(-) diff --git a/dayglo_detector/dayglo_detector.py b/dayglo_detector/dayglo_detector.py index ab4b049..a031d3b 100644 --- a/dayglo_detector/dayglo_detector.py +++ b/dayglo_detector/dayglo_detector.py @@ -1,134 +1,3 @@ -import os -import paho.mqtt.client as mqtt -import json -import requests -import cv2 -import numpy as np -import time -import threading - -# Configuration from environment variables -MQTT_BROKER = os.environ.get('MQTT_BROKER', 'frigate') -MQTT_PORT = int(os.environ.get('MQTT_PORT', '1883')) -MQTT_USERNAME = os.environ.get('MQTT_USERNAME') -MQTT_PASSWORD = os.environ.get('MQTT_PASSWORD') -MQTT_TOPIC_SUBSCRIBE = 'frigate/events' -MQTT_TOPIC_PUBLISH = 'homeassistant/sensor/dayglo_rating/state' -FRIGATE_URL = os.environ.get('FRIGATE_URL', 'http://frigate:5000') -INTERESTED_ZONES = ['Door_Front'] - -# Debug mode flag -DEBUG_MODE = True - -def debug_print(message): - """ Helper function to print debug messages if debug mode is enabled """ - if DEBUG_MODE: - print(f"[DEBUG] {message}") - -def calculate_dayglo_rating(image): - debug_print("Calculating dayglo rating...") - hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) - - # Define HSV range for "dayglo" colors (adjust as needed) - lower_color = np.array([20, 100, 100]) - upper_color = np.array([40, 255, 255]) - - # Create a mask for the colors - mask = cv2.inRange(hsv_image, lower_color, upper_color) - - # Calculate the percentage of "dayglo" pixels - dayglo_pixels = cv2.countNonZero(mask) - total_pixels = image.shape[0] * image.shape[1] - dayglo_rating = (dayglo_pixels / total_pixels) * 100 - - debug_print(f"Dayglo Rating calculated: {dayglo_rating:.2f}%") - return dayglo_rating - -def on_connect(client, userdata, flags, rc): - debug_print(f"Connected to MQTT broker with result code {rc}") - client.subscribe(MQTT_TOPIC_SUBSCRIBE) - -def on_message(client, userdata, msg): - debug_print("MQTT message received") - payload = json.loads(msg.payload) - event_type = payload.get('type') - after = payload.get('after', {}) - label = after.get('label') - - if event_type == 'new' and label == 'person': - zones = after.get('entered_zones', []) - if any(zone in INTERESTED_ZONES for zone in zones): - debug_print(f"Person detected in zones: {zones}") - threading.Thread(target=process_event, args=(after,)).start() - -def process_event(event_data): - event_id = event_data.get('id') - camera = event_data.get('camera') - debug_print(f"Processing event {event_id} from camera {camera}") - - # Get the snapshot URL for the event - snapshot_url = f"{FRIGATE_URL}/api/events/{event_id}/snapshot.jpg" - - # Retrieve the snapshot - try: - response = requests.get(snapshot_url) - if response.status_code == 200: - debug_print(f"Snapshot retrieved successfully from {snapshot_url}") - np_arr = np.frombuffer(response.content, np.uint8) - image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) - if image is not None: - dayglo_rating = calculate_dayglo_rating(image) - debug_print(f"Publishing dayglo rating: {dayglo_rating:.2f}%") - client.publish(MQTT_TOPIC_PUBLISH, f"{dayglo_rating:.2f}") - else: - debug_print("Failed to decode the snapshot image") - else: - debug_print(f"Failed to retrieve snapshot. HTTP Status code: {response.status_code}") - except Exception as e: - debug_print(f"Error retrieving snapshot: {e}") - -def assess_latest_image(): - """ Assess the most recent image when the script first starts """ - debug_print("Assessing the most recent image on startup...") - - # Retrieve the most recent event from Frigate - events_url = f"{FRIGATE_URL}/api/events?limit=1&has_snapshot=1" - try: - response = requests.get(events_url) - if response.status_code == 200: - events = response.json() - if events: - event_data = events[0] - debug_print(f"Most recent event ID: {event_data.get('id')}") - process_event(event_data) - else: - debug_print("No recent events found with snapshots") - else: - debug_print(f"Failed to retrieve events. HTTP Status code: {response.status_code}") - except Exception as e: - debug_print(f"Error retrieving events: {e}") - -# MQTT setup -#client = mqtt.Client() -client = mqtt.Client(protocol=mqtt.MQTTv5) -#client = mqtt.Client(protocol=mqtt.MQTTv311) - -if MQTT_USERNAME and MQTT_PASSWORD: - client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) - -client.on_connect = on_connect -client.on_message = on_message - -# Start MQTT connection -debug_print(f"Connecting to MQTT broker at {MQTT_BROKER}:{MQTT_PORT}...") -client.connect(MQTT_BROKER, MQTT_PORT, 60) - -# Assess the latest image when the script starts -assess_latest_image() - -# Start the MQTT loop -client.loop_forever() - import os import paho.mqtt.client as mqtt import numpy as np