本教程尝试揭露为什么以及怎么使用 ProtocolCodecFilter。
为什么使用 ProtocolCodecFilter?
- TCP 包装所有包的投递在正确的顺序里。但是不保证在发送端的写入操作,导致一次接收端的读取事件。查看 MINA 术语集里的 http://en.wikipedia.org/wiki/IPv4#Fragmentation_and_reassembly 和 http://en.wikipedia.org/wiki/Nagle%27s_algorithm:没有 ProtocolCodecFilter,一次 IoSession.write(Object message)的调用可以导致接收端多次 messageReceived(IoSession session,Object message)事件,以及 IoSession.write(Object message)的多次调用可能导致单个 messageReceived 事件。当客户端和服务端在同一主机上运行(或本地网络)时,可能不会碰到这个问题,但是你的应用应该能够处理这个问题。
- 大多数的网络应用需要找到当前消息结尾的方法,以及下一个消息的开头。
- 你可以在你的 IoHandler 里实现这个所有的逻辑,但是添加 ProtocolCodecFilter 将使你的代码更整洁、更易于维护。
- 它可让你从业务逻辑中分离你的协议逻辑。
怎么用?
你的应用基本上只接受一串字节,并且你需要转换这些字节为消息(高级对象)。
对于拆分字节流为消息,有三个通用的技巧:
- 使用固定长度的消息;
- 使用固定长度头,代表内容长度;
- 使用分隔符,例如很多基于文本的协议在每个消息后面加一个换行符。
在本教程中,我们将使用第一种和第二种方法,因为他们更容易实现。然后我们看一下使用分隔符。
实例
我们将开发一个图形生成服务,阐明怎么实现你自己的协议编解码器(ProtocolEncoder、ProtocolDecoder、以及 ProtocolCodecFactory)。这个协议非常简单。这是请求消息的设计:
| 4 bytes | 4 bytes | 4 bytes |
| width | height | numchars |
- width:请求的图片宽度(整型)
- height:请求图片的高度(整型);
- numchars:产生图形的数量;
服务器响应请求尺寸的两个图片,打印字符串的请求数量。这是响应消息的设计:
| 4 bytes | variable length body | 4 bytes | variable length body |
| length1 | image1 | length2 | image2 |
类的概览需要编码和解码请求和响应:
- ImageRequest:简单的 POJO,代表请求我们的 ImageServer。
- ImageRequestEncoder:编码 ImageRequest 对象为特殊协议的数据(客户端使用)
- ImageRequestDecoder:解码特殊协议的数据为 ImageRequest 对象(服务端使用)
- ImageResponse:简单的 POJO,代表我们 ImageServer 的响应。
- ImageResponseEncoder:服务端用于编码 ImageResponse 对象。
- ImageResponseDecoder:客户端用于解码 ImageResponse 对象。
- ImageCodecFactory:这个类创建必须的编码器和解码器。
下面是 ImageRequest 类:
public class ImageRequest {
private int width;
private int height;
private int numberOfCharacters;
public ImageRequest(int width, int height, int numberOfCharacters) {
this.width = width;
this.height = height;
this.numberOfCharacters = numberOfCharacters;
}
public int getWidth() {
return width;
}
public int getHeight() {
return height;
}
public int getNumberOfCharacters() {
return numberOfCharacters;
}
}
编码通常比解码简单,所以我们从 ImageRequestEncoder 开始:
public class ImageRequestEncoder implements ProtocolEncoder {
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
ImageRequest request = (ImageRequest) message;
IoBuffer buffer = IoBuffer.allocate(12, false);
buffer.putInt(request.getWidth());
buffer.putInt(request.getHeight());
buffer.putInt(request.getNumberOfCharacters());
buffer.flip();
out.write(buffer);
}
public void dispose(IoSession session) throws Exception {
// nothing to dispose
}
}
备注:
- MINA 将会在 IoSession 的 wirte 队列里的所有消息调用 encode 方法。因为我们的客户端只写 ImageRequest 对象,我们可以安全的转换消息为 ImageRequest。
- 我们从堆里分配一个新 IoBuffer。最好避免使用直接的缓冲区,因为一般堆缓冲区性能更好。
- 你没有必要释放 buffer,MINA 会为你做。
- 在 dispose()方法里,你应该释放在指定 session 编码期间获取的所有资源。如果没有东西释放,你可以让你的编码器继承 ProtocolEncoderAdapter。
现在我们看一下解码器。CumulativeProtocolDecoder 非常有助于编写你自己的解码器:它会缓冲所有进来的数据,直到你的编码器决定它可以用它做事。在本案例中,消息有一个固定大小,所以很容易的等待所有数据可用:
public class ImageResponse {
private BufferedImage image1;
private BufferedImage image2;
public ImageResponse(BufferedImage image1, BufferedImage image2) {
this.image1 = image1;
this.image2 = image2;
}
public BufferedImage getImage1() {
return image1;
}
public BufferedImage getImage2() {
return image2;
}
}
备注:
- 每次解码了一个完整消息,你应该将它写入到 ProtocolDecoderOutput;这些消息将通过过滤器链,最终到达你的 IoHandler.messageReceived 方法。
- 你不用负责释放 IoBuffer。
- 没有足够可用数据解码消息时,返回 false 即可。
响应也是一个简单的 POJO:
public class ImageResponse { private BufferedImage image1; private BufferedImage image2; public ImageResponse(BufferedImage image1, BufferedImage image2) { this.image1 = image1; this.image2 = image2; } public BufferedImage getImage1() { return image1; } public BufferedImage getImage2() { return image2; } }
编码响应也很简单:
public class ImageResponseEncoder extends ProtocolEncoderAdapter {
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
ImageResponse imageResponse = (ImageResponse) message;
byte[] bytes1 = getBytes(imageResponse.getImage1());
byte[] bytes2 = getBytes(imageResponse.getImage2());
int capacity = bytes1.length + bytes2.length + 8;
IoBuffer buffer = IoBuffer.allocate(capacity, false);
buffer.setAutoExpand(true);
buffer.putInt(bytes1.length);
buffer.put(bytes1);
buffer.putInt(bytes2.length);
buffer.put(bytes2);
buffer.flip();
out.write(buffer);
}
private byte[] getBytes(BufferedImage image) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ImageIO.write(image, "PNG", baos);
return baos.toByteArray();
}
}
备注:
- 当不能提前计算 IoBuffer 的长度时,你可以通过调用 buffer.setAutoExpand(true)使用自动扩展的 buffer。
现在我们看一下响应的解码:
public class ImageResponseDecoder extends CumulativeProtocolDecoder {
private static final String DECODER_STATE_KEY = ImageResponseDecoder.class.getName() + ".STATE";
public static final int MAX_IMAGE_SIZE = 5 * 1024 * 1024;
private static class DecoderState {
BufferedImage image1;
}
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
DecoderState decoderState = (DecoderState) session.getAttribute(DECODER_STATE_KEY);
if (decoderState == null) {
decoderState = new DecoderState();
session.setAttribute(DECODER_STATE_KEY, decoderState);
}
if (decoderState.image1 == null) {
// try to read first image
if (in.prefixedDataAvailable(4, MAX_IMAGE_SIZE)) {
decoderState.image1 = readImage(in);
} else {
// not enough data available to read first image
return false;
}
}
if (decoderState.image1 != null) {
// try to read second image
if (in.prefixedDataAvailable(4, MAX_IMAGE_SIZE)) {
BufferedImage image2 = readImage(in);
ImageResponse imageResponse = new ImageResponse(decoderState.image1, image2);
out.write(imageResponse);
decoderState.image1 = null;
return true; } else {
// not enough data available to read second image
return false;
}
}
return false;
}
private BufferedImage readImage(IoBuffer in) throws IOException {
int length = in.getInt();
byte[] bytes = new byte[length];
in.get(bytes);
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
return ImageIO.read(bais);
}
}
备注:
- 我们在 session 属性里存储了解码过程的状态。它也将可能在解码器对象自身存储状态,这里有几个缺点:
- 每个 IoSession 需要自己的解码器实例;
- MINA 保证对于相同的 IoSession 绝不同时存在一个线程,执行 decode()方法,但是它不保证总是相同的线程。假设第一块数据由线程 1 处理,它来决定不能解码,写一块数据到达时,它可以被另个线程处理。为了避免能见度问题,你必须正确的同步访问这个解码器状态(IoSession 属性存储进 ConcurrenthashMap,所有他们自动的可见其他线程)。
- 邮件列表里的讨论已经有了结论:在 IoSession 或解码器实例中选择存储状态,是更重要的尝试。确保没有两个线程运行相同 IoSession 的 decode 方法,MINA 需要做一些同步的形式=> 这个同步将确保你不能获得上面所述的能见度问题。
- IoBuffer.prefixedDataAvailable()非常方便,当你的协议使用长度前缀时;它支持 1、2、4 字节的前缀。
- 不要忘了重置 decoder 状态,当你解码响应之后。
如果响应由单个图片组成,我们将不需要存储 decoder 状态:
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
if (in.prefixedDataAvailable(4)) {
int length = in.getInt();
byte[] bytes = new byte[length];
in.get(bytes);
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
BufferedImage image = ImageIO.read(bais);
out.write(image);
return true; } else {
return false;
}
}
现在我们将它粘在一起:
public class ImageCodecFactory implements ProtocolCodecFactory {
private ProtocolEncoder encoder;
private ProtocolDecoder decoder;
public ImageCodecFactory(boolean client) {
if (client) {
encoder = new ImageRequestEncoder();
decoder = new ImageResponseDecoder();
} else {
encoder = new ImageResponseEncoder();
decoder = new ImageRequestDecoder();
}
}
public ProtocolEncoder getEncoder(IoSession ioSession) throws Exception {
return encoder;
}
public ProtocolDecoder getDecoder(IoSession ioSession) throws Exception {
return decoder;
}
}
备注:
- 对于每个新 session,MINA 会访问 ImageCodecFactory 的 encoder 和 decoder。
- 因为我们的编码器和解码器存储没有会话状态,非常安全的使所有 session 共享单个实例。
这是服务器怎么使用 ProtocolCodecFactory:
public class ImageServer {
public static final int PORT = 33789;
public static void main(String[] args) throws IOException {
ImageServerIoHandler handler = new ImageServerIoHandler();
NioSocketAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast("protocol", new ProtocolCodecFilter(new ImageCodecFactory(false)));
acceptor.setLocalAddress(new InetSocketAddress(PORT));
acceptor.setHandler(handler);
acceptor.bind();
System.out.println("server is listenig at port " + PORT);
}
}
客户端的用法完全相同:
public class ImageClient extends IoHandlerAdapter {
public static final int CONNECT_TIMEOUT = 3000;
private String host;
private int port;
private SocketConnector connector;
private IoSession session;
private ImageListener imageListener;
public ImageClient(String host, int port, ImageListener imageListener) {
this.host = host;
this.port = port;
this.imageListener = imageListener;
connector = new NioSocketConnector();
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ImageCodecFactory(true)));
connector.setHandler(this);
}
public void messageReceived(IoSession session, Object message) throws Exception {
ImageResponse response = (ImageResponse) message;
imageListener.onImages(response.getImage1(), response.getImage2());
}
出于完备性,我将为服务端 IoHandler 添加代码:
public class ImageServerIoHandler extends IoHandlerAdapter {
private final static String characters = "mina rocks abcdefghijklmnopqrstuvwxyz0123456789";
public static final String INDEX_KEY = ImageServerIoHandler.class.getName() + ".INDEX";
private Logger logger = LoggerFactory.getLogger(this.getClass());
public void sessionOpened(IoSession session) throws Exception {
session.setAttribute(INDEX_KEY, 0);
}
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
IoSessionLogger sessionLogger = IoSessionLogger.getLogger(session, logger);
sessionLogger.warn(cause.getMessage(), cause);
}
public void messageReceived(IoSession session, Object message) throws Exception {
ImageRequest request = (ImageRequest) message;
String text1 = generateString(session, request.getNumberOfCharacters());
String text2 = generateString(session, request.getNumberOfCharacters());
BufferedImage image1 = createImage(request, text1);
BufferedImage image2 = createImage(request, text2);
ImageResponse response = new ImageResponse(image1, image2);
session.write(response);
}
private BufferedImage createImage(ImageRequest request, String text) {
BufferedImage image = new BufferedImage(request.getWidth(), request.getHeight(), BufferedImage.TYPE_BYTE_INDEXED);
Graphics graphics = image.createGraphics();
graphics.setColor(Color.YELLOW);
graphics.fillRect(0, 0, image.getWidth(), image.getHeight());
Font serif = new Font("serif", Font.PLAIN, 30);
graphics.setFont(serif);
graphics.setColor(Color.BLUE);
graphics.drawString(text, 10, 50);
return image;
}
private String generateString(IoSession session, int length) {
Integer index = (Integer) session.getAttribute(INDEX_KEY);
StringBuffer buffer = new StringBuffer(length);
while (buffer.length() < length) {
buffer.append(characters.charAt(index));
index++;
if (index >= characters.length()) {
index = 0;
}
}
session.setAttribute(INDEX_KEY, index);
return buffer.toString();
}
}
总结
关于编码和解码的讨论还有很多。我希望本教程已经让你入门了。我将尝试在不久的将来对 DemuxingProtocolCodecFactory 添加一些东西。并且稍后我们还要看下如何使用分隔符代替前缀长度。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于