博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java.nio
阅读量:4064 次
发布时间:2019-05-25

本文共 13211 字,大约阅读时间需要 44 分钟。

附件里为阻塞模式、非阻塞模式、阻塞和非阻塞的混合模式代码。
下面为非阻塞的一段客户端和服务器的代码:
服务器端代码:
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Set; /**  * 非阻塞模式服务器  * */ public class EchoNoblockServer {
private Selector selector = null; private ServerSocketChannel serverSocketChannel = null; private int port = 8000; private Charset charset = Charset.forName("GBK"); public EchoNoblockServer() throws IOException{
selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().setReuseAddress(true); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port)); System.out.println("服务器启动"); } public void service() throws IOException{
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0){
Set readyKeys = selector.selectedKeys(); Iterator it = readyKeys.iterator(); while (it.hasNext()){
SelectionKey key = null; try{
key = (SelectionKey) it.next(); it.remove(); if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = (SocketChannel) ssc.accept(); System.out.println("接收到客户连接,来自:" + socketChannel.socket().getInetAddress() + ":" + socketChannel.socket().getPort()); socketChannel.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(1024); socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); } if (key.isReadable()) {
receive(key); } if (key.isWritable()) {
send(key); } }catch(IOException e){
e.printStackTrace(); try{
if(key != null){
key.cancel(); key.channel().close(); } }catch(Exception ex){
e.printStackTrace(); } } }//#while }//#while } public void send(SelectionKey key) throws IOException{
ByteBuffer buffer = (ByteBuffer)key.attachment(); SocketChannel socketChannel = (SocketChannel)key.channel(); buffer.flip(); //把极限设为位置,把位置设为0 String data = decode(buffer); if(data.indexOf("\r\n") == -1) return; String outputData = data.substring(0, data.indexOf("\n")+1); System.out.print(outputData); ByteBuffer outputBuffer = encode("echo:"+outputData); while(outputBuffer.hasRemaining()) socketChannel.write(outputBuffer); ByteBuffer temp = encode(outputData); buffer.position(temp.limit()); buffer.compact(); if(outputData.equals("bye\r\n")){
key.cancel(); socketChannel.close(); System.out.println("关闭与客户的连接"); } } public void receive(SelectionKey key) throws IOException{
ByteBuffer buffer = (ByteBuffer)key.attachment(); SocketChannel socketChannel = (SocketChannel)key.channel(); ByteBuffer readBuff = ByteBuffer.allocate(32); socketChannel.read(readBuff); readBuff.flip(); buffer.limit(buffer.capacity()); buffer.put(readBuff); } public String decode(ByteBuffer buffer){ //解码 CharBuffer charBuffer = charset.decode(buffer); return charBuffer.toString(); } public ByteBuffer encode(String str){ //编码 return charset.encode(str); } /** * @param args */ public static void main(String[] args) {
EchoNoblockServer server; try {
server = new EchoNoblockServer(); server.service(); } catch (IOException e) {
e.printStackTrace(); } } }
客户端代码:
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Set; /**  * 非阻塞模式客户端  * */ public class EchoNoblockClient {
private SocketChannel socketChannel = null; private ByteBuffer sendBuffer = ByteBuffer.allocate(1024); private ByteBuffer receiveBuffer = ByteBuffer.allocate(1024); private Charset charset = Charset.forName("GBK"); private Selector selector; public EchoNoblockClient() throws IOException {
socketChannel = SocketChannel.open(); InetAddress ia = InetAddress.getLocalHost(); InetSocketAddress isa = new InetSocketAddress(ia, 8000); socketChannel.connect(isa); socketChannel.configureBlocking(false); System.out.println("与服务器的连接建立成功"); selector=Selector.open(); } public void receiveFromUser(){
try{
BufferedReader localReader = new BufferedReader(new InputStreamReader(System.in)); String msg = null; while((msg=localReader.readLine()) != null){
synchronized(sendBuffer){
sendBuffer.put(encode(msg + "\r\n")); } if(msg.equals("bye")) break; } }catch(IOException e){
e.printStackTrace(); } } public void talk() throws IOException {
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); while (selector.select() > 0){
Set readyKeys = selector.selectedKeys(); Iterator it = readyKeys.iterator(); while (it.hasNext()){
SelectionKey key = null; try{
key = (SelectionKey) it.next(); it.remove(); if (key.isReadable()) {
receive(key); } if (key.isWritable()) {
send(key); } }catch(IOException e){
e.printStackTrace(); try{
if(key != null){
key.cancel(); key.channel().close(); } }catch(Exception ex){
e.printStackTrace(); } } }//#while }//#while } public void send(SelectionKey key) throws IOException{
SocketChannel socketChannel = (SocketChannel)key.channel(); synchronized(sendBuffer){
sendBuffer.flip(); //把极限设为位置 socketChannel.write(sendBuffer); sendBuffer.compact(); } } public void receive(SelectionKey key) throws IOException{
SocketChannel socketChannel = (SocketChannel)key.channel(); socketChannel.read(receiveBuffer); receiveBuffer.flip(); String receiveData = decode(receiveBuffer); if(receiveData.indexOf("\n") == -1) return; String outputData = receiveData.substring(0, receiveData.indexOf("\n")+1); System.out.print(outputData); if(outputData.equals("echo:bye\r\n")){
key.cancel(); socketChannel.close(); System.out.println("关闭与服务器的连接"); selector.close(); System.exit(0); } ByteBuffer temp = encode(outputData); receiveBuffer.position(temp.limit()); receiveBuffer.compact(); } public String decode(ByteBuffer buffer){ //解码 CharBuffer charBuffer = charset.decode(buffer); return charBuffer.toString(); } public ByteBuffer encode(String str){ //编码 return charset.encode(str); } /** * @param args */ public static void main(String[] args) {
final EchoNoblockClient client; try {
client = new EchoNoblockClient(); Thread receiver = new Thread(){
public void run(){
client.receiveFromUser(); } }; receiver.start(); client.talk(); } catch (IOException e) {
e.printStackTrace(); } } }
Ping客户端代码:
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.LinkedList; /**  * 非阻塞模式Ping客户端  * */ public class PingNoblockClient {
private Selector selector; //存放用户新提交的任务 private LinkedList targets = new LinkedList(); //存放已经完成的需要打印的任务 private LinkedList finishedTargets = new LinkedList(); public PingNoblockClient() throws IOException{
selector=Selector.open(); Connector connector = new Connector(); Printer printer = new Printer(); connector.start(); printer.start(); receiveTarget(); } public void addTarget(Target target) {
//向targets队列中加入一个任务 SocketChannel socketChannel = null; try {
socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(target.address); target.channel = socketChannel; target.connectStart = System.currentTimeMillis(); synchronized (targets) {
targets.add(target); } selector.wakeup(); } catch (Exception x) {
if (socketChannel != null) {
try {
socketChannel.close(); } catch (IOException xx) { } } target.failure = x; addFinishedTarget(target); } } public void addFinishedTarget(Target target) {
//向finishedTargets队列中加入一个任务 synchronized (finishedTargets) {
finishedTargets.notify(); finishedTargets.add(target); } } public void printFinishedTargets() {
//打印finisedTargets队列中的任务 try {
for (;;) {
Target target = null; synchronized (finishedTargets) {
while (finishedTargets.size() == 0) finishedTargets.wait(); target = (Target)finishedTargets.removeFirst(); } target.show(); } } catch (InterruptedException x) {
return; } } public void registerTargets(){
//取出targets队列中的任务,向Selector注册连接就绪事件 synchronized (targets) {
while (targets.size() > 0) {
Target target = (Target)targets.removeFirst(); try {
target.channel.register(selector, SelectionKey.OP_CONNECT, target); } catch (IOException x) {
try{
target.channel.close(); }catch(IOException e){
e.printStackTrace(); } target.failure = x; addFinishedTarget(target); } } } } public void processSelectedKeys() throws IOException {
//处理连接就绪事件 for (Iterator it = selector.selectedKeys().iterator(); it.hasNext();) {
SelectionKey selectionKey = (SelectionKey)it.next(); it.remove(); Target target = (Target)selectionKey.attachment(); SocketChannel socketChannel = (SocketChannel)selectionKey.channel(); try {
if (socketChannel.finishConnect()) {
selectionKey.cancel(); target.connectFinish = System.currentTimeMillis(); socketChannel.close(); addFinishedTarget(target); } } catch (IOException x) {
socketChannel.close(); target.failure = x; addFinishedTarget(target); } } } public void receiveTarget(){
//接收用户输入的地址,向targets队列中加入任务 try{
BufferedReader localReader = new BufferedReader(new InputStreamReader(System.in)); String msg=null; while((msg=localReader.readLine()) != null){
if(!msg.equals("bye")){
Target target = new Target(msg); addTarget(target); }else{
shutdown = true; selector.wakeup(); break; } } }catch(IOException e){
e.printStackTrace(); } } /** * @param args */ public static void main(String[] args) {
try {
new PingNoblockClient(); } catch (IOException e) {
e.printStackTrace(); } } boolean shutdown = false; public class Printer extends Thread{
public Printer(){
setDaemon(true); } public void run(){
printFinishedTargets(); } } public class Connector extends Thread{
public void run(){
while (!shutdown) {
try {
registerTargets(); if (selector.select() > 0) {
processSelectedKeys(); } } catch (Exception e) {
e.printStackTrace(); } } try{
selector.close(); }catch(IOException e){e.printStackTrace();} } } } class Target { //表示一项任务 InetSocketAddress address; SocketChannel channel; Exception failure; long connectStart; //开始连接时的时间 long connectFinish = 0; //连接成功时的时间 boolean shown = false; //该任务是否已经打印 Target(String host) {
try {
address = new InetSocketAddress(InetAddress.getByName(host), 80); } catch (IOException x) {
failure = x; } } void show() { //打印任务执行的结果 String result; if (connectFinish != 0) result = Long.toString(connectFinish - connectStart) + "ms"; else if (failure != null) result = failure.toString(); else result = "Timed out"; System.out.println(address + " : " + result); shown = true; } }

转载地址:http://ehvji.baihongyu.com/

你可能感兴趣的文章
my read_university
查看>>
network manager
查看>>
searchServer IBM OminiFind / WebSphere Commerce SOLR
查看>>
OS + Linux Disk disk lvm / disk partition / disk mount / disk io
查看>>
my read_Country
查看>>
RedHat + OS CPU、MEM、DISK
查看>>
project bbs_discuz
查看>>
net TCP/IP / TIME_WAIT / tcpip / iperf / cain
查看>>
Unix + OS books
查看>>
script webshell jspWebShell / pythonWebShell / phpWebShell
查看>>
project site_dns
查看>>
webServer kzserver/1.0.0
查看>>
hd printer lexmark / dazifuyin / dayin / fuyin
查看>>
OS + Unix IBM Aix basic / topas / nmon / filemon / vmstat / iostat / sysstat/sar
查看>>
monitorServer nagios / cacti / tivoli / zabbix / SaltStack
查看>>
my ReadMap subway / metro / map / ditie / gaotie / traffic / jiaotong
查看>>
OS + Linux DNS Server Bind
查看>>
web test flow
查看>>
web test LoadRunner SAP / java / Java Vuser / web_set_max_html_param_len
查看>>
OS + UNIX AIX command
查看>>