简介
什么是事件总线?
事件总线是对发布-订阅模式的一种实现。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种解耦的目的。
事件总线是一种机制,它允许不同的组件彼此通信而不彼此了解。 组件可以将事件发送到Eventbus,而无需知道是谁来接听或有多少其他人来接听。 组件也可以侦听Eventbus上的事件,而无需知道谁发送了事件。 这样,组件可以相互通信而无需相互依赖。 同样,很容易替换一个组件。 只要新组件了解正在发送和接收的事件,其他组件就永远不会知道.
快速开始
场景一:用户注册账号后,需要发送手机验证码,发送邮箱验证码,发放会员资格
场景二:安全平台预警后,需要通知紧急联系人、消防、医护、公安平台
CAP文档:https://cap.dotnetcore.xyz/
安装nuget包
# nuget包:
DotNetCore.CAP
DotNetCore.CAP.InMemoryStorage
Savorboard.CAP.InMemoryMessageQueue
配置StartUp.cs
// 注册到容器
builder.Services.AddTransient<ITestSubscriber1, TestSubscriber1>();
builder.Services.AddTransient<ITestSubscriber2, TestSubscriber2>();
builder.Services.AddTransient<ITestSubscriber3, TestSubscriber3>();
// 简易配置
builder.Services.AddCap(x =>
{
x.UseInMemoryStorage();
x.UseInMemoryMessageQueue();
});
创建订阅者(处理程序)
前提条件
- 处理类注册到容器中
- 继承ICapSubscribe
- 添加订阅特性CapSubscribe
public interface ITestSubscriber1
{
Task CheckReceivedMessage1Async();
}
public interface ITestSubscriber2
{
Task CheckReceivedMessage2Async();
}
public interface ITestSubscriber3
{
Task CheckReceivedMessage3Async();
}
public class TestSubscriber1 : ITestSubscriber1,ICapSubscribe
{
[CapSubscribe(EventBusHandler.TestEvent ,Group = "group1")]
public async Task CheckReceivedMessage1Async()
{
Console.WriteLine($"DateTime - {DateTime.Now:yyyy-MM-dd hh:mm:ss zz}");
Thread.Sleep(1000);
await Task.CompletedTask;
}
}
public class TestSubscriber2 : ITestSubscriber2,ICapSubscribe
{
[CapSubscribe(EventBusHandler.TestEvent, Group = "group2")]
public async Task CheckReceivedMessage2Async()
{
Console.WriteLine($"DateTime - {DateTime.Now:yyyy-MM-dd hh:mm:ss zz}");
Thread.Sleep(1000);
await Task.CompletedTask;
}
}
public class TestSubscriber3 : ITestSubscriber3,ICapSubscribe
{
[CapSubscribe(EventBusHandler.TestEvent, Group = "group3")]
public async Task CheckReceivedMessage3Async()
{
Console.WriteLine($"DateTime - {DateTime.Now:yyyy-MM-dd hh:mm:ss zz}");
Thread.Sleep(1000);
await Task.CompletedTask;
}
}
创建发布者(调用程序)
前提条件
- 构造函数获取ICapPublisher
[ApiController]
[Route("[controller]/[action]")]
public class TestPublisherController : ControllerBase
{
private readonly ICapPublisher _publisher;
public TestPublisherController(ICapPublisher publisher)
{
_publisher = publisher;
}
[HttpGet(Name = "GetPublisher")]
public Task<string> GetPublisher()
{
_publisher.Publish(EventBusHandler.TestEvent,"");
return Task.FromResult("ok");
}
}
公共属性
public class EventBusHandler
{
public const string TestEvent = "TestEvent";
}
调用及结果
进阶
监控面板
访问 http://localhost:xxx/cap
安装nuget包
# nuget包:
DotNetCore.CAP.Dashboard
配置StartUp.cs
builder.Services.AddCap(x =>
{
x.UseInMemoryStorage();
x.UseInMemoryMessageQueue();
x.UseDashboard();
});
持久化数据
- 移除缓存存储方式
安装nuget包
# nuget包:
DotNetCore.CAP.MySql
配置StartUp.cs
NAME | DESCRIPTION | TYPE | DEFAULT |
---|---|---|---|
TableNamePrefix | Cap表名前缀 | string | cap |
ConnectionString | 数据库连接字符串 | string | null |
x.UseMySql(options =>
{
options.ConnectionString = "Server=localhost;Database=cap_event_bus;Uid=root;Pwd=Pwd@123;";
options.TableNamePrefix = "cap_";
});
x.UseInMemoryMessageQueue();
x.UseDashboard();
此处评论已关闭