- <!-- https://mvnrepository.com/artifact/org.apache.camel/camel-ftp -->
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-ftp</artifactId>
- <version>2.13.1</version>
- </dependency>
这个是我们项目中用的Camel版本,比较老了,最新的是2.19.2
- <!-- https://mvnrepository.com/artifact/org.apache.camel/camel-ftp -->
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-ftp</artifactId>
- <version>2.19.2</version>
- </dependency>
ftp://[username@]hostname[:port]/directoryname[?options]
sftp://[username@]hostname[:port]/directoryname[?options]
ftps://[username@]hostname[:port]/directoryname[?options]
下面这个链接是我本地测试使用的,可以参考下,具体参数的意义请往下看。
- ftp://173.5.206.53:2121/MHE/?username=ftpAdmin&password=123456&binary=true&passiveMode=true&delete=true&delay=60000
其中directoryname表示文件夹目录。目录名称是一个相对路径。不支持绝对路径。相对路径可以包含嵌套文件夹,例如 /inbox/us。
对于Camel2.16之前的版本,directoryName必须是已经存在的,这个配置不支持autoCreate选项(自动创建哪个文件夹)。原因是FTP服务器上有权限限制。
对于Camel2.16,是支持autoCreate选项的。当消费者启动,在执行轮询计划之前,会自动到FTP服务器创建对应的文件目录。默认值autoCreate是true.
如果没有提供用户名,会使用anonymous匿名登录,密码会随机尝试。
如果没有提供端口号,Camel将根据协议提供缺省值(ftp sftp = ftp = 21日22日= 2222)。
你可以添加以下格式的URI查询配置, ?option=value&option=value&... 如上文我的测试链接。&是我在pom文件中转译使用的。
这个配置使用两个不同的库操作FTP。FTP和FTPS使用Apache Commons Net,而SFTP使用JCraft JSCH。
FTPS组件仅可在Camel2.2及更新的版本。
FTPS(也称为安全FTP)是一个扩展添加支持FTP传输层安全性(TLS)和安全套接字层(SSL)加密协议。
ps:因为我目前使用的是2.13.1,关于新版本的特性我也不是太了解。
Name |
Default Value |
Description |
---|---|---|
|
|
指定要使用的用户名登录远程服务器。 |
|
|
指定的密码用来登录到远程服务器。 |
|
|
Camel 2.15.2: 指定的帐户用于登录到远程FTP服务器(只对FTP和FTPS) |
|
|
指定文件传输模式,二进制或ASCII。默认是ASCII(false)。 |
|
|
Camel 2.2: 使用后是否要断开远程FTP服务器。可用于消费者和生产者。只会断开断开当前连接到FTP服务器。如果你有一个你想要停止消费,那么你需要停止消费者路由。 |
|
|
在使用时,可以使用本地工作目录将远程文件内容直接存储在本地文件中,以避免将内容加载到内存中。这个是有好处的,如果你下载一个非常大的远程文件,可以节省内存。详情见下文。 |
|
|
FTP and FTPS only: 指定是否使用被动模式连接。默认是false。 |
|
|
FTPS only: 设置底层安全协议。定义以下值: : Transport Layer Security : Secure Sockets Layer |
|
|
Camel 2.4: FTPS only: 当使用安全的数据传输时,是否要禁用使用默认值execPbsz和execProt。如果你想完全控制这些东西,你可以使用execpbsz和execprot。 ps:本段翻译我没看懂,我也没用过。 |
|
|
Camel 2.11: FTP消费者是否应该下载该文件。如果将此选项设置为false,那么消息体将null,但消费者仍将触发一个Camel Exchange获得文件的详细信息,如文件名,文件大小,等等。只是不能下载的文件。 |
streamDownload |
false |
Camel 2.11: 消费者是否应该下载整个文件前,默认的行为,或者是否应该通过InputStream从路由远程资源读取而不是从内存中的Camel Exchange数组获取。如果下载失败的或是本地目录提供,这个选项可以忽略。此选项对于处理大型远程文件非常有用。 |
|
|
Camel 2.4: FTPS only: 默认情况下,如果没有禁用安全数据通道默认值,则使用选项p。可能的值是: : Clear : Safe (SSL protocol only) : Confidential (SSL protocol only) : Private |
|
|
Camel 2.4: FTPS only: 此选项指定安全数据通道的缓冲区大小。如果选择useSecureDataChannel,但是没有被显式设置,就没有使用价值了。 |
|
|
FTPS only: 设置安全模式(implicit/explicit)隐式/显式。默认是explicit (false) |
|
|
SFTP only: 设置known_hosts文件,以便SFTP端点可以做主机密钥验证。 |
|
true | SFTP onlly: Camel 2.18: 如果knownhostfile没有配置,然后使用系统的host file。System.getProperty(user.home)/.ssh/known_hosts |
|
|
SFTP only: Camel 2.11.1: 设置known_hosts文件(从classpath默认加载),这样可以做主机密钥验证SFTP端点。 |
|
|
SFTP only: Camel 2.12.0: 设置Java为SSH密钥对的公钥认证,它支持DSA或RSA密钥 |
|
|
SFTP only: 设置私钥文件,SFTP端点可以做私人密钥验证。 |
|
|
SFTP only: Camel 2.11.1: 设置私钥文件(默认从类路径加载)SFTP端点可以做私钥验证。 |
|
|
SFTP only: Camel 2.11.1: 设置私钥byte[]SFTP端点可以做私钥验证 |
|
|
SFTP only: Deprecated: 使用 代替。设置私有密钥文件密码SFTP端点可以做私钥验证。 |
|
|
SFTP only: Camel 2.11.1: 设置私有密钥文件密码SFTP端点可以做私钥验证。 |
|
|
SFTP only: Camel 2.10.7, 2.11.2,2.12.0: set the preferred authentications which SFTP endpoint will used. Some example include:password,publickey. If not specified the default list from JSCH will be used. |
|
|
Camel 2.8.2, 2.9: SFTP only Set a comma separated list of ciphers that will be used in order of preference. Possible cipher names are defined by JCraft JSCH. Some examples include: aes128-ctr,aes128-cbc,3des-ctr,3des-cbc,blowfish-cbc,aes192-cbc,aes256-cbc. If not specified the default list from JSCH will be used. |
|
|
Camel 2.8.2, 2.9: 如果将此选项设置为true,camel-ftp将直接使用列表文件检查文件是否存在。由于一些FTP服务器可能不支持直接列出文件,如果选项是错误的,camel-ftp将使用旧的方法列出目录并检查文件是否存在。注意从Camel 2.10.1起这个选项也影响readLock=changed控制是否执行一个快速检查更新文件信息。如果FTP服务器有大量文件,则可以使用它来加快进程。 |
|
|
SFTP only: Camel 2.2: Sets whether to use strict host key checking. Possible values are: , and . does not make sense to use as Camel cannot answer the question for you as its meant for human intervention. Note: The default in Camel 2.1 and below was . |
|
|
Specifies the maximum reconnect attempts Camel performs when it tries to connect to the remote FTP server. Use 0 to disable this behavior. |
|
|
Delay in millis Camel will wait before performing a reconnect attempt. |
|
|
Camel 2.4: Is the connect timeout in millis. This corresponds to using for the FTP/FTPS. For SFTP this option is also used when attempting to connect. |
|
|
FTP and FTPS Only: Camel 2.4: Is the value in millis. A good idea is to configure this to a value such as 300000 (5 minutes) to not hang a connection. On SFTP this option is set as timeout on the JSCH Session instance. Also SFTP from Camel 2.14.3/2.15.3/2.16 onwards. From Camel 2.16 onwards the default is 300000 (300 sec). |
|
|
FTP and FTPS Only: Camel 2.4: Is the data timeout in millis. This corresponds to using for the FTP/FTPS. For SFTP there is no data timeout. |
|
|
Camel 2.5: Whether or not to thrown an exception if a successful connection and login could not be establish. This allows a custom to deal with the exception, for example to stop the consumer or the likes. |
|
|
FTP and FTPS Only: Camel 2.5: To execute site commands after successful login. Multiple site commands can be separated using a new line character (\n). Use to see which site commands your FTP server supports. |
|
|
Camel 2.6: Whether or not stepwise traversing directories should be used or not. Stepwise means that it will CD one directory at a time. See more details below. You can disable this in case you can't use this approach. |
|
|
Camel 2.6: Dictates what path separator char to use when uploading files. = Use the path provided without altering it. = Use unix style path separators. = Use Windows style path separators. Since Camel 2.15.2: The default value is changed to UNIX style path, before Camel 2.15.2: The default value is
|
|
|
SFTP Producer Only: Camel 2.9: 允许你设置chmod存储文件。例如 . |
|
0 |
SFTP Only: Camel 2.8.3/2.9: To use compression. Specify a level from 1 to 10. Important:You must manually add the needed JSCH zlib JAR to the classpath for compression support. |
|
|
FTP/FTPS Only: Camel 2.15.1: 下载文件的缓冲区大小。默认大小是32 kb。 |
|
|
FTP and FTPS Only: Camel 2.1: Allows you to use a custom instance. |
|
|
FTP and FTPS Only: Camel 2.1: Allows you to use a custom instance. |
|
FTP and FTPS Only: To configure various options on the FTPClient instance from the uri. For example:
|
|
|
|
SFTP Only: Camel 2.8 Allows you to set the serverAliveInterval of the sftp session |
|
|
SFTP Only: Camel 2.8 Allows you to set the serverAliveCountMax of the sftp session |
|
|
FTPS Only: Sets the trust store file, so that the FTPS client can look up for trusted certificates. |
|
|
FTPS Only: Sets the trust store type. |
|
|
FTPS Only: Sets the trust store algorithm. |
|
|
FTPS Only: Sets the trust store password. |
|
|
FTPS Only: 设置密钥存储文件,FTPS 客户端可以为私有证书。 |
|
|
FTPS Only: 设置密钥存储类型。 |
|
|
FTPS Only: 设置密钥存储算法。 |
|
|
FTPS Only: 设置密钥存储密码。 |
|
|
FTPS Only: 设置私钥密码。 |
|
|
FTPS Only: Camel 2.9: 参考在注册表中的org.apache.camel.util.jsse.SSLContextParameters。这个引用重写任何SSL配置相关选项ftpClient以及securityProtocol(SSL,TLS等)FtpsConfiguration。 |
|
|
SFTP Only: Camel 2.10.7, 2.11.1: 参考在注册表中的一com.jcraft.jsch.proxy。这个代理是用来消费/发送来自目标SFTP主机信息。 |
|
|
FTP/FTPS Only: Camel 2.12.1: 用户是否应该使用FTP列表命令检索目录列表以查看哪些文件存在。如果此选项设置为false,thenstepwise=false必须配置,而且文件名必须配置一个固定的名称,让消费者知道该文件的名称来检索。执行此操作时,只能检索单个文件。详情请看下文。 |
|
|
Camel 2.12.1: 当试图检索某个文件但不存在(出于某种原因)或由于文件权限不足导致的错误时,用户是否应该忽略该错误。. Camel 2.14.2: 这个选项现在也适用于目录。 |
|
|
Camel 2.16: 生产者。是否要发送一个等待命令作为pre-write检查之前上传文件到FTP服务器。这是默认启用的验证连接仍然有效,它允许静默重新连接才能上传文件。但是如果这导致问题,您可以关闭这个选项。 |
|
|
SFTP Only: Camel 2.15.3/2.16: 使用JSCH活动记录的日志级别。如JSCH十分冗长,在默认情况下在信息层面上默认记录WARN警告级别。 |
|
SFTP Only: Camel 2.17.1: 指定一次可能有多少个请求未完成。增加这个值可能会略微提高文件传输速度,但会增加内存使用量。 | |
|
false | Camel 2.18: 是否在批处理完成后从远程FTP服务器断开连接。可用于消费者和生产者。断开连接只会断开与FTP服务器的当前连接。如果你有一个想要停止的消费者,那么你需要停止消费者路由 |
|
Camel 2.18: 在主动模式设置客户端端口范围。语法是:minPort-maxPort。端口号都是包括的,如10000 - 19999包括所有xxxx端口。 |
FTP消费者在默认情况下将消耗远程FTP服务器上的文件。你必须显式地配置读取后如何处理这个文件,如果你想要删除的文件或移动到另一个位置。例如你可以使用delete=true删除的文件,或者使用move=.done将文件移动到一个隐藏的子目录。
常规的文件消费是不同的,因为它将默认移动文件.camel子目录。Camel不做FTP默认的处理行为是它可能缺少权限,默认情况下无法移动或删除文件。
先放一个小DEMO
- /**
- * Apache Camel FTP Demo
- * @author 小卖铺的老爷爷
- */
- public class HelloWorld extends RouteBuilder {
- //启动FTP路由,实际项目中初始化应该是单独的一个类
- public static void main(String[] args) throws Exception {
- // 这是camel上下文对象,整个路由的驱动全靠它了。
- ModelCamelContext camelContext = new DefaultCamelContext();
- // 启动route
- camelContext.start();
- // 将我们的路由处理加入到上下文中
- camelContext.addRoutes(new HelloWorld());
- }
- @Override
- public void configure() throws Exception {
- //从FTP上下载文件到本地目录,相关参数的意义,参考我上文贴出的API,实际项目中这些地址一般写在配置文件中
- from("ftp://173.5.206.53:2121/MHE/?username=ftpAdmin&password=123456&binary=true&passiveMode=true&delete=true&delay=60000")
- //自定义的处理器,可以做各种逻辑处理,如文件名匹配下载等
- .process(new HttpProcessor())
- .to("file:d:/wms-fe/inFile");
- }
- }
上文的demo是每隔一分钟扫描ftp服务器目录上是否有新文件,如果有匹配文件名复核条件的下载到本地,并将服务器上的文件删除。当然我们也可以不删除,将文件移到一个固定目录备份,这里可以用move=XXX参数。.process(new HttpProcessor()) 其中这句是可以删除的,删除后就是只有有文件,Camel就会下载到本地了。
可能小Demo中代码不是很齐全,因为Processor的处理逻辑的代码可能有点问题,被我删掉了。
下面看下我实际项目里面的处理逻辑吧,
- /**
- *
- * 此类描述的是:FTP下载、加压、解析处理
- *
- */
- public class FTPMheRoute extends RouteBuilder {
- private String ftpDownURI;
- private String downDir;
- private String unpackDir;
- private String ftpFileCharset;
- private void initialize() {
- ftpDownURI = WmsFEUtil.getSysConfigValue("camel.ftp.download.uri").trim();
- downDir = WmsFEUtil.getSysConfigValue("camel.ftp.download.zip.dir").trim();
- unpackDir = WmsFEUtil.getSysConfigValue("camel.ftp.download.zip.unpack.dir").trim();
- ftpFileCharset = WmsFEUtil.getSysConfigValue("camel.ftp.file.charset");
- }
- //
- @Override
- public void configure() throws Exception {
- initialize();
- downRoute();
- unpackRoute();
- parseRoute();
- }
- /**
- * 此方法描述的是:下载路由
- */
- private void downRoute() {
- from(ftpDownURI).to(downDir).process(new MHEDownloadedProcessor());
- // from(ftpDownURI)
- // .choice()
- // .when(new FTPFilter(ftpFileCharset, FTPFilter.TYPE_FTP_FILE_NORMAL))
- // .process(new FTPProcessor(ftpFileCharset))
- // .to(downDir);
- //.process(new MHEDownloadedProcessor());
- OutUtil.log(
- FTPMheRoute.class,"Register Route :"+
- " when find the zip file it will download to "+downDir
- );
- }
- /**
- * 此方法描述的是:解压路由
- */
- private void unpackRoute() {
- from(downDir).process(new MHEDownUnpackProcessor());
- OutUtil.log(
- FTPMheRoute.class,"Register Route :"+
- "camel listner @ "+ downDir ,
- " when find the zip file it will unpack to "+unpackDir
- );
- }
- /**
- * 此方法描述的是:解析路由
- */
- private void parseRoute() {
- from(unpackDir).process(new MHEDownExecuteProcessor());
- OutUtil.log(
- FTPMheRoute.class,"Register Route :"+
- "camel listner @ "+ unpackDir ,
- " when find the txt file it will parse to DB."
- );
- }
- }
上面的代码中,我使用三个路由连接,上一个路由中由最后一个元素处理完的Exchange对象,将被发送至由Direct连接的下一个路由起始位置(http://camel.apache.org/direct.html)。注意,两个被连接的路由一定要是可用的,并且存在于同一个Camel服务中。
最后说一下Process,它用于接收从控制端点、路由选择条件又或者另一个处理器的Exchange中传来的消息信息,并进行处理。这里process(Exchange exchange)方法是必须进行实现的。
因为某些原因,这里我只贴下我解压路由的处理逻辑。
- /**
- *
- * 此类描述的是:FTP解压处理
- */
- public class MHEDownUnpackProcessor implements Processor {
- private static final String TXT_PATTERN = ".*?\\.txt";
- private static final String ZIP_PATTERN = ".*?\\.zip";
- private static final String BUSINESSTYPE_FTPMHE = "FTPMHE";
- @Override public void process(Exchange exchange) throws Exception {
- Message message = exchange.getIn();
- GenericFile < ?>gf = (GenericFile < ?>) message.getBody();
- File zipFile = (File) gf.getFile();
- unzip(zipFile);
- }
- /**
- * 此方法描述的是:解压缩文件
- * @throws IOException
- */
- private List < File > unzip(File zipFile) throws IOException {
- String zipName = zipFile.getName();
- if (Pattern.matches(ZIP_PATTERN, zipName)) {
- List < File > list = ZipUtil.unzip(zipFile, getDownUnpackDir(), TXT_PATTERN);
- if (list == null) {
- BaseLogUtil.addMsgLog(BUSINESSTYPE_FTPMHE, zipName, "", BaseLogUtil.SOURCE_OTHER, BaseLogUtil.TO_SYS_OTHER, BaseLogUtil.CHANNEL_FTP, BaseLogUtil.STATUS_FAIL, "", "解压失败,文件内容为空");
- ParseFileTools.move2Dir(zipFile, "fail");
- } else {
- //FileUtil.deleteFile(zipFile);
- ParseFileTools.move2Dir(zipFile, "success");
- }
- return list;
- } else {
- BaseLogUtil.addMsgLog(BUSINESSTYPE_FTPMHE, zipName, "", BaseLogUtil.SOURCE_OTHER, BaseLogUtil.TO_SYS_OTHER, BaseLogUtil.CHANNEL_FTP, BaseLogUtil.STATUS_FAIL, "", "解压失败,压缩包为非法格式");
- ParseFileTools.move2Dir(zipFile, "fail");
- }
- return null;
- }
- /**
- * 此方法描述的是:获得解压临时目录
- */
- public static String getDownUnpackDir() {
- return getURI(WmsFEUtil.getSysConfigValue("camel.ftp.download.zip.unpack.dir"));
- }
- /**
- * 此方法描述的是:获得文件路径
- */
- public static String getURI(String fileURI) {
- String uri = fileURI.replaceFirst("file://", "");
- int lastParam = uri.lastIndexOf("?");
- if (lastParam != -1) {
- uri = uri.substring(0, lastParam);
- }
- return uri;
- }
- }
- public class ZipUtil {
- /**
- * 此方法描述的是:ZIP压缩
- * @param source 源文件
- * @param dest 压缩文件
- * @version: 2015年3月2日 上午9:17:26
- */
- public static String zip(File source, File dest) {
- ZipOutputStream out = null;
- BufferedOutputStream bo = null;
- try {
- File zipParent = dest.getParentFile();
- if (!zipParent.exists()) {
- zipParent.mkdirs();
- }
- out = new ZipOutputStream(new FileOutputStream(dest));
- bo = new BufferedOutputStream(out);
- zip(out, source, source.getName(), bo);
- return dest.getAbsolutePath();
- } catch(Exception e) {
- OutUtil.error(e, e.getMessage());
- } finally {
- if (bo != null) {
- try {
- bo.close();
- } catch(IOException e) {}
- }
- if (out != null) {
- try {
- out.close(); // 输出流关闭
- } catch(IOException e) {}
- }
- }
- return null;
- }
- public static void zip(ZipOutputStream out, File f, String base, BufferedOutputStream bo) throws Exception { // 方法重载
- out.putNextEntry(new ZipEntry(base)); // 创建zip压缩进入点base
- FileInputStream in =new FileInputStream(f);
- BufferedInputStream bi = new BufferedInputStream( in );
- int b;
- try {
- while ((b = bi.read()) != -1) {
- bo.write(b); // 将字节流写入当前zip目录
- }
- } finally {
- bi.close(); in .close(); // 输入流关闭
- bo.close();
- }
- }
- /** 解压缩(压缩文件中包含多个文件)可代替上面的方法使用。
- * ZipInputStream类
- * 当我们需要解压缩多个文件的时候,ZipEntry就无法使用了,
- * 如果想操作更加复杂的压缩文件,我们就必须使用ZipInputStream类
- * */
- public static List < File > unzip(File sourceFile, String outPath, String fileNameRegexp) {
- List < File > listDestFile = new ArrayList < File > ();
- File outFile = null;
- ZipFile zipFile = null;
- FileInputStream sourceInput = null;
- ZipInputStream zipInput = null;
- ZipEntry entry = null;
- InputStream input = null;
- OutputStream output = null;
- String sourceName = sourceFile.getName();
- try {
- zipFile = new ZipFile(sourceFile);
- sourceInput = new FileInputStream(sourceFile);
- zipInput = new ZipInputStream(sourceInput);
- while ((entry = zipInput.getNextEntry()) != null) {
- String entryName = entry.getName();
- if (!Pattern.matches(fileNameRegexp, entryName)) {
- continue;
- }
- outFile = new File(outPath + File.separator + sourceName + "-" + entryName);
- if (!outFile.getParentFile().exists()) {
- outFile.getParentFile().mkdir();
- }
- if (!outFile.exists()) {
- outFile.createNewFile();
- }
- input = zipFile.getInputStream(entry);
- output = new FileOutputStream(outFile);
- int temp = 0;
- while ((temp = input.read()) != -1) {
- output.write(temp);
- }
- input.close();
- output.close();
- listDestFile.add(outFile);
- }
- return listDestFile;
- } catch(Exception e) {
- e.printStackTrace();
- } finally {
- try {
- if (input != null) input.close();
- if (output != null) output.close();
- if (zipInput != null) zipInput.close();
- if (sourceInput != null) sourceInput.close();
- if (zipFile != null) zipFile.close();
- } catch(IOException e) {}
- }
- return null;
- }
- /**
- * 此方法描述的是:获得ZIP文件同名解压目录
- * @version: 2015年3月5日 下午2:10:41
- */
- public static String getUnpackForder(File zipFile) {
- String filePath = zipFile.getAbsolutePath();
- return filePath.substring(0, filePath.lastIndexOf("."));
- }
- /**
- * 此方法描述的是:获得ZIP文件指定解压目录
- * @version: 2015年3月6日 下午10:28:36
- */
- public static String getUnpackForder(File zipFile, String subDir) {
- return zipFile.getParent() + File.separator + subDir;
- }
- }
基本上就是上面这些,贴出的代码只具有参考意义。最后目前还有一个问题,在解压和解析处理的时候,我做了备份,成功的会移到succ文件夹,失败的会移到fail文件夹,但是.camel默认生成的文件夹虽然没什么用还是一直存在,不知道该如何禁止这个文件夹的生成。还请玩过Camel的大牛指导下。谢谢。
参考:
http://camel.apache.org
http://camel.apache.org/ftp.html
http://blog.csdn.net/column/details/sys-communication.html
来源: http://www.cnblogs.com/laoyeye/p/7491873.html