Skip to content
This repository was archived by the owner on Apr 5, 2023. It is now read-only.

Bidirectional MIDI controls #19

Merged
merged 8 commits into from
Apr 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 130 additions & 47 deletions main.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!/usr/bin/env python3
from __future__ import division
from websocket import WebSocketApp
from tinydb import TinyDB
Expand Down Expand Up @@ -28,6 +29,16 @@
"message-id": "%d",
"sourceName": "%s",
"filterName": "%s"
}""",
"SetCurrentScene": """{
"request-type": "GetCurrentScene",
"message-id": "%d",
"_unused": "%s"
}""",
"SetPreviewScene": """{
"request-type": "GetPreviewScene",
"message-id": "%d",
"_unused": "%s"
}"""
}

Expand Down Expand Up @@ -58,23 +69,43 @@ def __init__(self, device, deviceid):
self.log = get_logger("midi_to_obs_device")
self._id = deviceid
self._devicename = device["devicename"]
self._port = 0
self._port_in = 0
self._port_out = 0

try:
self.log.debug("Attempting to open midi port `%s`" % self._devicename)
self._port = mido.open_input(name=self._devicename, callback=self.callback)
# a device can be input, output or ioport. in the latter case it can also be the other two
# so we first check if we can use it as an ioport
if self._devicename in mido.get_ioport_names():
self._port_in = mido.open_ioport(name=self._devicename, callback=self.callback, autoreset=True)
self._port_out = self._port_in
# otherwise we try to use it separately as input and output
else:
if self._devicename in mido.get_input_names():
self._port_in = mido.open_input(name=self._devicename, callback=self.callback)
if self._devicename in mido.get_output_names():
self._port_out = mido.open_output(name=self._devicename, callback=self.callback, autoreset=True)
except:
self.log.critical("\nCould not open", self._devicename)
self.log.critical("The midi device might be used by another application/not plugged in/have a different name.")
self.log.critical("Please close the device in the other application/plug it in/select the rename option in the device management menu and restart this script.\n")
self.log.critical("Please close the device in the other application/plug it in/select the rename option in the device management menu and restart this script.")
self.log.critical("Currently connected devices:")
for name in mido.get_input_names():
self.log.critical(" - %s" % name)
# EIO 5 (Input/output error)
exit(5)

def callback(self, msg):
handler.handle_midi_input(msg, self._id, self._devicename)

def close(self):
self._port_close()
if self._port_in:
self._port_in.close()
# when it's an ioport we don't want to close the port twice
if self._port_out and self._port_out != self._port_in:
self._port_out.close()
self._port_in = 0
self._port_out = 0

class MidiHandler:
# Initializes the handler class
Expand Down Expand Up @@ -113,7 +144,7 @@ def __init__(self, config_path="config.json", ws_server="localhost", ws_port=444
exit(2)

self.log.debug("Successfully imported mapping database")

result = tiny_devdb.all()
if not result:
self.log.critical("Config file %s doesn't exist or is damaged" % config_path)
Expand All @@ -125,21 +156,27 @@ def __init__(self, config_path="config.json", ws_server="localhost", ws_port=444
self.log.debug("Retrieved MIDI port name(s) `%s`" % result)
#create new class with handler and open from there, just create new instances
for device in result:
self._portobjects.append(DeviceHandler(device, device.doc_id))
self._portobjects.append((DeviceHandler(device, device.doc_id), device.doc_id))

self.log.info("Successfully initialized midi port(s)")
del result

# close tinydb
tiny_database.close()

# setting up a Websocket client
self.log.debug("Attempting to connect to OBS using websocket protocol")
self.obs_socket = WebSocketApp("ws://%s:%d" % (ws_server, ws_port))
self.obs_socket.on_message = self.handle_obs_message
self.obs_socket.on_error = self.handle_obs_error
self.obs_socket.on_close = self.handle_obs_close
self.obs_socket.on_open = self.handle_obs_open
self.obs_socket.on_message = lambda ws, message: self.handle_obs_message(ws, message)
self.obs_socket.on_error = lambda ws, error: self.handle_obs_error(ws, error)
self.obs_socket.on_close = lambda ws: self.handle_obs_close(ws)
self.obs_socket.on_open = lambda ws: self.handle_obs_open(ws)

def getPortObject(self, mapping):
deviceID = mapping.get("out_deviceID", mapping["deviceID"])
for portobject, _deviceID in self._portobjects:
if _deviceID == deviceID:
return portobject

def handle_midi_input(self, message, deviceID, deviceName):
self.log.debug("Received %s %s %s %s %s", str(message), "from device", deviceID, "/", deviceName)
Expand Down Expand Up @@ -201,7 +238,7 @@ def handle_midi_fader(self, deviceID, control, value):
if command == "SetSourceRotation" or command == "SetTransitionDuration" or command == "SetSyncOffset" or command == "SetSourcePosition":
self.obs_socket.send(action % int(scaled))

def handle_obs_message(self, message):
def handle_obs_message(self, ws, message):
self.log.debug("Received new message from OBS")
payload = json.loads(message)

Expand All @@ -211,39 +248,69 @@ def handle_obs_message(self, message):
self.log.error("OBS returned error: %s" % payload["error"])
return

message_id = payload["message-id"]
if "message-id" in payload:
message_id = payload["message-id"]

self.log.debug("Looking for action with message id `%s`" % message_id)
for action in self._action_buffer:
(buffered_id, template, kind) = action
self.log.debug("Looking for action with message id `%s`" % message_id)
for action in self._action_buffer:
(buffered_id, template, kind) = action

if buffered_id != int(payload["message-id"]):
continue
if buffered_id != int(payload["message-id"]):
continue

del buffered_id
self.log.info("Action `%s` was requested by OBS" % kind)

if kind == "ToggleSourceVisibility":
# Dear lain, I so miss decent ternary operators...
invisible = "false" if payload["visible"] else "true"
self.obs_socket.send(template % invisible)
elif kind == "ReloadBrowserSource":
source = payload["sourceSettings"]["url"]
target = source[0:-1] if source[-1] == '#' else source + '#'
self.obs_socket.send(template % target)
elif kind == "ToggleSourceFilter":
invisible = "false" if payload["enabled"] else "true"
self.obs_socket.send(template % invisible)

self.log.debug("Removing action with message id %s from buffer" % message_id)
self._action_buffer.remove(action)
break

if message_id == "MIDItoOBSscreenshot":
if payload["status"] == "ok":
with open(str(time()) + ".png", "wb") as fh:
fh.write(base64.decodebytes(payload["img"][22:].encode()))

del buffered_id
self.log.info("Action `%s` was requested by OBS" % kind)

if kind == "ToggleSourceVisibility":
# Dear lain, I so miss decent ternary operators...
invisible = "false" if payload["visible"] else "true"
self.obs_socket.send(template % invisible)
elif kind == "ReloadBrowserSource":
source = payload["sourceSettings"]["url"]
target = source[0:-1] if source[-1] == '#' else source + '#'
self.obs_socket.send(template % target)
elif kind == "ToggleSourceFilter":
invisible = "false" if payload["enabled"] else "true"
self.obs_socket.send(template % invisible)
elif kind in ["SetCurrentScene", "SetPreviewScene"]:
self.sceneChanged(kind, payload["name"])

self.log.debug("Removing action with message id %s from buffer" % message_id)
self._action_buffer.remove(action)
break

if message_id == "MIDItoOBSscreenshot":
if payload["status"] == "ok":
with open(str(time()) + ".png", "wb") as fh:
fh.write(base64.decodebytes(payload["img"][22:].encode()))

elif "update-type" in payload:
update_type = payload["update-type"]

request_types = {"PreviewSceneChanged": "SetPreviewScene", "SwitchScenes": "SetCurrentScene"}
if update_type in request_types:
scene_name = payload["scene-name"]
self.sceneChanged(request_types[update_type], scene_name)

def sceneChanged(self, event_type, scene_name):
self.log.debug("Scene changed, event: %s, name: %s" % (event_type, scene_name))
# only buttons can change the scene, so we can limit our search to those
results = self.mappingdb.getmany(self.mappingdb.find('input_type == "button" and bidirectional == 1'))
if not results:
return
for result in results:
j = json.loads(result["action"])
if j["request-type"] != event_type:
continue
msgNoC = result.get("out_msgNoC", result["msgNoC"])
portobject = self.getPortObject(result)
if portobject and portobject._port_out:
if result["msg_type"] == "control_change":
value = 127 if j["scene-name"] == scene_name else 0
portobject._port_out.send(mido.Message(type="control_change", channel=0, control=msgNoC, value=value))
elif result["msg_type"] == "note_on":
velocity = 1 if j["scene-name"] == scene_name else 0
portobject._port_out.send(mido.Message(type="note_on", channel=0, note=msgNoC, velocity=velocity))

def handle_obs_error(self, ws, error=None):
# Protection against potential inconsistencies in `inspect.ismethod`
Expand All @@ -252,17 +319,21 @@ def handle_obs_error(self, ws, error=None):

if isinstance(error, (KeyboardInterrupt, SystemExit)):
self.log.info("Keyboard interrupt received, gracefully exiting...")
self.close(teardown=True)
else:
self.log.error("Websocket error: %" % str(error))

def handle_obs_close(self, ws):
self.log.error("OBS has disconnected, timed out or isn't running")
self.log.error("Please reopen OBS and restart the script")
self.close(teardown=True)

def handle_obs_open(self, ws):
self.log.info("Successfully connected to OBS")

# initialize bidirectional controls
self.send_action({"action": 'GetCurrentScene', "request": "SetCurrentScene", "target": ":-)"})
self.send_action({"action": 'GetPreviewScene', "request": "SetPreviewScene", "target": ":-)"})

def send_action(self, action_request):
action = action_request.get("action")
if not action:
Expand Down Expand Up @@ -308,10 +379,22 @@ def start(self):
self.obs_socket.run_forever()

def close(self, teardown=False):
# set bidirectional controls to their 0 state (i.e., turn off LEDs)
self.log.debug("Attempting to turn off bidirectional controls")
result = self.mappingdb.getmany(self.mappingdb.find('bidirectional == 1'))
if result:
for row in result:
msgNoC = row.get("out_msgNoC", row["msgNoC"])
portobject = self.getPortObject(row)
if portobject and portobject._port_out:
if row["msg_type"] == "control_change":
portobject._port_out.send(mido.Message(type="control_change", channel=0, control=msgNoC, value=0))
elif row["msg_type"] == "note_on":
portobject._port_out.send(mido.Message(type="note_on", channel=0, note=msgNoC, velocity=0))

self.log.debug("Attempting to close midi port(s)")
result = self.devdb.all()
for device in result:
device.close()
for portobject, _ in self._portobjects:
portobject.close()

self.log.info("Midi connection has been closed successfully")

Expand Down
20 changes: 13 additions & 7 deletions setup.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!/usr/bin/env python3
import mido, threading, sys, atexit, json, time, signal
from tinydb import TinyDB, Query
from websocket import create_connection
Expand Down Expand Up @@ -247,13 +248,15 @@ def setupButtonEvents(action, NoC, msgType, deviceID):
if action == "SetCurrentScene":
updateSceneList()
scene = printArraySelect(sceneListShort)
bidirectional = askForBidirectional()
action = jsonArchive["SetCurrentScene"] % scene
saveButtonToFile(msgType, NoC, "button" , action, deviceID)
saveButtonToFile(msgType, NoC, "button" , action, deviceID, bidirectional)
elif action == "SetPreviewScene":
updateSceneList()
scene = printArraySelect(sceneListShort)
bidirectional = askForBidirectional()
action = jsonArchive["SetPreviewScene"] % scene
saveButtonToFile(msgType, NoC, "button" , action, deviceID)
saveButtonToFile(msgType, NoC, "button" , action, deviceID, bidirectional)
elif action == "TransitionToProgram":
updateTransitionList()
print("Please select a transition to be used:")
Expand Down Expand Up @@ -524,15 +527,13 @@ def saveFaderToFile(msg_type, msgNoC, input_type, action, scale, cmd, deviceID):
else:
db.insert({"msg_type": msg_type, "msgNoC": msgNoC, "input_type": input_type, "scale_low": scale[0], "scale_high": scale[1], "action": action, "cmd": cmd, "deviceID": deviceID})

def saveButtonToFile(msg_type, msgNoC, input_type, action, deviceID):
print("Saved %s with note/control %s for action %s on device %s" % (msg_type, msgNoC, action, deviceID))
def saveButtonToFile(msg_type, msgNoC, input_type, action, deviceID, bidirectional=False):
print("Saved %s with note/control %s for action %s on device %s, bidirectional: %d" % (msg_type, msgNoC, action, deviceID, bidirectional))
Search = Query()
result = db.search((Search.msg_type == msg_type) & (Search.msgNoC == msgNoC) & (Search.deviceID == deviceID))
if result:
db.remove((Search.msgNoC == msgNoC) & (Search.deviceID == deviceID))
db.insert({"msg_type": msg_type, "msgNoC": msgNoC, "input_type": input_type, "action" : action, "deviceID": deviceID})
else:
db.insert({"msg_type": msg_type, "msgNoC": msgNoC, "input_type": input_type, "action" : action, "deviceID": deviceID})
db.insert({"msg_type": msg_type, "msgNoC": msgNoC, "input_type": input_type, "action" : action, "deviceID": deviceID, "bidirectional": bidirectional})

def saveTODOButtonToFile(msg_type, msgNoC, input_type, action, request, target, field2, deviceID):
print("Saved %s with note/control %s for action %s on device %s" % (msg_type, msgNoC, action, deviceID))
Expand Down Expand Up @@ -560,6 +561,11 @@ def askForInputScaling():
high = int(input("Select higher output value: "))
return low, high

def askForBidirectional():
print("Do you want the control to be bidirectional?\n1: Yes\n2: No")
bidirectional = int(input("Select 1 or 2: "))
return bidirectional == 1

def updateTransitionList():
global transitionList
ws = create_connection("ws://" + serverIP + ":" + serverPort)
Expand Down