Replace paho-mqtt with gmqtt for modern MQTT 5.0 support

- Switched to `gmqtt` for better MQTT 5.0 compatibility and to address deprecation warnings.
- Refactored the MQTT client to use asynchronous handling with `asyncio`.
- Updated snapshot processing and MQTT communication to use async functions.
This commit is contained in:
2024-10-20 18:44:47 +11:00
parent 795b527f90
commit 0a4ebf1fec
+41 -48
View File
@@ -1,12 +1,13 @@
import os import os
import time import time
import paho.mqtt.client as mqtt
import numpy as np import numpy as np
import cv2 import cv2
import base64 import base64
import json import json
import tempfile import tempfile
import sys import sys
import asyncio
from gmqtt import Client as MQTTClient
# Configuration # Configuration
MQTT_BROKER = os.environ.get('MQTT_BROKER', '10.59.221.172') MQTT_BROKER = os.environ.get('MQTT_BROKER', '10.59.221.172')
@@ -21,29 +22,41 @@ DISCOVERY_PREFIX = "homeassistant"
last_rating = 0 last_rating = 0
# Default color thresholds for dayglo detection # Default color thresholds for dayglo detection
LOWER_COLOR_GREEN = np.array([40, 70, 70]) # Expanded color thresholds for dayglo detection
UPPER_COLOR_GREEN = np.array([90, 255, 255]) LOWER_COLOR_GREEN = np.array([35, 50, 50])
LOWER_COLOR_YELLOW = np.array([20, 100, 100]) UPPER_COLOR_GREEN = np.array([100, 255, 255])
UPPER_COLOR_YELLOW = np.array([35, 255, 255]) LOWER_COLOR_YELLOW = np.array([15, 80, 80])
UPPER_COLOR_YELLOW = np.array([40, 255, 255])
# Track if the initial snapshot has been processed # Track if the initial snapshot has been processed
initial_snapshot_processed = False initial_snapshot_processed = False
connected_once = False
def on_connect(client, userdata, flags, reasonCode, properties=None): class DaygloDetectorMQTTClient(MQTTClient):
try: async def on_connect(self, client, flags, rc, properties):
if reasonCode == 0: global connected_once
if rc == 0 and not connected_once:
print("Connected successfully to MQTT broker") print("Connected successfully to MQTT broker")
client.subscribe(MQTT_SNAPSHOT_TOPIC, qos=1) await self.subscribe(MQTT_SNAPSHOT_TOPIC, qos=1)
print(f"Subscribed to topic: {MQTT_SNAPSHOT_TOPIC}") print(f"Subscribed to topic: {MQTT_SNAPSHOT_TOPIC}")
publish_discovery_configurations() connected_once = True
await publish_discovery_configurations(self)
# Publish initial rating of 0 # Publish initial rating of 0
publish_rating(0) await publish_rating(self, 0)
else: else:
print(f"Failed to connect, return code {reasonCode}") print(f"Failed to connect, return code {rc}")
except Exception as e:
print(f"Caught exception in on_connect: {e}")
def publish_discovery_configurations(): async def on_message(self, client, topic, payload, qos, properties):
if topic == MQTT_SNAPSHOT_TOPIC:
print("Snapshot received")
if len(payload) == 0:
print("Received an empty payload, skipping processing.")
else:
print(f"Payload length: {len(payload)} bytes")
print(f"Payload (first 100 bytes): {payload[:100]}...")
await process_snapshot(self, payload)
async def publish_discovery_configurations(client):
rating_config = { rating_config = {
"name": "Dayglo Rating", "name": "Dayglo Rating",
"state_topic": MQTT_TOPIC_PUBLISH, "state_topic": MQTT_TOPIC_PUBLISH,
@@ -52,26 +65,14 @@ def publish_discovery_configurations():
"icon": "mdi:brush", "icon": "mdi:brush",
"unique_id": "mqtt_dayglo_rating" "unique_id": "mqtt_dayglo_rating"
} }
client.publish(f"{DISCOVERY_PREFIX}/sensor/dayglo_rating/config", json.dumps(rating_config), retain=True) await client.publish(f"{DISCOVERY_PREFIX}/sensor/dayglo_rating/config", json.dumps(rating_config), retain=True)
def publish_rating(rating): async def publish_rating(client, rating):
global last_rating global last_rating
last_rating = rating last_rating = rating
client.publish(MQTT_TOPIC_PUBLISH, json.dumps({"rating": rating})) await client.publish(MQTT_TOPIC_PUBLISH, json.dumps({"rating": rating}))
def on_message(client, userdata, msg): async def process_snapshot(client, payload):
global initial_snapshot_processed
if msg.topic == MQTT_SNAPSHOT_TOPIC:
print("Snapshot received")
if len(msg.payload) == 0:
print("Received an empty payload, skipping processing.")
else:
print(f"Payload length: {len(msg.payload)} bytes")
print(f"Payload (first 100 bytes): {msg.payload[:100]}...")
process_snapshot(msg.payload)
initial_snapshot_processed = True
def process_snapshot(payload):
if not payload: if not payload:
print("Empty payload received, skipping processing.") print("Empty payload received, skipping processing.")
return return
@@ -89,7 +90,7 @@ def process_snapshot(payload):
if image is not None: if image is not None:
rating = calculate_dayglo_rating(image) rating = calculate_dayglo_rating(image)
print("Dayglo Rating calculated:", rating) print("Dayglo Rating calculated:", rating)
publish_rating(rating) await publish_rating(client, rating)
else: else:
print("Invalid image format or corrupted image received") print("Invalid image format or corrupted image received")
except Exception as e: except Exception as e:
@@ -107,21 +108,13 @@ def calculate_dayglo_rating(image):
hsv_image = cv2.cvtColor(cropped_image, cv2.COLOR_BGR2HSV) hsv_image = cv2.cvtColor(cropped_image, cv2.COLOR_BGR2HSV)
mask_green = cv2.inRange(hsv_image, LOWER_COLOR_GREEN, UPPER_COLOR_GREEN) mask_green = cv2.inRange(hsv_image, LOWER_COLOR_GREEN, UPPER_COLOR_GREEN)
mask_yellow = cv2.inRange(hsv_image, LOWER_COLOR_YELLOW, UPPER_COLOR_YELLOW) mask_yellow = cv2.inRange(hsv_image, LOWER_COLOR_YELLOW, UPPER_COLOR_YELLOW)
mask = cv2.bitwise_or(mask_green, mask_yellow) mask = cv2.addWeighted(mask_green, 1.0, mask_yellow, 1.0, 0)
dayglo_pixels = cv2.countNonZero(mask) dayglo_pixels = cv2.countNonZero(mask)
total_pixels = cropped_image.shape[0] * cropped_image.shape[1] total_pixels = cropped_image.shape[0] * cropped_image.shape[1]
rating = (dayglo_pixels / total_pixels) * 100 rating = (dayglo_pixels / total_pixels) * 100
return rating return rating
def connect_mqtt(): async def main():
while True:
try:
client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
break
except Exception as e:
print(f"Connection failed: {e}. Retrying in 5 seconds...")
time.sleep(5)
# Handle command line argument for image file # Handle command line argument for image file
if len(sys.argv) > 1: if len(sys.argv) > 1:
image_file = sys.argv[1] image_file = sys.argv[1]
@@ -137,12 +130,12 @@ if len(sys.argv) > 1:
print(f"File not found: {image_file}") print(f"File not found: {image_file}")
# Set up MQTT client for normal operation # Set up MQTT client for normal operation
client = mqtt.Client(client_id="dayglo_detector", protocol=mqtt.MQTTv5) client = DaygloDetectorMQTTClient("dayglo_detector")
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) client.set_auth_credentials(MQTT_USERNAME, MQTT_PASSWORD)
client.on_connect = on_connect
client.on_message = on_message
client.enable_logger()
connect_mqtt() await client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
client.loop_forever() client.loop_forever()
if __name__ == "__main__":
asyncio.run(main())