1.1.1 map 端连接 - DistributedCache 分布式缓存小数据集
当一个数据集非常小时, 可以将小数据集发送到每个节点, 节点缓存到内存中, 这个数据集称为边数据. 用 map 函数将小数据集中的数据按键聚合到大的数据集中, 输出连接数据集, 进行连接操作.
(1)分布式缓存指定缓存文件
执行命令行时, 采用 hadoop jar hadoop-example.jar MapSideJoinMain -files input/cityfile/tb_dim_city.dat input/data/all output
-files input/cityfile/tb_dim_city.dat 指定需要缓存的文件, 会被复制到各个节任务点.
(2)指定缓存文件的三种类型
Hadoop 命令行选项中, 有三个命令可以实现文件复制分发到任务的各个节点. 用户启动一个作业, Hadoop 会把由 -files,-archives, 和 -libjars 等选项所指定的文件复制到分布式文件系统之中, 任务运行前, 节点管理器从分布式文件系统中复制文件到本地.
1) -files 选项指定待分发的文件, 文件内包含以逗号隔开的 URL 列表. 文件可以存放在本地文件系统, HDFS, 或其它 Hadoop 可读文件系统之中. 如果尚未指定文件系统, 则这些文件被默认是本地的. 即使默认文件系统并非本地文件系统, 这也是成立的.
2) -archives 选项向自己的任务中复制存档 (压缩) 文件, 比如 JAR 文件, ZIP 文件, tar 文件和 gzipped tar 文件, 这些文件会被解档到任务节点.
3) -libjars 选项把 JAR 文件添加到 mapper 和 reducer 任务的类路径中. 如果作业 JAR 文件并非包含很多库 JAR 文件, 这点会很有用.
(3)缓存文件删除机制
节点管理器为缓存中的文件各维护一个计数器, 任务运行时, 文件计数器加 1, 任务完成后, 计数器减 1, 计数器为 0 时才能删除文件, 当节点缓存容量大于一定值(yarn.nodemanger.localizer.cache.target-size-mb 设置, 默认 10GB), 才会删除最近最少使用的文件.
(4)Job 的分布式缓存 API
除了可以用命令行参数指定缓存文件外, 还以通过 Job 的 API 指定缓存文件; 即通过 job 对象调用下面的函数设置缓存文件.
- // 以下两组方法将文件或存档添加到分布式缓存
- public void addCacheFile(URI uri);
- public void addCacheArchive(URI uri);
- // 以下两组方法将一次性向分布式缓存中添加一组文件或存档
- public void setCacheFiles(URI[] files);
- public void setCacheArchives(URI[] archives);
- // 以下两组方法将文件或存档添加到 MapReduce 任务的类路径
- public void addFileToClassPath(Path file);
- public void addArchiveToClassPath(Path archive);
- public void createSymlink();
(6)DistributedCache 缓存小数据集实现 hadoop map 端连接实例
下面的实例是将城市名称的数据集和用户信息的数据集进行连接, 城市名称的数据集很小, 而用户信息的数据集很大, 所以可以采用缓存文件的方式, 将城市信息数据集发送到任务, map 任务通过 setup 方法从缓存中读取小数据集文件 tb_dim_city.dat, 在内存中形成 map 映射, map 函数处理用户信息数据, 根据用户信息中的城市 id 去 map 映射中找到城市名称, 然后合并输出.
- package Temperature;
- import java.io.BufferedReader;
- import java.io.FileReader;
- import java.io.IOException;
- import java.util.HashMap;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.filecache.DistributedCache;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- /**
- *
- * 用途说明:
- * Map side join 中的 left outer join
- * 左连接, 两个文件分别代表 2 个表, 连接字段 table1 的 id 字段和 table2 的 cityID 字段
- * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show),
- * 假设 tb_dim_city 文件记录数很少, tb_dim_city.dat 文件内容, 分隔符为 "|":
- * id name orderid city_code is_show
- * 0 其他 9999 9999 0
- * 1 长春 1 901 1
- * 2 吉林 2 902 1
- * 3 四平 3 903 1
- * 4 松原 4 904 1
- * 5 通化 5 905 1
- * 6 辽源 6 906 1
- * 7 白城 7 907 1
- * 8 白山 8 908 1
- * 9 延吉 9 909 1
- * ------------------------- 风骚的分割线 -------------------------------
- * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
- * tb_user_profiles.dat 文件内容, 分隔符为 "|":
- * userID network flow cityID
- * 1 2G 123 1
- * 2 3G 333 2
- * 3 3G 555 1
- * 4 2G 777 3
- * 5 3G 666 4
- * ------------------------- 风骚的分割线 -------------------------------
- * 结果:
- * 1 长春 1 901 1 1 2G 123
- * 1 长春 1 901 1 3 3G 555
- * 2 吉林 2 902 1 2 3G 333
- * 3 四平 3 903 1 4 2G 777
- * 4 松原 4 904 1 5 3G 666
- */
- public class MapSideJoinMain extends Configured implements Tool {
- private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);
- public static class LeftOutJoinMapper extends Mapper {
- private HashMap city_info = new HashMap < String,
- String > ();
- private Text outPutKey = new Text();
- private Text outPutValue = new Text();
- private String mapInputStr = null;
- private String mapInputSpit[] = null;
- private String city_secondPart = null;
- /**
- * 此方法在每个 task 开始之前执行, 这里主要用作从 DistributedCache
- * 中取到 tb_dim_city 文件, 并将里边记录取出放到内存中.
- */
- @Override protected void setup(Context context) throws IOException,
- InterruptedException {
- BufferedReader br = null;
- // 获得当前作业的 DistributedCache 相关文件
- Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
- String cityInfo = null;
- for (Path p: distributePaths) {
- if (p.toString().endsWith("tb_dim_city.dat")) {
- // 读缓存文件, 并放到 mem 中
- br = new BufferedReader(new FileReader(p.toString()));
- while (null != (cityInfo = br.readLine())) {
- String[] cityPart = cityInfo.split("\\|", 5);
- if (cityPart.length == 5) {
- city_info.put(cityPart[0], cityPart[1] + "\t" + cityPart[2] + "\t" + cityPart[3] + "\t" + cityPart[4]);
- }
- }
- }
- }
- }
- /**
- * Map 端的实现相当简单, 直接判断 tb_user_profiles.dat 中的
- * cityID 是否存在我的 map 中就 ok 了, 这样就可以实现 Map Join 了
- */
- protected void map(Object key, Text value, Context context) throws IOException,
- InterruptedException {
- // 排掉空行
- if (value == null || value.toString().equals("")) {
- return;
- }
- mapInputStr = value.toString();
- mapInputSpit = mapInputStr.split("\\|", 4);
- // 过滤非法记录
- if (mapInputSpit.length != 4) {
- return;
- }
- // 判断链接字段是否在 map 中存在
- city_secondPart = (String) city_info.get((Object) mapInputSpit[3]);
- if (city_secondPart != null) {
- this.outPutKey.set(mapInputSpit[3]);
- this.outPutValue.set(city_secondPart + "\t" + mapInputSpit[0] + "\t" + mapInputSpit[1] + "\t" + mapInputSpit[2]);
- context.write(outPutKey, outPutValue);
- }
- }
- }
- public int run(String[] args) throws Exception {
- Configuration conf = getConf(); // 获得配置文件对象
- DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf); // 为该 job 添加缓存文件
- Job job = new Job(conf, "MapJoinMR");
- job.setNumReduceTasks(0);
- FileInputFormat.addInputPath(job, new Path(args[0])); // 设置 map 输入文件路径
- FileOutputFormat.setOutputPath(job, new Path(args[2])); // 设置 reduce 输出文件路径
- job.setJarByClass(MapSideJoinMain.class);
- job.setMapperClass(LeftOutJoinMapper.class);
- job.setInputFormatClass(TextInputFormat.class); // 设置文件输入格式
- job.setOutputFormatClass(TextOutputFormat.class); // 使用默认的 output 格式
- // 设置 map 的输出 key 和 value 类型
- job.setMapOutputKeyClass(Text.class);
- // 设置 reduce 的输出 key 和 value 类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.waitForCompletion(true);
- return job.isSuccessful() ? 0 : 1;
- }
- public static void main(String[] args) throws IOException,
- ClassNotFoundException,
- InterruptedException {
- try {
- int returnCode = ToolRunner.run(new MapSideJoinMain(), args);
- System.exit(returnCode);
- } catch(Exception e) {
- // TODO Auto-generated catch block
- logger.error(e.getMessage());
- }
- }
- }
实例参考文献:
https://www.cnblogs.com/CSSdongl/p/6018806.html
自己开发了一个股票智能分析软件, 功能很强大, 需要的点击下面的链接获取:
https://www.cnblogs.com/bclshuai/p/11380657.html
来源: https://www.cnblogs.com/bclshuai/p/12319471.html