今天我们来了解一下一些高并发的业务场景如何做到数据一致性的。
1、有数据表:ConCurrency,
- 1 CREATE TABLE [dbo].[ConCurrency](
- 2 [ID] [int] NOT NULL,
- 3 [Total] [int] NULL
- 4 )
2、初始值:ID=1,Total = 0
3、现要求每一次客户端请求 Total + 1
- 1 static void Main(string[] args)
- 2 {
- 3 ...
- 4 new Thread(Run).Start();
- 5 ...
- 6 }
- 7
- 8 public static void Run()
- 9 {
- 10 for (int i = 1; i <= 100; i++)
- 11 {
- 12 var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString();
- 13 var value = int.Parse(total) + 1;
- 14
- 15 DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null);
- 16 Thread.Sleep(1);
- 17 }
- 18 }
2.1 按要求,正常情况下应该输出:100
2.2 运行结果
貌似没有问题。
3.1 Main 改一下
- 1 static void Main(string[] args)
- 2 {
- 3 ...
- 4 new Thread(Run).Start();
- 5 new Thread(Run).Start();
- 6 ...
- 7 }
3.2 我们预期应该是要输出 200
3.3 运行结果
很遗憾,却是 150,造成这个结果的原因是这样的:T1、T2 获取 Total(假设此时值为 10),T1 更新一次或多次后,T2 才更新(Total:10)
这就造成之前 T1 提交的被覆盖了
3.4 如何避免呢?一般做法加锁就可以了,如 Run 改成如下
- 1 public static void Run() 2 {
- 3
- for (int i = 1; i <= 100; i++) 4 {
- 5 lock(resource) 6 {
- 7
- var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString();
- 8
- var value = int.Parse(total) + 1;
- 9 10 DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null);
- 11
- }
- 12 13 Thread.Sleep(1);
- 14
- }
- 15
- }
3.5 再次运行
4.1、定义队列
- static ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
- 1 /// <summary>生产者</summary>
- 2 public static void Produce() 3 {
- 4
- for (int i = 1; i <= 100; i++) 5 {
- 6 queue.Enqueue(i);
- 7
- }
- 8
- }
- 9 10 /// <summary>消费者</summary>
- 11 public static void Consume() 12 {
- 13 int times;
- 14
- while (queue.TryDequeue(out times)) 15 {
- 16
- var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString();
- 17
- var value = int.Parse(total) + 1;
- 18 19 DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null);
- 20 Thread.Sleep(1);
- 21
- }
- 22
- }
4.2 Main 改一下
- 1 static void Main(string[] args)
- 2 {
- 3 ...
- 4 new Thread(Produce).Start();
- 5 new Thread(Produce).Start();
- 6 Consume();
- 7 ...
- 8 }
4.3 预期输出 200,看运行结果
4.4 集群环境下测试,2 台机器
有问题!最后运行的那台机器居然是 379,数据库也是 379。
这超出了我们的预期结果,看来即便加锁,对于高并发场景也是不能解决所有问题的
5.1 解决上边问题可以用分布式队列,这里用的是 redis 队列
- 1 /// <summary>生产者</summary>
- 2 public static void ProduceToRedis() 3 {
- 4 using(var client = RedisManager.GetClient()) 5 {
- 6
- for (int i = 1; i <= 100; i++) 7 {
- 8 client.EnqueueItemOnList("EnqueueName", i.ToString());
- 9
- }
- 10
- }
- 11
- }
- 12 13 /// <summary>消费者</summary>
- 14 public static void ConsumeFromRedis() 15 {
- 16 using(var client = RedisManager.GetClient()) 17 {
- 18
- while (client.GetListCount("EnqueueName") > 0) 19 {
- 20
- if (client.SetValueIfNotExists("lock", "lock")) 21 {
- 22
- var item = client.DequeueItemFromList("EnqueueName");
- 23
- var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString();
- 24
- var value = int.Parse(total) + 1;
- 25 26 DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null);
- 27 28 client.Remove("lock");
- 29
- }
- 30 31 Thread.Sleep(5);
- 32
- }
- 33
- }
- 34
- }
5.2 Main 也要改改
- 1 static void Main(string[] args)
- 2 {
- 3 ...
- 4 new Thread(ProduceToRedis).Start();
- 5 new Thread(ProduceToRedis).Start();
- 6 Thread.Sleep(1000 * 10);
- 7
- 8 ConsumeFromRedis();
- 9 ...
- 10 }
5.3 在集群里再试试,2 个都是 400,没有错(因为每个站点开了 2 个线程)
可以看到数据完全正确!
来源: http://www.cnblogs.com/lanxiaoke/p/6657935.html