#!/bin/python import argparse import socket import mido from enum import Enum from dataclasses import dataclass # For Allen & Heath Qu MIDI protocol documentation see: # https://www.allen-heath.com/content/uploads/2023/06/Qu_MIDI_Protocol_V1.9.pdf class Mixer: @dataclass class SystemState: class QuModel(Enum): QU16 = 1 QU24 = 2 QU32 = 3 QUPAC = 4 QUSB = 5 def __str__(self): labels = { 1: "Qu-16", 2: "Qu-24", 3: "Qu-32", 4: "Qu-Pac", 5: "Qu-SB", } return labels[self.value] sys_ex_header : list midi_channel : int id : int model : QuModel major_ver : int minor_ver : int def __init__(self, ip): self.MIXER_PORT = 51325 self.sock = socket.create_connection((ip, self.MIXER_PORT)) self.mido_parser = mido.Parser() ALLEN_HEATH_ID = [0x00, 0x00, 0x1A] QU_MIXER = [0x50, 0x11] MAJOR_MINOR = [0x01, 0x00] ALL_CALL_MIDI_CHANNEL = [0x7F] SYSEX_HEADER = ALLEN_HEATH_ID + QU_MIXER + MAJOR_MINOR SYSEX_ALL_CALL = SYSEX_HEADER + ALL_CALL_MIDI_CHANNEL class SysExMessageId(Enum): GET_NAME_FROM_QU_REQUEST = 0x01 GET_NAME_FROM_QU_RESPONSE = 0x02 GET_SYSTEM_STATE_REQUEST = 0x10 GET_SYSTEM_STATE_RESPONSE = 0x11 GET_METER_DATA_REQUEST = 0x12 GET_METER_DATA_RESPONSE = 0x13 def recv_sys_ex(self, response_msg_filter: SysExMessageId = None): while True: data = self.sock.recv(1024) if not data: break self.mido_parser.feed(data) if self.mido_parser.pending(): msg = self.mido_parser.get_message() print(vars(msg)) if hasattr(msg, "data"): print("Hex: ", " ".join(f"{b:02X}" for b in msg.data)) if response_msg_filter is not None: if msg.type != "sysex": continue msg_id = int(msg.data[8]) if msg_id != response_msg_filter.value: continue return msg def get_system_state(self): msg_id = self.SysExMessageId.GET_SYSTEM_STATE_REQUEST.value i_pad_flag = 0x00 data = self.SYSEX_ALL_CALL + [msg_id, i_pad_flag] msg = mido.Message("sysex", data=data) msg_bytes = bytes(msg.bytes()) self.sock.sendall(msg_bytes) print("Sent:", " ".join(f"{b:02X}" for b in msg_bytes)) response = self.recv_sys_ex(self.SysExMessageId.GET_SYSTEM_STATE_RESPONSE) i_pad_flag = int(response.data[9]) state = self.SystemState( sys_ex_header=response.data[:8], midi_channel=int(response.data[7]), id=int(response.data[8]), model=self.SystemState.QuModel(i_pad_flag), major_ver=int(response.data[10]), minor_ver = int(response.data[11]) ) return state def get_name_from_qu(self, channel_no, name_to_set): msg_id = self.SysExMessageId.GET_NAME_FROM_QU_REQUEST.value data = self.SYSEX_HEADER + [int(0), msg_id, int(channel_no)] if name_to_set: data += list(name_to_set.encode("utf-8")) msg = mido.Message("sysex", data=data) msg_bytes = bytes(msg.bytes()) print("Sent:", " ".join(f"{b:02X}" for b in msg_bytes)) self.sock.sendall(msg_bytes) response = self.recv_sys_ex(self.SysExMessageId.GET_NAME_FROM_QU_RESPONSE) sysex_header = response.data[:8] midi_channel = int(response.data[7]) id = int(response.data[8]) channel_name = bytes(response.data[9:]).decode("utf-8") print(f"sysex_header: {sysex_header}") print(f"MIDI channel: {midi_channel}") print(f"ID: 0x{id:02X}") print(f"Channel Name: {channel_name}") def nrpn_parameter_control(self, midi_ch, mixer_ch, id, va, vx): a = mido.Message("control_change", channel=midi_ch, control=0x63, value=mixer_ch) b = mido.Message("control_change", channel=midi_ch, control=0x62, value=id) c = mido.Message("control_change", channel=midi_ch, control=0x06, value=va) d = mido.Message("control_change", channel=midi_ch, control=0x26, value=vx) msg_bytes = bytes(a.bytes() + b.bytes() + c.bytes() + d.bytes()) print(" ".join(f"{b:02X}" for b in msg_bytes)) self.sock.sendall(msg_bytes) def shutdown(self): self.nrpn_parameter_control(midi_ch=0, mixer_ch=0, id=0x5F, va=0x00, vx=0x00) def set_bank(self, bank_no): if bank_no < 1 or bank_no > 16384: raise ValueError(f"bank_no param must be between 0 and 16384, got {bank_no}") msb_val = ((bank_no - 1) >> 7) & 0xFF lsb_val = (bank_no - 1) & 0x7F control_change_msb = mido.Message("control_change", channel=0, control=0x00, value=msb_val) control_change_lsb = mido.Message("control_change", channel=0, control=0x20, value=lsb_val) msg_bytes = bytes(control_change_msb.bytes() + control_change_lsb.bytes()) print(" ".join(f"{b:02X}" for b in msg_bytes)) self.sock.sendall(msg_bytes) def scene_recall(self, scene_id): print(f"scene_recall: scene_id={scene_id}") self.set_bank(1) msb = mido.Message("program_change", channel=0, program=scene_id) msg_bytes = bytes(msb.bytes()) print(" ".join(f"{b:02X}" for b in msg_bytes)) self.sock.sendall(msg_bytes) def watch(self): while True: self.recv_sys_ex() def main(): parser = argparse.ArgumentParser(description="Allen & Heath Qu Remote Control") parser.add_argument("ip", help="IP of the mixer") subparsers = parser.add_subparsers(dest="command", required=True, help="Available commands") channel_naming_parser = subparsers.add_parser("get_name_from_qu", help="Channel naming") channel_naming_parser.add_argument( "channel_id", type=int, choices=range(0, 16), help="Number of channel to get or set its name", ) channel_naming_parser.add_argument("-n", "--name", help="Channel name to set") subparsers.add_parser("get_system_state", help="Get system information") subparsers.add_parser("shutdown", help="Shut down the mixer") scene_parser = subparsers.add_parser("scene_recall", help="Recall a specific scene") scene_parser.add_argument("scene_number", type=int, choices=range(0, 100), help="Scene number to recall") subparsers.add_parser("scene_recall_default", help="Set the default scene 0") subparsers.add_parser("watch", help="Just receive data from mixer and print it to console") args = parser.parse_args() print(f"IP: {args.ip}") mixer = Mixer(args.ip) if args.command: print(f"Command: {args.command}") print(f"Args: {vars(args)}") match args.command: case "get_name_from_qu": mixer.get_name_from_qu(args.channel_id, args.name) case "get_system_state": state = mixer.get_system_state() print(f"sysex_header: {state.sys_ex_header}") print(f"MIDI channel: {state.midi_channel}") print(f"ID: 0x{state.id:02X}") print(f"Model: {state.model}") print(f"Firmware Version: {state.major_ver}.{state.minor_ver}") case "shutdown": mixer.shutdown() case "scene_recall": mixer.scene_recall(args.scene_number) case "scene_recall_default": mixer.scene_recall(0) case "watch": mixer.watch() if __name__ == "__main__": main()