keys rpo throws max del 键值 先进先出 instance
这里编写了一个 java 序列化的工具, 主要是将对象转化为 byte 数组, 和根据 byte 数组反序列化成 java 对象; 主要是用到了 ByteArrayOutputStream 和 ByteArrayInputStream; 注意: 每个需要序列化的对象都要实现 Serializable 接口; 其代码如下:
- 1 package Utils;
- 2 import java.io. * ;
- 3
- /**
- 4 * Created by Kinglf on 2016/10/17.
- 5 */
- 6 public class ObjectUtil {
- 7
- /**
- 8 * 对象转byte[]
- 9 * @param obj
- 10 * @return
- 11 * @throws IOException
- 12 */
- 13 public static byte[] object2Bytes(Object obj) throws IOException {
- 14 ByteArrayOutputStream bo = new ByteArrayOutputStream();
- 15 ObjectOutputStream oo = new ObjectOutputStream(bo);
- 16 oo.writeObject(obj);
- 17 byte[] bytes = bo.toByteArray();
- 18 bo.close();
- 19 oo.close();
- 20
- return bytes;
- 21
- }
- 22
- /**
- 23 * byte[]转对象
- 24 * @param bytes
- 25 * @return
- 26 * @throws Exception
- 27 */
- 28 public static Object bytes2Object(byte[] bytes) throws Exception {
- 29 ByteArrayInputStream in =new ByteArrayInputStream(bytes);
- 30 ObjectInputStream sIn = new ObjectInputStream( in );
- 31
- return sIn.readObject();
- 32
- }
- 33
- }
- package Model;
- import java.io.Serializable;
- /**
- * Created by Kinglf on 2016/10/17.
- */
- public class Message implements Serializable {
- private static final long serialVersionUID = -389326121047047723L;
- private int id;
- private String content;
- public Message(int id, String content) {
- this.id = id;
- this.content = content;
- }
- public int getId() {
- return id;
- }
- public void setId(int id) {
- this.id = id;
- }
- public String getContent() {
- return content;
- }
- public void setContent(String content) {
- this.content = content;
- }
- }
利用 redis 做队列, 我们采用的是 redis 中 list 的 push 和 pop 操作; 结合队列的特点:
Redis 中 lpush
- 只允许在一端插入新元素只能在队列的尾部FIFO:先进先出原则
(rpop
- 头入
) 或 rpush
- 尾出
(lpop
- 尾入
) 可以满足要求, 而 Redis 中 list 药 push 或 pop 的对象仅需要转换成 byte[] 即可
- 头出
上代码:
- java采用Jedis进行Redis的存储和Redis的连接池设置
- package Utils;
- import redis.clients.jedis.Jedis;
- import redis.clients.jedis.JedisPool;
- import redis.clients.jedis.JedisPoolConfig;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
- /**
- * Created by Kinglf on 2016/10/17.
- */
- public class JedisUtil {
- private static String JEDIS_IP;
- private static int JEDIS_PORT;
- private static String JEDIS_PASSWORD;
- private static JedisPool jedisPool;
- static {
- //Configuration自行写的配置文件解析类,继承自Properties
- Configuration conf=Configuration.getInstance();
- JEDIS_IP=conf.getString("jedis.ip","127.0.0.1");
- JEDIS_PORT=conf.getInt("jedis.port",6379);
- JEDIS_PASSWORD=conf.getString("jedis.password",null);
- JedisPoolConfig config=new JedisPoolConfig();
- config.setMaxActive(5000);
- config.setMaxIdle(256);
- config.setMaxWait(5000L);
- config.setTestOnBorrow(true);
- config.setTestOnReturn(true);
- config.setTestWhileIdle(true);
- config.setMinEvictableIdleTimeMillis(60000L);
- config.setTimeBetweenEvictionRunsMillis(3000L);
- config.setNumTestsPerEvictionRun(-1);
- jedisPool=new JedisPool(config,JEDIS_IP,JEDIS_PORT,60000);
- }
- /**
- * 获取数据
- * @param key
- * @return
- */
- public static String get(String key){
- String value=null;
- Jedis jedis=null;
- try{
- jedis=jedisPool.getResource();
- value=jedis.get(key);
- }catch (Exception e){
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- }finally {
- close(jedis);
- }
- return value;
- }
- private static void close(Jedis jedis) {
- try{
- jedisPool.returnResource(jedis);
- }catch (Exception e){
- if(jedis.isConnected()){
- jedis.quit();
- jedis.disconnect();
- }
- }
- }
- public static byte[] get(byte[] key){
- byte[] value = null;
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- value = jedis.get(key);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- return value;
- }
- public static void set(byte[] key, byte[] value) {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- jedis.set(key, value);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- }
- public static void set(byte[] key, byte[] value, int time) {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- jedis.set(key, value);
- jedis.expire(key, time);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- }
- public static void hset(byte[] key, byte[] field, byte[] value) {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- jedis.hset(key, field, value);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- }
- public static void hset(String key, String field, String value) {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- jedis.hset(key, field, value);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- }
- /**
- * 获取数据
- *
- * @param key
- * @return
- */
- public static String hget(String key, String field) {
- String value = null;
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- value = jedis.hget(key, field);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- return value;
- }
- /**
- * 获取数据
- *
- * @param key
- * @return
- */
- public static byte[] hget(byte[] key, byte[] field) {
- byte[] value = null;
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- value = jedis.hget(key, field);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- return value;
- }
- public static void hdel(byte[] key, byte[] field) {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- jedis.hdel(key, field);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- }
- /**
- * 存储REDIS队列 顺序存储
- * @param key reids键名
- * @param value 键值
- */
- public static void lpush(byte[] key, byte[] value) {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- jedis.lpush(key, value);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- }
- /**
- * 存储REDIS队列 反向存储
- * @param key reids键名
- * @param value 键值
- */
- public static void rpush(byte[] key, byte[] value) {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- jedis.rpush(key, value);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- }
- /**
- * 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端
- * @param key reids键名
- * @param destination 键值
- */
- public static void rpoplpush(byte[] key, byte[] destination) {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- jedis.rpoplpush(key, destination);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- }
- /**
- * 获取队列数据
- * @param key 键名
- * @return
- */
- public static List lpopList(byte[] key) {
- List list = null;
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- list = jedis.lrange(key, 0, -1);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- return list;
- }
- /**
- * 获取队列数据
- * @param key 键名
- * @return
- */
- public static byte[] rpop(byte[] key) {
- byte[] bytes = null;
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- bytes = jedis.rpop(key);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- return bytes;
- }
- public static void hmset(Object key, Map hash) {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- jedis.hmset(key.toString(), hash);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- }
- public static void hmset(Object key, Map hash, int time) {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- jedis.hmset(key.toString(), hash);
- jedis.expire(key.toString(), time);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- }
- public static List hmget(Object key, String... fields) {
- List result = null;
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- result = jedis.hmget(key.toString(), fields);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- return result;
- }
- public static Set hkeys(String key) {
- Set result = null;
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- result = jedis.hkeys(key);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- return result;
- }
- public static List lrange(byte[] key, int from, int to) {
- List result = null;
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- result = jedis.lrange(key, from, to);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- return result;
- }
- public static Map hgetAll(byte[] key) {
- Map result = null;
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- result = jedis.hgetAll(key);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- return result;
- }
- public static void del(byte[] key) {
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- jedis.del(key);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- }
- public static long llen(byte[] key) {
- long len = 0;
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource();
- jedis.llen(key);
- } catch (Exception e) {
- //释放redis对象
- jedisPool.returnBrokenResource(jedis);
- e.printStackTrace();
- } finally {
- //返还到连接池
- close(jedis);
- }
- return len;
- }
- }
- package Utils;
- import java.io.IOException;
- import java.io.InputStream;
- import java.util.Properties;
- /**
- * Created by Kinglf on 2016/10/17.
- */
- public class Configuration extends Properties {
- private static final long serialVersionUID = -2296275030489943706L;
- private static Configuration instance = null;
- public static synchronized Configuration getInstance() {
- if (instance == null) {
- instance = new Configuration();
- }
- return instance;
- }
- public String getProperty(String key, String defaultValue) {
- String val = getProperty(key);
- return (val == null || val.isEmpty()) ? defaultValue : val;
- }
- public String getString(String name, String defaultValue) {
- return this.getProperty(name, defaultValue);
- }
- public int getInt(String name, int defaultValue) {
- String val = this.getProperty(name);
- return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
- }
- public long getLong(String name, long defaultValue) {
- String val = this.getProperty(name);
- return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
- }
- public float getFloat(String name, float defaultValue) {
- String val = this.getProperty(name);
- return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val);
- }
- public double getDouble(String name, double defaultValue) {
- String val = this.getProperty(name);
- return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val);
- }
- public byte getByte(String name, byte defaultValue) {
- String val = this.getProperty(name);
- return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val);
- }
- public Configuration() {
- InputStream in = ClassLoader.getSystemClassLoader().getResourceAsStream("config.xml");
- try {
- this.loadFromXML(in);
- in.close();
- } catch (IOException ioe) {
- }
- }
- }
- import Model.Message;
- import Utils.JedisUtil;
- import Utils.ObjectUtil;
- import redis.clients.jedis.Jedis;
- import java.io.IOException;
- /**
- * Created by Kinglf on 2016/10/17.
- */
- public class TestRedisQueue {
- public static byte[] redisKey = "key".getBytes();
- static {
- try {
- init();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- private static void init() throws IOException {
- for (int i = 0; i < 1000000; i++) {
- Message message = new Message(i, "这是第" + i + "个内容");
- JedisUtil.lpush(redisKey, ObjectUtil.object2Bytes(message));
- }
- }
- public static void main(String[] args) {
- try {
- pop();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- private static void pop() throws Exception {
- byte[] bytes = JedisUtil.rpop(redisKey);
- Message msg = (Message) ObjectUtil.bytes2Object(bytes);
- if (msg != null) {
- System.out.println(msg.getId() + "----" + msg.getContent());
- }
- }
- }
- 每执行一次pop()方法,
- 结果如下: 1----这是第1个内容2----这是第2个内容3----这是第3个内容4----这是第4个内容
- 至此,整个Redis消息队列的生产者和消费者代码已经完成
- 需要传送的实体类(需实现Serializable接口)
- Redis的配置读取类,继承自Properties
- 将对象和byte数组双向转换的工具类
- 通过消息队列的先进先出(FIFO)的特点结合Redis的list中的push和pop操作进行封装的工具类
Java 利用 Redis 实现消息队列
来源: http://www.bubuko.com/infodetail-2054533.html