Files
frigate/dayglo_detector/dayglo_detector.py
T
marcus 96bf583eb7 Fix coroutine warning by properly awaiting async MQTT methods
- Wrapped `on_connect` and `on_message` methods with `asyncio.create_task()` to ensure they are properly awaited.
- Created separate handler functions for connection and message handling.
2024-10-20 18:52:00 +11:00

144 lines
5.4 KiB
Python

import os
import time
import numpy as np
import cv2
import base64
import json
import tempfile
import sys
import asyncio
from gmqtt import Client as MQTTClient
# Configuration
MQTT_BROKER = os.environ.get('MQTT_BROKER', '10.59.221.172')
MQTT_PORT = int(os.environ.get('MQTT_PORT', '1883'))
MQTT_USERNAME = os.getenv('MQTT_USERNAME', 'your_username')
MQTT_PASSWORD = os.getenv('MQTT_PASSWORD', 'your_password')
MQTT_SNAPSHOT_TOPIC = "/frigate/patiocam/person/snapshot"
MQTT_TOPIC_PUBLISH = "homeassistant/sensor/dayglo_rating/state"
DISCOVERY_PREFIX = "homeassistant"
# Default rating
last_rating = 0
# Default color thresholds for dayglo detection
# Expanded color thresholds for dayglo detection
LOWER_COLOR_GREEN = np.array([35, 50, 50])
UPPER_COLOR_GREEN = np.array([100, 255, 255])
LOWER_COLOR_YELLOW = np.array([15, 80, 80])
UPPER_COLOR_YELLOW = np.array([40, 255, 255])
# Track if the initial snapshot has been processed
initial_snapshot_processed = False
connected_once = False
class DaygloDetectorMQTTClient(MQTTClient):
async def handle_on_connect(self, client, flags, rc, properties):
asyncio.create_task(self.handle_on_connect(client, flags, rc, properties))
global connected_once
if rc == 0 and not connected_once:
print("Connected successfully to MQTT broker")
await client.subscribe(MQTT_SNAPSHOT_TOPIC, qos=1)
print(f"Subscribed to topic: {MQTT_SNAPSHOT_TOPIC}")
connected_once = True
await publish_discovery_configurations(client)
# Publish initial rating of 0
await publish_rating(client, 0)
else:
print(f"Failed to connect, return code {rc}")
async def handle_on_message(self, client, topic, payload, qos, properties):
asyncio.create_task(self.handle_on_message(client, topic, payload, qos, properties))
if topic == MQTT_SNAPSHOT_TOPIC:
print("Snapshot received")
if len(payload) == 0:
print("Received an empty payload, skipping processing.")
else:
print(f"Payload length: {len(payload)} bytes")
print(f"Payload (first 100 bytes): {payload[:100]}...")
await process_snapshot(self, payload)
async def publish_discovery_configurations(client):
rating_config = {
"name": "Dayglo Rating",
"state_topic": MQTT_TOPIC_PUBLISH,
"unit_of_measurement": "%",
"value_template": "{{ value_json.rating }}",
"icon": "mdi:brush",
"unique_id": "mqtt_dayglo_rating"
}
await client.publish(f"{DISCOVERY_PREFIX}/sensor/dayglo_rating/config", json.dumps(rating_config), retain=True)
async def publish_rating(client, rating):
global last_rating
last_rating = rating
await client.publish(MQTT_TOPIC_PUBLISH, json.dumps({"rating": rating}))
async def process_snapshot(client, payload):
if not payload:
print("Empty payload received, skipping processing.")
return
print("Processing snapshot...")
try:
image_data = base64.b64decode(payload)
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as temp_image_file:
temp_image_file.write(image_data)
temp_image_path = temp_image_file.name
# Attempt to read the saved image with OpenCV
image = cv2.imread(temp_image_path)
if image is not None:
rating = calculate_dayglo_rating(image)
print("Dayglo Rating calculated:", rating)
await publish_rating(client, rating)
else:
print("Invalid image format or corrupted image received")
except Exception as e:
print(f"Error processing snapshot: {e}")
def calculate_dayglo_rating(image):
print("Calculating dayglo rating...")
# Crop the image to focus on the center area
height, width = image.shape[:2]
crop_margin = 0.1 # 10% margin
cropped_image = image[int(height * crop_margin):int(height * (1 - crop_margin)),
int(width * crop_margin):int(width * (1 - crop_margin))]
print("Calculating dayglo rating...")
hsv_image = cv2.cvtColor(cropped_image, cv2.COLOR_BGR2HSV)
mask_green = cv2.inRange(hsv_image, LOWER_COLOR_GREEN, UPPER_COLOR_GREEN)
mask_yellow = cv2.inRange(hsv_image, LOWER_COLOR_YELLOW, UPPER_COLOR_YELLOW)
mask = cv2.addWeighted(mask_green, 1.0, mask_yellow, 1.0, 0)
dayglo_pixels = cv2.countNonZero(mask)
total_pixels = cropped_image.shape[0] * cropped_image.shape[1]
rating = (dayglo_pixels / total_pixels) * 100
return rating
async def main():
# Handle command line argument for image file
if len(sys.argv) > 1:
image_file = sys.argv[1]
if os.path.exists(image_file):
print(f"Processing image from file: {image_file}")
image = cv2.imread(image_file)
if image is not None:
rating = calculate_dayglo_rating(image)
print("Dayglo Rating calculated from file:", rating)
else:
print("Invalid image file provided.")
else:
print(f"File not found: {image_file}")
# Set up MQTT client for normal operation
client = DaygloDetectorMQTTClient("dayglo_detector")
client.set_auth_credentials(MQTT_USERNAME, MQTT_PASSWORD)
await client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
await asyncio.get_event_loop().create_future()
if __name__ == "__main__":
asyncio.run(main())