MaxCompute 里的 MR 作业,很少是只要跑一次就好了的。如果需要周期性调度,目前 MaxCompute(原名 ODPS)只提供了计算引擎,任务调度可以使用来实现。这篇帖子从基础开始,介绍了 3 种周期性调度的方法。同时还介绍了如何使用资源文件。
代码以文档里的 作为例子。
在这个基础上,增加资源文件的读取方法,修改 Reduce 类。主要的逻辑是读取资源文件,资源文件里的数据格式是字符串 1, 字符串 2。代码逻辑是如果 word count 里的 word 如果有在字符串 1 里出现的话,就替换成字符串 2。
- public static class SumReducer extends ReducerBase {
- private Record result = null;
- private Map<String,String> maps = null;
- @Override
- public void setup(TaskContext context) throws IOException {
- result = context.createOutputRecord();
- maps = new HashMap<String,String>();
- StringBuilder importdata = new StringBuilder();
- BufferedInputStream bufferedInput = null;
- try {
- byte[] buffer = new byte[1024];
- int bytesRead = 0;
- //读取资源文件的内容
- bufferedInput = context.readResourceFileAsStream("resource.txt");
- while ((bytesRead = bufferedInput.read(buffer)) != -1) {
- String chunk = new String(buffer, 0, bytesRead);
- importdata.append(chunk);
- }
- //解析资源文件的内容,把替换前,替换后的数据放到map里
- String lines[] = importdata.toString().split("\n");
- for (int i = 0; i < lines.length; i++) {
- String[] ss = lines[i].split(",");
- maps.put(ss[0].trim(), ss[1].trim());
- System.out.println(ss[0]+"->"+ss[1]);
- }
- } catch (FileNotFoundException ex) {
- throw new IOException(ex);
- } catch (IOException ex) {
- throw new IOException(ex);
- } finally {
- }
- }
- @Override
- public void reduce(Record key, Iterator<Record> values,
- TaskContext context) throws IOException {
- long count = 0;
- while (values.hasNext()) {
- Record val = values.next();
- count += (Long) val.get(0);
- }
- String value = key.get(0).toString();
- if(maps.containsKey(value)){
- System.out.println(value+"->"+maps.get(value));
- value = maps.get(value);
- }
- result.set(0, value);
- result.set(1, count);
- context.write(result);
- }
- }
具体资源文件的用法可以参考 ,这里就不再多解释了。
对于测试数据,源文件的内容为
- odps,MaxCompute
- hello,Hello
我们先用手工调度来跑这个 MR,这里跑通了后后面的所有的配置就很容易明白了。
首先需要把代码打出的 jar 包,和这个 resource.txt 文件上传到服务器上
- >add jar D:\cx_word_count.jar -f;
- OK: Resource 'cx_word_count.jar' have been updated.
- >add file D:\resource.txt -f;
- OK: Resource 'resource.txt' have been updated.
然后通过命令行来调用
- jar -resources cx_word_count.jar,resource.txt -classpath D:\cx_word_count.jar com.aliyun.odps.mr.WordCount;
这里的 - resources 引用的是跑在服务器上的,-classpath 是用来找到 main 方法的。理解这个对后面配置同步任务很有帮助。可以参阅
odpscmd 客户端有一个参数,是 - e,可以在 shell 里直接调用 jar 命令来跑 MR,当然也可以使用 odpscmd -f 来再调用一个脚本文件,但是这样有点麻烦了。这里就直接用 - e 来做。
你可以先用
- /odps/cmd/bin/odpscmd -e "jar -resources cx_word_count.jar,resource.txt -classpath /odps/cx_word_count.jar com.aliyun.odps.mr.WordCount;"
在 Linux 服务器上运行任务。注意安装 odpscmd 配置前需要先配置好 java 环境。然后后面的 Crontab 的配置就不展开了。
配置 DataIDE 的 MR 作业的界面,很容易就让人想到 MR 任务的 main 方法。其实就是 DataIDE 会根据配置自己生成 main 方法,然后去调用 MaxCompute 上的任务。具体的配置可以参考这个截图:
可以在右边看到可以配置任务的调度周期和上下游依赖,从而实现每天的定时调度,而且还能是保证上游的数据导入、预处理完成后才开始做 MR 操作,非常好用。
上述的 MR 任务简单方便,但是 DataIDE 出于安全考虑,不让用户自己写 main 方法。如果需要用到诸如传参数之类的功能,可以自己写 Shell 任务,但是调度让 DataIDE 来做。这样就集上面两个方法之长了。
Shell 任务需要先参考 先配置调度的 ECS 信息,这里不再展开。完成后写一个 Shell 脚本,内容为
- ##@resource_reference{"cx_word_count.jar,resource.txt"}
- /opt/taobao/tbdpapp/odpswrapper/odpsconsole/bin/odpscmd -u testid -p testkey --project=testproject --endpoint=http://service.odps.aliyun.com/api -e "jar -resources cx_word_count.jar,resource.txt -classpath /odps/cx_word_count.jar com.aliyun.odps.mr.WordCount"
要把里面的 Access id/key,Project 替换成你自己的,然后开始测试代码。需要特别注意的是,** Shell 任务是在机器上的 admin 账号下运行的 ** ,如果发现各种奇怪的错误,比如明明存在的文件找不到一类的错误,可以先 su - admin,调试下 Shell 命令,或者访问下对应的文件,看看是否是环境变量,文件目录权限的问题。另外也可以把错误日志重定向到某个文件里,比如 / tmp 文件夹下的某个临时日志文件里,方便事后调试。大家可以在 admin 账号下把 shell 调试通过后再放到数加上去调用。
另外 Shell 任务可以调整调度的机器,可以参考
来源: