为大家分享一个Mina2.0实现TCP网关的例子.例子简单,但是包括的内容还是很全,包括Server,Client,Handler,Decoder,Encoder等常用组件.对于初次学习Mina的同学来说还是很不错的例子. 下面具体看代码. 1. Decoder--解码器 import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoSession; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.mina.filter.codec.demux.MessageDecoder; import org.apache.mina.filter.codec.demux.MessageDecoderResult; public class GatewayDecoder implements MessageDecoder { static final Logger logger = Logger.getLogger( GatewayDecoder.class.getName() ); public GatewayDecoder(){ } /* 自定义消息格式 * 1 消息长度 * 2 消息的总长度(字节数),从第一个字节到最后一个字节。 * 3 车载终端ID * 4 第一个字节是车载终端的厂家编号(从20H~FFH),后四个字节是SIM卡号码(去掉第一个数字1)。 * 5 协议版本号 1 版本号为2 * 6 命令序号 2 从0开始循环累加 * 7 命令ID 1 * 8信息内容 — 不定长 */ public MessageDecoderResult decodable(IoSession session, ByteBuffer in){ in.mark(); byte[] len = new byte[2]; in.get( len ); // 数据包长度 int datalen = this.getNonSign( len[0] ) * 256 + this.getNonSign( len[1] ); in.reset(); if (in.remaining() >= datalen) { // 接收到足够的长度 return MessageDecoderResult.OK; } // 其他的情况认为不完整,需要继续接收数据 return MessageDecoderResult.NEED_DATA; } public MessageDecoderResult decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception{ while (true){ in.mark(); byte[] len = new byte[2]; // 数据包长度 int datalen = 0; if (in.remaining() > 2) { in.get( len ); datalen = this.getNonSign( len[0] ) * 256 + this.getNonSign( len[1] ); in.reset(); } else if (in.remaining() == 0){ return MessageDecoderResult.OK; }else{ return MessageDecoderResult.NEED_DATA; } if (in.remaining() >= datalen){ // 接收到足够的长度 byte[] data = new byte[datalen]; in.get( data ); ByteBuffer bf = ByteBuffer.wrap( data ); out.write( bf ); }else{ return MessageDecoderResult.NEED_DATA; } } } protected int getNonSign(byte Sign) { if (Sign < 0){ return (Sign + 256); } else{ return Sign; } } public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception{ } } 2. Encoder---编码器 import java.util.Collections; import java.util.HashSet; import java.util.Set; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoSession; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import org.apache.mina.filter.codec.demux.MessageEncoder; public class GatewayEncoder implements MessageEncoder{ private static final Set<Class<?>> TYPES; static{ Set<Class<?>> types = new HashSet<Class<?>>(); types.add( ByteBuffer.class ); TYPES = Collections.unmodifiableSet( types ); } public Set<Class<?>> getMessageTypes() { return GatewayEncoder.TYPES; } public GatewayEncoder(){ } public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception{ ByteBuffer m = (ByteBuffer) message; if (m == null){ return; } out.write( m ); } } 3.ProtocolCodecFactory---编码工厂 import org.apache.mina.filter.codec.ProtocolCodecFactory; public class GatewayProtocolCodecFactory extends ProtocolCodecFactory{ public GatewayProtocolCodecFactory(){ super.register( GatewayDecoder.class ); super.register( GatewayEncoder.class ); } } 4.Handler---业务层类 import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.SocketException; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import com.tz.gps.tcp2udp.main.Main; public class TransferProtocolHandler extends IoHandlerAdapter { static Logger logger = Logger.getLogger(TransferProtocolHandler.class); private static DatagramSocket socket; public TransferProtocolHandler() { try { socket = new DatagramSocket(); } catch (SocketException e) { logger.error("创建UDP socket失败" + e.getMessage()); } } @Override public void messageReceived(IoSession arg0, Object arg1) throws Exception { ByteBuffer buffer = (ByteBuffer) arg1; buffer.clear(); byte[] b = buffer.array(); DatagramPacket data = new DatagramPacket(b, b.length, new InetSocketAddress(Main.GATEWAY_IP, Main.GATEWAY_PORT)); socket.send(data); logger.debug(buffer.duplicate()); } @Override public void exceptionCaught(IoSession arg0, Throwable arg1) throws Exception { logger.error("连接:" + arg0.getRemoteAddress() + " 发送数据时发生错误" + arg1.getMessage()); arg1.printStackTrace(); } } 5. Server---服务端监听连接 import java.io.File; import java.net.InetSocketAddress; import org.apache.log4j.Logger; import org.apache.log4j.xml.DOMConfigurator; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.ExecutorThreadModel; import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.SimpleByteBufferAllocator; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.transport.socket.nio.SocketAcceptor; import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.dom4j.Document; import org.dom4j.Node; import org.dom4j.io.SAXReader; import com.tz.gps.tcp2udp.codec.GatewayProtocolCodecFactory; import com.tz.gps.tcp2udp.handler.TransferProtocolHandler; public class Main { static Logger logger = Logger.getLogger(Main.class); private int tcpListenPort; public static String GATEWAY_IP; public static int GATEWAY_PORT; public Main(File configFile) { SAXReader reader = new SAXReader(); try { Document doc = reader.read(configFile); Node rootNode = doc.selectSingleNode("tcp2udp"); tcpListenPort = Integer.parseInt(rootNode .selectSingleNode("Server").valueOf("@ListenPort")); GATEWAY_IP = rootNode.selectSingleNode("Gateway").valueOf("@IP"); GATEWAY_PORT = Integer.parseInt(rootNode.selectSingleNode("Gateway").valueOf("@Port")); } catch (Exception e) { logger.error("读入配置文件错误:gwconfig.xml", e); System.exit(0); } } public static void main(String[] args) { DOMConfigurator.configureAndWatch("log4jcfg.xml", 10000); ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); ByteBuffer.setUseDirectBuffers(false); File configFile = new File("./config.xml"); new Main(configFile).start(); } public void start() { try { IoAcceptor acceptor = new SocketAcceptor(); SocketAcceptorConfig cfg = new SocketAcceptorConfig(); cfg.setReuseAddress(true); cfg.getFilterChain().addLast("protocolFilter", new ProtocolCodecFilter(new GatewayProtocolCodecFactory())); cfg.setThreadModel(ExecutorThreadModel.getInstance("tmPSListen")); acceptor.bind(new InetSocketAddress(tcpListenPort), new TransferProtocolHandler(), cfg); logger.info((new StringBuilder("启动TCP监听:")).append(tcpListenPort).toString()); } catch (Exception e) { logger.error("启动TCP监听时错误。", e); } } } 6.config.xml文件格式 <?xml version="1.0" encoding='UTF-8'?> <tcp2udp > <Server ListenPort="7890" /> <Gateway IP="10.20.1.38" Port="7896"/> </tcp2udp> |
行业聚焦 面试交流 职位推荐 开发视频 技术交流 腾讯微博 新浪微博
友情链接:课课家教育 阿里云 鲜果 W3Cfuns前端网 中国企业家 环球企业家 投资界 传媒梦工场 MSN中文网 Android开发者社区 cnbeta 投资中国网 又拍云存储 美通说传播 IT茶馆 网商在线 商业评论网 TechOrange IT时代周刊 3W创新传媒 开源中国社区 二维工坊 Iconfans 推酷 智能电视网 FreeBuf黑客与极客 财经网 DoNews 凤凰财经 新财富 eoe移动开发者社区 i黑马 网易科技 新浪科技 搜狐IT 创业家 创业邦 腾讯财经 福布斯中文网 天下网商 TechWeb 雷锋网 新浪创业 和讯科技 品途O2O 极客公园 艾瑞网 抽屉新热榜 卖家网 人民网通信频道 拉勾网 创新派 简单云主机
手机版|黑名单|守望者 成才网 在线教育 linux 高级程序设计 C/C++ 大数据
( 蜀ICP备14029946号 )
成都守望者科技有限公司 © 2013-2016 All Rights Reserved