简介

1735112489261.png

什么是事件总线?

事件总线是对发布-订阅模式的一种实现。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种解耦的目的。

事件总线是一种机制,它允许不同的组件彼此通信而不彼此了解。 组件可以将事件发送到Eventbus,而无需知道是谁来接听或有多少其他人来接听。 组件也可以侦听Eventbus上的事件,而无需知道谁发送了事件。 这样,组件可以相互通信而无需相互依赖。 同样,很容易替换一个组件。 只要新组件了解正在发送和接收的事件,其他组件就永远不会知道.

快速开始

场景一:用户注册账号后,需要发送手机验证码,发送邮箱验证码,发放会员资格

场景二:安全平台预警后,需要通知紧急联系人、消防、医护、公安平台

CAP文档:https://cap.dotnetcore.xyz/

安装nuget包

# nuget包:
DotNetCore.CAP
DotNetCore.CAP.InMemoryStorage
Savorboard.CAP.InMemoryMessageQueue

配置StartUp.cs

// 注册到容器
builder.Services.AddTransient<ITestSubscriber1, TestSubscriber1>();
builder.Services.AddTransient<ITestSubscriber2, TestSubscriber2>();
builder.Services.AddTransient<ITestSubscriber3, TestSubscriber3>();

// 简易配置
builder.Services.AddCap(x =>
{
    x.UseInMemoryStorage();
    x.UseInMemoryMessageQueue();
});

创建订阅者(处理程序)

前提条件

  1. 处理类注册到容器中
  2. 继承ICapSubscribe
  3. 添加订阅特性CapSubscribe
public interface ITestSubscriber1
{
    Task CheckReceivedMessage1Async();
}

public interface ITestSubscriber2
{
    Task CheckReceivedMessage2Async();
}

public interface ITestSubscriber3
{
    Task CheckReceivedMessage3Async();
}

public class TestSubscriber1 : ITestSubscriber1,ICapSubscribe
{
    [CapSubscribe(EventBusHandler.TestEvent ,Group = "group1")]
    public async Task CheckReceivedMessage1Async()
    {
        Console.WriteLine($"DateTime - {DateTime.Now:yyyy-MM-dd hh:mm:ss zz}");
        Thread.Sleep(1000);
        await Task.CompletedTask;
    }
}

public class TestSubscriber2 : ITestSubscriber2,ICapSubscribe
{
    [CapSubscribe(EventBusHandler.TestEvent, Group = "group2")]
    public async Task CheckReceivedMessage2Async()
    {
        Console.WriteLine($"DateTime - {DateTime.Now:yyyy-MM-dd hh:mm:ss zz}");
        Thread.Sleep(1000);
        await Task.CompletedTask;
    }
}

public class TestSubscriber3 : ITestSubscriber3,ICapSubscribe
{
    [CapSubscribe(EventBusHandler.TestEvent, Group = "group3")]
    public async Task CheckReceivedMessage3Async()
    {
        Console.WriteLine($"DateTime - {DateTime.Now:yyyy-MM-dd hh:mm:ss zz}");
        Thread.Sleep(1000);
        await Task.CompletedTask;
    }
}

创建发布者(调用程序)

前提条件

  1. 构造函数获取ICapPublisher
[ApiController]
[Route("[controller]/[action]")]
public class TestPublisherController : ControllerBase
{
    private readonly ICapPublisher _publisher;

    public TestPublisherController(ICapPublisher publisher)
    {
        _publisher = publisher;
    }
    
    [HttpGet(Name = "GetPublisher")]
    public Task<string> GetPublisher()
    {
        _publisher.Publish(EventBusHandler.TestEvent,"");
        
        return Task.FromResult("ok");
    }
}

公共属性

public class EventBusHandler
{
    public const string TestEvent = "TestEvent";
}

调用及结果

1735114425891.png

1735114459532.png

进阶

监控面板

访问 http://localhost:xxx/cap

1735114478639.png

安装nuget包

# nuget包:
DotNetCore.CAP.Dashboard

配置StartUp.cs

builder.Services.AddCap(x =>
{
    x.UseInMemoryStorage();
    x.UseInMemoryMessageQueue();
    x.UseDashboard();
});

持久化数据

1735114497283.png

  1. 移除缓存存储方式

安装nuget包

# nuget包:
DotNetCore.CAP.MySql

配置StartUp.cs

NAMEDESCRIPTIONTYPEDEFAULT
TableNamePrefixCap表名前缀stringcap
ConnectionString数据库连接字符串stringnull
    x.UseMySql(options =>
    {
        options.ConnectionString = "Server=localhost;Database=cap_event_bus;Uid=root;Pwd=Pwd@123;";
        options.TableNamePrefix = "cap_";
    });
    x.UseInMemoryMessageQueue();
    x.UseDashboard();
最后修改:2024 年 12 月 25 日
如果觉得我的文章对你有用,请随意赞赏