.NET Core 使用 CAP 实现分布式事务
原本是由于 TransactionScope
暂时在 .NET Core 中无法使用(见 这里),所以想找一个替代的方案,然后就查到了 CAP 项目。
先把结论写在这里:CAP 并不是用来实现数据库的分布式事务的,和 TransactionScope
的功能完全不同。
虽然不能用来实现我的需求,但其功能还是常见的需求(发布/订阅模式),使用起来也很方便。
而且查都查了,也就把调查过程记录了下来。
安装所需要的包
因为项目使用的是 .NET Core 2.1 ,DotNetCore.CAP 的最新版需要依赖 2.2,所以这里使用的是 DotNetCore.CAP 2.3.1 版。
Install-Package DotNetCore.CAP -Version 2.3.1
CAP 支持 RabbitMQ、Kafka 和 AzureService 作为消息队列。
Install-Package DotNetCore.CAP.RabbitMQ -Version 2.3.1
CAP 的事件日志存储支持 SqlServer、MySql、PostgreSql 和 MongoDB (需要 MongoDB 4.0+ cluster)作为事件日志存储。
所谓事件日志,个人理解其类似于消息队列中的消息。这里使用上述四种数据库来存储并确保该消息被消费并成功消费结束。
而且其消费并不保证只被执行一次,即消息接受的接口可能会被调用多次。这点也类似于消息队列的消费。所以也就需要开发者保证消费处理的幂等性。
Install-Package DotNetCore.CAP.SqlServer -Version 2.3.1
如果需要安装其他的消息队列或数据库,请参考 https://github.com/dotnetcore/CAP 。
配置
在 Startup.cs 的 ConfigureServices
方法中配置 CAP。
任务日志这里使用 ADO.NET(ORM 框架使用的是修改过的 FluentData)和 EntityFramework ,更多类型请参考 GitHub。
在 AddCap 配置数据库的意义在于创建用于存储 CAP 事件日志的表(Cap.Published 和 Cap.Received,其中 Cap 是 Schema,可以通过 o.Schema
属性配置)。如果对应的数据库中没有该表,则在 _capBus.Publish
发布消息时报 对象名 'Cap.Published' 无效 的错误。
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.Configure<CookiePolicyOptions>(options =>
{
// This lambda determines whether user consent for non-essential cookies is needed for a given request.
options.CheckConsentNeeded = context => true;
options.MinimumSameSitePolicy = SameSiteMode.None;
});
services.AddDbContext<Models.TestA.TEST_AContext>(); //Options, If you are using EF as the ORM
services.AddDbContext<Models.TestB.TEST_BContext>(); //Options, If you are using EF as the ORM
services.AddCap(x =>
{
// If you are using EF, you need to add the configuration:
x.UseEntityFramework<Models.TestA.TEST_AContext>(); //Options, Notice: You don't need to config x.UseSqlServer(""") again! C
x.UseEntityFramework<Models.TestB.TEST_BContext>(); //Options, Notice: You don't need to config x.UseSqlServer(""") again! C
// If you are using ADO.NET, choose to add configuration you needed:
x.UseSqlServer(o =>
{
// sqlserverOptions.ConnectionString
o.ConnectionString = Config.CONNECTION_STRING_SQL_SERVER_A;
});
x.UseSqlServer(o =>
{
// sqlserverOptions.ConnectionString
o.ConnectionString = Config.CONNECTION_STRING_SQL_SERVER_B;
});
// CAP support RabbitMQ,Kafka,AzureService as the MQ, choose to add configuration you needed:
x.UseRabbitMQ(o => {
// rabbitmq options.
o.HostName = Config.RABBIT_MQ_HOSTNAME;
o.UserName = Config.RABBIT_MQ_USERNAME;
o.Password = Config.RABBIT_MQ_PASSWORD;
});
});
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new Info { Title = "My API", Version = "v1" });
});
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
另外这里还配置了 SwaggerUI,方便调用接口。如需使用,先安装 Swashbuckle.AspNetCore 包。
Install-Package Swashbuckle.AspNetCore -Version 4.0.1
使用 EF & CAP
另外如果使用 EF 需要安装如下包:
Install-Package Microsoft.EntityFrameworkCore.SqlServer -Version 2.1.11
Install-Package Microsoft.EntityFrameworkCore.Tools -Version 2.1.11
Install-Package Microsoft.EntityFrameworkCore.SqlServer.Design -Version 1.1.6
2
3
如何生成 EF 对应的 DbContext 和 Models 请参考 【.NET Core】Entity Framework Core 。
这里使用了两个数据库 TEST_A 和 TEST_B,分别有 TableA 和 TableB。
下面的代码测试了在一个库中新增数据,然后通知 CAP,在之后的处理中在另一个库中再新增一条数据。
/// <summary>
/// [分布式事务测试][EF]
/// </summary>
[Route("api/test/scope/ef")]
[ApiController]
public class EFScopeTestController : ControllerBase
{
Models.TestA.TEST_AContext _dbContextA;
Models.TestB.TEST_BContext _dbContextB;
private readonly ICapPublisher _capBus;
public EFScopeTestController(ICapPublisher capPublisher, Models.TestA.TEST_AContext dbContextA, Models.TestB.TEST_BContext dbContextB)
{
_capBus = capPublisher;
_dbContextA = dbContextA;
_dbContextB = dbContextB;
}
/// <summary>
/// [分布式事务测试][EF] 成功
/// </summary>
/// <returns></returns>
[HttpPost("success-a")]
public void EFSuccess()
{
using (var trans = _dbContextA.Database.BeginTransaction(_capBus, autoCommit: false))
{
//your business logic code
_dbContextA.TableA.Add(new Models.TestA.TableA()
{
Guid = Guid.NewGuid(),
Name = "EF 测试数据 A",
CreateTime = DateTime.Now,
ModifyTime = DateTime.Now,
});
_dbContextA.SaveChanges();
_capBus.Publish("test.scope.ef.success.a.check", DateTime.Now);
trans.Commit();
}
}
/// <summary>
/// [分布式事务测试][EF]Check
/// </summary>
/// <param name="datetime"></param>
/// <param name="dbContextB"></param>
[HttpPost("check-message-a")]
[CapSubscribe("test.scope.ef.success.a.check")]
public void CheckTestAReceivedMessage(DateTime datetime)
{
Console.WriteLine(datetime);
using (var trans = _dbContextB.Database.BeginTransaction())
{
//your business logic code
_dbContextB.TableB.Add(new Models.TestB.TableB()
{
Guid = Guid.NewGuid(),
Name = "EF 测试 Check 数据 B",
CreateTime = DateTime.Now,
ModifyTime = DateTime.Now,
});
_dbContextB.SaveChanges();
trans.Commit();
}
}
/// <summary>
/// [分布式事务测试][EF] 成功
/// </summary>
/// <returns></returns>
[HttpPost("success-b")]
public void EFSuccessB()
{
using (var trans = _dbContextB.Database.BeginTransaction(_capBus, autoCommit: false))
{
//your business logic code
_dbContextB.TableB.Add(new Models.TestB.TableB()
{
Guid = Guid.NewGuid(),
Name = "EF 测试数据 B",
CreateTime = DateTime.Now,
ModifyTime = DateTime.Now,
});
_dbContextB.SaveChanges();
_capBus.Publish("test.scope.ef.success.b.check", DateTime.Now);
trans.Commit();
}
}
/// <summary>
/// [分布式事务测试][EF]Check
/// </summary>
/// <param name="datetime"></param>
/// <param name="dbContextB"></param>
[HttpPost("check-message-b")]
[CapSubscribe("test.scope.ef.success.b.check")]
public void CheckTestBReceivedMessage(DateTime datetime)
{
Console.WriteLine(datetime);
using (var trans = _dbContextA.Database.BeginTransaction())
{
//your business logic code
_dbContextA.TableA.Add(new Models.TestA.TableA()
{
Guid = Guid.NewGuid(),
Name = "EF 测试 Check 数据 A",
CreateTime = DateTime.Now,
ModifyTime = DateTime.Now,
});
_dbContextA.SaveChanges();
trans.Commit();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
这里测试了两种情况,分别是先在 TEST_A 和 TEST_B 库新增数据,结果 Cap.Published 和 Cap.Received 中的事件日志和想象中的并不一样。
原以为会分别在 TEST_A 和 TEST_B 中的 Cap.Published 和 Cap.Received 表中各有一条数据,实际结果是 TEST_A.Cap.Received 中没有数据,而 TEST_B.Cap.Received 中有两条数据。
TEST_A.Cap.Published
Id | Name | Content | Retries | Added | ExpiresAt | StatusName |
---|---|---|---|---|---|---|
1137912972419485696 | test.scope.ef.success.a.check | {"Id":"5cfdc3a8039e53669cffb77b","Timestamp":"2019-06-10T10:42:48.6969171+08:00","Content":"2019/6/10 10:42:48","CallbackName":null} | 0 | 2019-06-10 10:42:48.6800000 | NULL | Scheduled |
TEST_A.Cap.Received
Id | Name | Group | Content | Retries | Added | ExpiresAt | StatusName |
---|---|---|---|---|---|---|---|
TEST_B.Cap.Published
Id | Name | Content | Retries | Added | ExpiresAt | StatusName |
---|---|---|---|---|---|---|
1137912987879690240 | test.scope.ef.success.b.check | {"Id":"5cfdc3ac039e53669cffb77d","Timestamp":"2019-06-10T10:42:52.3689171+08:00","Content":"2019/6/10 10:42:52","CallbackName":null} | 0 | 2019-06-10 10:42:52.3700000 | 2019-06-11 10:42:52.3830000 | Succeeded |
TEST_B.Cap.Received
Id | Name | Group | Content | Retries | Added | ExpiresAt | StatusName |
---|---|---|---|---|---|---|---|
1137912972738252800 | test.scope.ef.success.a.check | cap.queue.ds_dnc_cap | {"Id":"5cfdc3a8039e53669cffb77b","Timestamp":"2019-06-10T10:42:48.6969171+08:00","Content":"2019/6/10 10:42:48","CallbackName":null} | 0 | 2019-06-10 10:42:48.7600000 | 2019-06-11 10:42:48.8530000 | Succeeded |
1137912987955187712 | test.scope.ef.success.b.check | cap.queue.ds_dnc_cap | {"Id":"5cfdc3ac039e53669cffb77d","Timestamp":"2019-06-10T10:42:52.3689171+08:00","Content":"2019/6/10 10:42:52","CallbackName":null} | 0 | 2019-06-10 10:42:52.3870000 | 2019-06-11 10:42:52.4000000 | Succeeded |
ADO.NET (FluentData) & CAP
/// <summary>
/// [分布式事务测试][ADO.NET]
/// </summary>
[Route("api/test/scope/adodotnet")]
[ApiController]
public class AdoDotNetScopeTestController : ControllerBase
{
private readonly ICapPublisher _capBus;
public AdoDotNetScopeTestController(ICapPublisher capPublisher)
{
_capBus = capPublisher;
}
/// <summary>
/// [分布式事务测试][ADO.NET] 成功
/// </summary>
/// <returns></returns>
[HttpPost("success")]
public void Success()
{
using (var db_wirte = new DbContext().ConnectionString(Config.CONNECTION_STRING_SQL_SERVER_A, new SqlServerProvider()).UseTransactionScope(_capBus))
{
db_wirte.Insert("Table_A", new TestModel()
{
Guid = Guid.NewGuid(),
Name = "ADO.NET 测试数据 A",
CreateTime = DateTime.Now,
ModifyTime = DateTime.Now,
}).AutoMap().Execute();
_capBus.Publish("test.scope.adodotnet.success.check", DateTime.Now);
db_wirte.Commit();
}
}
/// <summary>
/// [分布式事务测试][ADO.NET]Check
/// </summary>
/// <param name="datetime"></param>
[HttpPost("check-messagee")]
[CapSubscribe("test.scope.adodotnet.success.check")]
public void CheckReceivedMessage(DateTime datetime)
{
Console.WriteLine(datetime);
using (var db_prod = new DbContext().ConnectionString(Config.CONNECTION_STRING_SQL_SERVER_B, new SqlServerProvider()).UseTransactionScope(_capBus))
{
db_prod.Insert("Table_B", new TestModel()
{
Guid = Guid.NewGuid(),
Name = "ADO.NET 测试 Check 数据 B",
CreateTime = DateTime.Now,
ModifyTime = DateTime.Now,
}).AutoMap().Execute();
db_prod.Commit();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
IDbContext.cs 中增加 UseTransactionScope 接口定义。
IDbContext UseTransactionScope(ICapPublisher capBus);
Transactions.cs 中增加该接口的实现:
public IDbContext UseTransactionScope(ICapPublisher capBus)
{
Data.UseTransaction = true;
Data.CapBus = capBus;
return this;
}
2
3
4
5
6
修改 ExecuteQueryHandler.cs 的 PrepareDbCommand
方法中开启事务的代码。
if (_command.Data.Context.Data.UseTransaction)
{
if(_command.Data.Context.Data.Transaction == null)
{
if (_command.Data.Context.Data.CapBus != null)
{
_command.Data.Context.Data.Transaction = _command.Data.Context.Data.Connection.BeginTransaction(_command.Data.Context.Data.CapBus, autoCommit: false);
}
else
{
_command.Data.Context.Data.Transaction = _command.Data.Context.Data.Connection.BeginTransaction((System.Data.IsolationLevel)_command.Data.Context.Data.IsolationLevel);
}
}
_command.Data.InnerCommand.Transaction = _command.Data.Context.Data.Transaction;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
回调函数
_capBus.Publish
方法可以通过 callbackName
参数指定回调函数,在事件成功结束后,会执行回调的事件(回调事件不支持参数)。
不过在事件失败的情况下,并不会执行回调的事件。下面就是一个事件失败的示例代码,回调事件永远不会被执行。
/// <summary>
/// [分布式事务测试][EF] 失败
/// </summary>
/// <returns></returns>
[HttpPost("fail-a")]
public void EFFail()
{
using (var trans = _dbContextA.Database.BeginTransaction(_capBus, autoCommit: false))
{
//your business logic code
var model = new Models.TestA.TableA()
{
Guid = Guid.NewGuid(),
Name = "EF 失败测试数据 A",
CreateTime = DateTime.Now,
ModifyTime = DateTime.Now,
};
_dbContextA.TableA.Add(model);
_dbContextA.SaveChanges();
_capBus.Publish("test.scope.ef.fail.a.check", model.Guid, "test.scope.ef.fail.callback.a.check");
trans.Commit();
}
}
/// <summary>
/// [分布式事务测试][EF]Check Fail
/// </summary>
/// <param name="guid"></param>
/// <param name="dbContextB"></param>
[HttpPost("check-fail-message-a")]
[CapSubscribe("test.scope.ef.fail.a.check")]
public void CheckTestAReceivedFailMessage(Guid guid)
{
Console.WriteLine(guid);
throw new Exception("There is a exception at check-fail-message-a process.");
}
/// <summary>
/// [分布式事务测试][EF]Check Fail Callback
/// </summary>
/// <param name="guid"></param>
/// <param name="dbContextB"></param>
[HttpPost("check-fail-callback-message-a")]
[CapSubscribe("test.scope.ef.fail.callback.a.check")]
public void CheckTestAReceivedFailCallbackMessage()
{
Console.WriteLine($"fail callback {DateTime.Now}");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
感想
原本是由于 TransactionScope
暂时在 .NET Core 中无法使用(见 这里),所以想找一个替代的方案。
可惜通过上面的示例可以看出,CAP 和 TransactionScope 实现的目标是完全不同的。
TransactionScope 的目标是保证分布式数据的一致性和原子性,而 CAP 则是通过事务日志来确保后续处理一定会被调用,但并不保证其执行成功,也不能保证数据的原子性。
如果 CAP 能分别设置成功和失败的回调且支持参数的话,可扩展性应该会更好些。
综上,个人认为 CAP 是发布/订阅模式的 .NET Core 易用版,简化了开发者的配置和使用,但和数据库的分布式事务是完全不同的东西。