NIO高并发编程

之前http://blog.csdn.net/sunmenggmail/article/details/8638480

已经整理过,这次是2.0版

参考:

http://daizuan.iteye.com/blog/1112909

http://daizuan.iteye.com/blog/1113471

http://www.cnblogs.com/pingh/archive/2013/07/30/3224990.html

http://www.cnblogs.com/ajian005/archive/2012/09/27/2753662.html(相当好,总结了开源框架)

陷阱1:处理事件忘记移除key
在select返回值大于0的情况下,循环处理
Selector.selectedKeys集合,每处理一个必须从Set中移除

复制代码
Iterator<SelectionKey> it=set.iterator();
    While(it.hasNext()){
    SelectionKey key=it.next();
    it.remove(); //切记移除
    „„处理事件
}
复制代码

不移除的后果是本次的就绪的key集合下次会再次返回,导致无限循环,CPU消耗100%

陷阱2:Selector返回的key集合非线程安全

Selector.selectedKeys/keys 返回的集合都是非线程安全的
Selector.selectedKeys返回的可移除
Selector.keys 不可变
对selected keys的处理必须单线程处理或者适当同步

陷阱3:正确注册Channel和更新interest
直接注册不可吗?
channel.register(selector, ops, attachment);
不是不可以,效率问题
至少加两次锁,锁竞争激烈
Channel本身的regLock,竞争几乎没有
Selector内部的key集合,竞争激烈
更好的方式:加入缓冲队列,等待注册,reactor单线程处理

复制代码
If(isReactorThread()){
    channel.register(selector,ops,attachment);
}
else{
    register.offer(newEvent(channel,ops,attachment));
    selector.wakeup();
}
复制代码

同样,SelectionKey.interest(ops)
Linux上会阻塞,需要获取selector内部锁做同步
在win32上不会阻塞
屏蔽平台差异,避免锁的激烈竞争,采用类似注册channel的方式:

复制代码
if (this.isReactorThread()) {
    key.interestOps(key.interestOps() | SelectionKey.OP_READ);
} 
else {
    this.register.offer(new Event(key,SelectionKey.OP_READ));
    selector.wakeup();
}
复制代码

 

陷阱4:正确处理OP_WRITE
OP_WRITE处理不当很容易导致CPU 100%
OP_WRITE触发条件:
前提:interest了OP_WRITE
触发条件:
socket发送缓冲区可写
远端关闭
有错误发生
正确的处理方式:
仅在已经连接的channel上注册
仅在有数据可写的时候才注册
触发之后立即取消注册,否则会继续触发导致循环
处理完成后视情况决定是否继续注册
没有完全写入,继续注册
全部写入,无需注册

陷阱5:正确取消注册channel
SelectableChannel一旦注册将一直有效直到明确取消
怎么取消注册?
channel.close(),内部会调用key.cancel()
key.cancel();
中断channel的读写所在线程引起的channel关闭
但是这样还不够!
key.cancel()仅仅是将key加入cancelledKeys
直到下一次select才真正处理
并且channel的socketfd只有在真正取消注册后才会close(fd)

后果是什么?
服务端,问题不大,select调用频繁
客户端,通常只有一个连接,关闭channel之后,没有调用select就关闭了selector
sockfd没有关闭,停留在CLOSE_WAIT状态
正确的处理方式,取消注册也应当作为事件交给reactor处理,及时wakeup做select
适当的时候调用selector.selectNow()
Netty在超过256连接关闭的时候主动调用一次selectNow

复制代码
static final int CLEANUP_INTERVAL=256;
private boolean cleanUpCancelledKeys()throws IOException{
    if(cancelledKeys>=CLEANUP_INTERVAL){
        cancelledKeys=0;
        selector.selectNow();
        returntrue;
    }
    returnfalse;
}
//channel关闭的时候
channel.socket.close();
cancelledKeys++;
复制代码

陷阱6:同时注册OP_ACCPET和OP_READ,同时注册OP_CONNECT和OP_WRITE
在底层来说,只有两种事件:read和write
Java NIO还引入了OP_ACCEPT和OP_CONNECT
OP_ACCEPT、OP_READ == Read
OP_CONNECT、OP_WRITE == Write
同时注册OP_ACCEPT和OP_READ ,或者同时注册OP_CONNECT和OP_WRITE在不同平台上产生错误的行为,避免这样做!

陷阱7:正确处理connect
SocketChannel.connect方法在非阻塞模式下可能返回false,切记判断返回值
如果是loopback连接,可能直接返回true,表示连接成功
返回false,后续处理
注册channel到selector,监听OP_CONNECT事件
在OP_CONNECT触发后,调用SocketChannel.finishConnect成功后,连接才真正建立
陷阱:
没有判断connect返回值
没有调用finishConnect
在OP_CONNECT触发后,没有移除OP_CONNECT,导致SelectionKey一直处于就绪状态,空耗CPU
OP_CONNECT只能在还没有连接的channel上注册

忠告

尽量不要尝试实现自己的nio框架,除非有经验丰富的工程师
尽量使用经过广泛实践的开源NIO框架Mina、Netty3、xSocket
尽量使用最新稳定版JDK
遇到问题的时候,也许你可以先看下Java的bug database

 

elector自身是线程安全的,而他的key set却不是。在一次选择发生的过程中,对于key的关心事件的修改要等到下一次select的时候才会生效。 另外,key和其代表的channel有可能在任何时候被cancel和close。因此存在于key set中的key并不代表其key是有效的,也不代表其channel是open的。如果key有可能被其他的线程取消或关闭channel,程序必须小 心的同步检查这些条件。

阻塞了的select可以通过调用selector的wakeup方法来唤醒。

http://blog.csdn.NET/cutesource/article/details/6192016

如何正确使用NIO来构架网络服务器一直是最近思考的一个问题,于是乎分析了一下Jetty、Tomcat和Mina有关NIO的源码,发现大伙都基于类似的方式,我感觉这应该算是NIO构架网络服务器的经典模式,并基于这种模式写了个小小网络服务器,压力测试了一下,效果还不错。废话不多说,先看看三者是如何使用NIO的。

Jetty Connector的实现

先看看有关类图:

其中:

SelectChannelConnector负责组装各组件

SelectSet负责侦听客户端请求

SelectChannelEndPoint负责IO的读和写

HttpConnection负责逻辑处理

在整个服务端处理请求的过程可以分为三个阶段,时序图如下所示:

阶段一:监听并建立连接

这一过程主要是启动一个线程负责accept新连接,监听到后分配给相应的SelectSet,分配的策略就是轮询。

阶段二:监听客户端的请求

这一过程主要是启动多个线程(线程数一般为服务器CPU的个数),让SelectSet监听所管辖的channel队列,每个SelectSet维护一个Selector,这个Selector监听队列里所有的channel,一旦有读事件,从线程池里拿线程去做处理请求

阶段三:处理请求

这一过程就是每次客户端请求的数据处理过程,值得注意的是为了不让后端的业务处理阻碍Selector监听新的请求,就多线程来分隔开监听请求和处理请求两个阶段。

由此可以大致总结出Jetty有关NIO使用的模式,如下图所示:

最核心就是把三件不同的事情隔离开,并用不同规模的线程去处理,最大限度地利用NIO的异步和通知特性

下面再来看看Tomcat是如何使用NIO来构架Connector这块

先看看Tomcat Connector这块的类图:

其中:

NioEndpoint负责组装各部件

Acceptor负责监听新连接,并把连接交给Poller

Poller负责监听所管辖的channel队列,并把请求交给SocketProcessor处理

SocketProcessor负责数据处理,并把请求传递给后端业务处理模块

在整个服务端处理请求的过程可以分为三个阶段,时序图如下所示:

阶段一:监听并建立连接

这一阶段主要是Acceptor监听新连接,并轮询取一个Poller ,把连接交付给Poller

阶段二:监听客户端的请求

这一过程主要是让每个Poller监听所管辖的channel队列,select到新请求后交付给SocketProcessor处理

阶段三:处理请求

这一过程就是从多线程执行SocketProcessor,做数据和业务处理

于是乎我们发现抛开具体代码细节,Tomcat和Jetty在NIO的使用方面是非常一致的,采用的模式依然是下图:

Mina框架

最后我们再看看NIO方面最著名的框架Mina,抛开Mina有关session和处理链条等方面的设计,单单挑出前端网络层处理来看,也采用的是与Jetty和Tomcat类似的模式,只不过它做了些简化,它没有隔开请求侦听和请求处理两个阶段,因此,宏观上看它只分为两个阶段。

先看看它的类图:

其中:

SocketAcceptor起线程调用SocketAcceptor.Work负责新连接侦听,并交给SocketIoProcessor处理

SocketIoProcessor起线程调用SocketIoProcessor.Work负责侦听所管辖的channel队列, select到新请求后交给IoFilterChain处理

IoFilterChain组装了mina的处理链条

在整个服务端处理请求的过程可以分为两个阶段,时序图如下所示:

阶段一:监听并建立连接

阶段二:监听并处理客户端的请求

 

总结来看Jetty、tomcat和Mina,我们也大概清楚了该如何基于NIO来构架网络服务器,通过这个提炼出来的模式,我写了个很简单的NIO Server,在保持连接的情况下,可以很轻松的保持6万连接(由于有65535连接限制),并能在负载只有3左右的情况下(4核),承担3到4万的TPS请求(当然做的事情很简单,仅仅是把buffer转化为自定义协议的包,然后再把包转为buffer写到客户端)。因此简单地实践一下可以证明这个模式的有效性,不妨再看看这个图,希望对大伙以后写server有用:

安装这个架构,写了个粗略的版本,以后有机会一定要看看jetty等是怎么优雅的实现的

//server

[java] view plain copy

  1. import java.io.IOException;
  2. import java.net.InetSocketAddress;
  3. import java.net.ServerSocket;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.ServerSocketChannel;
  8. import java.nio.channels.SocketChannel;
  9. import java.nio.charset.Charset;
  10. import java.util.*;
  11. import java.util.concurrent.*;
  12. public class Server {
  13.     private ConcurrentLinkedQueue<SelectionKey> m_conn = new ConcurrentLinkedQueue<SelectionKey>();
  14.     private ConcurrentLinkedQueue<SelectionKey> m_req = new ConcurrentLinkedQueue<SelectionKey>();
  15.     private final int m_processNum = 3;
  16.     private final int m_worksNum = 3;
  17.     private final int m_port = 3562;
  18.     private ServerSocketChannel channel ;
  19.     private boolean connQuEpt = true;
  20.     private boolean reqQuEpt = true;
  21.     private Selector selector;//for connection
  22.     private List<Selector> m_reqSelector = new ArrayList<Selector>();
  23.     public void listen() throws IOException{
  24.         selector = Selector.open();
  25.         channel = ServerSocketChannel.open();
  26.         channel.configureBlocking(false);
  27.         channel.socket().bind(new InetSocketAddress(m_port));
  28.         channel.register(selector, SelectionKey.OP_ACCEPT);
  29.         new Thread(new ConnectionHander()).start();
  30.         //new Thread(new RequestManager()).start();
  31.         creatRequestHanders();
  32.         new Thread(new ProcessManager()).start();
  33.     }
  34.     /*class RequestManager implements Runnable {
  35.         private ExecutorService m_reqPool;
  36.         public RequestManager() {
  37.             m_reqPool = Executors.newFixedThreadPool(m_processNum, new RequestThreadFactor());
  38.         }
  39.         public void run() {
  40.             while (true) {
  41.                 
  42.             }
  43.         }
  44.     }*/
  45.     void creatRequestHanders() {
  46.         try {
  47.         for (int i = 0; i < m_processNum; ++i) {
  48.             Selector slt = Selector.open();
  49.             m_reqSelector.add(slt);
  50.             RequestHander req = new RequestHander();
  51.             req.setSelector(slt);
  52.             new Thread(req).start();
  53.         }
  54.         }
  55.         catch(IOException e) {
  56.             e.printStackTrace();
  57.         }
  58.     }
  59.     class ProcessManager implements Runnable {
  60.         private ExecutorService m_workPool;
  61.         public ProcessManager() {
  62.             m_workPool = Executors.newFixedThreadPool(m_worksNum);
  63.         }
  64.         public void run() {
  65.             SelectionKey key;
  66.             while(true) {
  67.                 //太消耗cpu//应该要加一个wait,但是这样就有锁了
  68.                 while((key = m_req.poll()) !=null) {
  69.                     ProcessRequest preq = new ProcessRequest();
  70.                     preq.setKey(key);
  71.                     m_workPool.execute(preq);
  72.                 }
  73.             }
  74.         }
  75.     }
  76.     /*class RequestThread extends Thread {
  77.         private Selector selector;
  78.         public  RequestThread(Runnable r) {
  79.             super(r);
  80.             try {
  81.             selector = Selector.open();
  82.             }
  83.             catch(IOException e) {
  84.                 e.printStackTrace();
  85.                 //todo
  86.             }
  87.         }
  88.     }
  89.     
  90.     class RequestThreadFactor implements ThreadFactory {
  91.         public Thread newThread(Runnable r) {
  92.             return new RequestThread(r);
  93.         }
  94.     }*/
  95.     //监视请求连接
  96.     class ConnectionHander implements Runnable {
  97.         int idx = 0;
  98.         @Override
  99.         public void run() {
  100.             System.out.println(“listenning to connection”);
  101.             while (true) {
  102.                 try {
  103.                     selector.select();
  104.                     Set<SelectionKey> selectKeys = selector.selectedKeys();
  105.                     Iterator<SelectionKey> it = selectKeys.iterator();
  106.                     while (it.hasNext()) {
  107.                         SelectionKey key = it.next();
  108.                         it.remove();
  109.                         m_conn.add(key);
  110.                         int num = m_reqSelector.size();
  111.                         m_reqSelector.get(idx).wakeup();//防止监听request的进程都在堵塞中
  112.                         idx =(idx + 1)%num;
  113.                     }
  114.                 }
  115.                 catch(IOException e) {
  116.                     e.printStackTrace();
  117.                 }
  118.             }
  119.         }
  120.     }
  121.     //监视读操作
  122.     class RequestHander implements Runnable {
  123.         private Selector selector;
  124.         public void setSelector(Selector slt) {
  125.             selector = slt;
  126.         }
  127.         public void run() {
  128.             try {
  129.             SelectionKey key;
  130.             System.out.println(Thread.currentThread() + “listenning to request”);
  131.             while (true) {
  132.                 selector.select();
  133.                 while((key = m_conn.poll()) != null) {
  134.                     ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
  135.                     SocketChannel sc = ssc.accept();//接受一个连接
  136.                     sc.configureBlocking(false);
  137.                     sc.register(selector, SelectionKey.OP_READ);
  138.                     System.out.println(Thread.currentThread() + “a connected line”);
  139.                 }
  140.                 Set<SelectionKey> keys = selector.selectedKeys();
  141.                 Iterator<SelectionKey> it = keys.iterator();
  142.                 while(it.hasNext()) {
  143.                     SelectionKey keytmp = it.next();
  144.                     it.remove();
  145.                     if (keytmp.isReadable()) {
  146.                         m_req.add(keytmp);
  147.                     }
  148.                 }
  149.             }
  150.             }
  151.             catch(IOException e) {
  152.                 e.printStackTrace();
  153.             }
  154.         }
  155.     }
  156.     //读数据并进行处理和发送返回
  157.     class ProcessRequest implements Runnable {
  158.         SelectionKey key;
  159.         public void setKey(SelectionKey key) {
  160.             this.key = key;
  161.         }
  162.         public void run() {
  163.             ByteBuffer buffer = ByteBuffer.allocate(1024);
  164.             SocketChannel sc = (SocketChannel) key.channel();
  165.             String msg = null;
  166.             try{
  167.                 int readBytes = 0;
  168.                 int ret;
  169.                 try{
  170.                 while((ret = sc.read(buffer)) > 0) {
  171.                 }
  172.                 }
  173.                 catch(IOException e) {
  174.                 }
  175.                 finally {
  176.                     buffer.flip();
  177.                 }
  178.                 if (readBytes > 0) {
  179.                     msg = Charset.forName(“utf-8”).decode(buffer).toString();
  180.                     buffer = null;
  181.                 }
  182.             }
  183.             finally {
  184.                 if(buffer != null)
  185.                     buffer.clear();
  186.             }
  187.             try {
  188.             System.out.println(“server received [ “ + msg +“] from client address : “ + sc.getRemoteAddress());
  189.             Thread.sleep(2000);
  190.             sc.write(ByteBuffer.wrap((msg + ” server response “).getBytes(Charset.forName(“utf-8”))));
  191.             }
  192.             catch(Exception e) {
  193.             }
  194.         }
  195.     }
  196.     public static void main(String[] args) {
  197.         // TODO Auto-generated method stub
  198.         Server server = new Server();
  199.         try {
  200.         server.listen();
  201.         }
  202.         catch(IOException e) {
  203.         }
  204.     }
  205. }

 

//client

[java] view plain copy

  1. package javatest;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.net.ServerSocket;
  5. import java.nio.ByteBuffer;
  6. import java.nio.channels.ClosedChannelException;
  7. import java.nio.channels.SelectionKey;
  8. import java.nio.channels.Selector;
  9. import java.nio.channels.ServerSocketChannel;
  10. import java.nio.channels.SocketChannel;
  11. import java.nio.charset.Charset;
  12. import java.util.*;
  13. public class Client implements Runnable {
  14.     // 空闲计数器,如果空闲超过10次,将检测server是否中断连接.
  15.     private static int idleCounter = 0;
  16.     private Selector selector;
  17.     private SocketChannel socketChannel;
  18.     private ByteBuffer temp = ByteBuffer.allocate(1024);
  19.     public static void main(String[] args) throws IOException {
  20.         Client client= new Client();
  21.         new Thread(client).start();
  22.         //client.sendFirstMsg();
  23.     }
  24.     public Client() throws IOException {
  25.         // 同样的,注册闹钟.
  26.         this.selector = Selector.open();
  27.         // 连接远程server
  28.         socketChannel = SocketChannel.open();
  29.         // 如果快速的建立了连接,返回true.如果没有建立,则返回false,并在连接后出发Connect事件.
  30.         Boolean isConnected = socketChannel.connect(new InetSocketAddress(“localhost”3562));
  31.         socketChannel.configureBlocking(false);
  32.         SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
  33.         if (isConnected) {
  34.             this.sendFirstMsg();
  35.         } else {
  36.             // 如果连接还在尝试中,则注册connect事件的监听. connect成功以后会出发connect事件.
  37.             key.interestOps(SelectionKey.OP_CONNECT);
  38.         }
  39.     }
  40.     public void sendFirstMsg() throws IOException {
  41.         String msg = “Hello NIO.”;
  42.         socketChannel.write(ByteBuffer.wrap(msg.getBytes(Charset.forName(“UTF-8”))));
  43.     }
  44.     @Override
  45.     public void run() {
  46.         while (true) {
  47.             try {
  48.                 // 阻塞,等待事件发生,或者1秒超时. num为发生事件的数量.
  49.                 int num = this.selector.select(1000);
  50.                 if (num ==0) {
  51.                     idleCounter ++;
  52.                     if(idleCounter >10) {
  53.                         // 如果server断开了连接,发送消息将失败.
  54.                         try {
  55.                             this.sendFirstMsg();
  56.                         } catch(ClosedChannelException e) {
  57.                             e.printStackTrace();
  58.                             this.socketChannel.close();
  59.                             return;
  60.                         }
  61.                     }
  62.                     continue;
  63.                 } else {
  64.                     idleCounter = 0;
  65.                 }
  66.                 Set<SelectionKey> keys = this.selector.selectedKeys();
  67.                 Iterator<SelectionKey> it = keys.iterator();
  68.                 while (it.hasNext()) {
  69.                     SelectionKey key = it.next();
  70.                     it.remove();
  71.                     if (key.isConnectable()) {
  72.                         // socket connected
  73.                         SocketChannel sc = (SocketChannel)key.channel();
  74.                         if (sc.isConnectionPending()) {
  75.                             sc.finishConnect();
  76.                         }
  77.                         // send first message;
  78.                         this.sendFirstMsg();
  79.                     }
  80.                     if (key.isReadable()) {
  81.                         // msg received.
  82.                         SocketChannel sc = (SocketChannel)key.channel();
  83.                         this.temp = ByteBuffer.allocate(1024);
  84.                         int count = sc.read(temp);
  85.                         if (count<0) {
  86.                             sc.close();
  87.                             continue;
  88.                         }
  89.                         // 切换buffer到读状态,内部指针归位.
  90.                         temp.flip();
  91.                         String msg = Charset.forName(“UTF-8”).decode(temp).toString();
  92.                         System.out.println(“Client received [“+msg+“] from server address:” + sc.getRemoteAddress());
  93.                         Thread.sleep(1000);
  94.                         // echo back.
  95.                         sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName(“UTF-8”))));
  96.                         // 清空buffer
  97.                         temp.clear();
  98.                     }
  99.                 }
  100.             } catch (IOException e) {
  101.                 e.printStackTrace();
  102.             } catch (InterruptedException e) {
  103.                 e.printStackTrace();
  104.             }
  105.         }
  106.     }
  107. }

发表评论

电子邮件地址不会被公开。 必填项已用*标注