#!/bin/python import argparse import json import paho.mqtt.client as mqtt from mixer import Mixer class MqttClient: MQTT_SERVER_PORT = 1883 MQTT_DISCOVERY_TOPIC = "homeassistant/button/mixer/config" MQTT_COMMAND_TOPIC = "livingroom/voc/allen_heath_qu16_mixer" def __init__(self, mixer_ip): self.mixer = Mixer(mixer_ip) self.client = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2) def mqtt_autodiscovery(self): j = json.dumps( { "name": "Reset Settings", "command_topic": self.MQTT_COMMAND_TOPIC, "command_template": '{"command": "scene_recall", "args": {"scene_id": 0}}', "device": { "name": "Mischpult", "model": "Qu-16", "manufacturer": "Allen & Heath", "suggested_area": "livingroom", "identifiers": "Mischpult", }, "unique_id": "mischpult_reset_settings", } ) self.client.publish(self.MQTT_DISCOVERY_TOPIC, payload=j, qos=0, retain=False) def on_connect(self, client, userdata, flags, reason_code, properties): print(f"Connected: {reason_code}") client.subscribe(self.MQTT_COMMAND_TOPIC) self.mqtt_autodiscovery(client) def on_message(self, client, userdata, msg): payload = msg.payload.decode() print(f"{msg.topic}: {payload}") j = json.loads(payload) match j["command"]: case "scene_recall": self.mixer.scene_recall(j["args"]["scene_id"]) case "shutdown": self.mixer.shutdown() def run(self, hostname): self.client.on_connect = self.on_connect self.client.on_message = self.on_message self.client.connect(hostname, self.MQTT_SERVER_PORT, 60) self.client.loop_forever() def main(): parser = argparse.ArgumentParser(description="Allen & Heath Qu MQTT Remote Control") parser.add_argument("mixer_ip", help="IP of the mixer") parser.add_argument("mqtt_hostname", help="IP of the MQTT broker") subparsers = parser.add_subparsers( dest="command", required=True, help="Available commands" ) subparsers.add_parser("shutdown", help="Shut down the mixer") scene_parser = subparsers.add_parser("scene_recall", help="Recall a specific scene") scene_parser.add_argument( "scene_number", type=int, choices=range(0, 100), help="Scene number to recall" ) args = parser.parse_args() print(f"Mixer IP: {args.mixer_ip}, MQTT broker hostname: {args.mqtt_hostname}") client = MqttClient(args.mixer_ip) client.run(args.mqtt_hostname) if __name__ == "__main__": main()