diff --git a/mixer.py b/mixer.py index 0bccd66..a2aeaa2 100755 --- a/mixer.py +++ b/mixer.py @@ -12,20 +12,12 @@ class Mixer: def __init__(self, ip, port): self.sock = socket.create_connection((ip, port)) - ALLEN_HEATH_ID = [0x00, 0x00, 0x1A] + A_H_ID = [0x00, 0x00, 0x1A] QU_MIXER = [0x50, 0x11] MAJOR_MINOR = [0x01, 0x00] ALL_CALL_MIDI_CHANNEL = [0x7F] - SYSEX_HEADER = ALLEN_HEATH_ID + QU_MIXER + MAJOR_MINOR - SYSEX_ALL_CALL = SYSEX_HEADER + ALL_CALL_MIDI_CHANNEL - class SysExMessageId(Enum): - GET_NAME_FROM_QU_REQUEST = 0x01, - GET_NAME_FROM_QU_RESPONSE = 0x02, - GET_SYSTEM_STATE_REQUEST = 0x10, - GET_SYSTEM_STATE_RESPONSE = 0x11, - GET_METER_DATA_REQUEST = 0x12, - GET_METER_DATA_RESPONSE = 0x13 + sysex_allcall = A_H_ID + QU_MIXER + MAJOR_MINOR + ALL_CALL_MIDI_CHANNEL def recv(self): p = mido.Parser() @@ -40,20 +32,16 @@ class Mixer: if p.pending(): msg = p.get_message() - - print(msg.type) - print(vars(msg)) - - if hasattr(msg, "data"): - print("Hex: ", " ".join(f"{b:02X}" for b in msg.data)) + print(msg) + print("Hex: ", " ".join(f"{b:02X}" for b in msg.data)) return msg def get_system_state(self): - msg_id = self.SysExMessageId.GET_SYSTEM_STATE_REQUEST + id = 0x10 i_pad_flag = 0x01 - data = self.SYSEX_ALL_CALL + [msg_id, i_pad_flag] + data = self.sysex_allcall + [id, i_pad_flag] msg = mido.Message('sysex', data=data) msg_bytes = bytes(msg.bytes()) @@ -94,106 +82,25 @@ class Mixer: print(f"Model: {QuModel(i_pad_flag)}") print(f"Firmware Version: {major_ver}.{minor_ver}") - def get_name_from_qu(self, channel_no, name_to_set): - msg_id = self.SysExMessageId.GET_NAME_FROM_QU_REQUEST - - data = self.SYSEX_HEADER + [int(0), msg_id, int(channel_no)] - - if name_to_set: - data += list(name_to_set.encode('utf-8')) - - msg = mido.Message('sysex', data=data) - msg_bytes = bytes(msg.bytes()) - - print("Sent:", " ".join(f"{b:02X}" for b in msg_bytes)) - self.sock.sendall(msg_bytes) - - response = self.recv() - - sysex_header = response.data[:8] - - midi_channel = int(response.data[7]) - id = int(response.data[8]) - channel_name = bytes(response.data[9:]).decode("utf-8") - - print(f"sysex_header: {sysex_header}") - print(f"MIDI channel: {midi_channel}") - print(f"ID: 0x{id:02X}") - print(f"Channel Name: {channel_name}") - - def nrpn_parameter_control(self, midi_ch, mixer_ch, id, va, vx): - a = mido.Message('control_change', channel=midi_ch, control=0x63, value=mixer_ch) - b = mido.Message('control_change', channel=midi_ch, control=0x62, value=id) - c = mido.Message('control_change', channel=midi_ch, control=0x06, value=va) - d = mido.Message('control_change', channel=midi_ch, control=0x26, value=vx) - - msg_bytes = bytes(a.bytes() + b.bytes() + c.bytes() + d.bytes()) - - print(' '.join(f"{b:02X}" for b in msg_bytes)) - self.sock.sendall(msg_bytes) - - def shutdown(self): - self.nrpn_parameter_control(midi_ch=0, mixer_ch=0, id=0x5F, va=0x00, vx=0x00) - - def set_bank_1(self): - msb = mido.Message('control_change', channel=0, control=0x00, value=0x00) - lsb = mido.Message('control_change', channel=0, control=0x20, value=0x00) - - msg_bytes = bytes(msb.bytes() + lsb.bytes()) - - print(' '.join(f"{b:02X}" for b in msg_bytes)) - self.sock.sendall(msg_bytes) - - def scene_recall(self, scene_id): - print(f"scene_recall: scene_id={scene_id}", scene_id) - - self.set_bank_1() - msb = mido.Message('program_change', channel=0, program=scene_id) - - msg_bytes = bytes(msb.bytes()) - - print(' '.join(f"{b:02X}" for b in msg_bytes)) - self.sock.sendall(msg_bytes) - - def watch(self): - while True: - self.recv() - def main(): parser = argparse.ArgumentParser(description="Allen & Heath Qu Remote Control") parser.add_argument("ip", help="IP of the mixer") - - subparsers = parser.add_subparsers(dest="command", required=True, help="Available commands") - - channel_naming_parser = subparsers.add_parser("get_name_from_qu", help="Channel naming") - channel_naming_parser.add_argument("channel_id", type=int, choices=range(0, 16), help="Number of channel to get or set its name") - channel_naming_parser.add_argument("-n", "--name", help="Channel name to set") - - subparsers.add_parser("get_system_state", help="Get system information") - subparsers.add_parser("shutdown", help="Shut down the mixer") - - scene_parser = subparsers.add_parser("scene_recall", help="Recall a specific scene") - scene_parser.add_argument("scene_number", type=int, choices=range(0, 100), help="Scene number to recall") - - subparsers.add_parser("scene_recall_default", help="Set the default scene 0") - subparsers.add_parser("watch", help="Just receive data from mixer and print it to console") + parser.add_argument("command", help="Command to execute") args = parser.parse_args() + if not args.ip: + print("ip is missing!") + return 1 + print(f"IP: {args.ip}") - mixer = Mixer(args.ip, MIXER_PORT) if args.command: print(f"Command: {args.command}") - print(f"Args: {vars(args)}") - match args.command: - case 'get_name_from_qu': mixer.get_name_from_qu(args.channel_id, args.name) - case 'get_system_state': mixer.get_system_state() - case 'shutdown': mixer.shutdown() - case 'scene_recall': mixer.scene_recall(args.scene_number) - case 'scene_recall_default': mixer.scene_recall(0) - case 'watch': mixer.watch() + if args.command == 'get_system_state': + mixer = Mixer(args.ip, MIXER_PORT) + mixer.get_system_state() if __name__ == '__main__': main()