234 lines
7.4 KiB
Python
Executable file
234 lines
7.4 KiB
Python
Executable file
#!/bin/python
|
|
|
|
import argparse
|
|
import socket
|
|
import mido
|
|
|
|
from enum import Enum
|
|
from dataclasses import dataclass
|
|
|
|
# For Allen & Heath Qu MIDI protocol documentation see:
|
|
# https://www.allen-heath.com/content/uploads/2023/06/Qu_MIDI_Protocol_V1.9.pdf
|
|
|
|
|
|
class Mixer:
|
|
@dataclass
|
|
class SystemState:
|
|
class QuModel(Enum):
|
|
QU16 = 1
|
|
QU24 = 2
|
|
QU32 = 3
|
|
QUPAC = 4
|
|
QUSB = 5
|
|
|
|
def __str__(self):
|
|
labels = {
|
|
1: "Qu-16",
|
|
2: "Qu-24",
|
|
3: "Qu-32",
|
|
4: "Qu-Pac",
|
|
5: "Qu-SB",
|
|
}
|
|
|
|
return labels[self.value]
|
|
|
|
sys_ex_header : list
|
|
midi_channel : int
|
|
id : int
|
|
model : QuModel
|
|
major_ver : int
|
|
minor_ver : int
|
|
|
|
def __init__(self, ip):
|
|
self.MIXER_PORT = 51325
|
|
self.sock = socket.create_connection((ip, self.MIXER_PORT))
|
|
self.mido_parser = mido.Parser()
|
|
|
|
ALLEN_HEATH_ID = [0x00, 0x00, 0x1A]
|
|
QU_MIXER = [0x50, 0x11]
|
|
MAJOR_MINOR = [0x01, 0x00]
|
|
ALL_CALL_MIDI_CHANNEL = [0x7F]
|
|
SYSEX_HEADER = ALLEN_HEATH_ID + QU_MIXER + MAJOR_MINOR
|
|
SYSEX_ALL_CALL = SYSEX_HEADER + ALL_CALL_MIDI_CHANNEL
|
|
|
|
class SysExMessageId(Enum):
|
|
GET_NAME_FROM_QU_REQUEST = 0x01
|
|
GET_NAME_FROM_QU_RESPONSE = 0x02
|
|
GET_SYSTEM_STATE_REQUEST = 0x10
|
|
GET_SYSTEM_STATE_RESPONSE = 0x11
|
|
GET_METER_DATA_REQUEST = 0x12
|
|
GET_METER_DATA_RESPONSE = 0x13
|
|
|
|
def recv_sys_ex(self, response_msg_filter: SysExMessageId = None):
|
|
while True:
|
|
data = self.sock.recv(1024)
|
|
|
|
if not data:
|
|
break
|
|
|
|
self.mido_parser.feed(data)
|
|
|
|
if self.mido_parser.pending():
|
|
msg = self.mido_parser.get_message()
|
|
|
|
print(vars(msg))
|
|
|
|
if hasattr(msg, "data"):
|
|
print("Hex: ", " ".join(f"{b:02X}" for b in msg.data))
|
|
|
|
if response_msg_filter is not None:
|
|
if msg.type != "sysex":
|
|
continue
|
|
|
|
msg_id = int(msg.data[8])
|
|
|
|
if msg_id != response_msg_filter.value:
|
|
continue
|
|
|
|
return msg
|
|
|
|
def get_system_state(self):
|
|
msg_id = self.SysExMessageId.GET_SYSTEM_STATE_REQUEST.value
|
|
i_pad_flag = 0x01
|
|
|
|
data = self.SYSEX_ALL_CALL + [msg_id, i_pad_flag]
|
|
|
|
msg = mido.Message("sysex", data=data)
|
|
msg_bytes = bytes(msg.bytes())
|
|
|
|
self.sock.sendall(msg_bytes)
|
|
print("Sent:", " ".join(f"{b:02X}" for b in msg_bytes))
|
|
|
|
response = self.recv_sys_ex(self.SysExMessageId.GET_SYSTEM_STATE_RESPONSE)
|
|
|
|
i_pad_flag = int(response.data[9])
|
|
|
|
state = self.SystemState(
|
|
sys_ex_header=response.data[:8],
|
|
midi_channel=int(response.data[7]),
|
|
id=int(response.data[8]),
|
|
model=self.SystemState.QuModel(i_pad_flag),
|
|
major_ver=int(response.data[10]),
|
|
minor_ver = int(response.data[11])
|
|
)
|
|
|
|
return state
|
|
|
|
def get_name_from_qu(self, channel_no, name_to_set):
|
|
msg_id = self.SysExMessageId.GET_NAME_FROM_QU_REQUEST.value
|
|
|
|
data = self.SYSEX_HEADER + [int(0), msg_id, int(channel_no)]
|
|
|
|
if name_to_set:
|
|
data += list(name_to_set.encode("utf-8"))
|
|
|
|
msg = mido.Message("sysex", data=data)
|
|
msg_bytes = bytes(msg.bytes())
|
|
|
|
print("Sent:", " ".join(f"{b:02X}" for b in msg_bytes))
|
|
self.sock.sendall(msg_bytes)
|
|
|
|
response = self.recv_sys_ex(self.SysExMessageId.GET_NAME_FROM_QU_RESPONSE)
|
|
|
|
sysex_header = response.data[:8]
|
|
|
|
midi_channel = int(response.data[7])
|
|
id = int(response.data[8])
|
|
channel_name = bytes(response.data[9:]).decode("utf-8")
|
|
|
|
print(f"sysex_header: {sysex_header}")
|
|
print(f"MIDI channel: {midi_channel}")
|
|
print(f"ID: 0x{id:02X}")
|
|
print(f"Channel Name: {channel_name}")
|
|
|
|
def nrpn_parameter_control(self, midi_ch, mixer_ch, id, va, vx):
|
|
a = mido.Message("control_change", channel=midi_ch, control=0x63, value=mixer_ch)
|
|
b = mido.Message("control_change", channel=midi_ch, control=0x62, value=id)
|
|
c = mido.Message("control_change", channel=midi_ch, control=0x06, value=va)
|
|
d = mido.Message("control_change", channel=midi_ch, control=0x26, value=vx)
|
|
|
|
msg_bytes = bytes(a.bytes() + b.bytes() + c.bytes() + d.bytes())
|
|
|
|
print(" ".join(f"{b:02X}" for b in msg_bytes))
|
|
self.sock.sendall(msg_bytes)
|
|
|
|
def shutdown(self):
|
|
self.nrpn_parameter_control(midi_ch=0, mixer_ch=0, id=0x5F, va=0x00, vx=0x00)
|
|
|
|
def set_bank_1(self):
|
|
msb = mido.Message("control_change", channel=0, control=0x00, value=0x00)
|
|
lsb = mido.Message("control_change", channel=0, control=0x20, value=0x00)
|
|
|
|
msg_bytes = bytes(msb.bytes() + lsb.bytes())
|
|
|
|
print(" ".join(f"{b:02X}" for b in msg_bytes))
|
|
self.sock.sendall(msg_bytes)
|
|
|
|
def scene_recall(self, scene_id):
|
|
print(f"scene_recall: scene_id={scene_id}")
|
|
|
|
self.set_bank_1()
|
|
msb = mido.Message("program_change", channel=0, program=scene_id)
|
|
|
|
msg_bytes = bytes(msb.bytes())
|
|
|
|
print(" ".join(f"{b:02X}" for b in msg_bytes))
|
|
self.sock.sendall(msg_bytes)
|
|
|
|
def watch(self):
|
|
while True:
|
|
self.recv_sys_ex()
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Allen & Heath Qu Remote Control")
|
|
parser.add_argument("ip", help="IP of the mixer")
|
|
|
|
subparsers = parser.add_subparsers(dest="command", required=True, help="Available commands")
|
|
|
|
channel_naming_parser = subparsers.add_parser("get_name_from_qu", help="Channel naming")
|
|
channel_naming_parser.add_argument(
|
|
"channel_id",
|
|
type=int,
|
|
choices=range(0, 16),
|
|
help="Number of channel to get or set its name",
|
|
)
|
|
channel_naming_parser.add_argument("-n", "--name", help="Channel name to set")
|
|
|
|
subparsers.add_parser("get_system_state", help="Get system information")
|
|
subparsers.add_parser("shutdown", help="Shut down the mixer")
|
|
|
|
scene_parser = subparsers.add_parser("scene_recall", help="Recall a specific scene")
|
|
scene_parser.add_argument("scene_number", type=int, choices=range(0, 100), help="Scene number to recall")
|
|
|
|
subparsers.add_parser("scene_recall_default", help="Set the default scene 0")
|
|
subparsers.add_parser("watch", help="Just receive data from mixer and print it to console")
|
|
|
|
args = parser.parse_args()
|
|
|
|
print(f"IP: {args.ip}")
|
|
mixer = Mixer(args.ip)
|
|
|
|
if args.command:
|
|
print(f"Command: {args.command}")
|
|
print(f"Args: {vars(args)}")
|
|
|
|
match args.command:
|
|
case "get_name_from_qu": mixer.get_name_from_qu(args.channel_id, args.name)
|
|
case "get_system_state":
|
|
state = mixer.get_system_state()
|
|
|
|
print(f"sysex_header: {state.sys_ex_header}")
|
|
print(f"MIDI channel: {state.midi_channel}")
|
|
print(f"ID: 0x{state.id:02X}")
|
|
print(f"Model: {state.model}")
|
|
print(f"Firmware Version: {state.major_ver}.{state.minor_ver}")
|
|
|
|
case "shutdown": mixer.shutdown()
|
|
case "scene_recall": mixer.scene_recall(args.scene_number)
|
|
case "scene_recall_default": mixer.scene_recall(0)
|
|
case "watch": mixer.watch()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|