#!/bin/python import argparse import json import paho.mqtt.client as mqtt from mixer import Mixer class MqttClient: MQTT_SERVER_PORT = 1883 MQTT_DISCOVERY_TOPIC = "homeassistant/button/mixer/config" MQTT_COMMAND_TOPIC = "livingroom/voc/allen_heath_qu16_mixer" def __init__(self, mixer_ip): self.mixer_ip = mixer_ip self.client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2) def mqtt_autodiscovery(self): j = json.dumps( { "name": "Reset Settings", "command_topic": self.MQTT_COMMAND_TOPIC, "command_template": '{"command": "scene_recall", "args": {"scene_id": 0}}', "device": { "name": "Mischpult", "model": "Qu-16", "manufacturer": "Allen & Heath", "suggested_area": "livingroom", "identifiers": "Mischpult", }, "unique_id": "mischpult_reset_settings", } ) self.client.publish(self.MQTT_DISCOVERY_TOPIC, payload=j, qos=0, retain=True) def on_connect(self, client, userdata, flags, reason_code, properties): print(f"Connected: {reason_code}") client.subscribe(self.MQTT_COMMAND_TOPIC) self.mqtt_autodiscovery() def on_message(self, client, userdata, msg): payload = msg.payload.decode() print(f"{msg.topic}: {payload}") j = json.loads(payload) mixer = Mixer(self.mixer_ip) match j["command"]: case "scene_recall": mixer.scene_recall(j["args"]["scene_id"]) case "shutdown": mixer.shutdown() def run(self, hostname): self.client.on_connect = self.on_connect self.client.on_message = self.on_message self.client.connect(hostname, self.MQTT_SERVER_PORT, 60) self.client.loop_forever() def main(): parser = argparse.ArgumentParser(description="Allen & Heath Qu MQTT Remote Control") parser.add_argument("mixer_ip", help="IP of the mixer") parser.add_argument("mqtt_hostname", help="IP of the MQTT broker") args = parser.parse_args() print(f"Mixer IP: {args.mixer_ip}, MQTT broker hostname: {args.mqtt_hostname}") client = MqttClient(args.mixer_ip) client.run(args.mqtt_hostname) if __name__ == "__main__": main()