天天看點

Xamarin Mqtt

1 什麼是MQTT?

  mqtt (Message Queuing Telemetry Transport,消息隊列遙測傳輸)是 IBM 開發的一個即時通訊協定,有可能成為物聯網的重要組成部分。MQTT 是基于二進制消息的釋出/訂閱程式設計模式的消息協定,如今已經成為 OASIS 規範,由于規範很簡單,非常适合需要低功耗和網絡帶寬有限的 IoT 場景。

2 MQTTnet

  MQTTnet 是一個基于MQTT協定高度專業的.net庫,它同時提供MQTT client和MQTT server(broke),支援v3.1.0,v3.1.1和v5.0.0的标準MQTT協定.

3 MQTTnet支援範圍

  .Net Standard 1.3+

  .Net Core 1.1+

  .Net Core App 1.1+

  .Net Framework 4.5.2+(x86,x64,AnyCPU)

  Mono 5.2+

  Universal Windows Platform(UWP) 10.0.1024+(x86,x64,ARM,AnyCPU,Windwos 10 IoT Core)

  Xamarin.Android 7.5+

  Xamarin.iOS 10.14+

4 建立伺服器

  MQTT伺服器以稱為"消息代理"(Broker),可以是一個應用程式或一台裝置。它是位于消息釋出者和訂閱者之間,它可以:

  (1)接受來自客戶的網絡連接配接;

  (2)接受客戶釋出的應用資訊;

  (3)處理來自用戶端的訂閱和退訂請求;

  (4)向訂閱的客戶轉發應用程式消息。

  伺服器建立一個控制台應用,可選>>控制台應用(.NET Core)建立新項目MqttNetServer,代碼如下:

1 using MQTTnet;
  2 using MQTTnet.Protocol;
  3 using MQTTnet.Server;
  4 using Newtonsoft.Json;
  5 using System;
  6 using System.Collections.Generic;
  7 using System.IO;
  8 using System.Reflection;
  9 using System.Security.Cryptography.X509Certificates;
 10 using System.Text;
 11 using System.Threading.Tasks;
 12 
 13 namespace MqttServerTest
 14 {
 15     class Program
 16     {
 17         public static IMqttServer mqttServer;
 18         static void Main(string[] args)
 19         {
 20             mqttServer = new MQTTnet.MqttFactory().CreateMqttServer();
 21             mqttServer.UseClientConnectedHandler(e =>
 22             {
 23                 Console.WriteLine("***new connect:" + e.ClientId);
 24                 
 25             });
 26             mqttServer.UseClientDisconnectedHandler(e =>
 27             {
 28                 Console.WriteLine("*** disconnect:" + e.ClientId);
 29             });
 30 
 31             //var options = new MqttServerOptions();
 32             //await mqttServer.StartAsync(options);
 33 
 34             //var currentPath = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location);
 35             //var certificate = new X509Certificate2(Path.Combine(currentPath, "certificate.pfx"), "yourPassword", X509KeyStorageFlags.Exportable);
 36 
 37 
 38             var optionsBuilder = new MqttServerOptionsBuilder()
 39                 .WithConnectionBacklog(100)
 40                 .WithDefaultEndpointPort(1884)
 41                 .WithConnectionValidator(c=> {
 42                     //c.SessionItems.
 43                     //if (c.ClientId.Length < 10)
 44                     //{
 45                     //    c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;
 46                     //    //c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid;
 47                     //    return;
 48                     //}
 49                     //if (c.Username != "mySecretUser")
 50                     //{
 51                     //    c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
 52                     //    return;
 53                     //}
 54 
 55                     //if (c.Password != "mySecretPassword")
 56                     //{
 57                     //    c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
 58                     //    return;
 59                     //}
 60 
 61                     c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted;
 62                     Console.WriteLine("***connect validator:"+c.ClientId);
 63                 })
 64                 //.WithEncryptionCertificate(certificate.Export(X509ContentType.Pfx))
 65                 //.WithEncryptionSslProtocol(SslProtocols.Tls12)
 66                 .WithApplicationMessageInterceptor(context=> {
 67                     //if (context.ApplicationMessage.Topic == "my/custom/topic")
 68                     //{
 69                     //    context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes("The server injected payload.");
 70                     //}
 71                     //// It is possible to disallow the sending of messages for a certain client id like this:
 72                     //if (context.ClientId != "Someone")
 73                     //{
 74                     //    context.AcceptPublish = false;
 75                     //    return;
 76                     //}
 77                     // It is also possible to read the payload and extend it. For example by adding a timestamp in a JSON document.
 78                     // This is useful when the IoT device has no own clock and the creation time of the message might be important.
 79 
 80                     context.AcceptPublish = true;
 81                     Console.WriteLine("***Message:" + context.ApplicationMessage.Payload);
 82                 })
 83                 .WithSubscriptionInterceptor(context=>
 84                 {
 85                     //if (context.TopicFilter.Topic.StartsWith("admin/foo/bar") && context.ClientId != "theAdmin")
 86                     //{
 87                     //    context.AcceptSubscription = false;
 88                     //}
 89 
 90                     //if (context.TopicFilter.Topic.StartsWith("the/secret/stuff") && context.ClientId != "Imperator")
 91                     //{
 92                     //    context.AcceptSubscription = false;
 93                     //    context.CloseConnection = true;
 94                     //}
 95 
 96                     context.AcceptSubscription = true;
 97                     Console.WriteLine("***Subscript:" + context.TopicFilter);
 98                 })
 99                 //.WithStorage(new RetainedMessageHandler())
100                 ;
101             var options = optionsBuilder.Build();
102 
103             //// Setting the options
104             //options.Storage=new RetainedMessageHandler();
105 
106             StartServer(options);
107 
108             
109 
110             Console.WriteLine("Press any key to exit.");
111             Console.ReadLine();
112 
113             //await mqttServer.StopAsync();
114         }
115 
116         public static async void StartServer(IMqttServerOptions options)
117         {
118             await mqttServer.StartAsync(options);
119         }
120 
121     }
122 
123     // The implementation of the storage:
124     // This code uses the JSON library "Newtonsoft.Json".
125     public class RetainedMessageHandler : IMqttServerStorage
126     {
127         private const string Filename = "C:\\MQTT\\RetainedMessages.json";
128 
129         public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages)
130         {
131             File.WriteAllText(Filename, JsonConvert.SerializeObject(messages));
132             return Task.FromResult(0);
133         }
134 
135         public Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync()
136         {
137             IList<MqttApplicationMessage> retainedMessages;
138             if (File.Exists(Filename))
139             {
140                 var json = File.ReadAllText(Filename);
141                 retainedMessages = JsonConvert.DeserializeObject<List<MqttApplicationMessage>>(json);
142             }
143             else
144             {
145                 retainedMessages = new List<MqttApplicationMessage>();
146             }
147 
148             return Task.FromResult(retainedMessages);
149         }
150     }
151 
152 }           
Xamarin Mqtt

代碼直接運作起來,就是一個簡單的Mqtt server。

5 建立xamarin APP

  一個使用MQTT協定的應用程式或者裝置,它總是建立到伺服器的網絡連接配接。用戶端可以:

  (1)釋出其他用戶端可能會訂閱的資訊;

  (2)訂閱其它用戶端釋出的消息;

  (3)退訂或删除應用程式的消息;

  (4)斷開與伺服器連接配接。

  在VS中建立一個xamarin.Forms的移動應用,建立好後在Nuget上搜尋mqttnet,添加對MQTTnet包的引用。更改代碼如下:

  

Xamarin Mqtt
1 <?xml version="1.0" encoding="utf-8" ?>
 2 <ContentPage xmlns="http://xamarin.com/schemas/2014/forms"
 3              xmlns:x="http://schemas.microsoft.com/winfx/2009/xaml"
 4              xmlns:local="clr-namespace:CatShell"
 5              x:Class="CatShell.MainPage">
 6 
 7     <StackLayout>
 8         <!-- Place new controls here -->
 9         <Label Text="SubscribeTopic"/>
10         <Entry x:Name="txtSubTopic" Placeholder="Subscribe Topic" />
11         <Button Text="BtnSubscribe" Clicked="SubButton_Clicked"/>
12         <Entry x:Name="txtReceiveMessage"/>
13         <Label Text="PublishTopic"/>
14         <Entry x:Name="txtPubTopic"/>
15         <Entry x:Name="txtSendMessage" />
16         <Button Text="Publish" Clicked="PubButton_Clicked"/>
17         <Editor>
18             
19         </Editor>
20 
21     </StackLayout>
22 
23 </ContentPage>           
Xamarin Mqtt
Xamarin Mqtt
1 using MQTTnet;
  2 using MQTTnet.Client;
  3 using MQTTnet.Client.Options;
  4 using System;
  5 using System.Collections.Generic;
  6 using System.Linq;
  7 using System.Text;
  8 using System.Threading;
  9 using System.Threading.Tasks;
 10 using Xamarin.Forms;
 11 
 12 namespace CatShell
 13 {
 14     public partial class MainPage : ContentPage
 15     {
 16         public IMqttClient mqttClient;
 17         public IMqttClientOptions options;
 18         public MainPage()
 19         {
 20             InitializeComponent();
 21             InitMqttClient();
 22             ConnectMqttServer();
 23         }
 24 
 25         public void InitMqttClient()
 26         {
 27             // Create a new MQTT client.
 28             var factory = new MqttFactory();
 29             mqttClient = factory.CreateMqttClient();
 30 
 31             mqttClient.UseConnectedHandler(e => {
 32 
 33                 Device.BeginInvokeOnMainThread(() =>
 34                 {
 35                     txtReceiveMessage.Text = txtReceiveMessage.Text + $">> connect success." + Environment.NewLine;
 36                 });
 37             });
 38             mqttClient.UseDisconnectedHandler(e =>
 39             {
 40                 Device.BeginInvokeOnMainThread(() =>
 41                 {
 42                     txtReceiveMessage.Text = txtReceiveMessage.Text + $">> Disconnect." + Environment.NewLine;
 43                 });
 44             });
 45             mqttClient.UseApplicationMessageReceivedHandler(e =>
 46             {
 47                 Device.BeginInvokeOnMainThread(() =>
 48                 {
 49                     txtReceiveMessage.Text = $">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}" + Environment.NewLine;
 50                 });
 51             });
 52 
 53             // Create TCP based options using the builder.
 54             options = new MqttClientOptionsBuilder()
 55                 .WithClientId("Client4")
 56                 .WithTcpServer("10.100.1.247", 1884) // Use TCP connection, Port is opptinal
 57                                                      //.WithWebSocketServer("broker.hivemq.com:8000/mqtt") // Use WebSocket connection.
 58                                                      //.WithCredentials("bud", "%spencer%")
 59                                                      //.WithTls()
 60                                                      //.WithTls(new MqttClientOptionsBuilderTlsParameters
 61                                                      //{
 62                                                      //    UseTls = true,
 63                                                      //    CertificateValidationCallback = (X509Certificate x, X509Chain y, SslPolicyErrors z, IMqttClientOptions o) =>
 64                                                      //    {
 65                                                      //        // TODO: Check conditions of certificate by using above parameters.
 66                                                      //        return true;
 67                                                      //    }
 68                                                      //})
 69                 .WithCleanSession()
 70                 .Build();
 71 
 72         }
 73 
 74         public async void ConnectMqttServer()
 75         {
 76             await mqttClient.ConnectAsync(options, CancellationToken.None); // Since 3.0.5 with CancellationToken
 77 
 78         }
 79 
 80         private async void SubButton_Clicked(object sender, EventArgs e)
 81         {
 82             
 83             string topic = txtSubTopic.Text.Trim();
 84 
 85             if (string.IsNullOrEmpty(topic))
 86             {
 87                 //MessageBox.Show("訂閱主題不能為空!");
 88                 return;
 89             }
 90 
 91             if (!mqttClient.IsConnected)
 92             {
 93                 //MessageBox.Show("MQTT用戶端尚未連接配接!");
 94                 return;
 95             }
 96 
 97             // Subscribe to a topic
 98             await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build());
 99 
100             txtReceiveMessage.Text = txtReceiveMessage.Text + $"已訂閱[{topic}]主題" + Environment.NewLine;
101             txtSubTopic.IsReadOnly = false;
102             //BtnSubscribe.Enabled = false;
103         }
104 
105         private async void PubButton_Clicked(object sender, EventArgs e)
106         {
107             string topic = txtPubTopic.Text.Trim();
108 
109             if (string.IsNullOrEmpty(topic))
110             {
111                 //MessageBox.Show("釋出主題不能為空!");
112                 return;
113             }
114 
115             string inputString = txtSendMessage.Text.Trim();
116 
117             PublishMessages(topic, inputString);
118         }
119 
120         public async void PublishMessages(string topicMsg, string payloadMsg)
121         {
122             var message = new MqttApplicationMessageBuilder()
123                 .WithTopic(topicMsg)
124                 .WithPayload(payloadMsg)
125                 .WithExactlyOnceQoS()
126                 .WithRetainFlag()
127                 .Build();
128 
129             await mqttClient.PublishAsync(message);
130         }
131 
132     }
133 }           
Xamarin Mqtt

代碼運作起來,在APP上可以直接發資訊。

6 建立winForm client(可選)

  可以建立一個winForm來互相互動,在VS上建立一個windows窗體應用(.NET Framework),界面設計如下

Xamarin Mqtt

  背景代碼如下:

Xamarin Mqtt
1 using MQTTnet;
  2 using MQTTnet.Client.Options;
  3 using MQTTnet.Client;
  4 using System;
  5 using System.Collections.Generic;
  6 using System.ComponentModel;
  7 using System.Data;
  8 using System.Drawing;
  9 using System.Linq;
 10 using System.Net.Security;
 11 using System.Security.Cryptography.X509Certificates;
 12 using System.Text;
 13 using System.Threading;
 14 using System.Threading.Tasks;
 15 using System.Windows.Forms;
 16 
 17 namespace MqttClientWin
 18 {
 19     public partial class Form1 : Form
 20     {
 21         public IMqttClient mqttClient;
 22         public IMqttClientOptions options;
 23         public Form1()
 24         {
 25             InitializeComponent();
 26             InitMqttClient();
 27             ConnectMqttServer();
 28         }
 29 
 30         public void InitMqttClient()
 31         {
 32             // Create a new MQTT client.
 33             var factory = new MqttFactory();
 34             mqttClient = factory.CreateMqttClient();
 35 
 36             mqttClient.UseApplicationMessageReceivedHandler(e =>
 37             {
 38                 Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
 39                 Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}");
 40                 Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
 41                 Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
 42                 Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}");
 43                 Console.WriteLine();
 44 
 45                 this.Invoke(new Action(() =>
 46                 {
 47                     txtReceiveMessage.AppendText($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");
 48                 }));
 49 
 50                 //Task.Run(() => mqttClient.PublishAsync("hello/world"));
 51             });
 52 
 53             mqttClient.UseConnectedHandler(async e =>
 54             {
 55                 Console.WriteLine("### CONNECTED WITH SERVER ###");
 56 
 57                 //// Subscribe to a topic
 58                 //await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build());
 59 
 60                 //Console.WriteLine("### SUBSCRIBED ###");
 61 
 62                 this.Invoke(new Action(() =>
 63                 {
 64                     txtReceiveMessage.AppendText($">> connect success.{Environment.NewLine}");
 65                 }));
 66             });
 67 
 68             mqttClient.UseDisconnectedHandler(e =>
 69             {
 70                 this.Invoke(new Action(() =>
 71                 {
 72                     txtReceiveMessage.AppendText($">> Disconnect .{Environment.NewLine}");
 73                 }));
 74             });
 75 
 76             // Create TCP based options using the builder.
 77             options = new MqttClientOptionsBuilder()
 78                 .WithClientId("Client5")
 79                 .WithTcpServer("10.100.1.247", 1884) // Use TCP connection, Port is opptinal
 80                                                           //.WithWebSocketServer("broker.hivemq.com:8000/mqtt") // Use WebSocket connection.
 81                 //.WithCredentials("bud", "%spencer%")
 82                 //.WithTls()
 83                 //.WithTls(new MqttClientOptionsBuilderTlsParameters
 84                 //{
 85                 //    UseTls = true,
 86                 //    CertificateValidationCallback = (X509Certificate x, X509Chain y, SslPolicyErrors z, IMqttClientOptions o) =>
 87                 //    {
 88                 //        // TODO: Check conditions of certificate by using above parameters.
 89                 //        return true;
 90                 //    }
 91                 //})
 92                 .WithCleanSession()
 93                 .Build();
 94             
 95         }
 96 
 97         public async void ConnectMqttServer()
 98         {
 99             await mqttClient.ConnectAsync(options, CancellationToken.None); // Since 3.0.5 with CancellationToken
100             
101         }
102 
103         public void ReconnectMqttServer()
104         {
105             mqttClient.UseDisconnectedHandler(async e =>
106             {
107                 Console.WriteLine("### DISCONNECTED FROM SERVER ###");
108                 await Task.Delay(TimeSpan.FromSeconds(5));
109 
110                 try
111                 {
112                     await mqttClient.ConnectAsync(options, CancellationToken.None); // Since 3.0.5 with CancellationToken
113                 }
114                 catch
115                 {
116                     Console.WriteLine("### RECONNECTING FAILED ###");
117                 }
118             });
119         }
120 
121         public async void PublishMessages(string topicMsg,string payloadMsg)
122         {
123             var message = new MqttApplicationMessageBuilder()
124                 .WithTopic(topicMsg)
125                 .WithPayload(payloadMsg)
126                 .WithExactlyOnceQoS()
127                 .WithRetainFlag()
128                 .Build();
129 
130             await mqttClient.PublishAsync(message);
131         }
132 
133         private async void BtnSubscribe_Click(object sender, EventArgs e)
134         {
135             string topic = txtSubTopic.Text.Trim();
136 
137             if (string.IsNullOrEmpty(topic))
138             {
139                 MessageBox.Show("訂閱主題不能為空!");
140                 return;
141             }
142 
143             if (!mqttClient.IsConnected)
144             {
145                 MessageBox.Show("MQTT用戶端尚未連接配接!");
146                 return;
147             }
148             
149             // Subscribe to a topic
150             await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build());
151 
152             txtReceiveMessage.AppendText($"已訂閱[{topic}]主題" + Environment.NewLine);
153             txtSubTopic.Enabled = false;
154             BtnSubscribe.Enabled = false;
155         }
156 
157         private void BtnPublish_Click(object sender, EventArgs e)
158         {
159             string topic = txtPubTopic.Text.Trim();
160 
161             if (string.IsNullOrEmpty(topic))
162             {
163                 MessageBox.Show("釋出主題不能為空!");
164                 return;
165             }
166 
167             string inputString = txtSendMessage.Text.Trim();
168 
169             PublishMessages(topic, inputString);
170             
171         }
172     }
173 }           
Xamarin Mqtt

7 MQTT協定中的訂閱、主題、會話

  一、訂閱(Subscription)

  訂閱包含主題篩選器(Topic Filter)和最大服務品質(QoS)。訂閱會與一個會話(Session)關聯。一個會話可以包含多個訂閱。每一個會話中的每個訂閱都有一個不同的主題篩選器。

  二、會話(Session)

  每個用戶端與伺服器建立連接配接後就是一個會話,用戶端和伺服器之間有狀态互動。會話存在于一個網絡之間,也可能在用戶端和伺服器之間跨越多個連續的網絡連接配接。

  三、主題名(Topic Name)

  連接配接到一個應用程式消息的标簽,該标簽與伺服器的訂閱相比對。伺服器會将消息發送給訂閱所比對标簽的每個用戶端。

  四、主題篩選器(Topic Filter)

  一個對主題名通配符篩選器,在訂閱表達式中使用,表示訂閱所比對到的多個主題。

  五、負載(Payload)

  消息訂閱者所具體接收的内容。

8 MQTT協定中的方法

  MQTT協定中定義了一些方法(也被稱為動作),來于表示對确定資源所進行操作。這個資源可以代表預先存在的資料或動态生成資料,這取決于伺服器的實作。通常來說,資源指伺服器上的檔案或輸出。主要方法有:

  (1)Connect。等待與伺服器建立連接配接。

  (2)Disconnect。等待MQTT用戶端完成所做的工作,并與伺服器斷開TCP/IP會話。

  (3)Subscribe。等待完成訂閱。