1 什麼是MQTT?
mqtt (Message Queuing Telemetry Transport,消息隊列遙測傳輸)是 IBM 開發的一個即時通訊協定,有可能成為物聯網的重要組成部分。MQTT 是基于二進制消息的釋出/訂閱程式設計模式的消息協定,如今已經成為 OASIS 規範,由于規範很簡單,非常适合需要低功耗和網絡帶寬有限的 IoT 場景。
2 MQTTnet
MQTTnet 是一個基于MQTT協定高度專業的.net庫,它同時提供MQTT client和MQTT server(broke),支援v3.1.0,v3.1.1和v5.0.0的标準MQTT協定.
3 MQTTnet支援範圍
.Net Standard 1.3+
.Net Core 1.1+
.Net Core App 1.1+
.Net Framework 4.5.2+(x86,x64,AnyCPU)
Mono 5.2+
Universal Windows Platform(UWP) 10.0.1024+(x86,x64,ARM,AnyCPU,Windwos 10 IoT Core)
Xamarin.Android 7.5+
Xamarin.iOS 10.14+
4 建立伺服器
MQTT伺服器以稱為"消息代理"(Broker),可以是一個應用程式或一台裝置。它是位于消息釋出者和訂閱者之間,它可以:
(1)接受來自客戶的網絡連接配接;
(2)接受客戶釋出的應用資訊;
(3)處理來自用戶端的訂閱和退訂請求;
(4)向訂閱的客戶轉發應用程式消息。
伺服器建立一個控制台應用,可選>>控制台應用(.NET Core)建立新項目MqttNetServer,代碼如下:
1 using MQTTnet;
2 using MQTTnet.Protocol;
3 using MQTTnet.Server;
4 using Newtonsoft.Json;
5 using System;
6 using System.Collections.Generic;
7 using System.IO;
8 using System.Reflection;
9 using System.Security.Cryptography.X509Certificates;
10 using System.Text;
11 using System.Threading.Tasks;
12
13 namespace MqttServerTest
14 {
15 class Program
16 {
17 public static IMqttServer mqttServer;
18 static void Main(string[] args)
19 {
20 mqttServer = new MQTTnet.MqttFactory().CreateMqttServer();
21 mqttServer.UseClientConnectedHandler(e =>
22 {
23 Console.WriteLine("***new connect:" + e.ClientId);
24
25 });
26 mqttServer.UseClientDisconnectedHandler(e =>
27 {
28 Console.WriteLine("*** disconnect:" + e.ClientId);
29 });
30
31 //var options = new MqttServerOptions();
32 //await mqttServer.StartAsync(options);
33
34 //var currentPath = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location);
35 //var certificate = new X509Certificate2(Path.Combine(currentPath, "certificate.pfx"), "yourPassword", X509KeyStorageFlags.Exportable);
36
37
38 var optionsBuilder = new MqttServerOptionsBuilder()
39 .WithConnectionBacklog(100)
40 .WithDefaultEndpointPort(1884)
41 .WithConnectionValidator(c=> {
42 //c.SessionItems.
43 //if (c.ClientId.Length < 10)
44 //{
45 // c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
46 // //c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
47 // return;
48 //}
49 //if (c.Username != "mySecretUser")
50 //{
51 // c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
52 // return;
53 //}
54
55 //if (c.Password != "mySecretPassword")
56 //{
57 // c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
58 // return;
59 //}
60
61 c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
62 Console.WriteLine("***connect validator:"+c.ClientId);
63 })
64 //.WithEncryptionCertificate(certificate.Export(X509ContentType.Pfx))
65 //.WithEncryptionSslProtocol(SslProtocols.Tls12)
66 .WithApplicationMessageInterceptor(context=> {
67 //if (context.ApplicationMessage.Topic == "my/custom/topic")
68 //{
69 // context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes("The server injected payload.");
70 //}
71 //// It is possible to disallow the sending of messages for a certain client id like this:
72 //if (context.ClientId != "Someone")
73 //{
74 // context.AcceptPublish = false;
75 // return;
76 //}
77 // It is also possible to read the payload and extend it. For example by adding a timestamp in a JSON document.
78 // This is useful when the IoT device has no own clock and the creation time of the message might be important.
79
80 context.AcceptPublish = true;
81 Console.WriteLine("***Message:" + context.ApplicationMessage.Payload);
82 })
83 .WithSubscriptionInterceptor(context=>
84 {
85 //if (context.TopicFilter.Topic.StartsWith("admin/foo/bar") && context.ClientId != "theAdmin")
86 //{
87 // context.AcceptSubscription = false;
88 //}
89
90 //if (context.TopicFilter.Topic.StartsWith("the/secret/stuff") && context.ClientId != "Imperator")
91 //{
92 // context.AcceptSubscription = false;
93 // context.CloseConnection = true;
94 //}
95
96 context.AcceptSubscription = true;
97 Console.WriteLine("***Subscript:" + context.TopicFilter);
98 })
99 //.WithStorage(new RetainedMessageHandler())
100 ;
101 var options = optionsBuilder.Build();
102
103 //// Setting the options
104 //options.Storage=new RetainedMessageHandler();
105
106 StartServer(options);
107
108
109
110 Console.WriteLine("Press any key to exit.");
111 Console.ReadLine();
112
113 //await mqttServer.StopAsync();
114 }
115
116 public static async void StartServer(IMqttServerOptions options)
117 {
118 await mqttServer.StartAsync(options);
119 }
120
121 }
122
123 // The implementation of the storage:
124 // This code uses the JSON library "Newtonsoft.Json".
125 public class RetainedMessageHandler : IMqttServerStorage
126 {
127 private const string Filename = "C:\\MQTT\\RetainedMessages.json";
128
129 public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages)
130 {
131 File.WriteAllText(Filename, JsonConvert.SerializeObject(messages));
132 return Task.FromResult(0);
133 }
134
135 public Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync()
136 {
137 IList<MqttApplicationMessage> retainedMessages;
138 if (File.Exists(Filename))
139 {
140 var json = File.ReadAllText(Filename);
141 retainedMessages = JsonConvert.DeserializeObject<List<MqttApplicationMessage>>(json);
142 }
143 else
144 {
145 retainedMessages = new List<MqttApplicationMessage>();
146 }
147
148 return Task.FromResult(retainedMessages);
149 }
150 }
151
152 }
代碼直接運作起來,就是一個簡單的Mqtt server。
5 建立xamarin APP
一個使用MQTT協定的應用程式或者裝置,它總是建立到伺服器的網絡連接配接。用戶端可以:
(1)釋出其他用戶端可能會訂閱的資訊;
(2)訂閱其它用戶端釋出的消息;
(3)退訂或删除應用程式的消息;
(4)斷開與伺服器連接配接。
在VS中建立一個xamarin.Forms的移動應用,建立好後在Nuget上搜尋mqttnet,添加對MQTTnet包的引用。更改代碼如下:
1 <?xml version="1.0" encoding="utf-8" ?>
2 <ContentPage xmlns="http://xamarin.com/schemas/2014/forms"
3 xmlns:x="http://schemas.microsoft.com/winfx/2009/xaml"
4 xmlns:local="clr-namespace:CatShell"
5 x:Class="CatShell.MainPage">
6
7 <StackLayout>
8 <!-- Place new controls here -->
9 <Label Text="SubscribeTopic"/>
10 <Entry x:Name="txtSubTopic" Placeholder="Subscribe Topic" />
11 <Button Text="BtnSubscribe" Clicked="SubButton_Clicked"/>
12 <Entry x:Name="txtReceiveMessage"/>
13 <Label Text="PublishTopic"/>
14 <Entry x:Name="txtPubTopic"/>
15 <Entry x:Name="txtSendMessage" />
16 <Button Text="Publish" Clicked="PubButton_Clicked"/>
17 <Editor>
18
19 </Editor>
20
21 </StackLayout>
22
23 </ContentPage>
1 using MQTTnet;
2 using MQTTnet.Client;
3 using MQTTnet.Client.Options;
4 using System;
5 using System.Collections.Generic;
6 using System.Linq;
7 using System.Text;
8 using System.Threading;
9 using System.Threading.Tasks;
10 using Xamarin.Forms;
11
12 namespace CatShell
13 {
14 public partial class MainPage : ContentPage
15 {
16 public IMqttClient mqttClient;
17 public IMqttClientOptions options;
18 public MainPage()
19 {
20 InitializeComponent();
21 InitMqttClient();
22 ConnectMqttServer();
23 }
24
25 public void InitMqttClient()
26 {
27 // Create a new MQTT client.
28 var factory = new MqttFactory();
29 mqttClient = factory.CreateMqttClient();
30
31 mqttClient.UseConnectedHandler(e => {
32
33 Device.BeginInvokeOnMainThread(() =>
34 {
35 txtReceiveMessage.Text = txtReceiveMessage.Text + $">> connect success." + Environment.NewLine;
36 });
37 });
38 mqttClient.UseDisconnectedHandler(e =>
39 {
40 Device.BeginInvokeOnMainThread(() =>
41 {
42 txtReceiveMessage.Text = txtReceiveMessage.Text + $">> Disconnect." + Environment.NewLine;
43 });
44 });
45 mqttClient.UseApplicationMessageReceivedHandler(e =>
46 {
47 Device.BeginInvokeOnMainThread(() =>
48 {
49 txtReceiveMessage.Text = $">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}" + Environment.NewLine;
50 });
51 });
52
53 // Create TCP based options using the builder.
54 options = new MqttClientOptionsBuilder()
55 .WithClientId("Client4")
56 .WithTcpServer("10.100.1.247", 1884) // Use TCP connection, Port is opptinal
57 //.WithWebSocketServer("broker.hivemq.com:8000/mqtt") // Use WebSocket connection.
58 //.WithCredentials("bud", "%spencer%")
59 //.WithTls()
60 //.WithTls(new MqttClientOptionsBuilderTlsParameters
61 //{
62 // UseTls = true,
63 // CertificateValidationCallback = (X509Certificate x, X509Chain y, SslPolicyErrors z, IMqttClientOptions o) =>
64 // {
65 // // TODO: Check conditions of certificate by using above parameters.
66 // return true;
67 // }
68 //})
69 .WithCleanSession()
70 .Build();
71
72 }
73
74 public async void ConnectMqttServer()
75 {
76 await mqttClient.ConnectAsync(options, CancellationToken.None); // Since 3.0.5 with CancellationToken
77
78 }
79
80 private async void SubButton_Clicked(object sender, EventArgs e)
81 {
82
83 string topic = txtSubTopic.Text.Trim();
84
85 if (string.IsNullOrEmpty(topic))
86 {
87 //MessageBox.Show("訂閱主題不能為空!");
88 return;
89 }
90
91 if (!mqttClient.IsConnected)
92 {
93 //MessageBox.Show("MQTT用戶端尚未連接配接!");
94 return;
95 }
96
97 // Subscribe to a topic
98 await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build());
99
100 txtReceiveMessage.Text = txtReceiveMessage.Text + $"已訂閱[{topic}]主題" + Environment.NewLine;
101 txtSubTopic.IsReadOnly = false;
102 //BtnSubscribe.Enabled = false;
103 }
104
105 private async void PubButton_Clicked(object sender, EventArgs e)
106 {
107 string topic = txtPubTopic.Text.Trim();
108
109 if (string.IsNullOrEmpty(topic))
110 {
111 //MessageBox.Show("釋出主題不能為空!");
112 return;
113 }
114
115 string inputString = txtSendMessage.Text.Trim();
116
117 PublishMessages(topic, inputString);
118 }
119
120 public async void PublishMessages(string topicMsg, string payloadMsg)
121 {
122 var message = new MqttApplicationMessageBuilder()
123 .WithTopic(topicMsg)
124 .WithPayload(payloadMsg)
125 .WithExactlyOnceQoS()
126 .WithRetainFlag()
127 .Build();
128
129 await mqttClient.PublishAsync(message);
130 }
131
132 }
133 }
代碼運作起來,在APP上可以直接發資訊。
6 建立winForm client(可選)
可以建立一個winForm來互相互動,在VS上建立一個windows窗體應用(.NET Framework),界面設計如下
背景代碼如下:
1 using MQTTnet;
2 using MQTTnet.Client.Options;
3 using MQTTnet.Client;
4 using System;
5 using System.Collections.Generic;
6 using System.ComponentModel;
7 using System.Data;
8 using System.Drawing;
9 using System.Linq;
10 using System.Net.Security;
11 using System.Security.Cryptography.X509Certificates;
12 using System.Text;
13 using System.Threading;
14 using System.Threading.Tasks;
15 using System.Windows.Forms;
16
17 namespace MqttClientWin
18 {
19 public partial class Form1 : Form
20 {
21 public IMqttClient mqttClient;
22 public IMqttClientOptions options;
23 public Form1()
24 {
25 InitializeComponent();
26 InitMqttClient();
27 ConnectMqttServer();
28 }
29
30 public void InitMqttClient()
31 {
32 // Create a new MQTT client.
33 var factory = new MqttFactory();
34 mqttClient = factory.CreateMqttClient();
35
36 mqttClient.UseApplicationMessageReceivedHandler(e =>
37 {
38 Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
39 Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
40 Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
41 Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
42 Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
43 Console.WriteLine();
44
45 this.Invoke(new Action(() =>
46 {
47 txtReceiveMessage.AppendText($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");
48 }));
49
50 //Task.Run(() => mqttClient.PublishAsync("hello/world"));
51 });
52
53 mqttClient.UseConnectedHandler(async e =>
54 {
55 Console.WriteLine("### CONNECTED WITH SERVER ###");
56
57 //// Subscribe to a topic
58 //await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());
59
60 //Console.WriteLine("### SUBSCRIBED ###");
61
62 this.Invoke(new Action(() =>
63 {
64 txtReceiveMessage.AppendText($">> connect success.{Environment.NewLine}");
65 }));
66 });
67
68 mqttClient.UseDisconnectedHandler(e =>
69 {
70 this.Invoke(new Action(() =>
71 {
72 txtReceiveMessage.AppendText($">> Disconnect .{Environment.NewLine}");
73 }));
74 });
75
76 // Create TCP based options using the builder.
77 options = new MqttClientOptionsBuilder()
78 .WithClientId("Client5")
79 .WithTcpServer("10.100.1.247", 1884) // Use TCP connection, Port is opptinal
80 //.WithWebSocketServer("broker.hivemq.com:8000/mqtt") // Use WebSocket connection.
81 //.WithCredentials("bud", "%spencer%")
82 //.WithTls()
83 //.WithTls(new MqttClientOptionsBuilderTlsParameters
84 //{
85 // UseTls = true,
86 // CertificateValidationCallback = (X509Certificate x, X509Chain y, SslPolicyErrors z, IMqttClientOptions o) =>
87 // {
88 // // TODO: Check conditions of certificate by using above parameters.
89 // return true;
90 // }
91 //})
92 .WithCleanSession()
93 .Build();
94
95 }
96
97 public async void ConnectMqttServer()
98 {
99 await mqttClient.ConnectAsync(options, CancellationToken.None); // Since 3.0.5 with CancellationToken
100
101 }
102
103 public void ReconnectMqttServer()
104 {
105 mqttClient.UseDisconnectedHandler(async e =>
106 {
107 Console.WriteLine("### DISCONNECTED FROM SERVER ###");
108 await Task.Delay(TimeSpan.FromSeconds(5));
109
110 try
111 {
112 await mqttClient.ConnectAsync(options, CancellationToken.None); // Since 3.0.5 with CancellationToken
113 }
114 catch
115 {
116 Console.WriteLine("### RECONNECTING FAILED ###");
117 }
118 });
119 }
120
121 public async void PublishMessages(string topicMsg,string payloadMsg)
122 {
123 var message = new MqttApplicationMessageBuilder()
124 .WithTopic(topicMsg)
125 .WithPayload(payloadMsg)
126 .WithExactlyOnceQoS()
127 .WithRetainFlag()
128 .Build();
129
130 await mqttClient.PublishAsync(message);
131 }
132
133 private async void BtnSubscribe_Click(object sender, EventArgs e)
134 {
135 string topic = txtSubTopic.Text.Trim();
136
137 if (string.IsNullOrEmpty(topic))
138 {
139 MessageBox.Show("訂閱主題不能為空!");
140 return;
141 }
142
143 if (!mqttClient.IsConnected)
144 {
145 MessageBox.Show("MQTT用戶端尚未連接配接!");
146 return;
147 }
148
149 // Subscribe to a topic
150 await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build());
151
152 txtReceiveMessage.AppendText($"已訂閱[{topic}]主題" + Environment.NewLine);
153 txtSubTopic.Enabled = false;
154 BtnSubscribe.Enabled = false;
155 }
156
157 private void BtnPublish_Click(object sender, EventArgs e)
158 {
159 string topic = txtPubTopic.Text.Trim();
160
161 if (string.IsNullOrEmpty(topic))
162 {
163 MessageBox.Show("釋出主題不能為空!");
164 return;
165 }
166
167 string inputString = txtSendMessage.Text.Trim();
168
169 PublishMessages(topic, inputString);
170
171 }
172 }
173 }
7 MQTT協定中的訂閱、主題、會話
一、訂閱(Subscription)
訂閱包含主題篩選器(Topic Filter)和最大服務品質(QoS)。訂閱會與一個會話(Session)關聯。一個會話可以包含多個訂閱。每一個會話中的每個訂閱都有一個不同的主題篩選器。
二、會話(Session)
每個用戶端與伺服器建立連接配接後就是一個會話,用戶端和伺服器之間有狀态互動。會話存在于一個網絡之間,也可能在用戶端和伺服器之間跨越多個連續的網絡連接配接。
三、主題名(Topic Name)
連接配接到一個應用程式消息的标簽,該标簽與伺服器的訂閱相比對。伺服器會将消息發送給訂閱所比對标簽的每個用戶端。
四、主題篩選器(Topic Filter)
一個對主題名通配符篩選器,在訂閱表達式中使用,表示訂閱所比對到的多個主題。
五、負載(Payload)
消息訂閱者所具體接收的内容。
8 MQTT協定中的方法
MQTT協定中定義了一些方法(也被稱為動作),來于表示對确定資源所進行操作。這個資源可以代表預先存在的資料或動态生成資料,這取決于伺服器的實作。通常來說,資源指伺服器上的檔案或輸出。主要方法有:
(1)Connect。等待與伺服器建立連接配接。
(2)Disconnect。等待MQTT用戶端完成所做的工作,并與伺服器斷開TCP/IP會話。
(3)Subscribe。等待完成訂閱。