需求:读取 hbase 数据,根据某一些条件,过滤掉不符合情况的行,实现数据在服务器端的过滤。
解决方法:通过翻阅《HBase 权威指南》发现,实现这个需求有以下几种方法
① 使用行过滤器(RowFilter),基于行健来过滤数据。通过比较返回比符合条件的行健。
② 单列值过滤器(SingleColumnValueFilter),此过滤器使用的情况比较多。可以用一列的值决定是否一行数据被过滤。
问题:使用 SingleColumnValueFilter 组成一个 FilterList 来过滤数据,实现数据在服务器端的过滤,但是当数据量非常大的情况下,如何结合 MapReduce 实现数据的查询。
解决方法:过滤器在声明 SCAN 时进行定义,得到的是一整行的数据,而不是自己需要的那一列。使其在 Map 端得到自己需要的某一列的值。
代码:
- public static void ParallelRange(String tableName, String columnFamily, String columnName, String startrowkey, String stoprowkey, String[] args) throws Exception {
- Configuration conf = new Configuration(); //把需要查询的列族和列名 传递给 map函数 conf.set("columnFamily", columnFamily); conf.set("columnName", columnName); Scan scan = new Scan(); // scan.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName)); scan.setStartRow(Bytes.toBytes(startrowkey)); scan.setStopRow(Bytes.toBytes(stoprowkey)); args=GetLongAndLati.getxy(args); //设置过滤器 List filters =new ArrayList(); //大于或等于最小的经纬度 SingleColumnValueFilter filter1 =new SingleColumnValueFilter(Bytes.toBytes("FilterFamily"), Bytes.toBytes("longitude"), CompareOp.GREATER_OR_EQUAL,new BinaryComparator(Bytes.toBytes(args[0]))); SingleColumnValueFilter filter2 =new SingleColumnValueFilter(Bytes.toBytes("FilterFamily"), Bytes.toBytes("latitude"), CompareOp.GREATER_OR_EQUAL,new BinaryComparator(Bytes.toBytes(args[1]))); //小于或等于最大的经纬度 SingleColumnValueFilter filter3 =new SingleColumnValueFilter(Bytes.toBytes("FilterFamily"), Bytes.toBytes("longitude"), CompareOp.LESS_OR_EQUAL,new BinaryComparator(Bytes.toBytes(args[2]))); SingleColumnValueFilter filter4 =new SingleColumnValueFilter(Bytes.toBytes("FilterFamily"), Bytes.toBytes("latitude"), CompareOp.LESS_OR_EQUAL,new BinaryComparator(Bytes.toBytes(args[3]))); System.out.println("longitude="+args[0]+"latitude="+args[1]); //添加到过滤器list中 filters.add(filter1); filters.add(filter2); filters.add(filter3); filters.add(filter4); FilterList fl = new FilterList(filters); scan.setFilter(fl); scan.setCaching(30); Job job=Job.getInstance(conf); job.setJarByClass(ImageHBase.class); job.setNumReduceTasks(0); //配置mapper函数 TableMapReduceUtil.initTableMapperJob(Bytes.toBytes(tableName), scan, RangeMapper.class, Text.class, Text.class, job); //配置输出路径 String outpath="hdfs://master:9000/tim/geodata/out2"; FileOutputFormat.setOutputPath(job, new Path(outpath)); System.exit(job.waitForCompletion(true) ? 0:1);}
mapper 函数:
- /** * 并行查询区域查询Mapper函数 * @author tim * */
- static class RangeMapper extends TableMapper {
- String columnFamily;
- String columnName;@Overrideprotected void setup(Mapper.Context context) throws IOException,
- InterruptedException { // TODO Auto-generated method stub //得到想要查询的 列族和 列名 columnFamily=conf.get("columnFamily"); columnName=conf.get("columnName");super.setup(context);} @Override protected void map(ImmutableBytesWritable rowkey, Result rs,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //得到想要的行, String rowvalue=Bytes.toString(rs.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName))); context.write(new Text(rowkey.toString()), new Text(rowvalue)); } }
注意: 不要在一开始,就声明自己想要的列
来源: http://www.92to.com/bangong/2017/06-20/23666877.html