博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
NIO框架之MINA源代码解析(一):背景
阅读量:6449 次
发布时间:2019-06-23

本文共 4527 字,大约阅读时间需要 15 分钟。



“你们的agent占了好多系统的port。把我们的非常多业务系统都给整死了,给我们造成了非常大的损失。要求你们的相关领导下周过来道歉”   --   来自我们的一个客户。

 怎么可能呢,我们都不相信,我们的agent仅仅占一个port啊!

事实胜过雄辩。经过查证。确实是因为我们的agent占了好多系统的port。我看了一下日志。基本把系统可用的port占完了!

为什么呢?MINA框架私自开的!

因为我们的agent端使用了NIO通信框架MINA,但并没有使用好,造成了这一差点儿毁灭行的灾难。

还是先看代码吧。

/** * 异步发送消息 * @param agent * @param request */public void sendMessageToAgent(Agent agent, HyRequest request) {	IoSession session = null;	IoConnector connector=null;	long startTime = System.currentTimeMillis();	try {		// 创建一个非堵塞的客户端程序		 connector = new NioSocketConnector();		// 设置链接超时时间		connector.setConnectTimeoutMillis(connectTimeoutMillis);		ObjectSerializationCodecFactory objsCodec = new ObjectSerializationCodecFactory();		objsCodec.setDecoderMaxObjectSize(DEFAULTDECODER);		objsCodec.setEncoderMaxObjectSize(DEFAULTDECODER);		ProtocolCodecFilter codecFilter = new ProtocolCodecFilter(				objsCodec);		// 数据转换。编码设置		connector.getFilterChain()				.addLast("codec", codecFilter);		// 消息		connector.setHandler(clientHandler);				SocketAddress socketAddress = new InetSocketAddress(				agent.getIpAddr(), agent.getAgentPort());		ConnectFuture future = connector.connect(socketAddress);		future.awaitUninterruptibly();		session = future.getSession();		String json = mapper.writeValueAsString(request);		session.write(json);				long endTime = System.currentTimeMillis();				logerr.debug("send-time:" + (endTime - startTime));			} catch (Exception e) {		logerr.error("host:" + agent.getIpAddr() + ", AgentPORT:" + agent.getAgentPort()				+ ", 连接异常..."+e.getMessage());		clientHandler.handlerConnectError(agent, request);			}}

public class MinaClientHandler extends IoHandlerAdapter {	// 日志	private Logger log = Logger.getLogger(getClass());		private MinaResponseProcesser minaResponseProcesser;		ObjectMapper mapper=null;		@Override	public void messageReceived(IoSession session, Object message)			throws Exception {		String msg = message.toString();		log.info("receive message from " + session.getRemoteAddress().toString() + ",message:" + message);		if(null == mapper){			 mapper = new ObjectMapper();		}		//请求消息转换为HyResponse对象		HyResponse response = mapper.readValue(msg, HyResponse.class);				String remoteIp= ((InetSocketAddress)session.getRemoteAddress()).getAddress().getHostAddress();				response.setRemoteIp(remoteIp);		HyRequest request = minaResponseProcesser.processResponse(response);		if(request == null){			//关闭当前session			closeSessionByServer(session,response);		}else{			session.write(mapper.writeValueAsString(request));		}	}}

上面的逻辑就是,当要发送一个消息时,创建一个新的connector,并获取一个session发送消息后直接返回,在MinaClientHandler类的messageReceived里面处理接受到的响应数据,并进行业务处理。最后假设不须要再次发送请求,则关闭当前session。

事实上出现本文一開始的问题就是在这里造成的。

在出现我们的agent占用大量port后,我们这边的project人员就迅速定位到了这个问题,并非常快修复了。但修复并不理想,但修复过后的代码。

/** * 异步发送消息 * @param agent * @param request */public void sendMessageToAgent(Agent agent, HyRequest request) {	IoSession session = null;	IoConnector connector=null;	long startTime = System.currentTimeMillis();	try {		// 创建一个非堵塞的客户端程序		 connector = new NioSocketConnector();		// 设置链接超时时间		connector.setConnectTimeoutMillis(connectTimeoutMillis);		ObjectSerializationCodecFactory objsCodec = new ObjectSerializationCodecFactory();		objsCodec.setDecoderMaxObjectSize(DEFAULTDECODER);		objsCodec.setEncoderMaxObjectSize(DEFAULTDECODER);		ProtocolCodecFilter codecFilter = new ProtocolCodecFilter(				objsCodec);		// 数据转换,编码设置		connector.getFilterChain()				.addLast("codec", codecFilter);		// 消息		connector.setHandler(clientHandler);				SocketAddress socketAddress = new InetSocketAddress(				agent.getIpAddr(), agent.getAgentPort());		ConnectFuture future = connector.connect(socketAddress);		future.awaitUninterruptibly();		session = future.getSession();		String json = mapper.writeValueAsString(request);		session.write(json);		// 等待断开连接		session.getCloseFuture().awaitUninterruptibly();		long endTime = System.currentTimeMillis();				logerr.debug("send-time:" + (endTime - startTime));		//connector.dispose();	} catch (Exception e) {		logerr.error("host:" + agent.getIpAddr() + ", AgentPORT:" + agent.getAgentPort()				+ ", 连接异常..."+e.getMessage());		clientHandler.handlerConnectError(agent, request);			}finally{		if(null!=session){			session.close(true);			session=null;		}		if(null !=connector){			connector.dispose();		}	}}

仅仅改了一个地方。就是在发送完消息后,加了一个等待断开连接语句和finally语句块-关闭session和connector。

尽管不会出现程序占用大量的系统port这个问题。但会造成另外一个问题-当有一个消息队列须要异步调用上面语句发送消息时,有原来的异步(发送完直接返回,相当于高速并发发送)变成伪异步(发送完消息后并等待消息返回处理后返回,相当于顺序处理队列里面的消息)。

上面的改动并非我们想要的结果,但至少修复了占用大量port的问题。

因为怀着想彻底修复这个问题的想法,我想还是深入了解一下MINA源代码吧。

你可能感兴趣的文章
北航 2012 秋季 现代软件工程 团队项目要求
查看>>
获取通讯组属性Get-DistributionGroup
查看>>
"知识管理夏季论坛",免费,欢迎你来!
查看>>
常用DOS命令
查看>>
能上QQ上不了网的解决办法
查看>>
flask + Python3 实现的的API自动化测试平台---- IAPTest接口测试平台
查看>>
【翻译】将Ext JS Grid转换为Excel表格
查看>>
关于人工智能的几个问题
查看>>
个人阅读作业2
查看>>
解决百度上传WebUploader在IE浏览器下点击无反应的问题
查看>>
Oracle常用函数 - 字符函数
查看>>
Linux shell脚本的字符串截取
查看>>
Zendstudio导入项目报错:overlaps the location of another
查看>>
Shell 标准输入、输出和错误
查看>>
Cisco设备配置AAA认证!
查看>>
UDP怎么会返回Connection refused错误
查看>>
上海i虹桥机场点烟器与UNIX哲学
查看>>
3.1-find命令详解
查看>>
清算/报表/日终跑批程序之性能优化案例(一)
查看>>
线上svn快速服务器搭建
查看>>