mixer-rpc/mixer.py

190 lines
5.3 KiB
Python
Executable file

#!/bin/python
import argparse
import socket
import mido
from enum import Enum
MIXER_PORT = 51325
class Mixer:
def __init__(self, ip, port):
self.sock = socket.create_connection((ip, port))
ALLEN_HEATH_ID = [0x00, 0x00, 0x1A]
QU_MIXER = [0x50, 0x11]
MAJOR_MINOR = [0x01, 0x00]
ALL_CALL_MIDI_CHANNEL = [0x7F]
SYSEX_HEADER = ALLEN_HEATH_ID + QU_MIXER + MAJOR_MINOR
SYSEX_ALL_CALL = SYSEX_HEADER + ALL_CALL_MIDI_CHANNEL
def recv(self):
p = mido.Parser()
while True:
data = self.sock.recv(1024)
if not data:
break
p.feed(data)
if p.pending():
msg = p.get_message()
# print(msg.type)
print(vars(msg))
if hasattr(msg, "data"):
print("Hex: ", " ".join(f"{b:02X}" for b in msg.data))
return msg
def get_system_state(self):
msg_id = 0x10
i_pad_flag = 0x01
data = self.SYSEX_ALL_CALL + [msg_id, i_pad_flag]
msg = mido.Message('sysex', data=data)
msg_bytes = bytes(msg.bytes())
self.sock.sendall(msg_bytes)
print("Sent:", " ".join(f"{b:02X}" for b in msg_bytes))
response = self.recv()
class QuModel(Enum):
QU16 = 1
QU24 = 2
QU32 = 3
QUPAC = 4
QUSB = 5
def __str__(self):
labels = {
1: "Qu-16",
2: "Qu-24",
3: "Qu-32",
4: "Qu-Pac",
5: "Qu-SB",
}
return labels[self.value]
sysex_header = response.data[:8]
midi_channel = int(response.data[7])
id = int(response.data[8])
i_pad_flag = int(response.data[9])
major_ver = int(response.data[10])
minor_ver = int(response.data[11])
print(f"sysex_header: {sysex_header}")
print(f"MIDI channel: {midi_channel}")
print(f"ID: 0x{id:02X}")
print(f"Model: {QuModel(i_pad_flag)}")
print(f"Firmware Version: {major_ver}.{minor_ver}")
def get_name_from_qu(self, channel_no):
msg_id = 0x01
data = self.SYSEX_HEADER + [int(0), msg_id, int(channel_no)]
msg = mido.Message('sysex', data=data)
msg_bytes = bytes(msg.bytes())
self.sock.sendall(msg_bytes)
print("Sent:", " ".join(f"{b:02X}" for b in msg_bytes))
response = self.recv()
sysex_header = response.data[:8]
midi_channel = int(response.data[7])
id = int(response.data[8])
channel_name = bytes(response.data[9:]).decode("utf-8")
print(f"sysex_header: {sysex_header}")
print(f"MIDI channel: {midi_channel}")
print(f"ID: 0x{id:02X}")
print(f"Channel Name: {channel_name}")
def nrpn_parameter_control(self, midi_ch, mixer_ch, id, va, vx):
a = mido.Message('control_change', channel=midi_ch, control=0x63, value=mixer_ch)
b = mido.Message('control_change', channel=midi_ch, control=0x62, value=id)
c = mido.Message('control_change', channel=midi_ch, control=0x06, value=va)
d = mido.Message('control_change', channel=midi_ch, control=0x26, value=vx)
msg_bytes = bytes(a.bytes() + b.bytes() + c.bytes() + d.bytes())
print(' '.join(f"{b:02X}" for b in msg_bytes))
self.sock.sendall(msg_bytes)
def shutdown(self):
self.nrpn_parameter_control(midi_ch=0, mixer_ch=0, id=0x5F, va=0x00, vx=0x00)
def set_bank_1(self):
msb = mido.Message('control_change', channel=0, control=0x00, value=0x00)
lsb = mido.Message('control_change', channel=0, control=0x20, value=0x00)
msg_bytes = bytes(msb.bytes() + lsb.bytes())
print(' '.join(f"{b:02X}" for b in msg_bytes))
self.sock.sendall(msg_bytes)
def scene_recall(self, scene_id):
self.set_bank_1()
msb = mido.Message('program_change', channel=0, program=scene_id)
msg_bytes = bytes(msb.bytes())
print(' '.join(f"{b:02X}" for b in msg_bytes))
self.sock.sendall(msg_bytes)
def watch(self):
while True:
self.recv()
def main():
parser = argparse.ArgumentParser(description="Allen & Heath Qu Remote Control")
parser.add_argument("ip", help="IP of the mixer")
parser.add_argument("command", help="Command to execute")
args = parser.parse_args()
if not args.ip:
print("ip is missing!")
return 1
print(f"IP: {args.ip}")
if args.command:
print(f"Command: {args.command}")
match args.command:
case 'get_name_from_qu':
mixer = Mixer(args.ip, MIXER_PORT)
mixer.get_name_from_qu(0)
case 'get_system_state':
mixer = Mixer(args.ip, MIXER_PORT)
mixer.get_system_state()
case 'shutdown':
mixer = Mixer(args.ip, MIXER_PORT)
mixer.shutdown()
case 'set_default_layer':
mixer = Mixer(args.ip, MIXER_PORT)
mixer.scene_recall(0)
case 'watch':
mixer = Mixer(args.ip, MIXER_PORT)
mixer.watch()
case 'scene_recall':
mixer = Mixer(args.ip, MIXER_PORT)
mixer.scene_recall(0)
if __name__ == '__main__':
main()