简介
Apache Camel 是一个基于规则的路由引擎,即用户可以自定义规则,完成从“来源”到“目标”的工作。
Apache Camel 的路由语法很简单:
from(src).to(dst)
具体示例:
from("file:path").to("ftp://url")
示例一:将文件夹 inbox 中的文件移动到文件夹 outbox
import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; public class MainMoveFileWithCamel { public static void main(String args[]) throws Exception { CamelContext context = new DefaultCamelContext(); context.addRoutes(new RouteBuilder() { public void configure() { from("file:/tmp/inbox?delay=5000").to("file:/tmp/outbox"); } }); context.start(); boolean loop = true; while (loop) { Thread.sleep(25000); } context.stop(); } }
会每 5 秒钟,则/tmp/inbox 中的文件,移动到/tmp/outbox 中。
基本概念
Endpoint
Endpoint 指资源的位置,如上例中的:file:/tmp/inbox?delay=5000
Component
Component 用以提供对某种协议的资源访问的支持,如上例中的 file 即为 component,提供了对本地文件系统访问的支持
单次路由转换
from(src).to(dst)
多次路由转换
from(src).to(dst1).to(dst2)
组合用法:其中的 log, process 都是接口的实现
from().log().process().log().to()
Routing Language
通过 Routing Language,来自定义我们自己的业务逻辑。
Camel 支持三种 Routing Language,分别为
- Java DSL
- Spring XML
- Scala DSL
这里对 2,3 不做介绍,只给出 1 的示例
示例二
自定义 process 的示例
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.file.GenericFile; import org.apache.camel.impl.DefaultCamelContext; public class MainMoveFileWithCamel { public static class FileConvertProcessor implements Processor { @Override public void process(Exchange exchange) throws Exception { try { // 读入数据 InputStream body = exchange.getIn().getBody(InputStream.class); // 中间处理 StringBuilder builder = new StringBuilder(); BufferedReader in = new BufferedReader(new InputStreamReader(body)); String str = in.readLine(); while (str != null) { System.out.println(str); builder.append("Hello " + str + "\n"); str = in.readLine(); } // 修改输出的文件名 GenericFile file = exchange.getProperty("CamelFileExchangeFile", GenericFile.class); exchange.getOut().setHeader(Exchange.FILE_NAME, file.getFileName()); // 写入结果 exchange.getOut().setBody(builder.toString()); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String args[]) throws Exception { CamelContext context = new DefaultCamelContext(); context.addRoutes(new RouteBuilder() { public void configure() { Processor processor = new FileConvertProcessor(); from("file:/tmp/inbox?delay=5000").process(processor).to("file:/tmp/outbox"); } }); context.start(); boolean loop = true; while (loop) { Thread.sleep(25000); } context.stop(); } }
Camel 支持的 components 列表
Camel 支持哪些第三方组件了呢?官方 github 上有一个列表,支持的还是非常广泛的。
这里介绍下对 ES 的支持
- 在 2.11 版本,支持 elasticsearch
- 在 2.19 版本,支持 elasticsearch5.x
官方文档
ES 示例
目前运行有问题.
import java.util.HashMap; import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; public class MainElasticSearchCamel { public static void main(String[] args) throws Exception { CamelContext context = new DefaultCamelContext(); context.addRoutes(new RouteBuilder() { public void configure() { from("direct:index") .to("elasticsearch5://elasticsearch?ip=127.0.0.1&operation=INDEX&indexName=twitter&indexType=tweet"); } }); context.start(); boolean loop = true; // 输入模板 ProducerTemplate template = context.createProducerTemplate(); Map<String, String> map = new HashMap<String, String>(); map.put("content", "test"); String indexId = template.requestBody("direct:index", map, String.class); System.out.println(indexId); while (loop) { Thread.sleep(25000); } context.stop(); } }
参考
- Apache Camel 框架入门示例
- Apache Camel 框架之 HTTP 路由
- Camel 的数据转换,本来想写的,结果原作者已经写过了,就不再写了.
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于