事件釋出/訂閱是一種非常強大的模式,它可以幫助業務元件間實作完全解耦,不同的業務元件隻依賴事件,隻關注哪些事件是需要自己處理的,而不用關注誰來處理自己釋出事件,事件追溯(Event Sourcing)也是基于事件釋出/訂閱的。
标題:ASP.NET Core中實作單體程式的事件釋出/訂閱
作者:Lamond Lu
位址:https://www.cnblogs.com/lwqlun/p/10468058.html
項目源代碼:https://github.com/lamondlu/EventHandlerInSingleApplication
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5SNxIDM1kjM0ETL5IzM3IDOxATM0AzMwkTMwITLxMDO1YzLcNDM5EDMy8CXxMDO1YzLcd2bsJ2Lc12bj5ycn9Gbi52YugTMwIzZtl2Lc9CX6MHc0RHaiojIsJye.png)
背景
事件釋出/訂閱是一種非常強大的模式,它可以幫助業務元件間實作完全解耦,不同的業務元件隻依賴事件,隻關注哪些事件是需要自己處理的,而不用關注誰來處理自己釋出事件,事件追溯(Event Sourcing)也是基于事件釋出/訂閱的。在微服務架構中,事件釋出/訂閱有非常多的應用場景。今天我給大家分享一個基于ASP.NET Core的單體程式使用事件釋出/訂閱的例子,針對分布式項目的事件釋出/訂閱比較複雜,難點是事務處理,後續我會另寫一篇博文來示範。
案例說明
目前我們有一個基于ASP.NET Core的電子商務系統,在項目的初期,業務非常簡單,隻有一個購物車子產品和一個訂單子產品,所有的代碼都放在一個項目中。
整個項目使用了一個簡單的三層架構。
這裡當使用者送出購物車的時候,程式會在
ShoppingCartManager
類的
SubmitShoppingCart
方法中執行3個操作
- 修改目前購物車的狀态為完成
- 根據購物車中的物品建立一個新訂單
- 給使用者發郵件
代碼如下:
public void SubmitShoppingCart(string shoppingCartId)
{
var shoppingCart = _unitOfWork.ShoppingCartRepository
.GetShoppingCart(shoppingCartId);
_unitOfWork.ShoppingCartRepository
.SubmitShoppingCart(shoppingCartId);
_unitOfWork.OrderRepository
.CreatOrder(new CreateOrderDTO
{
Items = shoppingCart.Items
.Select(p => new NewOrderItemDTO
{
ItemId = p.ItemId,
Name = p.Name,
Price = p.Price
}).ToList()
});
//這裡為了簡化代碼,我用指令行表示發送郵件的邏輯
Console.WriteLine("Confirm Email Sent.");
_unitOfWork.Save();
}
根據SOLID設計原則中的單一責任原則,如果一個類承擔的職責過多,就等于把這些職責耦合在一起了。這裡生成訂單和發送郵件都不應該是目前
SubmitShoppingCart
需要負責的,是以我們需要它們從這個方法中移出去,使用的方法就是事件訂閱/釋出。
新的架構圖
以下是使用事件釋出/訂閱之後的系統架構圖。
- 這裡我們會建立一個購物車送出事件
。ShoppingCartSubmittedEvent
- 當站點啟動的時候,我們會在一個名為
的類中注冊訂閱EventHandlerContainer
事件的2個處理類ShoppingCartSubmittedEvent
和CreateOrderHandler
ConfirmEmailSentHandler
- 在
方法中,我們會做2件事情:SubmitShoppingCart
- 更改目前購物車的狀态。
- 釋出
事件。ShoppingCartSubmittedEvent
-
事件處理器會調用CreateOrderHandler
類中的建立訂單方法。OrderManager
-
事件處理器會負責發送郵件。ConfirmEmailSentHandler
好的,下面讓我們來一步一步實作以上描述的代碼。
添加事件基類
這裡我們首先定義一個事件基類,其中暫時隻添加了一個屬性
OccuredOn
,它表示了目前事件的觸發時間。
public class EventBase
{
public EventBase()
{
OccuredOn = DateTime.Now;
}
protected DateTime OccuredOn
{
get;
set;
}
}
定義購物車送出事件
接下來我們就需要建立購物車送出事件類
ShoppingCartSubmittedEvent
, 它繼承自
EventBase
, 并提供了一個購物項集合
public class ShoppingCartSubmittedEvent : EventBase
{
public ShoppingCartSubmittedEvent()
{
Items = new List<ShoppingCartSubmittedItem>();
}
public List<ShoppingCartSubmittedItem> Items { get; set; }
}
public class ShoppingCartSubmittedItem
{
public string ItemId { get; set; }
public string Name { get; set; }
public decimal Price { get; set; }
}
定義事件處理器接口
為了添加事件處理器,我們首先需要定義一個泛型接口類
IEventHandler
public interface IEventHandler<T> where T : EventBase
{
void Run(T obj);
Task RunAsync(T obj);
}
這個泛型接口類的是泛型類型必須繼承自
EventBase
類。接口提供了2個方法
Run
RunAsync
。 它們定義了該接口的實作類必須實作同一個處理邏輯的同步和異步方法。
為購物車送出事件編寫事件處理器
有了事件處理器接口,接下來我們就可以開始為購物車送出事件添加事件處理器了。這裡我們為了實作前面定義的邏輯,我們需要建立2個處理器
CreateOrderHandler
ConfirmEmailSentHandler
CreateOrderHandler.cs
public class CreateOrderHandler : IEventHandler<ShoppingCartSubmittedEvent>
{
private IOrderManager _orderManager = null;
public CreateOrderHandler(IOrderManager orderManager)
{
_orderManager = orderManager;
}
public void Run(ShoppingCartSubmittedEvent obj)
{
_orderManager.CreateNewOrder(new Models.DTOs.CreateOrderDTO
{
Items = obj.Items.Select(p => new Models.DTOs.NewOrderItemDTO
{
ItemId = p.ItemId,
Name = p.Name,
Price = p.Price
}).ToList()
});
}
public Task RunAsync(ShoppingCartSubmittedEvent obj)
{
return Task.Run(() =>
{
Run(obj);
});
}
}
代碼解釋:
的構造函數中,我們注入了
CreateOrderHandler
接口對象,
IOrderManager
負責最終建立訂單的工作
CreateNewOrder
- 這裡為了簡化代碼,我直接使用了Task.Run,并在其中調用了同步方法實作
ConfirmEmailSentHandler.cs
public class ConfirmEmailSentHandler : IEventHandler<ShoppingCartSubmittedEvent>
{
public void Run(ShoppingCartSubmittedEvent obj)
{
Console.WriteLine("Confirm Email Sent.");
}
public Task RunAsync(ShoppingCartSubmittedEvent obj)
{
return Task.Run(() =>
{
Console.WriteLine("Confirm Email Sent.");
});
}
}
- 這個處理類非常簡單,為了簡化代碼,我僅輸出了一行文本來表示實際需要運作的代碼。
為 OrderManager
類添加建立訂單方法
OrderManager
IOrderManager.cs
public interface IOrderManager
{
string CreateNewOrder(CreateOrderDTO dto);
}
OrderManager.cs
public class OrderManager : IOrderManager
{
private IOrderRepository _orderRepository;
public OrderManager(IOrderRepository orderRepository)
{
_orderRepository = orderRepository;
}
public string CreateNewOrder(CreateOrderDTO dto)
{
var orderId = _orderRepository.CreatOrder(dto);
Console.WriteLine($"One order created: {JsonConvert.SerializeObject(dto)}");
return orderId;
}
}
建立 EventHandlerContainer
EventHandlerContainer
下面我們來編寫最核心的事件處理器容器。在這裡我們的事件處理器容器完成了3個功能
- 訂閱事件(Subscribe Event)
- 取消訂閱事件(Unsubscribe Event)
- 釋出事件(Publish Event)
public class EventHandlerContainer
{
private IServiceProvider _serviceProvider = null;
private static Dictionary<string, List<Type>> _mappings = new Dictionary<string, List<Type>>();
public EventHandlerContainer(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public static void Subscribe<T, THandler>()
where T : EventBase
where THandler : IEventHandler<T>
{
var name = typeof(T).Name;
if (!_mappings.ContainsKey(name))
{
_mappings.Add(name, new List<Type> { });
}
_mappings[name].Add(typeof(THandler));
}
public static void Unsubscribe<T, THandler>()
where T : EventBase
where THandler : IEventHandler<T>
{
var name = typeof(T).Name;
_mappings[name].Remove(typeof(THandler));
if (_mappings[name].Count == 0)
{
_mappings.Remove(name);
}
}
public void Publish<T>(T o) where T : EventBase
{
var name = typeof(T).Name;
if (_mappings.ContainsKey(name))
{
foreach (var handler in _mappings[name])
{
var service = (IEventHandler<T>)_serviceProvider.GetService(handler);
service.Run(o);
}
}
}
public async Task PublishAsync<T>(T o) where T : EventBase
{
var name = typeof(T).Name;
if (_mappings.ContainsKey(name))
{
foreach (var handler in _mappings[name])
{
var service = (IEventHandler<T>)_serviceProvider.GetService(handler);
await service.RunAsync(o);
}
}
}
}
- 這裡我沒有直接訂閱事件處理器的執行個體,而且訂閱了事件處理器的類型
- 多個事件處理器可以訂閱同一個事件
的構造函數中,我們注入了一個
EventHandlerContainer
,我們可以使用它來獲得對應事件處理器的執行個體。
IServiceProvider
在程式啟動時,注冊事件訂閱
現在我們來
Startup.cs
的
ConfigureServices
方法,這裡我們需要進行服務注冊,并完成事件訂閱。
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);
services.AddScoped<IOrderManager, OrderManager>();
services.AddScoped<IShoppingCartManager, ShoppingCartManager>();
services.AddScoped<IShoppingCartRepository, ShoppingCartRepository>();
services.AddScoped<IOrderRepository, OrderRepository>();
services.AddScoped<IUnitOfWork, UnitOfWork>();
services.AddScoped<CreateOrderHandler>();
services.AddScoped<ConfirmEmailSentHandler>();
services.AddScoped<EventHandlerContainer>();
EventHandlerContainer.Subscribe<ShoppingCartSubmittedEvent, CreateOrderHandler>();
EventHandlerContainer.Subscribe<ShoppingCartSubmittedEvent, ConfirmEmailSentHandler>();
}
注意:這裡保證一個Api請求中的所有資料庫操作在一個事務裡,這裡我們使用作用域。這樣我們就可以在調用工作單元
Scoped
接口的
IUnitOfWork
代碼中啟用事務。
Save
修改ShoppingCartManager
最後我們來修改
ShoppingCartManager
, 改用釋出事件的方式來完成後續建立訂單和發送郵件的功能。
public void SubmitShoppingCart(string shoppingCartId)
{
var shoppingCart = _unitOfWork.ShoppingCartRepository
.GetShoppingCart(shoppingCartId);
_unitOfWork.ShoppingCartRepository
.SubmitShoppingCart(shoppingCartId);
_container.Publish(new ShoppingCartSubmittedEvent()
{
Items = shoppingCart
.Items
.Select(p => new ShoppingCartSubmittedItem
{
ItemId = p.ItemId,
Name = p.Name,
Price = p.Price
})
.ToList()
});
_unitOfWork.Save();
}
這樣
ShoppingCartManager
就隻需要關注購物車狀态的變更,而不需要關注發送确認郵件和建立訂單了。
最終效果
現在讓我們啟動項目,
首先我們使用[POST] /api/shoppingCarts來添加一個新的購物車, 這個API會傳回目前購物車的Id
然後我們使用[PUT] /api/shoppingCarts/ShoppingCart_636872897140555966來模拟送出購物車,程式傳回操作成功
最後我們檢視一下控制台的輸出日志
2個事件處理器都被正确觸發了。
總結
至此我們的代碼重構完成。 最終的代碼中,
SubmitShoppingCart
方法,僅負責修改購物車狀态并釋出一個購物車送出的事件。生成訂單和發送郵件的功能代碼都被移動到了獨立的處理類中。
這樣的方式的好處不僅僅是完成了代碼的解耦,針對後續的擴充也非常有利,想想一下,如果在未來目前項目需求追加這樣一個功能,當送出購物車的時候,除了要發送确認郵件,還要發送手機短信。這時候你根本不需要去修改
ShoppingCartManager
類,你隻需要針對
ShoppingCartSubmittedEvent
在再添加一個新的事件處理器即可,這也滿足的SOLID的開閉原則。