Python

MQTT for Python

MQTT (Message Queuing Telemetry Transport) là một giao thức gởi dạng publish/subscribe sử dụng cho các thiết bị Internet of Things với băng thông thấp, độ tin cậy cao và khả năng được sử dụng trong mạng lưới không ổn định.

INSTALL

python3 -m pip install paho-mqtt

python -m pip install paho-mqtt


CONNECT/DISCONNECT

Giao thức MQTT gồm Broker và các client, trong đó:

Client – MQTT client có thể đơn giản là một MCU, một chiếc máy tính hoặc một server. Sau khi kết nối với Broker, client có thể đăng tải nội dụng vào các topic, cũng như đăng ký nhận dữ liệu từ các topic nó mong muốn.

Broker – MQTT Broker chịu trách nhiệm nhận và phân loại các nội dung được đăng tải lên từ các clients, sau đó gửi các nội dung này đến các client đã đăng ký các topic tương ứng.

publish_flow

Figure 1 – Broker – Client

  • Import MQTT Client library:

import paho.mqtt.client as paho

  • Để tạo một MQTT client ta sử dụng function:

client = paho.Client(client_id=””, clean_session=True, userdata=None, protocol=paho.MQTTv31)

Trong đó:
+ client_id – tên của client khi kết nối với Broker.
+ clean_session – a boolean that determines the client type. If True, the broker will remove all information about this client when it disconnects. If False, the client is a durable client and subscription information and queued messages will be retained when the client disconnects.
+ userdata – user defined data of any type that is passed as the userdata parameter to callbacks.
+ protocol – the version of the MQTT protocol to use for this client. Can be either MQTTv31 or MQTTv311.

  • Để kết nối vào MQTT Broker, ta sử dụng function:

client.connect(host=”localhost”, port=1883, keepalive=60, bind_address=””)

Trong đó:
+ host – địa chỉ của Broker.
+ port – the network port of the server host to connect to. Defaults to 1883.
+ keepalive – maximum period in seconds allowed between communications with the broker. If no other messages are being exchanged, this controls the rate at which the client will send ping messages to the broker.

  • Để giữ kết nối giữa Client và Broker, ta sử dụng các function sau:
  1. client.loop_forever() will block, processing the network traffic and reconnecting automatically as necessary. This function is most useful if you are only subscribing to the broker and acting on the messages you receive.
  2. client.loop_start() starts a background thread to handle the network traffic and will return immediately. This is better suited to the situation where you wish to do other tasks in your program. It is complemented by the client.loop_stop() function, which stops the background thread.
  • Để ngắt kết nối với Broker, ta sử dụng function:

client.disconnect()

  • Trong trường hợp client bị mất kết nối với Broker mà không phải do function client.disconnect(), ta sử dụng function will_set() để nhận thông báo về việc này:

client.will_set(topic, payload=None, qos=0, retain=False)

Trong đó:
+ topic – tên của topic mà thông báo sẽ được gửi đến khi client bị disconnect ngoài ý muốn.
+ payload – message string.
+ qos – Quality of Service level.

  • Callback functions:

client.on_connecton_connect_callback(client, userdata, flags, rc)

client.on_disconnect = on_disconnect_callback(client, userdata, rc)

Trong đó:
+ client – the client instance for this callback.
+ userdata – the private user data as set in Client().
+ flags – response flags sent by the broker.
+ rc – the connection result. 0: Connection successful; 1: Connection refused – incorrect protocol version; 2: Connection refused – invalid client identifier; 3: Connection refused – server unavailable; 4: Connection refused – bad username or password; 5: Connection refused – not authorised; 6-255: Currently unused.

Example 1: Connect to broker

import paho.mqtt.client as paho

def on_connect_callback(client, userdata, flags, rc):
    print("CONNACK received with code {}.".format(rc))

client = paho.Client("RPi")
client.on_connect = on_connect_callback
client.connect("iot.eclipse.org", 1883)
client.loop_forever()

Output:
CONNACK received with code 0.


PUBLISH

Giao thức MQTT dựa trên nguyên lý “publishing messages” và “subscribing to topics”, còn gọi là “pub/sub”. Các client kết nối với Broker và đăng ký nhận dữ liệu từ các topic mà nó mong muốn, tương tự như chúng ta subscribe Youtube channels vậy. Các client cũng có thể kết nối với Broker để đăng tải các nội dung vào các topic.

  • Để publish một nội dụng lên topic, ta sử dụng function:

client.publish(topic, payload=None, qos=0, retain=False)

Trong đó:
+ topic – tên của topic cần đăng tải nội dung.
+ payload – message, ở dạng String.
+ qos – Quality of Service level, [0-2]
+ retain – khi được set True, Broker sẽ lưu message này lại sau khi đã gửi nó đến tất cả các subscriber. Khi có một client mới subscribe topic, Broke sẽ gửi message đã được lưu tới client này. Cơ chế này được gọi là “Last known good”.

  • Callback function:

client.on_publish = on_publish_callback(client, userdata, mid)

Trong đó:
+ client – the client instance for this callback.
+ userdata – the private user data as set in Client().
+ mid – matches the mid variable returned from the corresponding publish() call, to allow outgoing messages to be tracked.

Example 2: Publishing to the topic

import paho.mqtt.client as paho
import time

def on_connect_callback(client, userdata, flags, rc):
    print("CONNACK received with code {}.".format(rc))
def on_publish_callback(client, userdata, mid):
    print("mid: "+str(mid))

client = paho.Client("RPi")
client.on_connect = on_connect_callback
client.connect("iot.eclipse.org", 1883)
client.loop_start()

while True:
    client.publish("/demo", "publishing", 2)
    time.sleep(1)

Output:
CONNACK received with code 0.
mid: 1
mid: 2
mid: 3


SUBSCRIBE/UNSUBSCRIBE

  • Để đăng ký nhận message từ các topic, ta sử dụng function:

Single topic subscription : client.subscribe(topic, qos=0)

Multiple topic subscriptions: client.subscribe([(topic1, 0), (topic2, 1), (topic3, 2)])

Trong đó: đối với Multiple topic subscriptions, tên topic và qos phải nằm trong một tuple (topic, qos). Tất cả các topic tuples phải nằm trong một list.

  • Để ngừng đăng ký topic, ta sử dụng function:

client.unsubscribe(topic)

  • Callback functions:

client.on_subscribe = on_subscribe_callback(client, userdata, mid, granted_qos)

client.on_message = on_message_callback(client, userdata, msg)

client.on_unsubscribe = on_unsubscribe_callback(client, userdata, mid)

Trong đó:
+ client – the client instance for this callback.
+ userdata – the private user data as set in Client().
+ mid – matches the mid variable returned from the corresponding subscribe() call.
+ granted_qos – a list of integers that give the QoS level the broker has granted for each of the different subscription requests.
+ msg – an instance of MQTTMessage. This is a class with members topic, payload, qos, retain.

  • Trong trường hợp chúng ta sử dụng wildcard để đăng ký nhiều topic, để thêm callback function cho một topic cụ thể ta sử dụng function:

client.message_callback_add(sub, callback)

Trong đó:
+ sub – the subscription filter to match against for this callback. Only one callback may be defined per literal sub string.
+ callback – the callback function to be used. Tương đương với on_message_callback() ở phía trên.

Example 3: Subscription

import paho.mqtt.client as paho

def on_connect_callback(client, userdata, flags, rc):
    print("CONNACK received with code {}.".format(rc))
def on_subscribe_callback(client, userdata, mid, granted_qos):
    print("Subscribed: "+str(mid)+" "+str(granted_qos))
def on_message_callback(client, userdata, msg):
    print(msg.topic+" "+str(msg.qos)+" "+str(msg.payload))

client = paho.Client("RPi")
client.on_connect = on_connect_callback
client.on_subscribe = on_subscribe_callback
client.on_message = on_message_callback
client.connect("iot.eclipse.org", 1883)
client.subscribe("/hello", 2)
client.loop_start()

while True:
    pass

Output:
CONNACK received with code 0.
Subscribed: 1 (2,)
/hello 2 b’world’
/hello 2 b’world’


TOPICS/SUBSCRIPTIONS

Messages in MQTT are published on topics. There is no need to configure a topic, publishing on it is enough. Topics are treated as a hierarchy, using a slash (/) as a separator. This allows sensible arrangement of common themes to be created, much in the same way as a filesystem.

Clients can receive messages by creating subscriptions. A subscription may be to an explicit topic, in which case only messages to that topic will be received, or it may include wildcards. Two wildcards are available, + or #.

+ can be used as a wildcard for a single level of hierarchy.

# can be used as a wildcard for all remaining levels of hierarchy. This means that it must be the final character in a subscription.

For example, for a topic of “a/b/c/d“, the following example subscriptions will match: a/b/c/d; +/b/c/d; a/+/c/d; a/+/+/d; +/+/+/+; #; a/#; a/+/#. The following subscriptions will not match: a/b/c; b/+/c/d; +/+/+.

Zero length topic levels are valid, which can lead to some slightly non-obvious behaviour. For example, a topic of “a//topic” would correctly match against a subscription of “a/+/topic“. Likewise, zero length topic levels can exist at both the beginning and the end of a topic string, so “/a/topic” would match against a subscription of “+/a/topic“, “#” or “/#“, and a topic “a/topic/” would match against a subscription of “a/topic/+” or “a/topic/#“.


QUALITY OF SERVICE (QoS)

Quality of Service (QoS) là một cơ chế đảm bảo việc truyền nhận message giữa client và Broker. QoS gồm 3 level: “at most once”, “at least once” và “exactly once”. QoS càng cao thì việc truyền nhận càng đáng tin cậy, nhưng độ trễ cao và yêu cầu băng thông cao hơn.

QoS = 0 – The minimal level is zero and it guarantees a best effort delivery. A message won’t be acknowledged by the receiver or stored and redelivered by the sender. This is often called “fire and forget” and provides the same guarantee as the underlying TCP protocol.

Được sử dụng khi:

  • Chúng ta có một kết nối ổn định giữa client và Broker.
  • Việc mất dữ liệu không đáng ngại.

publish_qos0_flow

Figure 2 – QoS = 0

QoS = 1 – When using QoS level 1, it is guaranteed that a message will be delivered at least once to the receiver. But the message can also be delivered more than once. The sender will store the message until it gets an acknowledgement in form of a PUBACK command message from the receiver.

Được sử dụng khi:

  • Chúng ta muốn nhận được tất cả dữ liệu, việc trùng lặp không đáng ngại hoặc có thể xử lý được.
  • Cần đảm bảo sự tin cậy và tốc độ truyền nhận.

publish_qos1_flow

Figure 3 – QoS = 1

QoS = 2 – The highest QoS is 2, it guarantees that each message is received only once by the counterpart. It is the safest and also the slowest quality of service level. The guarantee is provided by two flows there and back between sender and receiver. If a receiver gets a QoS 2 PUBLISH it will process the publish message accordingly and acknowledge it to the sender with a PUBREC message. The receiver will store a reference to the packet identifier until it has send the PUBCOMP. This is important for avoid processing the message a second time. When the sender receives the PUBREC it can safely discard the initial publish, because it knows that the counter part has successfully received the message. It will store the PUBREC and respond with a PUBREL. After the receiver gets the PUBREL it can discard every stored state and answer with a PUBCOMP. The same is true when the sender receives the PUBCOMP.

Được sử dụng khi:

  • Chúng ta muốn nhận tất cả dữ liệu mà không bị trùng lặp. Trong trường hợp này, việc trùng lặp có thể gây ảnh hưởng xấu đến chương trình.
  • Tốc độ truyền nhận không cao.

publish_qos2_flow

Figure 4 – QoS = 2


SUMMARY

Qua bài viết này chúng ta đã tìm hiểu về giao thức MQTT, cảm ơn các bạn đã theo dõi.

Thân ái và quyết thắng.

Reference:
[1] MQTT
[2] MQTT Client Library Paho Python
[3] MQTT Essentials: QoS
[4] Qualities of service provided by an MQTT client
[5] MQTT Topics.
[6] MQTT Paho Library API.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s