愛悠閑 > JAVA 非阻塞式IO

JAVA 非阻塞式IO

分類: java  |  作者: smithdoudou88 相關  |  發布日期 : 2014-08-07  |  熱度 : 1402°

JAVA 非阻塞式IO 編程學習

使用Java的Socket API編寫一個簡單的TCP Echo Server。其阻塞式IO的處理方式雖然簡單,但每個客戶端都需要一個單獨的Thread來處理,當服務器需要同時處理大量客戶端時,這種做法不再可行。使用NIO API可以讓一個或有限的幾個Thread同時處理連接到服務器上的所有客戶端。

NIO API允許一個線程通過Selector對象同時監控多個SelectableChannel來處理多路IO,NIO應用程序一般按下圖所示工作:

 

Figure 1

 

 

Figure 1 所示,Client一直在循環地進行select操作,每次select()返回以后,通過selectedKeys()可以得到需要處理的SelectableChannel并對其一一處理。 這樣做雖然簡單但也有個問題,當有不同類型的SelectableChannel需要做不同的IO處理時,在圖中Client的代碼就需要判斷channel的類型然后再作相應的操作,這往往意味著一連串的if else。更糟糕的是,每增加一種新的channel,不但需要增加相應的處理代碼,還需要對這一串if else進行維護。(在本文的這個例子中,我們有ServerSocketChannel和SocketChannel這兩種channel需要分別被處理。)

 

如果考慮將channel及其需要的IO處理進行封裝,抽象出一個統一的接口,就可以解決這一問題。在Listing 1中的NioSession就是這個接口。

 

NioSession的channel()方法返回其封裝的SelectableChannel對象,interestOps()返回用于這個channel注冊的interestOps。registered()是當SelectableChannel被注冊后調用的回調函數,通過這個回調函數,NioSession可以得到channel注冊后的SelectionKey。process()函數則是NioSession接口的核心,這個方法抽象了封裝的SelectableChannel所需的IO處理邏輯。

Listing 1:

復制代碼
public   interface  NioSession {

    
public  SelectableChannel channel();

    
public   int  interestOps();

    
public   void  registered(SelectionKey key);

    
public   void  process();   

}
復制代碼

和NioSession一起工作的是NioWorker這個類(Listing 2),它是NioSession的調用者,封裝了一個Selector對象和Figure 1中循環select操作的邏輯。理解這個類可以幫助我們了解該如何使用NioSession這個接口。NioWorker實現了Runnable接口,循環select操作的邏輯就在run()方法中。在NioWorker – NioSession這個框架中,NioSession在channel注冊的時候會被作為attachment送入register函數,這樣,在每次select()操作的循環中,對于selectedKeys()中的每一個SelectionKey,我們都可以通過attachment拿到其相對應的NioSession然后調用其process()方法。 每次select()循環還有一個任務,就是將通過add()方法加入到這個NioWorker的NioSession注冊到Selector上。在Listing 2的代碼中可以看出,NioSession中的channel()被取出并注冊在Selector上,注冊所需的interestOps從NioSession中取出,NioSession本身則作為attachment送入register()函數。注冊成功后,NioSession的registered()回調函數會被調用。 NioWorker的add()方法的作用是將一個NioSession加入到該NioWorker中,并wakeup當前的select操作,這樣在下一次的select()調用之前,這個NioSession會被注冊。stop()方法則是讓一個正在run()的NioWorker停止。closeAllChannels()會關閉當前注冊的所有channel,這個方法可在NioWorker不再使用時用來釋放IO資源。 Listing 2:

 

復制代碼
public   class  NioWorker  implements  Runnable {
    
public  NioWorker(Selector sel) {

       _sel 
=  sel;
  _added 
=   new  HashSet(); 

    }

    
public   void  run() {
       
try  {

           
try  {

              
while  (_run) {

                  _sel.select();

                  Set selected 
=  _sel.selectedKeys();

                  
for  (Iterator itr  =  selected.iterator(); itr.hasNext();) {

                     SelectionKey key 
=  (SelectionKey) itr.next();

                   NioSession s 
=  (NioSession) key.attachment();

                     s.process();

                     itr.remove();

                  }

                  
synchronized  (_added) {

                     
for  (Iterator itr  =  _added.iterator(); itr.hasNext();) {

                         NioSession s 
=  (NioSession) itr.next();

                         SelectionKey key 
=  s.channel().register(_sel, s.interestOps(), s);

                         s.registered(key);

                         itr.remove();

                     }

                  }

              }


           } 
finally  {

              _sel.close();

           }

       } 
catch  (IOException ex) {

           
throw   new  Error(ex);
       }
    }

    
public   void  add(NioSession s) {

       
synchronized  (_added) {

           _added.add(s);

       }

       _sel.wakeup();

    }

    
public   synchronized   void  stop() {

       _run 
=   false ;

       _sel.wakeup();

    }


    
public   void  closeAllChannels() {

       
for  (Iterator itr  =  _sel.keys().iterator(); itr.hasNext();) {

           SelectionKey key 
=  (SelectionKey) itr.next();


           
try  {         

              key.channel().close();

           } 
catch  (IOException ex) {}

       }
    }



    
protected  Selector _sel  =   null ;

    
protected  Collection _added  =   null ;

    
protected   volatile   boolean  _run  =   true ;
}
復制代碼

 

在Echo Server這個例子中,我們需要一個ServerSocketChannel來接受新的TCP連接,對于每個TCP連接,我們還需要一個SocketChannel來處理這個TCP連接上的IO操作。把這兩種channel和上面的NioWorker – NioSession結構整合在一起,可以得到NioServerSession和NioEchoSession這兩個類,它們分別封裝了ServerSocketChannel和SocketChannel及其對應的IO操作。下面這個UML類圖描述了這4個類的關系:

 

Figure 2

 

可以看到NioWorker和NioSession對新加入的兩個類沒有任何依賴性,NioServerSession和NioEchoSession通過實現NioSession這個接口為系統加入了新的功能。這樣的一個體系架構符合了Open-Close原則,新的功能可以通過實現NioSession被加入而無需對原有的模塊進行修改,這體現了面向對象設計的強大威力。 NioServerSession的實現(Listing 3)相對比較簡單,其封裝了一個ServerSocketChannel以及從這個channel上接受新的TCP連接的邏輯。NioServerSession還需要一個NioWorker的引用,這樣每接受一個新的TCP連接,NioServerSession就為其創建一個NioEchoSession的對象,并將這個對象加入到NioWorker中。 Listing 3:

復制代碼
public   class  NioServerSession  implements  NioSession {
    
public  NioServerSession(ServerSocketChannel channel, NioWorker worker) {

       _channel 
=  channel;

       _worker 
=  worker;
    }

    
public   void  registered(SelectionKey key) {}

    
public   void  process() {

       
try  {

           SocketChannel c 
=  _channel.accept();

           
if  (c  !=   null ) {

              c.configureBlocking(
false );

              NioEchoSession s 
=   new  NioEchoSession(c);

              _worker.add(s);
           }

       } 
catch  (IOException ex) {

           
throw   new  Error(ex);
       }
    }

  
public  SelectableChannel channel() {
       
return  _channel;

    }

    
public   int  interestOps(){

       
return  SelectionKey.OP_ACCEPT;

    } 

    
protected  ServerSocketChannel _channel;

    
protected  NioWorker _worker;

}
復制代碼

NioEchoSession的行為要復雜一些,NioEchoSession會先從TCP連接中讀取數據,再將這些數據用同一個連接寫回去,并重復這個步驟直到客戶端把連接關閉為止。我們可以把“Reading”和“Writing”看作NioEchoSession的兩個狀態,這樣可以用一個有限狀態機來描述它的行為,如下圖所示:

 

Figure 3

 

接下來的工作就是如何實現這個有限狀態機了。在這個例子中,我們使用State模式來實現它。下面這張UML類圖描述了NioEchoSession的設計細節。

 

Figure 4

 

NioEchoSession所處的狀態由EchoState這個抽象類來表現,其兩個子類分別對應了“Reading”和“Writing”這兩個狀態。NioEchoSession會將process()和interestOps()這兩個方法delegate給EchoState來處理,這樣,當NioEchoSession處于不同的狀態時,就會有不同的行為。 Listing 4是EchoState的實現。EchoState定義了process()和interestOps()這兩個抽象的方法來讓子類實現。NioEchoSession中的process()方法會被delegate到其當前EchoState的process()方法,NioEchoSession本身也會作為一個描述context的參數被送入EchoState的process()方法中。EchoState定義的interestOps()方法則會在NioEchoSession注冊和轉變State的時候被用到。

 

EchoState還定義了兩個靜態的方法來返回預先創建好的ReadState和WriteState,這樣做的好處是可以避免在NioEchoSession轉換state的時候創建一些不必要的對象從而影響性能。然而,這樣做要求state類必須是無狀態的,狀態需要保存在context類,也就是NioEchoSession中。

 

Listing 4:

 

復制代碼
public   abstract   class  EchoState {

    
public   abstract   void  process(NioEchoSession s)  throws  IOException;

    
public   abstract   int  interestOps();

    
public   static  EchoState readState() {

       
return  _read;

    }

    
public   static  EchoState writeState() {
       
return  _write;
    }

    
protected   static  EchoState _read  =   new  ReadState();
    
protected   static  EchoState _write  =   new  WriteState();
}
復制代碼

 

Listing 5是NioEchoSession的實現。NioEchoSession包含有一個SocketChannel,這個channel注冊后得到的SelectionKey,一個用于存放數據的ByteBuffer和一個記錄當前state的EchoState對象。在初始化時,EchoState被初始化為一個ReadState。NioEchoSession把process()方法和interestOps()方法都delegate到當前的EchoState中。其setState()方法用于切換當前state,在切換state后,NioEchoSession會通過SelectionKey更新注冊的interestOps。close()方法用于關閉這個NioEchoSession對象。

 

Listing 5:

 

 

復制代碼
public   class  NioEchoSession  implements  NioSession {


    
public  NioEchoSession(SocketChannel c) {

       _channel 
=  c;

       _buf 
=  ByteBuffer.allocate( 128 );

       _state 
=  EchoState.readState();
    }

    
public   void  registered(SelectionKey key) {

       _key 
=  key;

    }

    
public   void  process() {

       
try  {

           _state.process(
this );

       } 
catch  (IOException ex) {

           close();

           
throw   new  Error(ex);
       }
    }

    
public  SelectableChannel channel() {

       
return  _channel;

    }

    
public   int  interestOps() {

       
return  _state.interestOps();

    }


    
public   void  setState(EchoState state) {
     _state 
=  state;

       _key.interestOps(interestOps());
    }

    
public   void  close() {

       
try  {

      _channel.close();

       } 
catch  (IOException ex) {

           
throw   new  Error(ex);

       }

    }

    
protected  SocketChannel _channel  =   null ;

    
protected  SelectionKey _key;

    
protected  ByteBuffer _buf  =   null ;

    
protected  EchoState _state  =   null ;

}
復制代碼

 

Listing 6和Listing 7分別是ReadState和WriteState的實現。ReadState在process()中會先從NioEchoSession的channel中讀取數據,如果未能讀到數據,NioEchoSession會繼續留在ReadState;如果讀取出錯,NioEchoSession會被關閉;如果讀取成功,NioEchoSession會被切換到WriteState。WriteState則負責將NioEchoSession中已經讀取的數據寫回到channel中,全部寫完后,NioEchoSession會被切換回ReadState。

Listing 6:

public class ReadState extends EchoState {

    public void process(NioEchoSession s)

       throws IOException

    {

       SocketChannel channel = s._channel;

       ByteBuffer buf = s._buf;

       int count = channel.read(buf);

       if (count == 0) {

           return;

       }

       if (count == -1) {

        s.close();

           return;

       }


       buf.flip();

       s.setState(EchoState.writeState());

    }

    public int interestOps() {

       return SelectionKey.OP_READ;

    }

}

 


Listing 7:

 

 

 

public class WriteState extends EchoState {

 

    public void process(NioEchoSession s)

       throws IOException

    {

       SocketChannel channel = s._channel;

       ByteBuffer buf = s._buf;

       channel.write(buf);

       if (buf.remaining() == 0) {

           buf.clear();

           s.setState(EchoState.readState());

       }

    }

    public int interestOps() {

       return SelectionKey.OP_WRITE;

    }

}

 

 

 

 

NioEchoServer(Listing 8)被用來啟動和關閉一個TCP Echo Server,這個類實現了Runnable接口,調用其run()方法就啟動了Echo Server。其shutdown()方法被用來關閉這個Echo Server,注意shutdown()和run()的finally block中的同步代碼確保了只有當Echo Server被關閉后,shutdown()方法才會返回。

 

Listing 8:

 

public class NioEchoServer implements Runnable {

    public void run() {

       try {

          ServerSocketChannel serv = ServerSocketChannel.open();

           try {

              serv.socket().bind(new InetSocketAddress(7));

              serv.configureBlocking(false);

              _worker = new NioWorker(Selector.open());

              NioServerSession s = new NioServerSession(serv, _worker);

              _worker.add(s);

              _worker.run();

           } finally {

              _worker.closeAllChannels();

              synchronized (this) {

                  notify();

              }

           }

       } catch (IOException ex) {

           throw new Error(ex);

       }

    }

 

    public synchronized void shutdown() {

       _worker.stop();

       try {

           wait();

       } catch (InterruptedException ex) {

           throw new Error(ex);

       }

    }

    protected NioWorker _worker = null;

}

 

 

 

 

最后,通過一個簡單的main()函數(Listing 9),我們就可以運行這個Echo Server了。

 

 

 

 

 

 

Listing 9:

 

 

 

    public static void main(String [] args) {

 

 

       new NioEchoServer().run();

 

 

    }

 

 

 

 

我們可以通過telnet程序來檢驗這個程序的運行狀況:

 

 

 

1. 打開一個命令行,輸入 telnet localhost 7 來運行一個telnet程序并連接到Echo Server上。

 

 

 

2. 在telnet程序中輸入字符,可以看到輸入的字符被顯示在屏幕上。(這是因為Echo Server將收到的字符寫回到客戶端)

 

 

 

3. 多打開幾個telnet程序進行測試,可以看到Echo Server能通過NIO API用一個Thread服務多個客戶端。



快乐彩中奖说明