本文共 9127 字,大约阅读时间需要 30 分钟。
最近学习了非阻塞IO(NIO),因为厌烦了在开发并行处理时候,阻塞IO所导致的肥服务端,因为对于每个客户连接都要产生一个线程对此进行处理,当然你可以不这样实现,但我的前提是开发并行处理,下面是我的源码,因为是在dos命令行测试的,所以要是编写为GUI的时候,还要很多要改的东西,这也是我下个征服的对象,当然我已经迫不及待了,下面是我花了三个晚上学习并编写的非阻塞聊天室:(供交流学习用)
客户端:
import java.net.InetSocketAddress;import java.nio.channels.SocketChannel;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.CharBuffer;import java.io.BufferedReader;import java.io.*;import java.lang.Thread;import java.nio.charset.*;import java.nio.charset.CharsetDecoder;public class ChatClient{
private InetSocketAddress address = new InetSocketAddress("localhost",13); private SocketChannel client = null; private String user = null; private String pass = null; private BufferedReader in = null; private Thread t = null; public ChatClient(){ try{ client = SocketChannel.open(); System.out.println("connecting..."); client.connect(address); System.out.println("connected with "+address.getHostName()); client.configureBlocking(false); }catch(IOException ex){ ex.printStackTrace(); System.exit(-1); } this.start(); } public void start(){ this.receiveMessage(); this.sendMessage(); } public void sendMessage(){ try{ in = new BufferedReader(new InputStreamReader(System.in)); System.out.println("Input the Info then check it out on the server"); System.out.print("Your Name:"); user = in.readLine(); System.out.println("Password:"); pass = in.readLine(); ByteBuffer buffer = ByteBuffer.allocate(50); String message= new String("LOGIN:"+user+"&"+pass); buffer = ByteBuffer.wrap(message.getBytes()); while(buffer.hasRemaining()&client.write(buffer)!=-1); System.out.println(message+" has been send"); buffer.flip(); Charset charset = Charset.forName("gb2312"); CharsetDecoder decoder = charset.newDecoder(); CharBuffer charBuffer = decoder.decode(buffer); //System.out.println("receive:"+charBuffer+" length:"+charBuffer.limit()); }catch(IOException ex){ ex.printStackTrace(); } this.waitFor(2000); System.out.println("WELCOME TO THE KING 'S CHAT ROOM!"); System.out.println("Input the Info(exit is to leave out)"); while(true){ System.out.print(">"); ByteBuffer buffer = ByteBuffer.allocate(100); in = new BufferedReader(new InputStreamReader(System.in)); try{ String read=in.readLine(); if(read.equals("exit")){ break; } String message1="SENTO:"+read; buffer = ByteBuffer.wrap(message1.getBytes()); // buffer.flip(); System.out.println("before"); while(buffer.hasRemaining()&client.write(buffer)!=-1); // buffer.flip(); System.out.println(message1+" has been send"); this.waitFor(500); }catch(IOException ex){ ex.printStackTrace(); } } System.out.println("Welcome to use this soft!---King"); System.exit(-1); } public void waitFor(long time){ try{ Thread.sleep(time); }catch(Exception ex){ ex.printStackTrace(); } } public void receiveMessage(){ t=new ReceiveThread(client); t.start(); } public static void main(String[]args){ ChatClient cc=new ChatClient(); } class ReceiveThread extends Thread{ SocketChannel client =null; ByteBuffer buffer=ByteBuffer.allocate(50); private boolean val=true; public ReceiveThread(SocketChannel client){ this.client = client; } public void run(){ while(val){ try{ while (client.read(buffer) > 0){ buffer.flip(); String result = decode(buffer); System.out.println(">(back)"+result); buffer.flip(); } }catch(IOException ex){ ex.printStackTrace(); return; } } } } public String decode(ByteBuffer buffer){ Charset charset=null; CharsetDecoder decoder=null; CharBuffer charBuffer=null; try{ charset= Charset.forName("gb2312"); decoder= charset.newDecoder(); charBuffer= decoder.decode(buffer); return charBuffer.toString(); }catch(Exception ex){ ex.printStackTrace(); return ""; } }}注意:可以多个客户进行交流,程序要求输入验证信息,但由于时间原因后台我都以合法用户给予回馈服务端:import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;import java.net.ServerSocket;import java.net.InetSocketAddress;import java.nio.channels.Selector;import java.nio.channels.SelectionKey;import java.io.IOException;import java.util.Iterator;import java.nio.ByteBuffer;import java.util.ArrayList;import java.nio.charset.*;import java.nio.*;public class ChatServer {
private int port = 13; private Selector selector; private ServerSocketChannel ssc; private ServerSocket server; private InetSocketAddress address; private ArrayList connectKey=new ArrayList(); public ChatServer(){ //initServer try{ ssc=ServerSocketChannel.open(); server=ssc.socket(); address = new InetSocketAddress(port); server.bind(address); selector=Selector.open(); ssc.configureBlocking(false); ssc.register(selector,SelectionKey.OP_ACCEPT); System.out.println("============================================================"); System.out.println("= ="); System.out.println("= ="); System.out.println("= 水底沙聊天室-version1.0 ="); System.out.println("= ="); System.out.println("= QQ:247095340(交流) ="); System.out.println("============================================================"); System.out.println("Listening the port 13..."); }catch(IOException ex){ ex.printStackTrace(); System.exit(-1); } } public void startServer() throws IOException{ while(true){ int i=selector.select(); //System.out.print(i); Iterator keys = selector.selectedKeys().iterator(); while(keys.hasNext()){ SelectionKey key = (SelectionKey)keys.next(); keys.remove(); try{ if(key.isAcceptable()){ ServerSocketChannel ssc=(ServerSocketChannel)key.channel(); SocketChannel channel = ssc.accept();//return null if there's no request System.out.println(channel+" has accepted"); channel.configureBlocking(false); SelectionKey clientKey=channel.register(selector,SelectionKey.OP_READ); }//else if(key.isWritable()){ SocketChannel channel = (SocketChannel)key.channel(); ByteBuffer buffer = (ByteBuffer)key.attachment(); if(buffer!=null){ key.attach(null);//avoid the return twice //check info:the login or the message //buffer.flip(); String checkBuffer = this.decode(buffer); System.out.println("write:"+checkBuffer); if(checkBuffer.equals("LOGIN:OK")){ //return LOGIN:OK then add into the connectKey array! System.out.println("ok"+buffer); buffer.flip(); //while(buffer.hasRemaining()&channel.write(buffer)!=-1); channel.write(buffer); key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE); connectKey.add(key);//add to the connectKey array System.out.println("here"); }else if(checkBuffer.equals("LOGIN:ERROR")){ //return LOGIN:ERROR the client should close the channel //warning:method:key.channel(); //Returns the channel for which this key was created. // This method will continue to return the channel even after the key is cancelled. while(buffer.hasRemaining()&channel.write(buffer)!=-1); key.cancel(); }else //if(checkBuffer.indexOf("SENTO:")!=-1){ { //return the message to everyone // while(buffer.hasRemaining()&channel.write(buffer)!=-1); System.out.println("sento"+buffer); buffer.flip(); channel.write(buffer); System.out.println("send over"); } } }//else if(key.isReadable()){ SocketChannel channel = (SocketChannel)key.channel(); ByteBuffer buffer=ByteBuffer.allocate(50); System.out.println("read..."); channel.read(buffer); buffer.flip(); String checkBuffer = this.decode(buffer); System.out.println("read:"+checkBuffer); //while(buffer.hasRemaining()&&channel.read(buffer)!=-1); //check the buffer //buffer.flip(); //String checkBuffer = this.decode(buffer); // System.out.println("read:"+checkBuffer); if(checkBuffer.startsWith("LOGIN:")){ //get info of the user & pass then check for it,return feedback! //the format is LOGIN:user&pass int p1=checkBuffer.length(); int p2=checkBuffer.indexOf("&"); String user=checkBuffer.substring(6,p2); String pass=checkBuffer.substring(p2+1,p1); System.out.println(user+pass); //todo check from the database!!! //assume the user is legal ByteBuffer feedback = ByteBuffer.allocate(20); feedback=ByteBuffer.wrap("LOGIN:OK".getBytes()); key.interestOps(SelectionKey.OP_WRITE); key.attach(feedback); }else if(checkBuffer.startsWith("SENTO:")){ String message = checkBuffer.substring(6); System.out.println("sentto:"+message); ByteBuffer buffer1 = ByteBuffer.allocate(50); buffer1=ByteBuffer.wrap(message.getBytes()); Iterator it = connectKey.iterator(); //key.interestOps(SelectionKey.OP_WRITE); while(it.hasNext()){ ((SelectionKey)it.next()).attach(buffer1.duplicate()); } System.out.println("here1"); //for(int i=0;i<connectKey.add.;i++){ //connectKey[i].attach(buffer.duplicate()); //} } } }catch(IOException ex){ key.cancel(); //System.exit(-1); try{ key.channel().close(); }catch(IOException cex){ } } } } } public String decode(ByteBuffer buffer){ Charset charset=null; CharsetDecoder decoder=null; CharBuffer charBuffer=null; try{ charset= Charset.forName("gb2312"); decoder= charset.newDecoder(); charBuffer= decoder.decode(buffer); return charBuffer.toString(); }catch(Exception ex){ ex.printStackTrace(); return ""; } } public static void main(String []args){ ChatServer cs = new ChatServer(); try{ cs.startServer(); }catch(IOException ex){ ex.printStackTrace(); System.exit(-1); } }}注意:假如客户强制登出服务端时候,服务器里面的登录用户列表还是保存他注册的SelectionKey地址,这是存在问题,其实解决很简单,在对通道进行写入时候,如果通道已经被关闭的话,可以用try/catch语句进行处理上面就是总的程序,其实之前我都是用阻塞socket去完成这类工作的,由于在用swt基于实现时候遇到很多swt线程问题,后期我会以GUI界面共享给大家,当然自己也在不断学习中!King
转载地址:http://zlkni.baihongyu.com/