MINA CodecFilter(编解码过滤器)

本贴最后更新于 2166 天前,其中的信息可能已经事过景迁


本教程尝试揭露为什么以及怎么使用 ProtocolCodecFilter。

为什么使用 ProtocolCodecFilter?

  • TCP 包装所有包的投递在正确的顺序里。但是不保证在发送端的写入操作,导致一次接收端的读取事件。查看 MINA 术语集里的 http://en.wikipedia.org/wiki/IPv4#Fragmentation_and_reassemblyhttp://en.wikipedia.org/wiki/Nagle%27s_algorithm:没有 ProtocolCodecFilter,一次 IoSession.write(Object message)的调用可以导致接收端多次 messageReceived(IoSession session,Object message)事件,以及 IoSession.write(Object message)的多次调用可能导致单个 messageReceived 事件。当客户端和服务端在同一主机上运行(或本地网络)时,可能不会碰到这个问题,但是你的应用应该能够处理这个问题。
  • 大多数的网络应用需要找到当前消息结尾的方法,以及下一个消息的开头。
  • 你可以在你的 IoHandler 里实现这个所有的逻辑,但是添加 ProtocolCodecFilter 将使你的代码更整洁、更易于维护。
  • 它可让你从业务逻辑中分离你的协议逻辑。

怎么用?

你的应用基本上只接受一串字节,并且你需要转换这些字节为消息(高级对象)。

对于拆分字节流为消息,有三个通用的技巧:

  • 使用固定长度的消息;
  • 使用固定长度头,代表内容长度;
  • 使用分隔符,例如很多基于文本的协议在每个消息后面加一个换行符。

在本教程中,我们将使用第一种和第二种方法,因为他们更容易实现。然后我们看一下使用分隔符。

实例

我们将开发一个图形生成服务,阐明怎么实现你自己的协议编解码器(ProtocolEncoder、ProtocolDecoder、以及 ProtocolCodecFactory)。这个协议非常简单。这是请求消息的设计:

| 4 bytes | 4 bytes | 4 bytes |
| width | height | numchars |

  • width:请求的图片宽度(整型)
  • height:请求图片的高度(整型);
  • numchars:产生图形的数量;

服务器响应请求尺寸的两个图片,打印字符串的请求数量。这是响应消息的设计:

| 4 bytes | variable length body | 4 bytes | variable length body |
| length1 | image1 | length2 | image2 |

类的概览需要编码和解码请求和响应:

  • ImageRequest:简单的 POJO,代表请求我们的 ImageServer。
  • ImageRequestEncoder:编码 ImageRequest 对象为特殊协议的数据(客户端使用)
  • ImageRequestDecoder:解码特殊协议的数据为 ImageRequest 对象(服务端使用)
  • ImageResponse:简单的 POJO,代表我们 ImageServer 的响应。
  • ImageResponseEncoder:服务端用于编码 ImageResponse 对象。
  • ImageResponseDecoder:客户端用于解码 ImageResponse 对象。
  • ImageCodecFactory:这个类创建必须的编码器和解码器。

下面是 ImageRequest 类:

public class ImageRequest {
   private int width;
 private int height;
 private int numberOfCharacters;
 public ImageRequest(int width, int height, int numberOfCharacters) {
       this.width = width;
 this.height = height;
 this.numberOfCharacters = numberOfCharacters;
  }
   public int getWidth() {
       return width;
  }
   public int getHeight() {
       return height;
  }
   public int getNumberOfCharacters() {
       return numberOfCharacters;
  }
}

编码通常比解码简单,所以我们从 ImageRequestEncoder 开始:

public class ImageRequestEncoder implements ProtocolEncoder {
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
        ImageRequest request = (ImageRequest) message;
  IoBuffer buffer = IoBuffer.allocate(12, false);
  buffer.putInt(request.getWidth());
  buffer.putInt(request.getHeight());
  buffer.putInt(request.getNumberOfCharacters());
  buffer.flip();
  out.write(buffer);
  }
    public void dispose(IoSession session) throws Exception {
        // nothing to dispose
  }
}

备注:

  • MINA 将会在 IoSession 的 wirte 队列里的所有消息调用 encode 方法。因为我们的客户端只写 ImageRequest 对象,我们可以安全的转换消息为 ImageRequest。
  • 我们从堆里分配一个新 IoBuffer。最好避免使用直接的缓冲区,因为一般堆缓冲区性能更好。
  • 你没有必要释放 buffer,MINA 会为你做。
  • 在 dispose()方法里,你应该释放在指定 session 编码期间获取的所有资源。如果没有东西释放,你可以让你的编码器继承 ProtocolEncoderAdapter。

现在我们看一下解码器。CumulativeProtocolDecoder 非常有助于编写你自己的解码器:它会缓冲所有进来的数据,直到你的编码器决定它可以用它做事。在本案例中,消息有一个固定大小,所以很容易的等待所有数据可用:

public class ImageResponse {
    private BufferedImage image1;
 private BufferedImage image2;
 public ImageResponse(BufferedImage image1, BufferedImage image2) {
        this.image1 = image1;
 this.image2 = image2;
  }
    public BufferedImage getImage1() {
        return image1;
  }
    public BufferedImage getImage2() {
        return image2;
  }
}

备注:

  • 每次解码了一个完整消息,你应该将它写入到 ProtocolDecoderOutput;这些消息将通过过滤器链,最终到达你的 IoHandler.messageReceived 方法。
  • 你不用负责释放 IoBuffer。
  • 没有足够可用数据解码消息时,返回 false 即可。

响应也是一个简单的 POJO:

public class ImageResponse { private BufferedImage image1; private BufferedImage image2; public ImageResponse(BufferedImage image1, BufferedImage image2) { this.image1 = image1; this.image2 = image2; } public BufferedImage getImage1() { return image1; } public BufferedImage getImage2() { return image2; } }

编码响应也很简单:

public class ImageResponseEncoder extends ProtocolEncoderAdapter {
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
        ImageResponse imageResponse = (ImageResponse) message;
 byte[] bytes1 = getBytes(imageResponse.getImage1());
 byte[] bytes2 = getBytes(imageResponse.getImage2());
 int capacity = bytes1.length + bytes2.length + 8;
  IoBuffer buffer = IoBuffer.allocate(capacity, false);
  buffer.setAutoExpand(true);
  buffer.putInt(bytes1.length);
  buffer.put(bytes1);
  buffer.putInt(bytes2.length);
  buffer.put(bytes2);
  buffer.flip();
  out.write(buffer);
  }
    private byte[] getBytes(BufferedImage image) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
  ImageIO.write(image, "PNG", baos);
 return baos.toByteArray();
  }
}

备注:

  • 当不能提前计算 IoBuffer 的长度时,你可以通过调用 buffer.setAutoExpand(true)使用自动扩展的 buffer。

现在我们看一下响应的解码:

public class ImageResponseDecoder extends CumulativeProtocolDecoder {
    private static final String DECODER_STATE_KEY = ImageResponseDecoder.class.getName() + ".STATE";
 public static final int MAX_IMAGE_SIZE = 5 * 1024 * 1024;
 private static class DecoderState {
        BufferedImage image1;
  }
    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
        DecoderState decoderState = (DecoderState) session.getAttribute(DECODER_STATE_KEY);
 if (decoderState == null) {
            decoderState = new DecoderState();
  session.setAttribute(DECODER_STATE_KEY, decoderState);
  }
        if (decoderState.image1 == null) {
            // try to read first image
  if (in.prefixedDataAvailable(4, MAX_IMAGE_SIZE)) {
                decoderState.image1 = readImage(in);
  } else {
                // not enough data available to read first image
  return false;
  }
        }
        if (decoderState.image1 != null) {
            // try to read second image
  if (in.prefixedDataAvailable(4, MAX_IMAGE_SIZE)) {
                BufferedImage image2 = readImage(in);
  ImageResponse imageResponse = new ImageResponse(decoderState.image1, image2);
  out.write(imageResponse);
  decoderState.image1 = null;
 return true;  } else {
                // not enough data available to read second image
  return false;
  }
        }
        return false;
  }
    private BufferedImage readImage(IoBuffer in) throws IOException {
        int length = in.getInt();
 byte[] bytes = new byte[length];
  in.get(bytes);
  ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
 return ImageIO.read(bais);
  }
}

备注:

  • 我们在 session 属性里存储了解码过程的状态。它也将可能在解码器对象自身存储状态,这里有几个缺点:
    • 每个 IoSession 需要自己的解码器实例;
    • MINA 保证对于相同的 IoSession 绝不同时存在一个线程,执行 decode()方法,但是它不保证总是相同的线程。假设第一块数据由线程 1 处理,它来决定不能解码,写一块数据到达时,它可以被另个线程处理。为了避免能见度问题,你必须正确的同步访问这个解码器状态(IoSession 属性存储进 ConcurrenthashMap,所有他们自动的可见其他线程)。
    • 邮件列表里的讨论已经有了结论:在 IoSession 或解码器实例中选择存储状态,是更重要的尝试。确保没有两个线程运行相同 IoSession 的 decode 方法,MINA 需要做一些同步的形式=> 这个同步将确保你不能获得上面所述的能见度问题。
  • IoBuffer.prefixedDataAvailable()非常方便,当你的协议使用长度前缀时;它支持 1、2、4 字节的前缀。
  • 不要忘了重置 decoder 状态,当你解码响应之后。

如果响应由单个图片组成,我们将不需要存储 decoder 状态:

protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
        if (in.prefixedDataAvailable(4)) {
        int length = in.getInt();
 byte[] bytes = new byte[length];
  in.get(bytes);
  ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
  BufferedImage image = ImageIO.read(bais);
  out.write(image);
 return true;  } else {
        return false;
  }
        }
		

现在我们将它粘在一起:

public class ImageCodecFactory implements ProtocolCodecFactory {
   private ProtocolEncoder encoder;
 private ProtocolDecoder decoder;
 public ImageCodecFactory(boolean client) {
       if (client) {
           encoder = new ImageRequestEncoder();
  decoder = new ImageResponseDecoder();
  } else {
           encoder = new ImageResponseEncoder();
  decoder = new ImageRequestDecoder();
  }
   }
   public ProtocolEncoder getEncoder(IoSession ioSession) throws Exception {
       return encoder;
  }
   public ProtocolDecoder getDecoder(IoSession ioSession) throws Exception {
       return decoder;
  }
}

备注:

  • 对于每个新 session,MINA 会访问 ImageCodecFactory 的 encoder 和 decoder。
  • 因为我们的编码器和解码器存储没有会话状态,非常安全的使所有 session 共享单个实例。

这是服务器怎么使用 ProtocolCodecFactory:

public class ImageServer {
    public static final int PORT = 33789;
 public static void main(String[] args) throws IOException {
        ImageServerIoHandler handler = new ImageServerIoHandler();
  NioSocketAcceptor acceptor = new NioSocketAcceptor();
  acceptor.getFilterChain().addLast("protocol", new ProtocolCodecFilter(new ImageCodecFactory(false)));
  acceptor.setLocalAddress(new InetSocketAddress(PORT));
  acceptor.setHandler(handler);
  acceptor.bind();
  System.out.println("server is listenig at port " + PORT);
  }
}

客户端的用法完全相同:

public class ImageClient extends IoHandlerAdapter {
    public static final int CONNECT_TIMEOUT = 3000;
 private String host;
 private int port;
 private SocketConnector connector;
 private IoSession session;
 private ImageListener imageListener;
 public ImageClient(String host, int port, ImageListener imageListener) {
        this.host = host;
 this.port = port;
 this.imageListener = imageListener;
  connector = new NioSocketConnector();
  connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ImageCodecFactory(true)));
  connector.setHandler(this);
  }
    public void messageReceived(IoSession session, Object message) throws Exception {
        ImageResponse response = (ImageResponse) message;
  imageListener.onImages(response.getImage1(), response.getImage2());
  }

出于完备性,我将为服务端 IoHandler 添加代码:

public class ImageServerIoHandler extends IoHandlerAdapter {
   private final static String characters = "mina rocks abcdefghijklmnopqrstuvwxyz0123456789";
 public static final String INDEX_KEY = ImageServerIoHandler.class.getName() + ".INDEX";
 private Logger logger = LoggerFactory.getLogger(this.getClass());
 public void sessionOpened(IoSession session) throws Exception {
       session.setAttribute(INDEX_KEY, 0);
  }
   public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
       IoSessionLogger sessionLogger = IoSessionLogger.getLogger(session, logger);
  sessionLogger.warn(cause.getMessage(), cause);
  }
   public void messageReceived(IoSession session, Object message) throws Exception {
       ImageRequest request = (ImageRequest) message;
  String text1 = generateString(session, request.getNumberOfCharacters());
  String text2 = generateString(session, request.getNumberOfCharacters());
  BufferedImage image1 = createImage(request, text1);
  BufferedImage image2 = createImage(request, text2);
  ImageResponse response = new ImageResponse(image1, image2);
  session.write(response);
  }
   private BufferedImage createImage(ImageRequest request, String text) {
       BufferedImage image = new BufferedImage(request.getWidth(), request.getHeight(), BufferedImage.TYPE_BYTE_INDEXED);
  Graphics graphics = image.createGraphics();
  graphics.setColor(Color.YELLOW);
  graphics.fillRect(0, 0, image.getWidth(), image.getHeight());
  Font serif = new Font("serif", Font.PLAIN, 30);
  graphics.setFont(serif);
  graphics.setColor(Color.BLUE);
  graphics.drawString(text, 10, 50);
 return image;
  }
   private String generateString(IoSession session, int length) {
       Integer index = (Integer) session.getAttribute(INDEX_KEY);
  StringBuffer buffer = new StringBuffer(length);
 while (buffer.length() < length) {
           buffer.append(characters.charAt(index));
  index++;
 if (index >= characters.length()) {
               index = 0;
  }
       }
       session.setAttribute(INDEX_KEY, index);
 return buffer.toString();
  }
}

总结

关于编码和解码的讨论还有很多。我希望本教程已经让你入门了。我将尝试在不久的将来对 DemuxingProtocolCodecFactory 添加一些东西。并且稍后我们还要看下如何使用分隔符代替前缀长度。

  • MINA
    3 引用
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3187 引用 • 8213 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...