Files
frigate/dayglo_detector/dayglo_detector.py
T
marcus 296775bc2e Fix improper indentation in process_snapshot function
- Corrected inconsistent indentation to avoid runtime .
- Ensured that all indentation levels match Python's expected format.
2024-10-20 19:10:44 +11:00

147 lines
5.6 KiB
Python

import os
import time
import numpy as np
import cv2
import base64
import json
import tempfile
import sys
import asyncio
from gmqtt import Client as MQTTClient
# Configuration
MQTT_BROKER = os.environ.get('MQTT_BROKER', '10.59.221.172')
MQTT_PORT = int(os.environ.get('MQTT_PORT', '1883'))
MQTT_USERNAME = os.getenv('MQTT_USERNAME', 'your_username')
MQTT_PASSWORD = os.getenv('MQTT_PASSWORD', 'your_password')
MQTT_SNAPSHOT_TOPIC = "/frigate/patiocam/person/snapshot"
MQTT_TOPIC_PUBLISH = "homeassistant/sensor/dayglo_rating/state"
DISCOVERY_PREFIX = "homeassistant"
# Default rating
last_rating = 0
# Default color thresholds for dayglo detection
# Expanded color thresholds for dayglo detection
LOWER_COLOR_GREEN = np.array([35, 50, 50])
UPPER_COLOR_GREEN = np.array([100, 255, 255])
LOWER_COLOR_YELLOW = np.array([15, 80, 80])
UPPER_COLOR_YELLOW = np.array([40, 255, 255])
# Track if the initial snapshot has been processed
initial_snapshot_processed = False
connected_once = False
class DaygloDetectorMQTTClient(MQTTClient):
async def handle_on_connect(self, client, flags, rc, properties):
asyncio.create_task(self.handle_on_connect(client, flags, rc, properties))
global connected_once
if rc == 0 and not connected_once:
print("Connected successfully to MQTT broker")
await client.subscribe(MQTT_SNAPSHOT_TOPIC, qos=1)
print(f"Subscribed to topic: {MQTT_SNAPSHOT_TOPIC}")
connected_once = True
await publish_discovery_configurations(client)
# Publish initial rating of 0
await publish_rating(client, 0)
else:
print(f"Failed to connect, return code {rc}")
async def handle_on_message(self, client, topic, payload, qos, properties):
asyncio.create_task(self.handle_on_message(client, topic, payload, qos, properties))
if topic == MQTT_SNAPSHOT_TOPIC:
print("Snapshot received")
if len(payload) == 0:
print("Received an empty payload, skipping processing.")
else:
print(f"Payload length: {len(payload)} bytes")
print(f"Payload (first 100 bytes): {payload[:100]}...")
await process_snapshot(self, payload)
async def publish_discovery_configurations(client):
rating_config = {
"name": "Dayglo Rating",
"state_topic": MQTT_TOPIC_PUBLISH,
"unit_of_measurement": "%",
"value_template": "{{ value_json.rating }}",
"icon": "mdi:brush",
"unique_id": "mqtt_dayglo_rating"
}
await client.publish(f"{DISCOVERY_PREFIX}/sensor/dayglo_rating/config", json.dumps(rating_config), retain=True)
async def publish_rating(client, rating):
global last_rating
last_rating = rating
await client.publish(MQTT_TOPIC_PUBLISH, json.dumps({"rating": rating}))
async def process_snapshot(client, payload):
if not payload:
print("Empty payload received, skipping processing.")
return
print("Processing snapshot...")
try:
image_data = base64.b64decode(payload)
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as temp_image_file:
temp_image_file.write(image_data)
temp_image_path = temp_image_file.name
# Attempt to read the saved image with OpenCV
image = cv2.imread(temp_image_path)
if image is not None:
rating = calculate_dayglo_rating(image)
print("Dayglo Rating calculated:", rating)
await publish_rating(client, rating)
else:
print("Invalid image format or corrupted image received")
except Exception as e:
print(f"Error processing snapshot: {e}")
def calculate_dayglo_rating(image):
print("Calculating dayglo rating...")
# Crop the image to focus on the center area
height, width = image.shape[:2]
crop_margin = 0.1 # 10% margin
cropped_image = image[int(height * crop_margin):int(height * (1 - crop_margin)),
int(width * crop_margin):int(width * (1 - crop_margin))]
hsv_image = cv2.cvtColor(cropped_image, cv2.COLOR_BGR2HSV)
mask_green = cv2.inRange(hsv_image, LOWER_COLOR_GREEN, UPPER_COLOR_GREEN)
mask_yellow = cv2.inRange(hsv_image, LOWER_COLOR_YELLOW, UPPER_COLOR_YELLOW)
mask = cv2.addWeighted(mask_green, 1.0, mask_yellow, 1.0, 0)
dayglo_pixels = cv2.countNonZero(mask)
total_pixels = cropped_image.shape[0] * cropped_image.shape[1]
rating = (dayglo_pixels / total_pixels) * 100
return rating
async def main():
print("Starting Dayglo Detector...")
# Handle command line argument for image file
if len(sys.argv) > 1:
image_file = sys.argv[1]
if os.path.exists(image_file):
client = DaygloDetectorMQTTClient("dayglo_detector")
client.set_auth_credentials(MQTT_USERNAME, MQTT_PASSWORD)
await client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
print(f"Processing image from file: {image_file}")
image = cv2.imread(image_file)
if image is not None:
rating = calculate_dayglo_rating(image)
print("Dayglo Rating calculated from file:", rating)
await publish_rating(client, rating)
else:
print("Invalid image file provided.")
else:
print(f"File not found: {image_file}")
# Set up MQTT client for normal operation
client = DaygloDetectorMQTTClient("dayglo_detector")
client.set_auth_credentials(MQTT_USERNAME, MQTT_PASSWORD)
await client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
await asyncio.get_event_loop().create_future()
if __name__ == "__main__":
asyncio.run(main())