这里有新鲜出炉的 Redis 设计与实现(第一版),程序狗速度看过来!
Redis 是一个开源的使用 ANSI C 语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value 数据库,并提供多种语言的 API。
消息传递这一应用广泛存在于各个网站中,这个功能也是一个网站必不可少的。本文主要介绍了 php 中 Redis 的应用 -- 消息传递。下面跟着小编一起来看下吧
阅读目录
1、摘要
2、实现方法
3、一对一消息传递
4、多对多消息传递
1、摘要
消息传递这一应用广泛存在于各个网站中,这个功能也是一个网站必不可少的。常见的消息传递应用有,新浪微博中的 @我呀、给你评论然后的提示呀、赞赞赞提示、私信呀、甚至是发微博分享的新鲜事;知乎中的私信呀、live 发送过来的消息、知乎团队消息呀等等。
2、实现方法
消息传递即两个或者多个客户端在相互发送和接收消息。
通常有两种方法实现:
第一种为消息推送。Redis 内置有这种机制,publish 往频道推送消息、subscribe 订阅频道。这种方法有一个缺点就是必须保证接收者时刻在线(即是此时程序不能停下来,一直保持监控状态,假若断线后就会出现客户端丢失信息)
第二种为消息拉取。所谓消息拉取,就是客户端自主去获取存储在服务器中的数据。Redis 内部没有实现消息拉取这种机制。因此我们需要自己手动编写代码去实现这个功能。
在这里我们,我们进一步将消息传递再细分为一对一的消息传递,多对多的消息传递(群组消息传递)。
【注:两个类的代码相对较多,因此将其折叠起来了】
3、一对一消息传递
例子 1:一对一消息发送与获取
模块要求:
1、提示有多少个联系人发来新消息
2、信息包含发送人、时间、信息内容
3、能够获取之前的旧消息
4、并且消息能够保持 7 天,过期将会被动触发删除
Redis 实现思路:
1、新消息与旧消息分别采用两个链表来存储
2、原始消息的结构采用数组的形式存放,并且含有发送人、时间戳、信息内容
3、在推入 redis 的链表前,需要将数据转换为 json 类型然后再进行存储
4、在取出新信息时应该使用 rpoplpush 来实现,将已读的新消息推入旧消息链表中
5、取出旧消息时,应该用旧消息的时间与现在的时间进行对比,若超时,则直接删除后面的全部数据(因为数据是按时间一个一个压进链表中的,所以对于时间是有序排列的)
数据存储结构图:
PHP 的实现代码:
#SinglePullMessage.class.php
- <?php
- #单接接收者接收消息
- class SinglePullMessage
- {
- private $redis=''; #存储redis对象
- /**
- * @desc 构造函数
- *
- * @param $host string | redis主机
- * @param $port int | 端口
- */
- public function __construct($host,$port=6379)
- {
- $this->redis=new Redis();
- $this->redis->connect($host,$port);
- }
- /**
- * @desc 发送消息(一个人)
- *
- * @param $toUser string | 接收人
- * @param $messageArr array | 发送的消息数组,包含sender、message、time
- *
- * @return bool
- */
- public function sendSingle($toUser,$messageArr)
- {
- $json_message=json_encode($messageArr); #编码成json数据
- return $this->redis->lpush($toUser,$json_message); #将数据推入链表
- }
- /**
- * @desc 用户获取新消息
- *
- * @param $user string | 用户名
- *
- * @return array 返回数组,包含多少个用户发来新消息,以及具体消息
- */
- public function getNewMessage($user)
- {
- #接收新信息数据,并且将数据推入旧信息数据链表中,并且在原链表中删除
- $messageArr=array();
- while($json_message=$this->redis->rpoplpush($user, 'preMessage_'.$user))
- {
- $temp=json_decode($json_message); #将json数据变成对象
- $messageArr[$temp->sender][]=$temp; #转换成数组信息
- }
- if($messageArr)
- {
- $arr['count']=count($messageArr); #统计有多少个用户发来信息
- $arr['messageArr']=$messageArr;
- return $arr;
- }
- return false;
- }
- public function getPreMessage($user)
- {
- ##取出旧消息
- $messageArr=array();
- $json_pre=$this->redis->lrange('preMessage_'.$user, 0, -1); #一次性将全部旧消息取出来
- foreach ($json_pre as $k => $v)
- {
- $temp=json_decode($v); #json反编码
- $timeout=$temp->time+60*60*24*7; #数据过期时间 七天过期
- if($timeout<time()) #判断数据是否过期
- {
- if($k==0) #若是最迟插入的数据都过期了,则将所有数据删除
- {
- $this->redis->del('preMessage_'.$user);
- break;
- }
- $this->redis->ltrim('preMessage_'.$user, 0, $k); #若检测出有过期的,则将比它之前插入的所有数据删除
- break;
- }
- $messageArr[$temp->sender][]=$temp;
- }
- return $messageArr;
- }
- /**
- * @desc 消息处理,没什么特别的作用。在这里这是用来处理数组信息,然后将其输出。
- *
- * @param $arr array | 需要处理的信息数组
- *
- * @return 返回打印输出
- */
- public function dealArr($arr)
- {
- foreach ($arr as $k => $v)
- {
- foreach ($v as $k1 => $v2)
- {
- echo '发送人:'.$v2->sender.' 发送时间:'.date('Y-m-d h:i:s',$v2->time).'<br/>';
- echo '消息内容:'.$v2->message.'<br/>';
- }
- echo "<hr/>";
- }
- }
- }
测试:
1、发送消息
#建立 test1.php
- include './SinglePullMessage.class.php';
- $object=new SinglePullMessage('192.168.95.11');
- #发送消息
- $sender='boss'; #发送者
- $to='jane'; #接收者
- $message='How are you'; #信息
- $time=time();
- $arr=array('sender'=>$sender,'message'=>$message,'time'=>$time);
- echo $object->sendSingle($to,$arr);
2、获取新消息
#建立 test2.php
- include './SinglePullMessage.class.php';
- $object=new SinglePullMessage('192.168.95.11');
- #获取新消息
- $arr=$object->getNewMessage('jane');
- if($arr)
- {
- echo $arr['count']."个联系人发来新消息<br/><hr/>";
- $object->dealArr($arr['messageArr']);
- }
- else
- echo "无新消息";
访问结果:
3、获取旧消息
#建立 test3.php
- include './SinglePullMessage.class.php';
- $object=new SinglePullMessage('192.168.95.11');
- #获取旧消息
- $arr=$object->getPreMessage('jane');
- if($arr)
- {
- $object->dealArr($arr);
- }
- else
- echo "无旧数据";
4、多对多消息传递
例子 2:多对多消息发送与获取(即是群组)
模块要求:
1、用户能够自行创建群组,并成为群主
2、群主可以拉人进来作为群组成员、并且可以踢人
3、用户可以直接退出群组
4、可以发送消息,每一位成员都可以拉取消息
5、群组的消息最大容纳量为 5000 条
6、成员可以拉取新消息,并提示有多少新消息
7、成员可以分页获取之前已读的旧消息
。。。。。功能就写这几个吧,有需要或者想练习的同学们可以增加其他功能,例如禁言、匿名消息发送、文件发送等等。
Redis 实现思路:
1、群组的消息以及群组的成员组成采用有序集合进行存储。群组消息有序集合的 member 存储用户发送的 json 数据消息,score 存储唯一值,将采用原子操作 incr 获取 string 中的自增长值进行存储;群组成员有序集合的 member 存储 user,score 存储非零数字(在这里这个 score 意义不大,我的例子代码中使用数字 1 为群主的 score,其他的存储为 2。当然这使用这个数据还可以扩展别的功能,例如群组中成员等级)可参考下面数据存储结构简图。
2、用户所加入的群组也是采用有序集合进行存储。其中,member 存储群组 ID,score 存储用户已经获取该群组的最大消息分值(对应群组消息的 score 值)
3、用户创建群组的时候,通过原子操作 incr 从而获取一个唯一 ID
4、用户在群中发送消息时,也是通过原子操作 incr 获取一个唯一自增长有序 ID
5、在执行 incr 时,为防止并发导致竞争关系,因此需要进行加锁操作【redis 详细锁的讲解可以参考:Redis 构建分布式锁 / article/17/0724/340294.html】
6、创建群组方法简要思路,任何一个用户都可以创建群组聊天,在创建的同时,可以选择时是否添加群组成员(参数通过数组的形式)。创建过程将会为这个群组建立一个群组成员有序集合(群组信息有序集合暂时不创建),接着将群主添加进去,再将群 ID 添加用户所参加的群组有序集合中。
数据存储结构图:
PHP 的代码实现:
#ManyPullMessage.class.php
- <?php
- class ManyPullMessage
- {
- private $redis=''; #存储redis对象
- /**
- * @desc 构造函数
- *
- * @param $host string | redis主机
- * @param $port int | 端口
- */
- public function __construct($host,$port=6379)
- {
- $this->redis=new Redis();
- $this->redis->connect($host,$port);
- }
- /**
- * @desc 用于创建群组的方法,在创建的同时还可以拉人进群组
- *
- * @param $user string | 用户名,创建群组的主人
- * @param $addUser array | 其他用户构成的数组
- *
- * @param $lockName string | 锁的名字,用于获取群组ID的时候用
- * @return int 返回群组ID
- */
- public function createGroupChat($user, $addUser=array(), $lockName='chatIdLock')
- {
- $identifier=$this->getLock($lockName); #获取锁
- if($identifier)
- {
- $id=$this->redis->incr('groupChatID'); #获取群组ID
- $this->releaseLock($lockName,$identifier); #释放锁
- }
- else
- return false;
- $messageCount=$this->redis->set('countMessage_'.$id, 0); #初始化这个群组消息计数器
- #开启非事务型流水线,一次性将所有redis命令传给redis,减少与redis的连接
- $pipe=$this->redis->pipeline();
- $this->redis->zadd('groupChat_'.$id, 1, $user); #创建群组成员有序集合,并添加群主
- #将这个群组添加到user所参加的群组有序集合中
- $this->redis->zadd('hasGroupChat_'.$user, 0, $id);
- foreach ($addUser as $v) #创建群组的同时需要添加的用户成员
- {
- $this->redis->zadd('groupChat_'.$id, 2, $v);
- $this->redis->zadd('hasGroupChat_'.$v, 0, $id);
- }
- $pipe->exec();
- return $id; #返回群组ID
- }
- /**
- * @desc 群主主动拉人进群
- *
- * @param $user string | 群主名
- * @param $groupChatID int | 群组ID
- * @param $addMembers array | 需要拉进群的用户
- *
- * @return bool
- */
- public function addMembers($user, $groupChatID, $addMembers=array())
- {
- $groupMasterScore=$this->redis->zscore('groupChat_'.$groupChatID, $user); #将groupChatName的群主取出来
- if($groupMasterScore==1) #判断user是否是群主
- {
- $pipe=$this->redis->pipeline(); #开启非事务流水线
- foreach ($addMembers as $v)
- {
- $this->redis->zadd('groupChat_'.$groupChatID, 2, $v); #添加进群
- $this->redis->zadd('hasGroupChat_'.$v, 0, $groupChatID); #添加群名到用户的有序集合中
- }
- $pipe->exec();
- return true;
- }
- return false;
- }
- /**
- * @desc 群主删除成员
- *
- * @param $user string | 群主名
- * @param $groupChatID int | 群组ID
- * @param $delMembers array | 需要删除的成员名字
- *
- * @return bool
- */
- public function delMembers($user, $groupChatID, $delMembers=array())
- {
- $groupMasterScore=$this->redis->zscore('groupChat_'.$groupChatID, $user);
- if($groupMasterScore==1) #判断user是否是群主
- {
- $pipe=$this->redis->pipeline(); #开启非事务流水线
- foreach ($delMembers as $v)
- {
- $this->redis->zrem('groupChat_'.$groupChatID, $v);
- $this->redis->zrem('hasGroupChat_'.$v, $groupChatID);
- }
- $pipe->exec();
- return true;
- }
- return false;
- }
- /**
- * @desc 退出群组
- *
- * @param $user string | 用户名
- * @param $groupChatID int | 群组名
- */
- public function quitGroupChat($user, $groupChatID)
- {
- $this->redis->zrem('groupChat_'.$groupChatID, $user);
- $this->redis->zrem('hasGroupChat_'.$user, $groupChatID);
- return true;
- }
- /**
- * @desc 发送消息
- *
- * @param $user string | 用户名
- * @param $groupChatID int | 群组ID
- * @param $messageArr array | 包含发送消息的数组
- * @param $preLockName string | 群消息锁前缀,群消息锁全名为countLock_群ID
- *
- * @return bool
- */
- public function sendMessage($user, $groupChatID, $messageArr, $preLockName='countLock_')
- {
- $memberScore=$this->redis->zscore('groupChat_'.$groupChatID, $user); #成员score
- if($memberScore)
- {
- $identifier=$this->getLock($preLockName.$groupChatID); #获取锁
- if($identifier) #判断获取锁是否成功
- {
- $messageCount=$this->redis->incr('countMessage_'.$groupChatID);
- $this->releaseLock($preLockName.$groupChatID,$identifier); #释放锁
- }
- else
- return false;
- $json_message=json_encode($messageArr);
- $this->redis->zadd('groupChatMessage_'.$groupChatID, $messageCount, $json_message);
- $count=$this->redis->zcard('groupChatMessage_'.$groupChatID); #查看信息量大小
- if($count>5000) #判断数据量有没有达到5000条
- { #数据量超5000,则需要清除旧数据
- $start=5000-$count;
- $this->redis->zremrangebyrank('groupChatMessage_'.$groupChatID, $start, $count);
- }
- return true;
- }
- return false;
- }
- /**
- * @desc 获取新信息
- *
- * @param $user string | 用户名
- *
- * @return 成功则放回json数据数组,无新信息返回false
- */
- public function getNewMessage($user)
- {
- $arrID=$this->redis->zrange('hasGroupChat_'.$user, 0, -1, 'withscores'); #获取用户拥有的群组ID
- $json_message=array(); #初始化
- foreach ($arrID as $k => $v) #遍历循环所有群组,查看是否有新消息
- {
- $messageCount=$this->redis->get('countMessage_'.$k); #群组最大信息分值数
- if($messageCount>$v) #判断用户是否存在未读新消息
- {
- $json_message[$k]['message']=$this->redis->zrangebyscore('groupChatMessage_'.$k, $v+1, $messageCount);
- $json_message[$k]['count']=count($json_message[$k]['message']); #统计新消息数量
- $this->redis->zadd('hasGroupChat_'.$user, $messageCount, $k); #更新已获取消息
- }
- }
- if($json_message)
- return $json_message;
- return false;
- }
- /**
- * @desc 分页获取群组信息
- *
- * @param $user string | 用户名
- * @param $groupChatID int | 群组ID
- * @param $page int | 第几页
- * @param $size int | 每页多少条数据
- *
- * @return 成功返回json数据,失败返回false
- */
- public function getPartMessage($user, $groupChatID, $page=1, $size=10)
- {
- $start=$page*$size-$size; #开始截取数据位置
- $stop=$page*$size-1; #结束截取数据位置
- $json_message=$this->redis->zrevrange('groupChatMessage_'.$groupChatID, $start, $stop);
- if($json_message)
- return $json_message;
- return false;
- }
- /**
- * @desc 加锁方法
- *
- * @param $lockName string | 锁的名字
- * @param $timeout int | 锁的过期时间
- *
- * @return 成功返回identifier/失败返回false
- */
- public function getLock($lockName, $timeout=2)
- {
- $identifier=uniqid(); #获取唯一标识符
- $timeout=ceil($timeout); #确保是整数
- $end=time()+$timeout;
- while(time()<$end) #循环获取锁
- {
- /*
- #这里的set操作可以等同于下面那个if操作,并且可以减少一次与redis通讯
- if($this->redis->set($lockName, $identifier array('nx', 'ex'=>$timeout)))
- return $identifier;
- */
- if($this->redis->setnx($lockName, $identifier)) #查看$lockName是否被上锁
- {
- $this->redis->expire($lockName, $timeout); #为$lockName设置过期时间
- return $identifier; #返回一维标识符
- }
- elseif ($this->redis->ttl($lockName)===-1)
- {
- $this->redis->expire($lockName, $timeout); #检测是否有设置过期时间,没有则加上
- }
- usleep(0.001); #停止0.001ms
- }
- return false;
- }
- /**
- * @desc 释放锁
- *
- * @param $lockName string | 锁名
- * @param $identifier string | 锁的唯一值
- *
- * @param bool
- */
- public function releaseLock($lockName,$identifier)
- {
- if($this->redis->get($lockName)==$identifier) #判断是锁有没有被其他客户端修改
- {
- $this->redis->multi();
- $this->redis->del($lockName); #释放锁
- $this->redis->exec();
- return true;
- }
- else
- {
- return false; #其他客户端修改了锁,不能删除别人的锁
- }
- }
- }
- ?>
测试:
1、建立 createGroupChat.php(测试创建群组功能)
执行代码并创建 568、569 群组(群主为 jack)
- include './ManyPullMessage.class.php';
- $object=new ManyPullMessage('192.168.95.11');
- #创建群组
- $user='jack';
- $arr=array('jane1','jane2');
- $a=$object->createGroupChat($user,$arr);
- echo "<pre>";
- print_r($a);
- echo "</pre>";die;
2、建立 addMembers.php(测试添加成员功能)
执行代码并添加新成员
- include './ManyPullMessage.class.php';
- $object=new ManyPullMessage('192.168.95.11');
- $b=$object->addMembers('jack','568',array('jane1','jane2','jane3','jane4'));
- echo "<pre>";
- print_r($b);
- echo "</pre>";die;
3、建立 delete.php(测试群主删除成员功能)
- include './ManyPullMessage.class.php';
- $object=new ManyPullMessage('192.168.95.11');
- #群主删除成员
- $c=$object->delMembers('jack', '568', array('jane1','jane4'));
- echo "<pre>";
- print_r($c);
- echo "</pre>";die;
4、建立 sendMessage.php(测试发送消息功能)
多执行几遍,568、569 都发几条
- include './ManyPullMessage.class.php';
- $object=new ManyPullMessage('192.168.95.11');
- #发送消息
- $user='jane2';
- $message='go go go';
- $groupChatID=568;
- $arr=array('sender'=>$user, 'message'=>$message, 'time'=>time());
- $d=$object->sendMessage($user,$groupChatID,$arr);
- echo "<pre>";
- print_r($d);
- echo "</pre>";die;
5、建立 getNewMessage.php(测试用户获取新消息功能)
- include './ManyPullMessage.class.php';
- $object=new ManyPullMessage('192.168.95.11');
- #用户获取新消息
- $e=$object->getNewMessage('jane2');
- echo "<pre>";
- print_r($e);
- echo "</pre>";die;
6、建立 getPartMessage.php(测试用户获取某个群组部分消息)
(多发送几条消息,用于测试。568 中共 18 条数据)
- include './ManyPullMessage.class.php';
- $object=new ManyPullMessage('192.168.95.11');
- #用户获取某个群组部分消息
- $f=$object->getPartMessage('jane2', 568, 1, 10);
- echo "<pre>";
- print_r($f);
- echo "</pre>";die;
page=1,size=10
page=2,size=10
测试完毕,还需要别的功能可以自己进行修改添加测试。
这次整理这篇文章相对比较赶,心里已经想着快点整理完赶紧学习其他的技术啦,哈哈。各位大神请留步,恳请各位给点学习 redis 的指导意见,本人职业方向是 PHP
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,同时也希望多多支持 PHPERZ!
来源: http://www.phperz.com/article/17/0824/340297.html