From 0a4ebf1fecce704385dbfa638d23afe665bbd30f Mon Sep 17 00:00:00 2001 From: Marcus Brown Date: Sun, 20 Oct 2024 18:44:47 +1100 Subject: [PATCH] Replace paho-mqtt with gmqtt for modern MQTT 5.0 support - Switched to `gmqtt` for better MQTT 5.0 compatibility and to address deprecation warnings. - Refactored the MQTT client to use asynchronous handling with `asyncio`. - Updated snapshot processing and MQTT communication to use async functions. --- dayglo_detector/dayglo_detector.py | 117 ++++++++++++++--------------- 1 file changed, 55 insertions(+), 62 deletions(-) diff --git a/dayglo_detector/dayglo_detector.py b/dayglo_detector/dayglo_detector.py index 3d85171..e29e813 100644 --- a/dayglo_detector/dayglo_detector.py +++ b/dayglo_detector/dayglo_detector.py @@ -1,12 +1,13 @@ import os import time -import paho.mqtt.client as mqtt import numpy as np import cv2 import base64 import json import tempfile import sys +import asyncio +from gmqtt import Client as MQTTClient # Configuration MQTT_BROKER = os.environ.get('MQTT_BROKER', '10.59.221.172') @@ -21,29 +22,41 @@ DISCOVERY_PREFIX = "homeassistant" last_rating = 0 # Default color thresholds for dayglo detection -LOWER_COLOR_GREEN = np.array([40, 70, 70]) -UPPER_COLOR_GREEN = np.array([90, 255, 255]) -LOWER_COLOR_YELLOW = np.array([20, 100, 100]) -UPPER_COLOR_YELLOW = np.array([35, 255, 255]) +# Expanded color thresholds for dayglo detection +LOWER_COLOR_GREEN = np.array([35, 50, 50]) +UPPER_COLOR_GREEN = np.array([100, 255, 255]) +LOWER_COLOR_YELLOW = np.array([15, 80, 80]) +UPPER_COLOR_YELLOW = np.array([40, 255, 255]) # Track if the initial snapshot has been processed initial_snapshot_processed = False +connected_once = False -def on_connect(client, userdata, flags, reasonCode, properties=None): - try: - if reasonCode == 0: +class DaygloDetectorMQTTClient(MQTTClient): + async def on_connect(self, client, flags, rc, properties): + global connected_once + if rc == 0 and not connected_once: print("Connected successfully to MQTT broker") - client.subscribe(MQTT_SNAPSHOT_TOPIC, qos=1) + await self.subscribe(MQTT_SNAPSHOT_TOPIC, qos=1) print(f"Subscribed to topic: {MQTT_SNAPSHOT_TOPIC}") - publish_discovery_configurations() + connected_once = True + await publish_discovery_configurations(self) # Publish initial rating of 0 - publish_rating(0) + await publish_rating(self, 0) else: - print(f"Failed to connect, return code {reasonCode}") - except Exception as e: - print(f"Caught exception in on_connect: {e}") + print(f"Failed to connect, return code {rc}") -def publish_discovery_configurations(): + async def on_message(self, client, topic, payload, qos, properties): + if topic == MQTT_SNAPSHOT_TOPIC: + print("Snapshot received") + if len(payload) == 0: + print("Received an empty payload, skipping processing.") + else: + print(f"Payload length: {len(payload)} bytes") + print(f"Payload (first 100 bytes): {payload[:100]}...") + await process_snapshot(self, payload) + +async def publish_discovery_configurations(client): rating_config = { "name": "Dayglo Rating", "state_topic": MQTT_TOPIC_PUBLISH, @@ -52,26 +65,14 @@ def publish_discovery_configurations(): "icon": "mdi:brush", "unique_id": "mqtt_dayglo_rating" } - client.publish(f"{DISCOVERY_PREFIX}/sensor/dayglo_rating/config", json.dumps(rating_config), retain=True) + await client.publish(f"{DISCOVERY_PREFIX}/sensor/dayglo_rating/config", json.dumps(rating_config), retain=True) -def publish_rating(rating): +async def publish_rating(client, rating): global last_rating last_rating = rating - client.publish(MQTT_TOPIC_PUBLISH, json.dumps({"rating": rating})) + await client.publish(MQTT_TOPIC_PUBLISH, json.dumps({"rating": rating})) -def on_message(client, userdata, msg): - global initial_snapshot_processed - if msg.topic == MQTT_SNAPSHOT_TOPIC: - print("Snapshot received") - if len(msg.payload) == 0: - print("Received an empty payload, skipping processing.") - else: - print(f"Payload length: {len(msg.payload)} bytes") - print(f"Payload (first 100 bytes): {msg.payload[:100]}...") - process_snapshot(msg.payload) - initial_snapshot_processed = True - -def process_snapshot(payload): +async def process_snapshot(client, payload): if not payload: print("Empty payload received, skipping processing.") return @@ -89,7 +90,7 @@ def process_snapshot(payload): if image is not None: rating = calculate_dayglo_rating(image) print("Dayglo Rating calculated:", rating) - publish_rating(rating) + await publish_rating(client, rating) else: print("Invalid image format or corrupted image received") except Exception as e: @@ -107,42 +108,34 @@ def calculate_dayglo_rating(image): hsv_image = cv2.cvtColor(cropped_image, cv2.COLOR_BGR2HSV) mask_green = cv2.inRange(hsv_image, LOWER_COLOR_GREEN, UPPER_COLOR_GREEN) mask_yellow = cv2.inRange(hsv_image, LOWER_COLOR_YELLOW, UPPER_COLOR_YELLOW) - mask = cv2.bitwise_or(mask_green, mask_yellow) + mask = cv2.addWeighted(mask_green, 1.0, mask_yellow, 1.0, 0) dayglo_pixels = cv2.countNonZero(mask) total_pixels = cropped_image.shape[0] * cropped_image.shape[1] rating = (dayglo_pixels / total_pixels) * 100 return rating -def connect_mqtt(): - while True: - try: - client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60) - break - except Exception as e: - print(f"Connection failed: {e}. Retrying in 5 seconds...") - time.sleep(5) - -# Handle command line argument for image file -if len(sys.argv) > 1: - image_file = sys.argv[1] - if os.path.exists(image_file): - print(f"Processing image from file: {image_file}") - image = cv2.imread(image_file) - if image is not None: - rating = calculate_dayglo_rating(image) - print("Dayglo Rating calculated from file:", rating) +async def main(): + # Handle command line argument for image file + if len(sys.argv) > 1: + image_file = sys.argv[1] + if os.path.exists(image_file): + print(f"Processing image from file: {image_file}") + image = cv2.imread(image_file) + if image is not None: + rating = calculate_dayglo_rating(image) + print("Dayglo Rating calculated from file:", rating) + else: + print("Invalid image file provided.") else: - print("Invalid image file provided.") - else: - print(f"File not found: {image_file}") + print(f"File not found: {image_file}") -# Set up MQTT client for normal operation -client = mqtt.Client(client_id="dayglo_detector", protocol=mqtt.MQTTv5) -client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) -client.on_connect = on_connect -client.on_message = on_message -client.enable_logger() + # Set up MQTT client for normal operation + client = DaygloDetectorMQTTClient("dayglo_detector") + client.set_auth_credentials(MQTT_USERNAME, MQTT_PASSWORD) -connect_mqtt() -client.loop_forever() + await client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60) + client.loop_forever() + +if __name__ == "__main__": + asyncio.run(main())