在以前的文章中,我介紹了如何使用Paho開源項目建立MQTTClient_pulish用戶端。今天我就結合新的例子,給大家講解一下Paho使用MQTT用戶端的主要過程。
在例子中,我實作了在一個用戶端裡面具備訂閱和發送的功能,并可以通過訂閱端接收到的消息來控制發送端。同時,對這些過程進行了講解。
最後,我對paho中的常用函數、結構體等進行了講解。
概述
在文章Paho - MQTT C Cient的實作中,我介紹了如何使用Paho開源項目建立MQTTClient_pulish用戶端。但隻是簡單的介紹了使用方法,而且用戶端的結果與之前介紹的并不吻合,今天我就結合新的例子,給大家講解一下Paho使用MQTT用戶端的主要過程。
如同前面介紹的,MQTT用戶端分為同步用戶端和異步用戶端。今天主要講解的是同步用戶端,結構還是如同步用戶端中介紹的:
1.建立一個用戶端對象;
2.設定連接配接MQTT伺服器的選項;
3.如果多線程(異步模式)操作被使用則設定回調函數(詳見 Asynchronous >vs synchronous client applications);
4.訂閱用戶端需要接收的任意話題;
5.重複以下操作直到結束:
a.釋出用戶端需要的任意資訊;
b.處理所有接收到的資訊;
6.斷開用戶端連接配接;
7.釋放用戶端使用的所有記憶體。
實作
好,直接上代碼,MQTT簡單的同步用戶端。
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"
#if !defined(WIN32)
#include <unistd.h>
#else
#include <windows.h>
#endif
#define NUM_THREADS 2
#define ADDRESS "tcp://localhost:1883" //更改此處位址
#define CLIENTID "aaabbbccc_pub" //更改此處用戶端ID
#define SUB_CLIENTID "aaabbbccc_sub" //更改此處用戶端ID
#define TOPIC "topic01" //更改發送的話題
#define PAYLOAD "Hello Man, Can you see me ?!" //
#define QOS 1
#define TIMEOUT 10000L
#define USERNAME "test_user"
#define PASSWORD "jim777"
#define DISCONNECT "out"
int CONNECT = 1;
volatile MQTTClient_deliveryToken deliveredtoken;
void delivered(void *context, MQTTClient_deliveryToken dt)
{
printf("Message with token value %d delivery confirmed\n", dt);
deliveredtoken = dt;
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
int i;
char* payloadptr;
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
payloadptr = message->payload;
if(strcmp(payloadptr, DISCONNECT) == 0){
printf(" \n out!!");
CONNECT = 0;
}
for(i=0; i<message->payloadlen; i++)
{
putchar(*payloadptr++);
}
printf("\n");
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
void connlost(void *context, char *cause)
{
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
}
void *subClient(void *threadid){
long tid;
tid = (long)threadid;
printf("Hello World! It's me, thread #%ld!\n", tid);
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
int rc;
int ch;
MQTTClient_create(&client, ADDRESS, SUB_CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = USERNAME;
conn_opts.password = PASSWORD;
MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
"Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
MQTTClient_subscribe(client, TOPIC, QOS);
do
{
ch = getchar();
} while(ch!='Q' && ch != 'q');
MQTTClient_unsubscribe(client, TOPIC);
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
pthread_exit(NULL);
}
void *pubClient(void *threadid){
long tid;
tid = (long)threadid;
int count = 0;
printf("Hello World! It's me, thread #%ld!\n", tid);
//聲明一個MQTTClient
MQTTClient client;
//初始化MQTT Client選項
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
//#define MQTTClient_message_initializer { {'M', 'Q', 'T', 'M'}, 0, 0, NULL, 0, 0, 0, 0 }
MQTTClient_message pubmsg = MQTTClient_message_initializer;
//聲明消息token
MQTTClient_deliveryToken token;
int rc;
//使用參數建立一個client,并将其指派給之前聲明的client
MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = USERNAME;
conn_opts.password = PASSWORD;
//使用MQTTClient_connect将client連接配接到伺服器,使用指定的連接配接選項。成功則傳回MQTTCLIENT_SUCCESS
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
while(CONNECT){
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
printf("Waiting for up to %d seconds for publication of %s\n"
"on topic %s for client with ClientID: %s\n",
(int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
printf("Message with delivery token %d delivered\n", token);
usleep(3000000L);
}
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
}
int main(int argc, char* argv[])
{
pthread_t threads[NUM_THREADS];
long t;
pthread_create(&threads[0], NULL, subClient, (void *)0);
pthread_create(&threads[1], NULL, pubClient, (void *)1);
pthread_exit(NULL);
}
在代碼中,我建立了兩個線程,分别用來處理訂閱用戶端和釋出用戶端。
整體詳解
接下來我講解一下這個簡單的用戶端,其中,大體的流程如下:
大體的流程如圖所示,在用戶端啟動之後,會啟動線程,建立一個訂閱用戶端,它會監聽消息的到達,在消息到達之後會觸發相應的回調函數以對消息進行處理;後在啟動一個線程,建立一個發送用戶端,用來發送消息的,每次發送消息之前會判斷是否要掉線,如CONNECT=0則會掉線,否則發送消息給topic01。
訂閱用戶端詳解
以下函數完成的是訂閱的功能。
void *subClient(void *threadid)
過程大概如下:
第一步:聲明用戶端,并通過函數給其指派;
MQTTClient client;
MQTTClient_create(&client, ADDRESS, SUB_CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
第二步:設定連接配接MQTT伺服器的選項;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
第三步:設定回調函數;
MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);
//相應的回調函數connlost,msgarrvd,delivered我的代碼中都有
第四步:使用用戶端和連接配接選項連接配接伺服器;
MQTTClient_connect(client, &conn_opts))
第五步訂閱話題;
MQTTClient_subscribe(client, TOPIC, QOS);
第六步一直等待,知道輸入'Q' 或'q';
do
{
ch = getchar();
} while(ch!='Q' && ch != 'q');
第六步一直等待,直到輸入'Q' 或'q';
do
{
ch = getchar();
} while(ch!='Q' && ch != 'q');
第七步取消訂閱;
MQTTClient_unsubscribe(client, TOPIC);
第八步.斷開用戶端連接配接;
MQTTClient_disconnect(client, 10000);
第九步.釋放用戶端使用的所有記憶體;
MQTTClient_destroy(&client);
至此,訂閱用戶端就結束了。一般訂閱用戶端的大體結構都是這樣。不同的是回調函數的個性化上。
發送用戶端詳解
以下函數完成的是發送的功能。
void *pubClient(void *threadid)
MQTTClient client;
MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
第三步:使用用戶端和連接配接選項連接配接伺服器;
MQTTClient_connect(client, &conn_opts)
第四步設定發送消息的屬性;
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
第五步循環發送消息;
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
第六步一直等待,當CONNECT=0時退出該用戶端;
第七步.斷開用戶端連接配接;
MQTTClient_disconnect(client, 10000);
第八步.釋放用戶端使用的所有記憶體;
MQTTClient_destroy(&client);
至此,發送用戶端就結束了。一般的發送用戶端大體結構也如此,但異步用戶端可能有些許不同,無非就是設計回調函數,然後在連接配接,斷開連接配接等時可以使用回調函數做一些操作而已,具體的可以自己研究。
為了讓大家能夠更深入了解,我把自己學到的一些函數和結構體大緻在下面講解了一下。
相關結構體
MQTTClient
定義:typedef void* MQTTClient;
含義:代表MQTT用戶端的句柄。成功調用MQTTClient_create()後,可以得到有效的用戶端句柄。
MQTTClient_connectOptions
定義:
typedef struct
{
char struct_id[4];//結構體的識别序列,必須為MQTC
int struct_version;//結構體版本
/**
在0,1,2,3,4,5中取值:
0-表示沒有SSL選項且沒有serverURIs;
1-表示沒有serverURIs;
2-表示沒有MQTTVersion
3-表示沒有傳回值;
4-表示沒有二進制密碼選項
*/
int keepAliveInterval;
/**
在這段時間内沒有資料相關的消息時,用戶端發送一個非常小的MQTT“ping”消息,伺服器将會确認這個消息
*/
int cleansession;
/**
當cleansession為true時,會話狀态資訊在連接配接和斷開連接配接時被丢棄。 将cleansession設定為false将保留會話狀态資訊
*/
int reliable;
/*
将該值設定為true意味着必須完成釋出的消息(已收到确認),才能發送另一個消息
*/
MQTTClient_willOptions* will;
/*
如果程式不使用最後的意願和遺囑功能,請将此指針設定為NULL。
*/
const char* username;//使用者名
const char* password;//密碼
int connectTimeout;//允許嘗試連接配接的過時時間
int retryInterval;//嘗試重連的時間
MQTTClient_SSLOptions* ssl;
/*
如果程式不使用最後的ssl,請将此指針設定為NULL。
*/
int serverURIcount;
char* const* serverURIs;
/*
連接配接伺服器的url,以protocol:// host:port為格式
*/
int MQTTVersion;
/*
MQTT的版本,MQTTVERSION_3_1(3),MQTTVERSION_3_1_1 (4)
*/
struct
{
const char* serverURI;
int MQTTVersion;
int sessionPresent;
} returned;
struct {
int len;
const void* data;
} binarypwd;
} MQTTClient_connectOptions;
含義:用來設定MQTTClient的連接配接選項的結構體。
MQTTClient_message
typedef struct
{
char struct_id[4];//結構體的識别序列,必須為MQTM
int struct_version;//結構體的版本,必須為0
int payloadlen;//MQTT資訊的長度
void* payload;//指向消息負載的指針
int qos;//服務品質
int retained;//保留标志
int dup;dup//标志訓示這個消息是否是重複的。 隻有在收到QoS1消息時才有意義。 如果為true,則用戶端應用程式應采取适當的措施來處理重複的消息。
int msgid;//消息辨別符通常保留供MQTT用戶端和伺服器内部使用。
} MQTTClient_message;
含義:代表MQTT資訊的結構體。
相關函數詳解
MQTTClient_create
DLLExport int MQTTClient_create(
MQTTClient * handle,
const char * serverURI,
const char * clientId,
int persistence_type,
void * persistence_context
)
作用:該函數建立了一個用于連接配接到特定伺服器,使用特定持久存儲的MQTT用戶端。
| 參數 | 含義 |
| ---|-------------|
| handle | 指向MQTT用戶端句柄的指針。句柄被成功從函數中傳回的用戶端引用所填充 |
|serverURI | 以空結尾的字元串,其指定用戶端将連接配接到的伺服器。其格式為protocol://host:port。現在的(protocol)協定必須是tcp或ssl,而host可以指定為IP位址或域名。例如, 要使用預設 MQTT 端口連接配接到本地計算機上運作的伺服器, 請指定為 tcp://localhost:1883。 |
| clientId|用戶端辨別符(clientId)是一個以空結尾的 UTF-8 編碼字元串,用戶端連接配接到伺服器時将它傳遞過去。 |
| persistence_type|用戶端所使用的持久類型。MQTTCLIENT_PERSISTENCE_NONE-使用記憶體持久化。如果用戶端運作的裝置或系統出故障或關閉, 則任何正在運作的消息的目前狀态都将丢失, 甚至在 QoS1 和 QoS2 中也可能無法傳遞某些消息; MQTTCLIENT_PERSISTENCE_DEFAULT-使用預設的持久化機制(檔案系統)。正在運作消息的狀态被儲存在持久存儲中,以便在意外出現時對消息的丢失提供一些保護; MQTTCLIENT_PERSISTENCE_USER-使用程式指定的持久化實作。使用這種類型,應用程式可對持久化機制進行控制,應用程式必須實作MQTTClient_persistence 接口。 |
| persistence_context|如果應用程式使用的是MQTTCLIENT_PERSISTENCE_NONE持久化,該參數不使用,而且值應該設定為NULL。對于MQTTCLIENT_PERSISTENCE_DEFAULT持久化,應該設定持久化目錄的位置(如果設定為NULL,則使用工作目錄作為持久化目錄)。使用MQTTCLIENT_PERSISTENCE_USER持久化,則将此參數指向有效的MQTTClient_persistence結構。|
MQTTClient_setCallbacks
DLLExport int MQTTClient_setCallbacks (
MQTTClient handle,
void * context,
MQTTClient_connectionLost * cl,
MQTTClient_messageArrived * ma,
MQTTClient_deliveryComplete * dc
)
作用:該函數為特定的用戶端建立回調函數。如果您的用戶端應用程式不使用特定的回調函數,請将相關參數設定為NULL。 調用MQTTClient_setCallbacks()使用戶端進入多線程模式。 任何必要的消息确認和狀态通信都在背景處理,而不需要用戶端應用程式的任何幹預。
注意:在調用該函數時,MQTT用戶端必須斷開連接配接。(即先要調用該函數在連接配接用戶端)。
| context| 指向任何應用程式特定上下文的指針。 上下文指針被傳遞給每個回調函數,以提供對回調中的上下文資訊的通路。|
|cl|指向MQTTClient_connectionLost()回調函數的指針。 如果您的應用程式不處理斷開連接配接,您可以将其設定為NULL。|
|ma|指向MQTTClient_messageArrived()回調函數的指針。 當您調用MQTTClient_setCallbacks()時,必須指定此回調函數。|
|dc|指向MQTTClient_deliveryComplete()回調函數的指針。 如果您的應用程式同步釋出,或者您不想檢查是否成功發送,則可以将其設定為NULL。|
MQTTClient_connect
DLLExport int MQTTClient_connect (
MQTTClient handle,
MQTTClient_connectOptions * options
)
作用:此函數嘗試使用指定的選項将先前建立的用戶端連接配接到MQTT伺服器。
| options| 指向有效的MQTTClient_connectOptions結構的指針。|
| 傳回值 | 含義 |
| 0| 連接配接成功 |
| 1| 拒絕連接配接:不可接受的協定版本。|
| 2| 拒絕連接配接:辨別符被拒絕。|
|3| 拒絕連接配接:伺服器不可用。|
| 4| 拒絕連接配接:使用者名或密碼錯誤。|
| 5| 拒絕連接配接:未經授權。|
| 6| 保留給未來用。|
MQTTClient_subscribe
DLLExport int MQTTClient_subscribe (
MQTTClient handle,
const char * topic,
int qos
)
作用:此功能嘗試将客戶訂閱到單個主題,該主題可能包含通配符。 此函數還指定服務品質。
| topic| 訂閱的主題,可使用通配符。|
|qos|訂閱的請求服務品質|
MQTTClient_publishMessage
DLLExport int MQTTClient_publishMessage (
MQTTClient handle,
const char * topicName,
MQTTClient_message * msg,
MQTTClient_deliveryToken * dt
)
| topicName| 與資訊相關的主題。|
|msg|指向有效的 MQTTClient_message 結構的指針, 其中包含要釋出消息的有效負載和屬性|
|dt|指向MQTTClient_deliveryToken的指針。當函數成功傳回時,dt會被指派為代表消息的token。如果程式中沒有使用傳遞token,将其設定為NULL。|
MQTTClient_waitForCompletion
DLLExport int MQTTClient_waitForCompletion (
MQTTClient handle,
MQTTClient_deliveryToken dt,
unsigned long timeout
)
作用:用戶端應用程式調用此函數來将主線程的執行與消息的完成釋出同步。 被調用時,MQTTClient_waitForCompletion()阻塞執行,直到消息成功傳遞或已超過指定的時間。
|dt|代表消息的MQTTClient_deliveryToken用來檢測是否成功傳遞。傳遞token由釋出函數MQTTClient_publish () 和 MQTTClient_publishMessage ()所産生。|
|timeout|等待的最大毫秒數。|
傳回值:
消息成功傳遞則傳回MQTTCLIENT_SUCCESS(0) ,如果時間已過期或檢測token時出問題,則傳回錯誤碼。
對paho用戶端的講解就到此結束了,如有不明白的,可以給我留言,一起讨論,一起進步。
作者:阿進的寫字台
出處:https://www.cnblogs.com/homejim/
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,否則保留追究法律責任的權利。