diff --git a/mixer.py b/mixer.py index a2aeaa2..0bccd66 100755 --- a/mixer.py +++ b/mixer.py @@ -12,12 +12,20 @@ class Mixer: def __init__(self, ip, port): self.sock = socket.create_connection((ip, port)) - A_H_ID = [0x00, 0x00, 0x1A] + ALLEN_HEATH_ID = [0x00, 0x00, 0x1A] QU_MIXER = [0x50, 0x11] MAJOR_MINOR = [0x01, 0x00] ALL_CALL_MIDI_CHANNEL = [0x7F] + SYSEX_HEADER = ALLEN_HEATH_ID + QU_MIXER + MAJOR_MINOR + SYSEX_ALL_CALL = SYSEX_HEADER + ALL_CALL_MIDI_CHANNEL - sysex_allcall = A_H_ID + QU_MIXER + MAJOR_MINOR + ALL_CALL_MIDI_CHANNEL + class SysExMessageId(Enum): + GET_NAME_FROM_QU_REQUEST = 0x01, + GET_NAME_FROM_QU_RESPONSE = 0x02, + GET_SYSTEM_STATE_REQUEST = 0x10, + GET_SYSTEM_STATE_RESPONSE = 0x11, + GET_METER_DATA_REQUEST = 0x12, + GET_METER_DATA_RESPONSE = 0x13 def recv(self): p = mido.Parser() @@ -32,16 +40,20 @@ class Mixer: if p.pending(): msg = p.get_message() - print(msg) - print("Hex: ", " ".join(f"{b:02X}" for b in msg.data)) + + print(msg.type) + print(vars(msg)) + + if hasattr(msg, "data"): + print("Hex: ", " ".join(f"{b:02X}" for b in msg.data)) return msg def get_system_state(self): - id = 0x10 + msg_id = self.SysExMessageId.GET_SYSTEM_STATE_REQUEST i_pad_flag = 0x01 - data = self.sysex_allcall + [id, i_pad_flag] + data = self.SYSEX_ALL_CALL + [msg_id, i_pad_flag] msg = mido.Message('sysex', data=data) msg_bytes = bytes(msg.bytes()) @@ -82,25 +94,106 @@ class Mixer: print(f"Model: {QuModel(i_pad_flag)}") print(f"Firmware Version: {major_ver}.{minor_ver}") + def get_name_from_qu(self, channel_no, name_to_set): + msg_id = self.SysExMessageId.GET_NAME_FROM_QU_REQUEST + + data = self.SYSEX_HEADER + [int(0), msg_id, int(channel_no)] + + if name_to_set: + data += list(name_to_set.encode('utf-8')) + + msg = mido.Message('sysex', data=data) + msg_bytes = bytes(msg.bytes()) + + print("Sent:", " ".join(f"{b:02X}" for b in msg_bytes)) + self.sock.sendall(msg_bytes) + + response = self.recv() + + sysex_header = response.data[:8] + + midi_channel = int(response.data[7]) + id = int(response.data[8]) + channel_name = bytes(response.data[9:]).decode("utf-8") + + print(f"sysex_header: {sysex_header}") + print(f"MIDI channel: {midi_channel}") + print(f"ID: 0x{id:02X}") + print(f"Channel Name: {channel_name}") + + def nrpn_parameter_control(self, midi_ch, mixer_ch, id, va, vx): + a = mido.Message('control_change', channel=midi_ch, control=0x63, value=mixer_ch) + b = mido.Message('control_change', channel=midi_ch, control=0x62, value=id) + c = mido.Message('control_change', channel=midi_ch, control=0x06, value=va) + d = mido.Message('control_change', channel=midi_ch, control=0x26, value=vx) + + msg_bytes = bytes(a.bytes() + b.bytes() + c.bytes() + d.bytes()) + + print(' '.join(f"{b:02X}" for b in msg_bytes)) + self.sock.sendall(msg_bytes) + + def shutdown(self): + self.nrpn_parameter_control(midi_ch=0, mixer_ch=0, id=0x5F, va=0x00, vx=0x00) + + def set_bank_1(self): + msb = mido.Message('control_change', channel=0, control=0x00, value=0x00) + lsb = mido.Message('control_change', channel=0, control=0x20, value=0x00) + + msg_bytes = bytes(msb.bytes() + lsb.bytes()) + + print(' '.join(f"{b:02X}" for b in msg_bytes)) + self.sock.sendall(msg_bytes) + + def scene_recall(self, scene_id): + print(f"scene_recall: scene_id={scene_id}", scene_id) + + self.set_bank_1() + msb = mido.Message('program_change', channel=0, program=scene_id) + + msg_bytes = bytes(msb.bytes()) + + print(' '.join(f"{b:02X}" for b in msg_bytes)) + self.sock.sendall(msg_bytes) + + def watch(self): + while True: + self.recv() + def main(): parser = argparse.ArgumentParser(description="Allen & Heath Qu Remote Control") parser.add_argument("ip", help="IP of the mixer") - parser.add_argument("command", help="Command to execute") + + subparsers = parser.add_subparsers(dest="command", required=True, help="Available commands") + + channel_naming_parser = subparsers.add_parser("get_name_from_qu", help="Channel naming") + channel_naming_parser.add_argument("channel_id", type=int, choices=range(0, 16), help="Number of channel to get or set its name") + channel_naming_parser.add_argument("-n", "--name", help="Channel name to set") + + subparsers.add_parser("get_system_state", help="Get system information") + subparsers.add_parser("shutdown", help="Shut down the mixer") + + scene_parser = subparsers.add_parser("scene_recall", help="Recall a specific scene") + scene_parser.add_argument("scene_number", type=int, choices=range(0, 100), help="Scene number to recall") + + subparsers.add_parser("scene_recall_default", help="Set the default scene 0") + subparsers.add_parser("watch", help="Just receive data from mixer and print it to console") args = parser.parse_args() - if not args.ip: - print("ip is missing!") - return 1 - print(f"IP: {args.ip}") + mixer = Mixer(args.ip, MIXER_PORT) if args.command: print(f"Command: {args.command}") + print(f"Args: {vars(args)}") - if args.command == 'get_system_state': - mixer = Mixer(args.ip, MIXER_PORT) - mixer.get_system_state() + match args.command: + case 'get_name_from_qu': mixer.get_name_from_qu(args.channel_id, args.name) + case 'get_system_state': mixer.get_system_state() + case 'shutdown': mixer.shutdown() + case 'scene_recall': mixer.scene_recall(args.scene_number) + case 'scene_recall_default': mixer.scene_recall(0) + case 'watch': mixer.watch() if __name__ == '__main__': main()