| 
 
| 
| 
| | « | October 2025 | » |  | 日 | 一 | 二 | 三 | 四 | 五 | 六 |  |  |  |  | 1 | 2 | 3 | 4 |  5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 | 19 | 20 | 21 | 22 | 23 | 24 | 25 | 26 | 27 | 28 | 29 | 30 | 31 |  | |  | 
 |  公告
 |  
| 本博客在此声明所有文章均为转摘,只做资料收集使用。并无其他商业用途。 |  
 
 
 
 
 
 
| Blog信息 |  
| blog名称:日志总数:210
 评论数量:205
 留言数量:-19
 访问次数:925423
 建立时间:2007年5月10日
 |   
 
 |  | 
 
|  [Apache Mina]转:Apache Mina使用手记(四) 文章收藏,  网上资源,  软件技术,  电脑与网络
 李小白 发表于 2009/8/21 16:18:57  |  
| 
| 上一篇中,我们介绍了如何在mina中编写自己的日志过滤器,这一篇我们自己实现一个编解器。
实际应用当,很多应用系统应用的都不是标准的web service或XML等,比如象中国移动/联通/电信的短信网关程序,都有自己不同的协议实现,并且都是基于TCP/IP的字节流。Mina自带的编解码器实现了TextLineEncoder和TextLineDecoder,可以进行按行的字符串处理,对于象短信网关程序,就要自己实现编解码过滤器了。
我们定义一个简单的基于TCP/IP字节流的协议,实现在客户端和服务端之间的数据包传输。数据包MyProtocalPack有消息头和消息体组成,消息头包括:length(消息包的总长度,数据类型int),flag(消息包标志位,数据类型byte),消息体content是一个字符串,实际实现的时候按byte流处理。源代码如下:
view plaincopy to clipboardprint?
package com.gftech.mytool.mina;  
import com.gftech.util.GFCommon;  
public class MyProtocalPack {  
    private int length;  
    private byte flag;  
    private String content;  
      
    public MyProtocalPack(){  
          
    }  
      
    public MyProtocalPack(byte flag,String content){  
        this.flag=flag;  
        this.content=content;  
        int len1=content==null?0:content.getBytes().length;  
        this.length=5+len1;  
    }  
      
    public MyProtocalPack(byte[] bs){  
        if(bs!=null && bs.length>=5){  
            length=GFCommon.bytes2int(GFCommon.bytesCopy(bs, 0, 4));  
            flag=bs[4];  
            content=new String(GFCommon.bytesCopy(bs, 5, length-5));  
        }  
    }  
      
    public int getLength() {  
        return length;  
    }  
    public void setLength(int length) {  
        this.length = length;  
    }  
    public byte getFlag() {  
        return flag;  
    }  
    public void setFlag(byte flag) {  
        this.flag = flag;  
    }  
    public String getContent() {  
        return content;  
    }  
    public void setContent(String content) {  
        this.content = content;  
    }  
      
    public String toString(){  
        StringBuffer sb=new StringBuffer();  
        sb.append(" Len:").append(length);  
        sb.append(" flag:").append(flag);  
        sb.append(" content:").append(content);  
        return sb.toString();  
    }  
}  
回过头来,我们先看一下在MinaTimeServer中,如何使用一个文本的编解码过滤器,它是在过滤器链中添加了一个叫ProtocalCodecFilter的类,其中它调用 了一个工厂方法TextLineCodecFactory的工厂类,创建具休的TextLineEncoder和TextLineDecoder编码和解 码器。我们看一下具体的源代码:
view plaincopy to clipboardprint?
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("GBK"))));  
view plaincopy to clipboardprint?
package org.apache.mina.filter.codec.textline;  
import java.nio.charset.Charset;  
import org.apache.mina.core.buffer.BufferDataException;  
import org.apache.mina.core.session.IoSession;  
import org.apache.mina.filter.codec.ProtocolCodecFactory;  
import org.apache.mina.filter.codec.ProtocolDecoder;  
import org.apache.mina.filter.codec.ProtocolEncoder;  
/** 
 * A {@link ProtocolCodecFactory} that performs encoding and decoding between 
 * a text line data and a Java string object.  This codec is useful especially 
 * when you work with a text-based protocols such as SMTP and IMAP. 
 * 
 * @author The Apache MINA Project (dev@mina.apache.org) 
 * @version $Rev$, $Date$ 
 */  
public class TextLineCodecFactory implements ProtocolCodecFactory {  
    private final TextLineEncoder encoder;  
    private final TextLineDecoder decoder;  
    /** 
     * Creates a new instance with the current default {@link Charset}. 
     */  
    public TextLineCodecFactory() {  
        this(Charset.defaultCharset());  
    }  
    /** 
     * Creates a new instance with the specified {@link Charset}.  The 
     * encoder uses a UNIX {@link LineDelimiter} and the decoder uses 
     * the AUTO {@link LineDelimiter}. 
     * 
     * @param charset 
     *  The charset to use in the encoding and decoding 
     */  
    public TextLineCodecFactory(Charset charset) {  
        encoder = new TextLineEncoder(charset, LineDelimiter.UNIX);  
        decoder = new TextLineDecoder(charset, LineDelimiter.AUTO);  
    }  
    /** 
     * Creates a new instance of TextLineCodecFactory.  This constructor 
     * provides more flexibility for the developer. 
     * 
     * @param charset 
     *  The charset to use in the encoding and decoding 
     * @param encodingDelimiter 
     *  The line delimeter for the encoder 
     * @param decodingDelimiter 
     *  The line delimeter for the decoder 
     */  
    public TextLineCodecFactory(Charset charset,  
            String encodingDelimiter, String decodingDelimiter) {  
        encoder = new TextLineEncoder(charset, encodingDelimiter);  
        decoder = new TextLineDecoder(charset, decodingDelimiter);  
    }  
    /** 
     * Creates a new instance of TextLineCodecFactory.  This constructor 
     * provides more flexibility for the developer. 
     * 
     * @param charset 
     *  The charset to use in the encoding and decoding 
     * @param encodingDelimiter 
     *  The line delimeter for the encoder 
     * @param decodingDelimiter 
     *  The line delimeter for the decoder 
     */  
    public TextLineCodecFactory(Charset charset,  
            LineDelimiter encodingDelimiter, LineDelimiter decodingDelimiter) {  
        encoder = new TextLineEncoder(charset, encodingDelimiter);  
        decoder = new TextLineDecoder(charset, decodingDelimiter);  
    }  
    public ProtocolEncoder getEncoder(IoSession session) {  
        return encoder;  
    }  
    public ProtocolDecoder getDecoder(IoSession session) {  
        return decoder;  
    }  
       /** 
     * Returns the allowed maximum size of the encoded line. 
     * If the size of the encoded line exceeds this value, the encoder 
     * will throw a {@link IllegalArgumentException}.  The default value 
     * is {@link Integer#MAX_VALUE}. 
     * <p> 
     * This method does the same job with {@link TextLineEncoder#getMaxLineLength()}. 
     */  
    public int getEncoderMaxLineLength() {  
        return encoder.getMaxLineLength();  
    }  
    /** 
     * Sets the allowed maximum size of the encoded line. 
     * If the size of the encoded line exceeds this value, the encoder 
     * will throw a {@link IllegalArgumentException}.  The default value 
     * is {@link Integer#MAX_VALUE}. 
     * <p> 
     * This method does the same job with {@link TextLineEncoder#setMaxLineLength(int)}. 
     */  
    public void setEncoderMaxLineLength(int maxLineLength) {  
        encoder.setMaxLineLength(maxLineLength);  
    }  
    /** 
     * Returns the allowed maximum size of the line to be decoded. 
     * If the size of the line to be decoded exceeds this value, the 
     * decoder will throw a {@link BufferDataException}.  The default 
     * value is <tt>1024</tt> (1KB). 
     * <p> 
     * This method does the same job with {@link TextLineDecoder#getMaxLineLength()}. 
     */  
    public int getDecoderMaxLineLength() {  
        return decoder.getMaxLineLength();  
    }  
    /** 
     * Sets the allowed maximum size of the line to be decoded. 
     * If the size of the line to be decoded exceeds this value, the 
     * decoder will throw a {@link BufferDataException}.  The default 
     * value is <tt>1024</tt> (1KB). 
     * <p> 
     * This method does the same job with {@link TextLineDecoder#setMaxLineLength(int)}. 
     */  
    public void setDecoderMaxLineLength(int maxLineLength) {  
        decoder.setMaxLineLength(maxLineLength);  
    }  
}  
TextLineFactory实现了ProtocalCodecFactory接口,该接口主要有一个编码的方法getEncoder()和一个解码的方法getDecoder():
view plaincopy to clipboardprint?
package org.apache.mina.filter.codec;  
import org.apache.mina.core.session.IoSession;  
/** 
 * Provides {@link ProtocolEncoder} and {@link ProtocolDecoder} which translates 
 * binary or protocol specific data into message object and vice versa. 
 * <p> 
 * Please refer to 
 * <a href="../../../../../xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html" mce_href="xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html"><code>ReverserProtocolProvider</code></a> 
 * example. 
 * 
 * @author The Apache MINA Project (dev@mina.apache.org) 
 * @version $Rev$, $Date$ 
 */  
public interface ProtocolCodecFactory {  
    /** 
     * Returns a new (or reusable) instance of {@link ProtocolEncoder} which 
     * encodes message objects into binary or protocol-specific data. 
     */  
    ProtocolEncoder getEncoder(IoSession session) throws Exception;  
    /** 
     * Returns a new (or reusable) instance of {@link ProtocolDecoder} which 
     * decodes binary or protocol-specific data into message objects. 
     */  
    ProtocolDecoder getDecoder(IoSession session) throws Exception;  
}  
我们主要是仿照TextLineEncoder实现其中的encode()方法,仿照TextLineDecoder实现其中的decode()即可,它们分别实现了ProtocalEncoder和ProtocalDecoder接口。我们要编写三个类分别是:MyProtocalCodecFactory,MyProtocalEncoder,MyProtocalDecoder对应TextLineCodecFactory,TextLineEncoder,TextLineDecoder。
view plaincopy to clipboardprint?
package com.gftech.mytool.mina;  
import java.nio.charset.Charset;  
import org.apache.mina.core.session.IoSession;  
import org.apache.mina.filter.codec.ProtocolCodecFactory;  
import org.apache.mina.filter.codec.ProtocolDecoder;  
import org.apache.mina.filter.codec.ProtocolEncoder;  
public class MyProtocalCodecFactory   implements ProtocolCodecFactory {  
        private final MyProtocalEncoder encoder;  
        private final MyProtocalDecoder decoder;  
          
        public MyProtocalCodecFactory(Charset charset) {  
            encoder=new MyProtocalEncoder(charset);  
            decoder=new MyProtocalDecoder(charset);  
        }  
           
        public ProtocolEncoder getEncoder(IoSession session) {  
            return encoder;  
        }  
        public ProtocolDecoder getDecoder(IoSession session) {  
            return decoder;  
        }  
          
}  
view plaincopy to clipboardprint?
package com.gftech.mytool.mina;  
import java.nio.charset.Charset;  
import org.apache.mina.core.buffer.IoBuffer;  
import org.apache.mina.core.session.IoSession;  
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;  
import org.apache.mina.filter.codec.ProtocolEncoderOutput;  
public class MyProtocalEncoder extends ProtocolEncoderAdapter {  
    private final Charset charset;  
    public MyProtocalEncoder(Charset charset) {  
        this.charset = charset;  
    }  
    //在此处实现对MyProtocalPack包的编码工作,并把它写入输出流中  
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {  
        MyProtocalPack value = (MyProtocalPack) message;  
        IoBuffer buf = IoBuffer.allocate(value.getLength());  
        buf.setAutoExpand(true);  
        buf.putInt(value.getLength());  
        buf.put(value.getFlag());  
        if (value.getContent() != null)  
            buf.put(value.getContent().getBytes());  
        buf.flip();  
        out.write(buf);  
    }  
    public void dispose() throws Exception {  
    }  
}  
view plaincopy to clipboardprint?
package com.gftech.mytool.mina;  
import java.nio.charset.Charset;  
import java.nio.charset.CharsetDecoder;  
import org.apache.mina.core.buffer.IoBuffer;  
import org.apache.mina.core.session.AttributeKey;  
import org.apache.mina.core.session.IoSession;  
import org.apache.mina.filter.codec.ProtocolDecoder;  
import org.apache.mina.filter.codec.ProtocolDecoderOutput;  
public class MyProtocalDecoder implements ProtocolDecoder {  
    private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context");  
    private final Charset charset;  
    private int maxPackLength = 100;  
    public MyProtocalDecoder() {  
        this(Charset.defaultCharset());  
    }  
    public MyProtocalDecoder(Charset charset) {  
        this.charset = charset;  
    }  
    public int getMaxLineLength() {  
        return maxPackLength;  
    }  
    public void setMaxLineLength(int maxLineLength) {  
        if (maxLineLength <= 0) {  
            throw new IllegalArgumentException("maxLineLength: " + maxLineLength);  
        }  
        this.maxPackLength = maxLineLength;  
    }  
    private Context getContext(IoSession session) {  
        Context ctx;  
        ctx = (Context) session.getAttribute(CONTEXT);  
        if (ctx == null) {  
            ctx = new Context();  
            session.setAttribute(CONTEXT, ctx);   
        }   
        return ctx;  
    }  
    public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {  
        final int packHeadLength = 5;  
        //先获取上次的处理上下文,其中可能有未处理完的数据  
        Context ctx = getContext(session);  
        // 先把当前buffer中的数据追加到Context的buffer当中   
        ctx.append(in);   
        //把position指向0位置,把limit指向原来的position位置  
        IoBuffer buf = ctx.getBuffer();  
        buf.flip();   
        // 然后按数据包的协议进行读取  
        while (buf.remaining() >= packHeadLength) {  
            buf.mark();  
            // 读取消息头部分  
            int length = buf.getInt();  
            byte flag = buf.get();  
            //检查读取的包头是否正常,不正常的话清空buffer  
            if (length<0 ||length > maxPackLength) {  
                buf.clear();   
                break;  
            }   
            //读取正常的消息包,并写入输出流中,以便IoHandler进行处理  
            else if (length >= packHeadLength && length - packHeadLength <= buf.remaining()) {  
                int oldLimit2 = buf.limit();  
                buf.limit(buf.position() + length - packHeadLength);  
                String content = buf.getString(ctx.getDecoder());  
                buf.limit(oldLimit2);  
                MyProtocalPack pack = new MyProtocalPack(flag, content);  
                out.write(pack);  
            } else {  
                // 如果消息包不完整  
                // 将指针重新移动消息头的起始位置   
                buf.reset();   
                break;  
            }  
        }  
        if (buf.hasRemaining()) {  
            // 将数据移到buffer的最前面   
                IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(true);  
                temp.put(buf);  
                temp.flip();  
                buf.clear();  
                buf.put(temp);  
                   
        } else {// 如果数据已经处理完毕,进行清空  
            buf.clear();   
        }  
          
          
    }  
    public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {  
    }  
    public void dispose(IoSession session) throws Exception {   
        Context ctx = (Context) session.getAttribute(CONTEXT);  
        if (ctx != null) {  
            session.removeAttribute(CONTEXT);  
        }  
    }  
    //记录上下文,因为数据触发没有规模,很可能只收到数据包的一半  
    //所以,需要上下文拼起来才能完整的处理  
    private class Context {  
        private final CharsetDecoder decoder;  
        private IoBuffer buf;  
        private int matchCount = 0;  
        private int overflowPosition = 0;  
        private Context() {  
            decoder = charset.newDecoder();  
            buf = IoBuffer.allocate(80).setAutoExpand(true);  
        }  
        public CharsetDecoder getDecoder() {  
            return decoder;  
        }  
        public IoBuffer getBuffer() {  
            return buf;  
        }  
        public int getOverflowPosition() {  
            return overflowPosition;  
        }  
        public int getMatchCount() {  
            return matchCount;  
        }  
        public void setMatchCount(int matchCount) {  
            this.matchCount = matchCount;  
        }  
        public void reset() {  
            overflowPosition = 0;  
            matchCount = 0;  
            decoder.reset();  
        }  
        public void append(IoBuffer in) {   
            getBuffer().put(in);  
            
        }  
   
    }  
}  
在MyProtocalServer中,添加自己实现的Log4jFilter和编解码过滤器:
view plaincopy to clipboardprint?
package com.gftech.mytool.mina;  
import java.io.IOException;  
import java.net.InetSocketAddress;  
import java.nio.charset.Charset;  
import org.apache.log4j.Logger;  
import org.apache.log4j.PropertyConfigurator;  
import org.apache.mina.core.service.IoAcceptor;  
import org.apache.mina.core.service.IoHandlerAdapter;  
import org.apache.mina.core.session.IdleStatus;  
import org.apache.mina.core.session.IoSession;  
import org.apache.mina.filter.codec.ProtocolCodecFilter;  
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;  
public class MyProtocalServer {  
    private static final int PORT = 2500;  
    static Logger logger = Logger.getLogger(MyProtocalServer.class);  
    public static void main(String[] args) throws IOException {  
        PropertyConfigurator.configure("conf\\log4j.properties");  
        IoAcceptor acceptor = new NioSocketAcceptor();  
        Log4jFilter lf = new Log4jFilter(logger);  
        acceptor.getFilterChain().addLast("logger", lf);  
      
        acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalCodecFactory(Charset.forName("GBK"))));  
        acceptor.getSessionConfig().setReadBufferSize(1024);  
        acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);  
        acceptor.setHandler(new MyHandler());  
        acceptor.bind(new InetSocketAddress(PORT));  
        System.out.println("start server ...");  
    }  
}  
class MyHandler extends IoHandlerAdapter {  
    static Logger logger = Logger.getLogger(MyHandler.class);  
    @Override  
    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {  
        cause.printStackTrace();  
    }  
    @Override  
    public void messageReceived(IoSession session, Object message) throws Exception {  
        MyProtocalPack pack=(MyProtocalPack)message;  
        logger.debug("Rec:" + pack);  
    }  
    @Override  
    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {  
        logger.debug("IDLE " + session.getIdleCount(status));  
    }  
}  
编写一个客户端程序进行测试:
view plaincopy to clipboardprint?
package com.gftech.mytool.mina;  
import java.io.DataOutputStream;  
import java.net.Socket;  
public class MyProtocalClient {  
      
    public static void main(String[] args) {  
        try {  
            Socket socket = new Socket("127.0.0.1", 2500);  
            DataOutputStream out =  new DataOutputStream( socket.getOutputStream() ) ;  
            for (int i = 0; i < 1000; i++) {  
                MyProtocalPack pack=new MyProtocalPack((byte)i,i+"测试MyProtocalaaaaaaaaaaaaaa");  
                out.writeInt(pack.getLength());  
                out.write(pack.getFlag());  
                out.write(pack.getContent().getBytes());  
                out.flush();  
                System.out.println(i + " sended");  
            }  
            Thread.sleep(1000 );  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
}  
也可以用IoConnector实现自己的客户端:
view plaincopy to clipboardprint?
package com.gftech.mytool.mina;  
import java.io.IOException;  
import java.net.InetSocketAddress;  
import java.nio.charset.Charset;  
import org.apache.mina.core.future.ConnectFuture;  
import org.apache.mina.core.future.IoFutureListener;  
import org.apache.mina.core.service.IoConnector;  
import org.apache.mina.core.service.IoHandlerAdapter;  
import org.apache.mina.core.session.IdleStatus;  
import org.apache.mina.core.session.IoSession;  
import org.apache.mina.filter.codec.ProtocolCodecFilter;  
import org.apache.mina.transport.socket.nio.NioSocketConnector;  
public class MyProtocalClient2 {  
    private static final String HOST = "192.168.10.8";  
    private static final int PORT = 2500;  
    static long counter = 0;  
    final static int FC1 = 100;  
    static long start = 0;  
    /** 
     * 使用Mina的框架结构进行测试 
     *  
     * @param args 
     */  
    public static void main(String[] args) throws IOException {  
        start = System.currentTimeMillis();  
        IoConnector connector = new NioSocketConnector();  
        connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalCodecFactory(Charset.forName("GBK"))));  
        connector.setHandler(new TimeClientHandler2());  
        connector.getSessionConfig().setReadBufferSize(100);  
        connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);   
        ConnectFuture connFuture = connector.connect(new InetSocketAddress(HOST, PORT));  
        connFuture.addListener(new IoFutureListener<ConnectFuture>() {  
            public void operationComplete(ConnectFuture future) {  
                try {  
                    if (future.isConnected()) {  
                        IoSession session = future.getSession();   
                        sendData(session);    
                    } else {  
                        System.out.println("连接不存在 ");  
                    }  
                } catch (Exception e) {  
                    e.printStackTrace();  
                }  
            }  
        });  
        System.out.println("start client ...");  
    }  
    public static void sendData(IoSession session) throws IOException {  
        for (int i = 0; i < FC1; i++) {  
            String content = "afdjkdafk张新波测试" + i;  
            MyProtocalPack pack = new MyProtocalPack((byte) i, content);  
            session.write(pack);  
            System.out.println("send data:" + pack);  
        }  
    }  
}  
class TimeClientHandler2 extends IoHandlerAdapter {  
    @Override  
    public void sessionOpened(IoSession session) {  
        // Set reader idle time to 10 seconds.  
        // sessionIdle(...) method will be invoked when no data is read  
        // for 10 seconds.  
        session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 60);  
    }  
    @Override  
    public void sessionClosed(IoSession session) {  
        // Print out total number of bytes read from the remote peer.  
        System.err.println("Total " + session.getReadBytes() + " byte(s)");  
    }  
    @Override  
    public void sessionIdle(IoSession session, IdleStatus status) {  
        // Close the connection if reader is idle.  
        if (status == IdleStatus.READER_IDLE) {  
            session.close(true);  
        }  
    }  
    @Override  
    public void messageReceived(IoSession session, Object message) {  
        MyProtocalPack pack = (MyProtocalPack) message;  
        System.out.println("rec:" + pack);  
    }  
} |  
 |  
 
 
 
 |