一. 使用 JAVA API 的方式
private static Table table = null;
// 声明静态配置
static Configuration conf = null;
static {
conf = HBaseConfiguration.create();
// 配置 hbase.zookeeper.quorum: 后接 zookeeper 集群的机器列表
conf.set("hbase.zookeeper.quorum", HConfiguration.hbase_zookeeper_quorum);
conf.set("hbase.zookeeper.property.clientPort", "2181");
try {
conn = ConnectionFactory.createConnection(conf);
} catch(IOException e) {
e.printStackTrace();
}
}
// 获取 htable 实例
public static void getHTable() {
if (table == null) {
try {
table = conn.getTable(TableName.valueOf("tablename"));
} catch(IOException e) {
e.printStackTrace();
}
}
}
/*
* 遍历查询 hbase 表
*
* @tableName 表名
*/
public static void getResultScann(String start_rowkey, String stop_rowkey) throws IOException {
Scan scan = new Scan();
Table tables = null;
scan.setStartRow(Bytes.toBytes(start_rowkey));
scan.setStopRow(Bytes.toBytes(stop_rowkey));
//FirstKeyOnlyFilter filter = new FirstKeyOnlyFilter();
// 只查询每个行键的第一个键值对的 Filter
//scan.setFilter(filter);
ResultScanner rs = null;
try {
tables = conn.getTable(TableName.valueOf("tablename"));
rs = tables.getScanner(scan);
int index = 0;
for (Result result: rs) { // 按行遍历
System.out.println("第" + index+++"行 ---- 列数::" + result.listCells().size());
for (Cell kv: result.listCells()) { // 遍历每一行的各列
System.out.println("row:" + Bytes.toString(kv.getRow()));
System.out.println("family:" + Bytes.toString(kv.getFamily()));
System.out.println("qualifier:" + Bytes.toString(kv.getQualifier()));
System.out.println("value:" + Bytes.toString(kv.getValue()));
System.out.println("timestamp:" + kv.getTimestamp());
System.out.println("-------------------------------------------");
}
}
} catch(IOException e) {
System.out.println("error");
e.printStackTrace();
} finally {
rs.close();
tables.close();
}
}
二. 使用 MapReduce 方式
public class OnlyMapScan {
/**
* Mapper
*/
public static class MyMapper extends TableMapper < Text,
NullWritable > {
public void map(ImmutableBytesWritable rows, Result result, Context context) throws IOException,
InterruptedException {
// 把取到的值直接打印
for (Cell kv: result.listCells()) { // 遍历每一行的各列
// 假如我们当时插入 HBase 的时候没有把 intfloat 等类型的数据转换成 String, 这里就会乱码了
String row = new String(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(), "UTF-8");
String family = new String(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), "UTF-8");
String qualifier = new String(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(), "UTF-8");
String value = new String(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(), "UTF-8");
System.out.println("row:" + row);
System.out.println("family:" + family);
System.out.println("qualifier:" + qualifier);
System.out.println("value:" + value);
System.out.println("timestamp:" + kv.getTimestamp());
System.out.println("-------------------------------------------");
}
}
}
/**
* Main
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();
// 设置 zookeeper
configuration.set("hbase.zookeeper.quorum", HConfiguration.hbase_zookeeper_quorum);
configuration.set("hbase.zookeeper.property.clientPort", "2181");
configuration.set(TableInputFormat.INPUT_TABLE, HConfiguration.tableName);
// 将该值改大, 防止 hbase 超时退出
configuration.set("dfs.socket.timeout", "18000");
Scan scan = new Scan();
scan.setCaching(1024);
scan.setCacheBlocks(false);
scan.setStartRow(Bytes.toBytes("73037041-AA"));
scan.setStopRow(Bytes.toBytes("73037045-AA"));
Job job = new Job(configuration, "ScanHbaseJob");
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("student"), scan, MyMapper.class, Text.class, NullWritable.class, job);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(1);
job.waitForCompletion(true);
}
}
百度搜索就爱阅读, 专业资料, 生活学习, 尽在就爱阅读网 92to.com, 您的在线图书馆!
来源: http://www.92to.com/bangong/2018/01-25/33173657.html