Tip: 此篇已加入 .NET Core微服務基礎系列文章索引
一、預備知識:資料一緻性
關于資料一緻性的文章,園子裡已經有很多了,如果你還不了解,那麼可以通過以下的幾篇文章去快速地了解了解,有個感性認識即可。
(1)左正,《
保證分布式系統資料一緻性的6種方案》
(2)成金之路,《
分布式系統的資料一緻性解決方案(3)E_Star,《
分布式環境下資料一緻性的設計總結(4)Itegel,《
分布式事務?No,最終一緻性必須要了解的點:_ACID、CAP、BASE、強一緻性、弱一緻性、最終一緻性_。
CAP理論由加州大學伯克利分校的計算機教授Eric Brewer在2000年提出,其核心思想是任何基于網絡的資料共享系統最多隻能滿足資料一緻性(Consistency)、可用性(Availability)和網絡分區容忍(Partition Tolerance)三個特性中的兩個(由此我們知道在分布式系統中,同時滿足CAP三個特性是不可能的),三個特性的定義如下:
C:資料一緻性(Consistency):如果系統對一個寫操作傳回成功,那麼之後的讀請求都必須讀到這個新資料;如果傳回失敗,那麼所有讀操作都不能讀到這個資料,對調用者而言資料具有強一緻性(Strong Consistency)(又叫原子性Atomic或線性一緻性Linerizable Consistency)
A:服務可用性(Availability):所有讀寫請求在一定時間内得到響應,可終止、不會一直等待
P:分區容錯性(Partition-Tolerance):在網絡分區的情況下,被分隔的節點仍能正常對外服務
- 強一緻性:當更新操作完成之後,任何多個後續程序或者線程的通路都會傳回最新的更新過的值。這種是對使用者最友好的,就是使用者上一次寫什麼,下一次就保證能讀到什麼。根據 CAP 理論,這種實作需要犧牲可用性。=> 在傳統單體式應用中,大部分都是強一緻性的應用,想想我們寫過多少工作單元模式的Code?
- 弱一緻性:系統并不保證續程序或者線程的通路都會傳回最新的更新過的值。系統在資料寫入成功之後,不承諾立即可以讀到最新寫入的值,也不會具體的承諾多久之後可以讀到。
- 最終一緻性:弱一緻性的特定形式。系統保證在沒有後續更新的前提下,系統最終傳回上一次更新操作的值。在沒有故障發生的前提下,不一緻視窗的時間主要受通信延遲,系統負載和複制副本的個數影響。
為保證可用性,網際網路分布式架構中經常将強一緻性需求轉換成最終一緻性的需求,并通過系統執行幂等性的保證,保證資料的最終一緻性。
在微服務架構中,各個微服務之間通常會使用事件驅動通信和釋出訂閱系統實作最終一緻性。
更多背景知識,還是得看上面列出的參考文章,這裡不再贅述。
二、MassTransit極簡介紹
MassTransit 是一個自由、開源、輕量級的消息總線, 用于使用. NET 架構建立分布式應用程式。MassTransit 在現有消息傳輸上提供了一組廣泛的功能, 進而使開發人員能夠友好地使用基于消息的會話模式異步連接配接服務。基于消息的通信是實作面向服務的體系結構的可靠和可擴充的方式。
官網位址:
http://masstransit-project.com/,GitHub位址:
https://github.com/MassTransit/MassTransit(目前:1590Star,719Fork)
類似的國外開源元件還有
NServiceBus,沒有用過,據說MassTransit比NServiceBus更加輕量級,并且在開發之初就選用了RabbitMQ作為消息傳輸元件,當然MassTransit還支援Azure Service Bus。類似的國内開源元件則有園友savorboard(楊曉東)的
CAP,這個我會在MassTransit學習結束後去使用使用,CAP在GitHub上已經有了超過1000個Star,是NCC的幾個千星項目之一。另外,張善友大隊長在他的NanoFabric項目中推薦我們使用Rebus和Ray,如下圖所示:
由于時間和精力,以及文檔資料的可見性,我在我的POC和這個系列博文的準備中,隻會使用到MassTransit和CAP這兩個開源項目。
三、MassTransit Quick Start
這裡以MassTransit + RabbitMQ為例子,首先請確定安裝了RabbitMQ,如果沒有安裝,可以閱讀我的《
基于EasyNetQ使用RabbitMQ消息隊列》去把RabbitMQ先安裝到你的電腦上。另外,RabbitMQ的背景知識也有一堆,有機會也還是要了解下Exchange,Channel、Queue等内容。
3.1 最簡單的發送/接收執行個體
(1)準備兩個控制台程式,一個為Sender(發送者),一個為Receiver(接收者),并分别通過NuGet安裝MassTransit以及MassTransit.RabbitMQ
NuGet>Install-Package MassTransit
NuGet>Install-Package MassTransit.RabbitMQ
(2)編寫Sender
public class Program
{
public static void Main(string[] args)
{
Console.Title = "MassTransit Client";
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst =>
{
hst.Username("admin");
hst.Password("edison");
});
});
var uri = new Uri("rabbitmq://192.168.80.71/EDCVHOST/Qka.MassTransitTest");
var message = Console.ReadLine();
while (message != null)
{
Task.Run(() => SendCommand(bus, uri, message)).Wait();
message = Console.ReadLine();
}
Console.ReadKey();
}
private static async void SendCommand(IBusControl bus, Uri sendToUri, string message)
{
var endPoint = await bus.GetSendEndpoint(sendToUri);
var command = new Client()
{
Id = 100001,
Name = "Edison Zhou",
Birthdate = DateTime.Now.AddYears(-18),
Message = message
};
await endPoint.Send(command);
Console.WriteLine($"You Sended : Id = {command.Id}, Name = {command.Name}, Message = {command.Message}");
}
}
這裡首先連接配接到我的RabbitMQ服務,然後向指定的Queue發送消息(這裡是一個Client類型的執行個體對象)。
(3)編寫Receiver
public class Program
{
public static void Main(string[] args)
{
Console.Title = "MassTransit Server";
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst =>
{
hst.Username("admin");
hst.Password("edison");
});
cfg.ReceiveEndpoint(host, "Qka.MassTransitTest", e=>
{
e.Consumer<TestConsumerClient>();
e.Consumer<TestConsumerAgent>();
});
});
bus.Start();
Console.WriteLine("Press any key to exit.");
Console.ReadKey();
bus.Stop();
}
}
對于Receiver,要做的事就隻有兩件:一是連接配接到RabbitMQ,二是告訴RabbitMQ我要接收哪個消息隊列的什麼類型的消息。下面是TestConsumerClient和TestConsumerAgent的定義:
public class TestConsumerClient : IConsumer<Client>
{
public async Task Consume(ConsumeContext<Client> context)
{
Console.ForegroundColor = ConsoleColor.Red;
await Console.Out.WriteLineAsync($"Receive message: {context.Message.Id}, {context.Message.Name}, {context.Message.Birthdate.ToString()}");
Console.ResetColor();
}
}
public class Client
{
public int Id { get; set; }
public string Name { get; set; }
public DateTime Birthdate { get; set; }
public string Message { get; set; }
}
public class TestConsumerAgent : IConsumer<Agent>
{
public async Task Consume(ConsumeContext<Agent> context)
{
Console.ForegroundColor = ConsoleColor.Red;
await Console.Out.WriteLineAsync($"Receive message: {context.Message.AgentCode}, {context.Message.AgentName}, {context.Message.AgentRole}");
Console.ResetColor();
}
}
public class Agent
{
public int AgentCode { get; set; }
public string AgentName { get; set; }
public string AgentRole { get; set; }
public string Message { get; set; }
}
(4)測試一下:
3.2 最簡單的釋出/訂閱執行個體
除了簡單的發送/接收模式外,我們用的更多的是釋出/訂閱這種模式。
(1)準備下圖所示的類庫和控制台項目,并對除Messages類庫之外的其他項目安裝MassTransit以及MassTransit.RabbitMQ。
(2)Messages類庫:準備需要傳輸的Message
public class TestBaseMessage
{
public string Name { get; set; }
public DateTime Time { get; set; }
public string Message { get; set; }
}
public class TestCustomMessage
{
public string Name { get; set; }
public DateTime Time { get; set; }
public int Age { get; set; }
public string Message { get; set; }
}
(3)Publisher:接收我的消息吧騷年們
public class Program
{
public static void Main(string[] args)
{
Console.Title = "MassTransit Publisher";
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST/"), hst =>
{
hst.Username("admin");
hst.Password("edison");
});
});
do
{
Console.WriteLine("Please enter your message, if want to exit please press q.");
string message = Console.ReadLine();
if (message.ToLower().Equals("q"))
{
break;
}
bus.Publish(new TestBaseMessage()
{
Name = "Edison Zhou",
Time = DateTime.Now,
Message = message
});
bus.Publish(new TestCustomMessage()
{
Name = "Leo Dai",
Age = 27,
Time = DateTime.Now,
Message = message
});
} while (true);
bus.Stop();
}
這裡向RabbitMQ釋出了兩個不同類型的消息(TestBaseMessage和TestCustomMessage)
(4)SubscriberA:我隻接收TestBaseMessage類型的消息,其他的我不要
public class Program
{
public static void Main(string[] args)
{
Console.Title = "MassTransit SubscriberA";
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst =>
{
hst.Username("admin");
hst.Password("edison");
});
cfg.ReceiveEndpoint(host, "Qka.MassTransitTestv2.CA", e =>
{
e.Consumer<ConsumerA>();
});
});
bus.Start();
Console.ReadKey(); // press Enter to Stop
bus.Stop();
}
}
這裡ConsumerA的定義如下:可以看出,它實作了一個泛型接口IConsumer,然後指定了TestBaseMessage為消費的消息類型。
public class ConsumerA : IConsumer<TestBaseMessage>
{
public async Task Consume(ConsumeContext<TestBaseMessage> context)
{
Console.ForegroundColor = ConsoleColor.Red;
await Console.Out.WriteLineAsync($"SubscriberA => ConsumerA received message : {context.Message.Name}, {context.Message.Time}, {context.Message.Message}, Type:{context.Message.GetType()}");
Console.ResetColor();
}
}
(5)SubscriberA:我隻接收TestCustomMessage類型的消息,其他的我不要
public class Program
{
public static void Main(string[] args)
{
Console.Title = "MassTransit SubscriberB";
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst =>
{
hst.Username("admin");
hst.Password("edison");
});
cfg.ReceiveEndpoint(host, "Qka.MassTransitTestv2.CB", e =>
{
e.Consumer<ConsumerA>();
});
});
bus.Start();
Console.ReadKey(); // press Enter to Stop
bus.Stop();
}
}
這裡的ConsumerA的定義如下;它實作的接口是IConsumer
public class ConsumerA : IConsumer<TestCustomMessage>
{
public async Task Consume(ConsumeContext<TestCustomMessage> context)
{
Console.ForegroundColor = ConsoleColor.Red;
await Console.Out.WriteLineAsync($"SubscriberB => ConsumerA received message : {context.Message.Name}, {context.Message.Time}, {context.Message.Message}, Age: {context.Message.Age}, Type:{context.Message.GetType()}");
Console.ResetColor();
}
}
(6)測試一下:由于Publisher發送了兩個不同類型的消息,兩個Subscriber均隻接受其中的一個類型的消息。
3.3 帶傳回狀态消息的示例
之前的例子都是釋出之後,不管訂閱者有沒有收到以及收到後有沒有處理成功(即有沒有傳回消息,類似于HTTP請求和響應),在MassTransit中提供了這樣的一種模式,并且還可以結合GreenPipes的一些擴充方法實作重試、限流以及熔斷機制。這一部分詳見官方文檔:
http://masstransit-project.com/MassTransit/usage/request-response.html(1)準備下圖所示的三個項目:通過NuGet安裝MassTransit以及MassTransit.RabbitMQ
(2)Messages:準備請求和響應的消息傳輸類型
public interface IRequestMessage
{
int MessageId { get; set; }
string Content { get; set; }
}
public class RequestMessage : IRequestMessage
{
public int MessageId { get; set; }
public string Content { get; set; }
public int RequestId { get; set; }
}
public interface IResponseMessage
{
int MessageCode { get; set; }
string Content { get; set; }
}
public class ResponseMessage : IResponseMessage
{
public int MessageCode { get; set; }
public string Content { get; set; }
public int RequestId { get; set; }
}
(3)Sender 請求發送端
public class Program
{
public static void Main(string[] args)
{
Console.Title = "Masstransit Request Side";
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst =>
{
hst.Username("admin");
hst.Password("edison");
});
// Retry : 重試
cfg.UseRetry(ret =>
{
ret.Interval(3, 10); // 消費失敗後重試3次,每次間隔10s
});
// RateLimit : 限流
cfg.UseRateLimit(1000, TimeSpan.FromMinutes(1)); // 1分鐘以内最多1000次調用通路
// CircuitBreaker : 熔斷
cfg.UseCircuitBreaker(cb =>
{
cb.TrackingPeriod = TimeSpan.FromMinutes(1);
cb.TripThreshold = 15; // 當失敗的比例至少達到15%才會啟動熔斷
cb.ActiveThreshold = 10; // 當失敗次數至少達到10次才會啟動熔斷
cb.ResetInterval = TimeSpan.FromMinutes(5);
}); // 當在1分鐘内消費失敗率達到15%或調用了10次還是失敗時,暫停5分鐘的服務通路
});
bus.Start();
SendMessage(bus);
bus.Stop();
}
private static void SendMessage(IBusControl bus)
{
var mqAddress = new Uri($"rabbitmq://192.168.80.71/EDCVHOST/Qka.MassTransitTestv3");
var client = bus.CreateRequestClient<IRequestMessage, IResponseMessage>(mqAddress,
TimeSpan.FromHours(10)); // 建立請求用戶端,10s之内木有回饋則認為是逾時(Timeout)
do
{
Console.WriteLine("Press q to exit if you want.");
string value = Console.ReadLine();
if (value.ToLower().Equals("q"))
{
break;
}
Task.Run(async () =>
{
var request = new RequestMessage()
{
MessageId = 10001,
Content = value,
RequestId = 10001
};
var response = await client.Request(request);
Console.WriteLine($"Request => MessageId={request.MessageId}, Content={request.Content}");
Console.WriteLine($"Response => MessageCode={response.MessageCode}, Content={response.Content}");
}).Wait();
} while (true);
}
}
這裡不再解釋,請看注釋。
(4)Receiver 接收端
public class Program
{
public static void Main(string[] args)
{
Console.Title = "MassTransit Response Side";
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst =>
{
hst.Username("admin");
hst.Password("edison");
});
cfg.ReceiveEndpoint(host, "Qka.MassTransitTestv3", e =>
{
e.Consumer<RequestConsumer>();
});
});
bus.Start();
Console.WriteLine("Press any key to exit.");
Console.ReadKey();
bus.Stop();
}
}
其中,RequestConsumer的定義如下:
public class RequestConsumer : IConsumer<IRequestMessage>
{
public async Task Consume(ConsumeContext<IRequestMessage> context)
{
Console.ForegroundColor = ConsoleColor.Red;
await Console.Out.WriteLineAsync($"Received message: Id={context.Message.MessageId}, Content={context.Message.Content}");
Console.ResetColor();
var response = new ResponseMessage
{
MessageCode = 200,
Content = $"Success",
RequestId = context.Message.MessageId
};
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"Response message: Code={response.MessageCode}, Content={response.Content}, RequestId={response.RequestId}");
Console.ResetColor();
await context.RespondAsync(response);
}
}
(5)測試一下:
可以看出,請求調用方收到了來自接收方傳回的狀态消息,我們可以借助傳回值去check一些狀态。這裡不再示範發生異常進而啟用重試、熔斷等的示例,有興趣的園友可以自行測試。
3.4 Observer模式的釋出/訂閱示例
在某些場景中,我們需要針對一個消息進行類似于AoP(面向切面程式設計)或者監控的操作,比如在發送消息之前和結束後記日志等操作,我們可以借助MassTransit中的Observer模式來實作。(在MassTransit的消息接收中,可以通過兩種模式來實作:一種是基于實作IConsumer接口,另一種就是基于實作IObserver接口)關于這一部分,詳見官方文檔:
http://masstransit-project.com/MassTransit/usage/observers.html(1)準備以下圖所示的項目:
(2)Messages:定義要使用的Consumer和Observer
Consumer:
public class TestMessageConsumer : IConsumer<TestMessage>
{
public async Task Consume(ConsumeContext<TestMessage> context)
{
Console.ForegroundColor = ConsoleColor.Red;
await Console.Out.WriteLineAsync($"TestMessageConsumer => Type:{context.Message.GetType()}, ID:{context.Message.MessageId}, Content:{context.Message.Content}");
Console.ResetColor();
}
}
Observer:
public class PublishObserver : IPublishObserver
{
public Task PrePublish<T>(PublishContext<T> context) where T : class
{
Console.WriteLine("------------------PrePublish--------------------");
var message = context.Message as TestMessage;
Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}");
Console.WriteLine("----------------------------------------------------");
return Task.CompletedTask;
}
public Task PostPublish<T>(PublishContext<T> context) where T : class
{
Console.WriteLine("------------------PostPublish--------------------");
var message = context.Message as TestMessage;
Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}");
Console.WriteLine("----------------------------------------------------");
return Task.CompletedTask;
}
public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class
{
Console.WriteLine("------------------PublishFault--------------------");
var message = context.Message as TestMessage;
Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}");
Console.WriteLine("------------------------------------------------------");
return Task.CompletedTask;
}
}
public class SendObserver : ISendObserver
{
public Task PreSend<T>(SendContext<T> context) where T : class
{
Console.WriteLine("------------------PreSend--------------------");
var message = context.Message as TestMessage;
Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}");
Console.WriteLine("-------------------------------------------------");
return Task.CompletedTask;
}
public Task PostSend<T>(SendContext<T> context) where T : class
{
Console.WriteLine("------------------PostSend-------------------");
var message = context.Message as TestMessage;
Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}");
Console.WriteLine("-------------------------------------------------");
return Task.CompletedTask;
}
public Task SendFault<T>(SendContext<T> context, Exception exception) where T : class
{
Console.WriteLine("------------------SendFault-----------------");
var message = context.Message as TestMessage;
Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}");
Console.WriteLine("-------------------------------------------------");
return Task.CompletedTask;
}
}
public class ReceiveObserver : IReceiveObserver
{
public Task PreReceive(ReceiveContext context)
{
Console.WriteLine("------------------PreReceive--------------------");
Console.WriteLine(Encoding.Default.GetString(context.GetBody()));
Console.WriteLine("--------------------------------------");
return Task.CompletedTask;
}
public Task PostReceive(ReceiveContext context)
{
Console.WriteLine("------------------PostReceive--------------------");
Console.WriteLine(Encoding.Default.GetString(context.GetBody()));
Console.WriteLine("------------------------------------------------------");
return Task.CompletedTask;
}
public Task ReceiveFault(ReceiveContext context, Exception exception)
{
Console.WriteLine("------------------ReceiveFault--------------------");
Console.WriteLine(Encoding.Default.GetString(context.GetBody()));
Console.WriteLine("-------------------------------------------------------");
return Task.CompletedTask;
}
public Task PostConsume<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType) where T : class
{
Console.WriteLine("------------------PostConsume--------------------");
var message = context.Message as TestMessage;
Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}");
Console.WriteLine("--------------------------------------------------------");
return Task.CompletedTask;
}
public Task ConsumeFault<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType, Exception exception) where T : class
{
Console.WriteLine("------------------ConsumeFault-------------------");
var message = context.Message as TestMessage;
Console.WriteLine($"ID={message.MessageId}, Content={message.Content},Time={message.Time}");
Console.WriteLine("--------------------------------------------------------");
return Task.CompletedTask;
}
}
(3)ObserverPublish
public class Program
{
public static void Main(string[] args)
{
Console.Title = "Masstransit Observer Publisher";
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst =>
{
hst.Username("admin");
hst.Password("edison");
});
});
var observer1 = new SendObserver();
var handle1 = bus.ConnectSendObserver(observer1);
var observer2 = new PublishObserver();
var handle2 = bus.ConnectPublishObserver(observer2);
bus.Start();
do
{
Console.WriteLine("Presss q if you want to exit this program.");
string message = Console.ReadLine();
if (message.ToLower().Equals("q"))
{
break;
}
bus.Publish(new TestMessage
{
MessageId = 10001,
Content = message,
Time = DateTime.Now
});
} while (true);
bus.Stop();
}
}
可以看到,這裡我們使用了兩個用于發送消息的Observer:SendObserver和PublishObserver
(3)ObserverReceive
public class Program
{
public static void Main(string[] args)
{
Console.Title = "Masstransit Observer Receiver";
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://192.168.80.71/EDCVHOST"), hst =>
{
hst.Username("admin");
hst.Password("edison");
});
cfg.ReceiveEndpoint(host, "Qka.MassTransitTestv4", e =>
{
e.Consumer<TestMessageConsumer>();
});
});
var observer = new ReceiveObserver();
var handle = bus.ConnectReceiveObserver(observer);
bus.Start();
Console.ReadKey();
bus.Stop();
}
}
由此看出,我們可以借助Observer這種方式來截取到消息的一個lifecycle,用以進行不同階段的業務邏輯操作。
四、小結
本篇極簡的介紹了一下資料一緻性和MassTransit這個開源的元件,通過幾個例子介紹了在.NET環境下如何使用MassTransit操作RabbitMQ實作消息的接收/發送以及釋出/訂閱。後續我會繼續使用MassTransit結合Quartz.Net和Polly在ASP.NET Core環境下實作一個簡單的基于補償機制的資料一緻性的小案例,希望到時也可以和各位園友分享。
示例代碼
Click Here =>
點我下載下傳參考資料
(1)桂素偉,《
基于.NET Core的微服務(2)richieyangs(張陽),《
如何優雅的使用RabbitMQ》,《
使用Masstransit開發基于消息傳遞的分布式應用(3)青客寶團隊,《
MassTransit&Sagas分布式服務開發ppt分享(4)成天,《
MassTransit實作應用程式間的互動(5)娃娃都會打醬油了,《
MassTransit學習記錄(6)MassTransit 官方文檔,
http://masstransit-project.com/MassTransit/作者:
周旭龍出處:
http://edisonchou.cnblogs.com本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連結。