MQTT_Aug7CreativeTech/godot_client/addons/mqtt/mqtt.gd
2024-08-06 11:33:18 -04:00

479 lines
14 KiB
GDScript

extends Node
# MQTT client implementation in GDScript
# Loosely based on https://github.com/pycom/pycom-libraries/blob/master/lib/mqtt/mqtt.py
# and initial work by Alex J Lennon <ajlennon@dynamicdevices.co.uk>
# but then heavily rewritten to follow https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html
# mosquitto_sub -h test.mosquitto.org -v -t "metest/#"
# mosquitto_pub -h test.mosquitto.org -t "metest/retain" -m "retained message" -r
@export var client_id = ""
@export var verbose_level = 2 # 0 quiet, 1 connections and subscriptions, 2 all messages
@export var binarymessages = false
@export var pinginterval = 30
var socket = null
var sslsocket = null
var websocket = null
const BCM_NOCONNECTION = 0
const BCM_WAITING_WEBSOCKET_CONNECTION = 1
const BCM_WAITING_SOCKET_CONNECTION = 2
const BCM_WAITING_SSL_SOCKET_CONNECTION = 3
const BCM_FAILED_CONNECTION = 5
const BCM_WAITING_CONNMESSAGE = 10
const BCM_WAITING_CONNACK = 19
const BCM_CONNECTED = 20
var brokerconnectmode = BCM_NOCONNECTION
var regexbrokerurl = RegEx.new()
const DEFAULTBROKERPORT_TCP = 1883
const DEFAULTBROKERPORT_SSL = 8886
const DEFAULTBROKERPORT_WS = 8080
const DEFAULTBROKERPORT_WSS = 8081
const CP_PINGREQ = 0xc0
const CP_PINGRESP = 0xd0
const CP_CONNACK = 0x20
const CP_CONNECT = 0x10
const CP_PUBLISH = 0x30
const CP_SUBSCRIBE = 0x82
const CP_UNSUBSCRIBE = 0xa2
const CP_PUBREC = 0x40
const CP_SUBACK = 0x90
const CP_UNSUBACK = 0xb0
var pid = 0
var user = null
var pswd = null
var keepalive = 120
var lw_topic = null
var lw_msg = null
var lw_qos = 0
var lw_retain = false
signal received_message(topic, message)
signal broker_connected()
signal broker_disconnected()
signal broker_connection_failed()
signal publish_acknowledge(pid)
var receivedbuffer : PackedByteArray = PackedByteArray()
var common_name = null
func senddata(data):
var E = 0
if sslsocket != null:
E = sslsocket.put_data(data)
elif socket != null:
E = socket.put_data(data)
elif websocket != null:
E = websocket.put_packet(data)
if E != 0:
print("bad senddata packet E=", E)
func receiveintobuffer():
if sslsocket != null:
var sslsocketstatus = sslsocket.get_status()
if sslsocketstatus == StreamPeerTLS.STATUS_CONNECTED or sslsocketstatus == StreamPeerTLS.STATUS_HANDSHAKING:
sslsocket.poll()
var n = sslsocket.get_available_bytes()
if n != 0:
var sv = sslsocket.get_data(n)
assert (sv[0] == 0) # error code
receivedbuffer.append_array(sv[1])
elif socket != null and socket.get_status() == StreamPeerTCP.STATUS_CONNECTED:
socket.poll()
var n = socket.get_available_bytes()
if n != 0:
var sv = socket.get_data(n)
assert (sv[0] == 0) # error code
receivedbuffer.append_array(sv[1])
elif websocket != null:
websocket.poll()
while websocket.get_available_packet_count() != 0:
receivedbuffer.append_array(websocket.get_packet())
var pingticksnext0 = 0
func _process(delta):
if brokerconnectmode == BCM_NOCONNECTION:
pass
elif brokerconnectmode == BCM_WAITING_WEBSOCKET_CONNECTION:
websocket.poll()
var websocketstate = websocket.get_ready_state()
if websocketstate == WebSocketPeer.STATE_CLOSED:
if verbose_level:
print("WebSocket closed with code: %d, reason %s." % [websocket.get_close_code(), websocket.get_close_reason()])
brokerconnectmode = BCM_FAILED_CONNECTION
emit_signal("broker_connection_failed")
elif websocketstate == WebSocketPeer.STATE_OPEN:
brokerconnectmode = BCM_WAITING_CONNMESSAGE
if verbose_level:
print("Websocket connection now open")
elif brokerconnectmode == BCM_WAITING_SOCKET_CONNECTION:
socket.poll()
var socketstatus = socket.get_status()
if socketstatus == StreamPeerTCP.STATUS_ERROR:
if verbose_level:
print("TCP socket error")
brokerconnectmode = BCM_FAILED_CONNECTION
emit_signal("broker_connection_failed")
if socketstatus == StreamPeerTCP.STATUS_CONNECTED:
brokerconnectmode = BCM_WAITING_CONNMESSAGE
elif brokerconnectmode == BCM_WAITING_SSL_SOCKET_CONNECTION:
socket.poll()
var socketstatus = socket.get_status()
if socketstatus == StreamPeerTCP.STATUS_ERROR:
if verbose_level:
print("TCP socket error before SSL")
brokerconnectmode = BCM_FAILED_CONNECTION
emit_signal("broker_connection_failed")
if socketstatus == StreamPeerTCP.STATUS_CONNECTED:
if sslsocket == null:
sslsocket = StreamPeerTLS.new()
if verbose_level:
print("Connecting socket to SSL with common_name=", common_name)
var E3 = sslsocket.connect_to_stream(socket, common_name)
if E3 != 0:
print("bad sslsocket.connect_to_stream E=", E3)
brokerconnectmode = BCM_FAILED_CONNECTION
emit_signal("broker_connection_failed")
sslsocket = null
if sslsocket != null:
sslsocket.poll()
var sslsocketstatus = sslsocket.get_status()
if sslsocketstatus == StreamPeerTLS.STATUS_CONNECTED:
brokerconnectmode = BCM_WAITING_CONNMESSAGE
elif sslsocketstatus >= StreamPeerTLS.STATUS_ERROR:
print("bad sslsocket.connect_to_stream")
emit_signal("broker_connection_failed")
elif brokerconnectmode == BCM_WAITING_CONNMESSAGE:
senddata(firstmessagetoserver())
brokerconnectmode = BCM_WAITING_CONNACK
elif brokerconnectmode == BCM_WAITING_CONNACK or brokerconnectmode == BCM_CONNECTED:
receiveintobuffer()
while wait_msg():
pass
if brokerconnectmode == BCM_CONNECTED and pingticksnext0 < Time.get_ticks_msec():
pingreq()
pingticksnext0 = Time.get_ticks_msec() + pinginterval*1000
elif brokerconnectmode == BCM_FAILED_CONNECTION:
cleanupsockets()
func _ready():
regexbrokerurl.compile('^(tcp://|wss://|ws://|ssl://)?([^:\\s]+)(:\\d+)?(/\\S*)?$')
if client_id == "":
randomize()
client_id = "rr%d" % randi()
func set_last_will(stopic, smsg, retain=false, qos=0):
assert((0 <= qos) and (qos <= 2))
assert(stopic)
self.lw_topic = stopic.to_ascii_buffer()
self.lw_msg = smsg if binarymessages else smsg.to_ascii_buffer()
self.lw_qos = qos
self.lw_retain = retain
if verbose_level:
print("LASTWILL%s topic=%s msg=%s" % [ " <retain>" if retain else "", stopic, smsg])
func set_user_pass(suser, spswd):
if suser != null:
self.user = suser.to_ascii_buffer()
self.pswd = spswd.to_ascii_buffer()
else:
self.user = null
self.pswd = null
func firstmessagetoserver():
var clean_session = true
var msg = PackedByteArray()
msg.append(CP_CONNECT);
msg.append(0x00);
msg.append(0x00);
msg.append(0x04);
msg.append_array("MQTT".to_ascii_buffer());
msg.append(0x04);
msg.append(0x02);
msg.append(0x00);
msg.append(0x3C);
msg[1] = 10 + 2 + len(self.client_id)
msg[9] = (1<<1) if clean_session else 0
if self.user != null:
msg[1] += 2 + len(self.user) + 2 + len(self.pswd)
msg[9] |= 0xC0
if self.keepalive:
assert(self.keepalive < 65536)
msg[10] |= self.keepalive >> 8
msg[11] |= self.keepalive & 0x00FF
if self.lw_topic:
msg[1] += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
msg[9] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
msg[9] |= 1<<5 if self.lw_retain else 0
msg.append(len(self.client_id) >> 8)
msg.append(self.client_id.length() & 0xFF)
msg.append_array(self.client_id.to_ascii_buffer())
if self.lw_topic:
msg.append(len(self.lw_topic) >> 8)
msg.append(len(self.lw_topic) & 0xFF)
msg.append_array(self.lw_topic)
msg.append(len(self.lw_msg) >> 8)
msg.append(len(self.lw_msg) & 0xFF)
msg.append_array(self.lw_msg)
if self.user != null:
msg.append(len(self.user) >> 8)
msg.append(len(self.user) & 0xFF)
msg.append_array(self.user)
msg.append(len(self.pswd) >> 8)
msg.append(len(self.pswd) & 0xFF)
msg.append_array(self.pswd)
return msg
func cleanupsockets(retval=false):
if verbose_level:
print("cleanupsockets")
if socket:
if sslsocket:
sslsocket = null
socket.disconnect_from_host()
socket = null
else:
assert (sslsocket == null)
if websocket:
websocket.close()
websocket = null
brokerconnectmode = BCM_NOCONNECTION
return retval
func connect_to_broker(brokerurl):
assert (brokerconnectmode == BCM_NOCONNECTION)
var brokermatch = regexbrokerurl.search(brokerurl)
if brokermatch == null:
print("ERROR: unrecognized brokerurl pattern:", brokerurl)
return cleanupsockets(false)
var brokercomponents = brokermatch.strings
var brokerprotocol = brokercomponents[1]
var brokerserver = brokercomponents[2]
var iswebsocket = (brokerprotocol == "ws://" or brokerprotocol == "wss://")
var isssl = (brokerprotocol == "ssl://" or brokerprotocol == "wss://")
var brokerport = ((DEFAULTBROKERPORT_WSS if isssl else DEFAULTBROKERPORT_WS) if iswebsocket else (DEFAULTBROKERPORT_SSL if isssl else DEFAULTBROKERPORT_TCP))
if brokercomponents[3]:
brokerport = int(brokercomponents[3].substr(1))
var brokerpath = brokercomponents[4] if brokercomponents[4] else ""
common_name = null
if iswebsocket:
websocket = WebSocketPeer.new()
websocket.supported_protocols = PackedStringArray(["mqttv3.1"])
var websocketurl = ("wss://" if isssl else "ws://") + brokerserver + ":" + str(brokerport) + brokerpath
if verbose_level:
print("Connecting to websocketurl: ", websocketurl)
var E = websocket.connect_to_url(websocketurl)
if E != 0:
print("ERROR: websocketclient.connect_to_url Err: ", E)
return cleanupsockets(false)
print("Websocket get_requested_url ", websocket.get_requested_url())
brokerconnectmode = BCM_WAITING_WEBSOCKET_CONNECTION
else:
socket = StreamPeerTCP.new()
if verbose_level:
print("Connecting to %s:%s" % [brokerserver, brokerport])
var E = socket.connect_to_host(brokerserver, brokerport)
if E != 0:
print("ERROR: socketclient.connect_to_url Err: ", E)
return cleanupsockets(false)
if isssl:
brokerconnectmode = BCM_WAITING_SSL_SOCKET_CONNECTION
common_name = brokerserver
else:
brokerconnectmode = BCM_WAITING_SOCKET_CONNECTION
return true
func disconnect_from_server():
if brokerconnectmode == BCM_CONNECTED:
senddata(PackedByteArray([0xE0, 0x00]))
emit_signal("broker_disconnected")
cleanupsockets()
func publish(stopic, smsg, retain=false, qos=0):
var msg = smsg.to_ascii_buffer() if not binarymessages else smsg
var topic = stopic.to_ascii_buffer()
var pkt = PackedByteArray()
pkt.append(CP_PUBLISH | (2 if qos else 0) | (1 if retain else 0));
pkt.append(0x00);
var sz = 2 + len(topic) + len(msg)
if qos > 0:
sz += 2
assert(sz < 2097152)
var i = 1
while sz > 0x7f:
pkt[i] = (sz & 0x7f) | 0x80
sz >>= 7
i += 1
if i + 1 > len(pkt):
pkt.append(0x00);
pkt[i] = sz
pkt.append(len(topic) >> 8)
pkt.append(len(topic) & 0xFF)
pkt.append_array(topic)
if qos > 0:
pid += 1
pkt.append(pid >> 8)
pkt.append(pid & 0xFF)
pkt.append_array(msg)
senddata(pkt)
if verbose_level >= 2:
print("CP_PUBLISH%s%s topic=%s msg=%s" % [ "[%d]"%pid if qos else "", " <retain>" if retain else "", stopic, smsg])
return pid
func subscribe(stopic, qos=0):
pid += 1
var topic = stopic.to_ascii_buffer()
var length = 2 + 2 + len(topic) + 1
var msg = PackedByteArray()
msg.append(CP_SUBSCRIBE);
msg.append(length)
msg.append(pid >> 8)
msg.append(pid & 0xFF)
msg.append(len(topic) >> 8)
msg.append(len(topic) & 0xFF)
msg.append_array(topic)
msg.append(qos);
if verbose_level:
print("SUBSCRIBE[%d] topic=%s" % [pid, stopic])
senddata(msg)
func pingreq():
if verbose_level >= 2:
print("PINGREQ")
senddata(PackedByteArray([CP_PINGREQ, 0x00]))
func unsubscribe(stopic):
pid += 1
var topic = stopic.to_ascii_buffer()
var length = 2 + 2 + len(topic)
var msg = PackedByteArray()
msg.append(CP_UNSUBSCRIBE);
msg.append(length)
msg.append(pid >> 8)
msg.append(pid & 0xFF)
msg.append(len(topic) >> 8)
msg.append(len(topic) & 0xFF)
msg.append_array(topic)
if verbose_level:
print("UNSUBSCRIBE[%d] topic=%s" % [pid, stopic])
senddata(msg)
func wait_msg():
var n = receivedbuffer.size()
if n < 2:
return false
var op = receivedbuffer[0]
var i = 1
var sz = receivedbuffer[i] & 0x7f
while (receivedbuffer[i] & 0x80):
i += 1
if i == n:
return false
sz += (receivedbuffer[i] & 0x7f) << ((i-1)*7)
i += 1
if n < i + sz:
return false
if op == CP_PINGRESP:
assert (sz == 0)
if verbose_level >= 2:
print("PINGRESP")
elif op & 0xf0 == 0x30:
var topic_len = (receivedbuffer[i]<<8) + receivedbuffer[i+1]
var im = i + 2
var topic = receivedbuffer.slice(im, im + topic_len).get_string_from_ascii()
im += topic_len
var pid1 = 0
if op & 6:
pid1 = (receivedbuffer[im]<<8) + receivedbuffer[im+1]
im += 2
var data = receivedbuffer.slice(im, i + sz)
var msg = data if binarymessages else data.get_string_from_ascii()
if verbose_level >= 2:
print("received topic=", topic, " msg=", msg)
emit_signal("received_message", topic, msg)
if op & 6 == 2:
senddata(PackedByteArray([0x40, 0x02, (pid1 >> 8), (pid1 & 0xFF)]))
elif op & 6 == 4:
assert(0)
elif op == CP_CONNACK:
assert (sz == 2)
var retcode = receivedbuffer[i+1]
if verbose_level:
print("CONNACK ret=%02x" % retcode)
if retcode == 0x00:
brokerconnectmode = BCM_CONNECTED
emit_signal("broker_connected")
else:
if verbose_level:
print("Bad connection retcode=", retcode) # see https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html
emit_signal("broker_connection_failed")
elif op == CP_PUBREC:
assert (sz == 2)
var apid = (receivedbuffer[i]<<8) + receivedbuffer[i+1]
if verbose_level >= 2:
print("PUBACK[%d]" % apid)
emit_signal("publish_acknowledgewait_msg", apid)
elif op == CP_SUBACK:
assert (sz == 3)
var apid = (receivedbuffer[i]<<8) + receivedbuffer[i+1]
if verbose_level:
print("SUBACK[%d] ret=%02x" % [apid, receivedbuffer[i+2]])
#if receivedbuffer[i+2] == 0x80:
# E = FAILED
elif op == CP_UNSUBACK:
assert (sz == 2)
var apid = (receivedbuffer[i]<<8) + receivedbuffer[i+1]
if verbose_level:
print("UNSUBACK[%d]" % apid)
else:
if verbose_level:
print("Unknown MQTT opcode op=%x" % op)
trimreceivedbuffer(i + sz)
return true
func trimreceivedbuffer(n):
if n == receivedbuffer.size():
receivedbuffer = PackedByteArray()
else:
assert (n <= receivedbuffer.size())
receivedbuffer = receivedbuffer.slice(n)