使用场景描述:
网络请求中经常会遇到发送的请求, 服务端响应是成功的, 但是返回的时候出现网络故障, 导致客户端无法接收到请求结果, 那么客户端程序可能判断为网络故障, 而重复发送同一个请求. 当然如果接口中定义了请求结果查询接口, 那么这种重复会相对少一些. 特别是交易类的数据, 这种操作更是需要避免重复发送请求. 另外一种情况是用户过于快速的点击界面按钮, 产生连续的相同内容请求, 那么后端也需要进行过滤, 这种一般出现在系统对接上, 无法去控制第三方系统的业务逻辑, 需要从自身业务逻辑里面去限定.
其他需求描述:
这类请求一般存在时间范围和高并发的特点, 就是短时间内会出现重复的请求, 因此对模块需要支持高并发性.
技术实现:
对请求的业务内容进行 MD5 摘要, 并且将 MD5 摘要存储到缓存中, 每个请求数据都通过这个一个公共的调用的方法进行判断.
代码实现:
公共调用代码 UniqueCheck 采用单例模式创建唯一对象, 便于在多线程调用的时候, 只访问一个统一的缓存库
- /*
- * volatile 就像大家更熟悉的 const 一样, volatile 是一个类型修饰符 (type specifier).
- * 它是被设计用来修饰被不同线程访问和修改的变量.
- * 如果没有 volatile, 基本上会导致这样的结果: 要么无法编写多线程程序, 要么编译器失去大量优化的机会.
- */
- private static readonly object lockHelper = new object();
- private volatile static UniqueCheck _instance;
- /// <summary>
- /// 获取单一实例
- /// </summary>
- /// <returns></returns>
- public static UniqueCheck GetInstance()
- {
- if (_instance == null)
- {
- lock (lockHelper)
- {
- if (_instance == null)
- _instance = new UniqueCheck();
- }
- }
- return _instance;
- }
这里需要注意 volatile 的修饰符, 在实际测试过程中, 如果没有此修饰符, 在高并发的情况下会出现报错.
自定义一个可以进行并发处理队列, 代码如下: ConcurrentLinkedQueue
- using System;
- using System.Collections.Generic;
- using System.Text;
- using System.Threading;
- namespace PackgeUniqueCheck
- {
- /// <summary>
- /// 非加锁并发队列, 处理 100 个并发数以内
- /// </summary>
- /// <typeparam name="T"></typeparam>
- public class ConcurrentLinkedQueue<T>
- {
- private class Node<K>
- {
- internal K Item;
- internal Node<K> Next;
- public Node(K item, Node<K> next)
- {
- this.Item = item;
- this.Next = next;
- }
- }
- private Node<T> _head;
- private Node<T> _tail;
- public ConcurrentLinkedQueue()
- {
- _head = new Node<T>(default(T), null);
- _tail = _head;
- }
- public bool IsEmpty
- {
- get { return (_head.Next == null); }
- }
- /// <summary>
- /// 进入队列
- /// </summary>
- /// <param name="item"></param>
- public void Enqueue(T item)
- {
- Node<T> newNode = new Node<T>(item, null);
- while (true)
- {
- Node<T> curTail = _tail;
- Node<T> residue = curTail.Next;
- // 判断_tail 是否被其他 process 改变
- if (curTail == _tail)
- {
- //A 有其他 process 执行 C 成功,_tail 应该指向新的节点
- if (residue == null)
- {
- //C 其他 process 改变了 tail 节点, 需要重新取 tail 节点
- if (Interlocked.CompareExchange<Node<T>>(
- ref curTail.Next, newNode, residue) == residue)
- {
- //D 尝试修改 tail
- Interlocked.CompareExchange<Node<T>>(ref _tail, newNode, curTail);
- return;
- }
- }
- else
- {
- //B 帮助其他线程完成 D 操作
- Interlocked.CompareExchange<Node<T>>(ref _tail, residue, curTail);
- }
- }
- }
- }
- /// <summary>
- /// 队列取数据
- /// </summary>
- /// <param name="result"></param>
- /// <returns></returns>
- public bool TryDequeue(out T result)
- {
- Node<T> curHead;
- Node<T> curTail;
- Node<T> next;
- while (true)
- {
- curHead = _head;
- curTail = _tail;
- next = curHead.Next;
- if (curHead == _head)
- {
- if (next == null) //Queue 为空
- {
- result = default(T);
- return false;
- }
- if (curHead == curTail) //Queue 处于 Enqueue 第一个 node 的过程中
- {
- // 尝试帮助其他 Process 完成操作
- Interlocked.CompareExchange<Node<T>>(ref _tail, next, curTail);
- }
- else
- {
- // 取 next.Item 必须放到 CAS 之前
- result = next.Item;
- // 如果_head 没有发生改变, 则将_head 指向 next 并退出
- if (Interlocked.CompareExchange<Node<T>>(ref _head,
- next, curHead) == curHead)
- break;
- }
- }
- }
- return true;
- }
- /// <summary>
- /// 尝试获取最后一个对象
- /// </summary>
- /// <param name="result"></param>
- /// <returns></returns>
- public bool TryGetTail(out T result)
- {
- result = default(T);
- if (_tail == null)
- {
- return false;
- }
- result = _tail.Item;
- return true;
- }
- }
- }
虽然是一个非常简单的唯一性校验逻辑, 但是要做到高效率, 高并发支持, 高可靠性, 以及低内存占用, 需要实现这样的需求, 需要做细致的模拟测试.
- using System;
- using System.Collections.Generic;
- using System.Text;
- using System.Threading;
- using System.Collections;
- namespace PackgeUniqueCheck
- {
- public class UniqueCheck
- {
- /*
- * volatile 就像大家更熟悉的 const 一样, volatile 是一个类型修饰符 (type specifier).
- * 它是被设计用来修饰被不同线程访问和修改的变量.
- * 如果没有 volatile, 基本上会导致这样的结果: 要么无法编写多线程程序, 要么编译器失去大量优化的机会.
- */
- private static readonly object lockHelper = new object();
- private volatile static UniqueCheck _instance;
- /// <summary>
- /// 获取单一实例
- /// </summary>
- /// <returns></returns>
- public static UniqueCheck GetInstance()
- {
- if (_instance == null)
- {
- lock (lockHelper)
- {
- if (_instance == null)
- _instance = new UniqueCheck();
- }
- }
- return _instance;
- }
- private UniqueCheck()
- {
- // 创建一个线程安全的哈希表, 作为字典缓存
- _DataKey = Hashtable.Synchronized(new Hashtable());
- Queue myqueue = new Queue();
- _DataQueue = Queue.Synchronized(myqueue);
- _Myqueue = new ConcurrentLinkedQueue<string>();
- _Timer = new Thread(DoTicket);
- _Timer.Start();
- }
- #region 公共属性设置
- /// <summary>
- /// 设定定时线程的休眠时间长度: 默认为 1 分钟
- /// 时间范围: 1-7200000, 值为 1 毫秒到 2 小时
- /// </summary>
- /// <param name="value"></param>
- public void SetTimeSpan(int value)
- {
- if (value> 0&& value <=7200000)
- {
- _TimeSpan = value;
- }
- }
- /// <summary>
- /// 设定缓存 Cache 中的最大记录条数
- /// 值范围: 1-5000000,1 到 500 万
- /// </summary>
- /// <param name="value"></param>
- public void SetCacheMaxNum(int value)
- {
- if (value> 0 && value <= 5000000)
- {
- _CacheMaxNum = value;
- }
- }
- /// <summary>
- /// 设置是否在控制台中显示日志
- /// </summary>
- /// <param name="value"></param>
- public void SetIsShowMsg(bool value)
- {
- Helper.IsShowMsg = value;
- }
- /// <summary>
- /// 线程请求阻塞增量
- /// 值范围: 1-CacheMaxNum, 建议设置为缓存最大值的 10%-20%
- /// </summary>
- /// <param name="value"></param>
- public void SetBlockNumExt(int value)
- {
- if (value> 0 && value <= _CacheMaxNum)
- {
- _BlockNumExt = value;
- }
- }
- /// <summary>
- /// 请求阻塞时间
- /// 值范围: 1-max, 根据阻塞增量设置请求阻塞时间
- /// 阻塞时间越长, 阻塞增量可以设置越大, 但是请求实时响应就越差
- /// </summary>
- /// <param name="value"></param>
- public void SetBlockSpanTime(int value)
- {
- if (value> 0)
- {
- _BlockSpanTime = value;
- }
- }
- #endregion
- #region 私有变量
- /// <summary>
- /// 内部运行线程
- /// </summary>
- private Thread _runner = null;
- /// <summary>
- /// 可处理高并发的队列
- /// </summary>
- private ConcurrentLinkedQueue<string> _Myqueue = null;
- /// <summary>
- /// 唯一内容的时间健值对
- /// </summary>
- private Hashtable _DataKey = null;
- /// <summary>
- /// 内容时间队列
- /// </summary>
- private Queue _DataQueue = null;
- /// <summary>
- /// 定时线程的休眠时间长度: 默认为 1 分钟
- /// </summary>
- private int _TimeSpan = 3000;
- /// <summary>
- /// 定时计时器线程
- /// </summary>
- private Thread _Timer = null;
- /// <summary>
- /// 缓存 Cache 中的最大记录条数
- /// </summary>
- private int _CacheMaxNum = 500000;
- /// <summary>
- /// 线程请求阻塞增量
- /// </summary>
- private int _BlockNumExt = 10000;
- /// <summary>
- /// 请求阻塞时间
- /// </summary>
- private int _BlockSpanTime = 100;
- #endregion
- #region 私有方法
- private void StartRun()
- {
- _runner = new Thread(DoAction);
- _runner.Start();
- Helper.ShowMsg("内部线程启动成功!");
- }
- private string GetItem()
- {
- string tp = string.Empty;
- bool result = _Myqueue.TryDequeue(out tp);
- return tp;
- }
- /// <summary>
- /// 执行循环操作
- /// </summary>
- private void DoAction()
- {
- while (true)
- {
- while (!_Myqueue.IsEmpty)
- {
- string item = GetItem();
- _DataQueue.Enqueue(item);
- if (!_DataKey.ContainsKey(item))
- {
- _DataKey.Add(item, DateTime.Now);
- }
- }
- //Helper.ShowMsg("当前数组已经为空, 处理线程进入休眠状态...");
- Thread.Sleep(2);
- }
- }
- /// <summary>
- /// 执行定时器的动作
- /// </summary>
- private void DoTicket()
- {
- while (true)
- {
- Helper.ShowMsg("当前数据队列个数:" + _DataQueue.Count.ToString());
- if (_DataQueue.Count> _CacheMaxNum)
- {
- while (true)
- {
- Helper.ShowMsg(string.Format("当前队列数:{0}, 已经超出最大长度:{1}, 开始进行清理操作...", _DataQueue.Count, _CacheMaxNum.ToString()));
- string item = _DataQueue.Dequeue().ToString();
- if (!string.IsNullOrEmpty(item))
- {
- if (_DataKey.ContainsKey(item))
- {
- _DataKey.Remove(item);
- }
- if (_DataQueue.Count <= _CacheMaxNum)
- {
- Helper.ShowMsg("清理完成, 开始休眠清理线程...");
- break;
- }
- }
- }
- }
- Thread.Sleep(_TimeSpan);
- }
- }
- /// <summary>
- /// 线程进行睡眠等待
- /// 如果当前负载压力大大超出了线程的处理能力
- /// 那么需要进行延时调用
- /// </summary>
- private void BlockThread()
- {
- if (_DataQueue.Count> _CacheMaxNum + _BlockNumExt)
- {
- Thread.Sleep(_BlockSpanTime);
- }
- }
- #endregion
- #region 公共方法
- /// <summary>
- /// 开启服务线程
- /// </summary>
- public void Start()
- {
- if (_runner == null)
- {
- StartRun();
- }
- else
- {
- if (_runner.IsAlive == false)
- {
- StartRun();
- }
- }
- }
- /// <summary>
- /// 关闭服务线程
- /// </summary>
- public void Stop()
- {
- if (_runner != null)
- {
- _runner.Abort();
- _runner = null;
- }
- }
- /// <summary>
- /// 添加内容信息
- /// </summary>
- /// <param name="item"> 内容信息 </param>
- /// <returns>true: 缓存中不包含此值, 队列添加成功, false: 缓存中包含此值, 队列添加失败 </returns>
- public bool AddItem(string item)
- {
- BlockThread();
- item = Helper.MakeMd5(item);
- if (_DataKey.ContainsKey(item))
- {
- return false;
- }
- else
- {
- _Myqueue.Enqueue(item);
- return true;
- }
- }
- /// <summary>
- /// 判断内容信息是否已经存在
- /// </summary>
- /// <param name="item"> 内容信息 </param>
- /// <returns>true: 信息已经存在于缓存中, false: 信息不存在于缓存中 </returns>
- public bool CheckItem(string item)
- {
- item = Helper.MakeMd5(item);
- return _DataKey.ContainsKey(item);
- }
- #endregion
- }
- }
模拟测试代码:
- private static string _example = Guid.NewGuid().ToString();
- private static UniqueCheck _uck = null;
- static void Main(string[] args)
- {
- _uck = UniqueCheck.GetInstance();
- _uck.Start();
- _uck.SetIsShowMsg(false);
- _uck.SetCacheMaxNum(20000000);
- _uck.SetBlockNumExt(1000000);
- _uck.SetTimeSpan(6000);
- _uck.AddItem(_example);
- Thread[] threads = new Thread[20];
- for (int i = 0; i < 20; i++)
- {
- threads[i] = new Thread(AddInfo);
- threads[i].Start();
- }
- Thread checkthread = new Thread(CheckInfo);
- checkthread.Start();
- string value = Console.ReadLine();
- checkthread.Abort();
- for (int i = 0; i < 50; i++)
- {
- threads[i].Abort();
- }
- _uck.Stop();
- }
- static void AddInfo()
- {
- while (true)
- {
- _uck.AddItem(Guid.NewGuid().ToString());
- }
- }
- static void CheckInfo()
- {
- while (true)
- {
- Console.WriteLine("开始时间:{0}...", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"));
- Console.WriteLine("插入结果:{0}", _uck.AddItem(_example));
- Console.WriteLine("结束时间:{0}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"));
- // 调整进程休眠时间, 可以测试高并发的情况
- //Thread.Sleep(1000);
- }
- }
测试截图:
来源: https://www.cnblogs.com/huangyoulong/p/11611145.html