简介
Apache Camel 是一个基于规则的路由引擎,即用户可以自定义规则,完成从“来源”到“目标”的工作。
Apache Camel 的路由语法很简单:
from(src).to(dst)
具体示例:
from("file:path").to("ftp://url")
示例一:将文件夹 inbox 中的文件移动到文件夹 outbox
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
public class MainMoveFileWithCamel {
public static void main(String args[]) throws Exception {
CamelContext context = new DefaultCamelContext();
context.addRoutes(new RouteBuilder() {
public void configure() {
from("file:/tmp/inbox?delay=5000").to("file:/tmp/outbox");
}
});
context.start();
boolean loop = true;
while (loop) {
Thread.sleep(25000);
}
context.stop();
}
}
会每 5 秒钟,则/tmp/inbox 中的文件,移动到/tmp/outbox 中。
基本概念
Endpoint
Endpoint 指资源的位置,如上例中的:file:/tmp/inbox?delay=5000
Component
Component 用以提供对某种协议的资源访问的支持,如上例中的 file 即为 component,提供了对本地文件系统访问的支持
单次路由转换
from(src).to(dst)
多次路由转换
from(src).to(dst1).to(dst2)
组合用法:其中的 log, process 都是接口的实现
from().log().process().log().to()
Routing Language
通过 Routing Language,来自定义我们自己的业务逻辑。
Camel 支持三种 Routing Language,分别为
- Java DSL
- Spring XML
- Scala DSL
这里对 2,3 不做介绍,只给出 1 的示例
示例二
自定义 process 的示例
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.impl.DefaultCamelContext;
public class MainMoveFileWithCamel {
public static class FileConvertProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
try {
// 读入数据
InputStream body = exchange.getIn().getBody(InputStream.class);
// 中间处理
StringBuilder builder = new StringBuilder();
BufferedReader in = new BufferedReader(new InputStreamReader(body));
String str = in.readLine();
while (str != null) {
System.out.println(str);
builder.append("Hello " + str + "\n");
str = in.readLine();
}
// 修改输出的文件名
GenericFile file = exchange.getProperty("CamelFileExchangeFile", GenericFile.class);
exchange.getOut().setHeader(Exchange.FILE_NAME, file.getFileName());
// 写入结果
exchange.getOut().setBody(builder.toString());
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String args[]) throws Exception {
CamelContext context = new DefaultCamelContext();
context.addRoutes(new RouteBuilder() {
public void configure() {
Processor processor = new FileConvertProcessor();
from("file:/tmp/inbox?delay=5000").process(processor).to("file:/tmp/outbox");
}
});
context.start();
boolean loop = true;
while (loop) {
Thread.sleep(25000);
}
context.stop();
}
}
Camel 支持的 components 列表
Camel 支持哪些第三方组件了呢?官方 github 上有一个列表,支持的还是非常广泛的。
这里介绍下对 ES 的支持
- 在 2.11 版本,支持 elasticsearch
- 在 2.19 版本,支持 elasticsearch5.x
官方文档
ES 示例
目前运行有问题.
import java.util.HashMap;
import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
public class MainElasticSearchCamel {
public static void main(String[] args) throws Exception {
CamelContext context = new DefaultCamelContext();
context.addRoutes(new RouteBuilder() {
public void configure() {
from("direct:index")
.to("elasticsearch5://elasticsearch?ip=127.0.0.1&operation=INDEX&indexName=twitter&indexType=tweet");
}
});
context.start();
boolean loop = true;
// 输入模板
ProducerTemplate template = context.createProducerTemplate();
Map<String, String> map = new HashMap<String, String>();
map.put("content", "test");
String indexId = template.requestBody("direct:index", map, String.class);
System.out.println(indexId);
while (loop) {
Thread.sleep(25000);
}
context.stop();
}
}
参考
- Apache Camel 框架入门示例
- Apache Camel 框架之 HTTP 路由
- Camel 的数据转换,本来想写的,结果原作者已经写过了,就不再写了.
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于