JAVA中的NIO (New IO)

2019-10-29 16:07:10来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

JAVA中的NIO (New IO)

简介

标准的IO是基于字节流和字符流进行操作的,而JAVA中的NIO是基于Channel和Buffer进行操作的。

传统IO

graph TB; 字节流 --> InputStream; 字节流 --> OutputStream; 字符流 --> Reader; 字符流 --> Writer;

NIO

graph TB; A[Channel] --> B[Buffer..]; C[Channel] --> D[Buffer..]; E[Channel] --> F[Buffer..];

核心模块

NIO主要有三个核心部分:Selector、Channel、Buffer

数据总是从Channel读取到Buffer或者从Buffer写入到Channel中。

Selector可以监听多个Channel的多个事件。

graph TB; Selector --> A[Channel]; Selector --> B[Channel]; Selector --> C[Channel]; A --> E1[Event...]; B --> E2[Event...]; C --> E3[Event...];

传统的IO与Channel的区别

1.传统的IO是BIO的,而Channel是NIO的。

*当流调用了read()、write()方法后会一直阻塞线程直到数据被读取或写入完毕。

2.传统IO流是单向的,而Channel是双向的。


Channel

FileChannel:从文件中进行读取

DatagramChannel:可以通过UDP协议在网络中进行数据的传输

SocketChannel:可以通过TCP协议在网络中进行数据的传输

ServerSocketChannel:可以作为一个服务器监听连接

Channel通用API:

read(buffer):将数据从Channel读取到Buffer中,读取完毕返回-1。

read(buffer []):将数据从Channel读取到多个Buffer中,仅当第一个Buffer被写满后往第二个Buffer中进行写入。

write(buffer):将Buffer中的数据写入到Channel中。

write(buffer[]):将多个Buffer中的数据写入到Channel中,仅当第一个Buffer中的数据被读取完毕后再从第二个Buffer中进行读取。

register(selector,interest):将Channel注册到Selector中,同时需要向Selector传递要监听此Channel的事件类型(注册到Selector中的Channel一定要非阻塞的)

configureBlocking(boolean):设置Channel是否为阻塞。

transferFrom(position,count,channel):将其他Channel中的数据传输到当前Channel中。

transferTo(position,count,channel):将当前Channel中的数据传输到其他Channel中。

SocketChannel API

open()静态方法:创建SocketChannel。

connect(new InetSocketAddress(port))方法:连接服务器。

finishConnect()方法:判断是否已经与服务器建立连接。

ServerSocketChannel API

open()静态方法:创建ServerSocketChannel。

accept()方法:该方法会一直阻塞线程直到有新连接到达。

阻塞式与非阻塞式Channel

正常情况下Channel都是阻塞的,只有当调用了configureBlocking(false)方法时Channel才为非阻塞。

阻塞式Channel的connect()、accept()、read()、write()方法都会阻塞线程,直到处理完毕。

非阻塞式Channel的connect()、accept()、read()、write()方法都是异步的。

*当调用了非阻塞式Channel的connect()方法后,需要使用finishConnect()方法判断是否已经与服务器建立连接。

*当调用了非阻塞式Channel的accept()方法后,需要根据方法的返回值是否为NULL判断是否接收到新的连接。

*当调用了非阻塞式Channel的read()方法后,需要根据方法的返回值是否大于0判断是否有读取到数据。

*在使用非阻塞式Channel的write()方法时,需要借助while循环与hasRemaining()方法保证buffer中的内容被全部写入。

*FileChannel一定是阻塞的。

示例

public void testFileChannel() throws IOException {
    RandomAccessFile randomAccessFile = new RandomAccessFile(new File("F:\\笔记\\nginx.txt"), "rw");
    FileChannel fileChannel = randomAccessFile.getChannel();
    ByteBuffer byteBuffer = ByteBuffer.allocate(64);
    int count = fileChannel.read(byteBuffer);
    while (count != -1) {
        byteBuffer.flip();
        System.out.println(new String(Arrays.copyOfRange(byteBuffer.array(),0,byteBuffer.limit()),Charset.forName("UTF-8")));
        byteBuffer.clear();
        count = fileChannel.read(byteBuffer);
    }
}

Buffer

Buffer是一块可以进行读写操作的内存(顺序存储结构)

ByteBuffer:基于Byte类型进行存储

CharBuffer:基于Char类型进行存储

DoubleBuffer:基于Double类型进行存储

FloatBuffer:基于Float类型进行存储

IntBuffer:基于Int类型进行存储

LongBuffer:基于Long类型进行存储

ShortBuffer:基于Short类型进行存储

Buffer的内部结构

1.capacity:表示buffer的容量

2.position:表示当前的位置(从0开始,最大值为capacity-1)

3.limit:在写模式中表示可以写入的个数(与capacity一样),在读模式中表示可以读取的个数。

从写模式转换成读模式

limit设置为position+1,position设置为0。

从读模式转换成写模式

limit设置为capacity,position设置为0。

往Buffer中写数据

1.将数据从Channel读取到Buffer中。

2.使用Buffer的put()方法。

从Buffer中读数据

1.将Buffer中的数据写入到Channel中。

2.使用Buffer的get()方法

Buffer通用API:

allocate(size)静态静态:初始化一个Buffer。

flip():将buffer从写模式转换成读模式。

array():将Buffer中的内容转换成数组(不受limit控制)

get():获取Buffer中的内容。

hasRemaining():判断Buffer中是否还有未读的元素(limit - (postion+1) )

rewind():将positon设置为0。

clear():将limit设置为capacity,position设置为0。

compact():将所有未读的元素移动到Buffer的起始处,position指向最后一个未读的元素的下一位,limit设置为capacity。

*clear()和compact()方法都可以理解成将Buffer从读模式转换成写模式,区别在于compact()方法会保留未读取的元素。

mark():在当前position处打一个标记。

reset():将position恢复到标记处。

Selector

Selector用于监听多个Channel的多个事件(单线程)

graph TB; Selector --> A[Channel]; Selector --> B[Channel]; Selector --> C[Channel]; A --> E1[connect]; B --> E2[accept]; C --> E3[connect]; C --> E4[read];

Channel的事件类型

1.连接就绪:当SocketChannel、DatagramChannel成功与服务器建立连接时将会触发连接就绪事件。

2.接收就绪:当ServerSocketChannel有新连接到达时将会触发接收就绪事件。

3.读就绪:当SocketChannel、DatagramChannel有数据可读时将会触发读就绪事件。

4.写就绪:当SocketChannel、DatagramChannel可以进行数据写入时将会触发写就绪事件。

SelectionKey

SelectionKey用于存储Selector与Channel之间的相关信息。

SelectionKey中提供了四个常量分别代表Channel的事件类型。

SelectionKey.OP_CONNECT

SelectionKey.OP_ACCEPT

SelectionKey.OP_READ

SelectionKey.OP_WRITE

SelectableChannel提供的register(selector,interest)方法用于将Channel注册到Selector中,同时需要向Selector传递要监听此Channel的事件类型,当要监听的事件类型不止一个时可以使用或运算,当将Channel注册到Selector后会返回SelectionKey实例,用于存储Selector与此Channel之间的相关信息。

SelectionKey API:

interestOps()方法:返回Selector监听此Channel的事件类型。

readyOps()方法:返回此Channel目前就绪的事件。

isAcceptable():判断Channel是否接收就绪。

isConnectable():判断Channel是否连接就绪。

isReadable():判断Channel是否读就绪。

isWriteable():判断Channel是否写就绪。

channel():返回具体的Channel实例。

selector():返回Selector实例。

attach():往SelectionKey中添加一个附加对象。

attachment():返回保存在SelectionKey中的附加对象。

Selector API:

open()静态方法:创建一个Selector。

select()方法:该方法会一直阻塞线程直到所监听的Channel有事件就绪,返回就绪的Channel个数(只会返回新就绪的Channel个数)

selectedKeys()方法:返回就绪的Channel对应的SelectionKey。

*当Channel就绪的事件处理完毕后,需要手动删除SelectionKey集合中该Channel对应的SelectionKey,当该Channel再次有事件就绪时会自动加入到Selectionkey集合中。

非阻塞式Channel与Selector

非阻塞式Channel一般与Selector配合使用

当Selector监听到ServerSocketChannel接收就绪时,那么此时可以立即调用ServerSocketChannel的accept()方法获取新连接。

当Selector监听到SocketChannel读就绪时,那么此时可以立即调用SocketChannel的read()方法进行数据的读取。

非阻塞式服务器

服务器

package com.novellatonyatt.nio;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Auther: Zhuang HaoTang
 * @Date: 2019/10/26 16:35
 * @Description:
 */
public class Server {

    private ServerSocketChannel createNIOServerSocketChannel() throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(InetAddress.getLocalHost(), 8888));
        serverSocketChannel.configureBlocking(false);
        return serverSocketChannel;
    }

    private void acceptHandler(SelectionKey selectionKey) throws IOException {
        Selector selector = selectionKey.selector();
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
        System.out.println("accept client connection " + socketChannel.getLocalAddress());
    }

    private void readHandler(SelectionKey selectionKey) throws IOException {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(100);
        int num = socketChannel.read(byteBuffer);
        if(num == -1){ // 连接已断开
            System.out.println("client "+socketChannel.getLocalAddress() + " disconnection");
            socketChannel.close();
            return;
        }
        byteBuffer.flip();
        while (byteBuffer.hasRemaining()) {
            byte b = byteBuffer.get();
            System.out.println((char) b);
        }
    }

    public void open() throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = createNIOServerSocketChannel();
        System.out.println("start nio server and bind port 8888");
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        int ready = selector.select();
        while (ready > 0) {
            System.out.println("ready channel count " + ready);
            Set<SelectionKey> selectionKeySet = selector.selectedKeys();
            for (Iterator<SelectionKey> iterator = selectionKeySet.iterator(); iterator.hasNext(); ) {
                SelectionKey selectionKey = iterator.next();
                if (selectionKey.isAcceptable()) {
                    System.out.println("acceptable");
                    acceptHandler(selectionKey);
                } else if (selectionKey.isReadable()) {
                    System.out.println("readable");
                    readHandler(selectionKey);
                }
                iterator.remove();
            }
            ready = selector.select();
        }
    }

    public static void main(String[] args) throws IOException {
        Server server = new Server();
        server.open();
    }


}

*一个Channel不会同时有多个事件就绪,以事件为单位。

*当客户端断开连接,那么将会触发读就绪,并且channel的read()方法返回-1,表示连接已断开,服务器应该要做出处理,关闭这个连接。

客户端

package com.novellatonyatt.nio;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
 * @Auther: Zhuang HaoTang
 * @Date: 2019/10/26 16:36
 * @Description:
 */
public class Client {

    public static void main(String[] args) throws IOException, InterruptedException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(),8888));

        String message = "today is sunday";
        ByteBuffer byteBuffer = ByteBuffer.allocate(message.getBytes().length);
        byteBuffer.put(message.getBytes());
        byteBuffer.flip();
        socketChannel.write(byteBuffer);
        Thread.sleep(5000);
    }

}

运行结果


Reactor模式

1.单线程的事件分发器。

2.具体事件类型的Handler线程池。

3.业务线程池。

graph TB; A[单线程的事件分发器] --> B[接收就绪]; A --> C[读就绪]; A --> D[写就绪]; C --> F[Handler线程池]; D --> G[Handler线程池]; F --> H[业务线程池]; G --> I[业务线程池];

NIO指的是事件分化器,当应用程序发起IO操作后,可以利用等待的时间做一些处理,JAVA NIO使用了IO多路复用中的Select模型。

*主线程不需要等待具体事件类型的Handler处理完毕,直接异步返回,那么将会导致事件重复就绪,程序做出相应的控制即可。

*具体事件类型的Handler是异步的(注意这并不是AIO)

服务器

package com.novellatonyatt.nio;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author: Zhuang HaoTang
 * @create: 2019-10-28 17:00
 * @description:
 */
public class ReactorServer {

    private ThreadPoolExecutor eventHandlerPool = new ThreadPoolExecutor(10, 50, 2, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(200), new ThreadPoolExecutor.CallerRunsPolicy());

    private ServerSocketChannel createNIOServerSocketChannel() throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(InetAddress.getLocalHost(), 8888));
        serverSocketChannel.configureBlocking(false);
        return serverSocketChannel;
    }

    private void readHandler(SelectionKey selectionKey) {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(100);
        try {
            int num = socketChannel.read(byteBuffer);
            if (num == -1) {
                System.out.println("client " + socketChannel.getLocalAddress() + " disconnection");
                socketChannel.close(); // 底层有些逻辑
                return;
            }
            byteBuffer.flip();
            while (byteBuffer.hasRemaining()) {
                byte b = byteBuffer.get();
                System.out.println((char) b);
            }
        } catch (Exception e) {
            System.out.println("由于连接关闭导致并发线程读取异常");
        }
    }

    private void open() throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = createNIOServerSocketChannel();
        System.out.println("start nio server and bind port 8888");
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        int ready = selector.select();
        while (ready > 0) {
            System.out.println("ready channel count " + ready);
            Set<SelectionKey> selectionKeySet = selector.selectedKeys();
            for (Iterator<SelectionKey> iterator = selectionKeySet.iterator(); iterator.hasNext(); ) {
                final SelectionKey selectionKey = iterator.next();
                if (selectionKey.isAcceptable()) {
                    System.out.println("acceptable");
                    serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ);
                    System.out.println("accept client connection " + socketChannel.getLocalAddress());
                } else if (selectionKey.isReadable()) {
                    System.out.println("readable");
                    eventHandlerPool.submit(new Runnable() {
                        @Override
                        public void run() {
                            readHandler(selectionKey);
                        }
                    });
                }
                iterator.remove();
            }
            ready = selector.select();
        }
    }

    public static void main(String[] args) throws IOException {
        ReactorServer reactorServer = new ReactorServer();
        reactorServer.open();
    }

}

客户端

package com.novellatonyatt.nio;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
 * @Auther: Zhuang HaoTang
 * @Date: 2019/10/26 16:36
 * @Description:
 */
public class Client {

    public static void main(String[] args) throws IOException, InterruptedException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8888));

        String message = "today is sunday";
        ByteBuffer byteBuffer = ByteBuffer.allocate(message.getBytes().length);
        byteBuffer.put(message.getBytes());
        byteBuffer.flip();
        socketChannel.write(byteBuffer);
        Thread.sleep(5000);
    }

}

运行结果

*当channel有数据可读时,将会触发读就绪,那么主线程将会不停的向线程池提交任务,直到某个线程读取完毕,此时将会停止读就绪,其他线程读取到的个数为0。

*当客户端断开连接时,将会触发读就绪,那么主线程将会不停的向线程池提交任务,直到某个线程关闭连接,此时将会停止读就绪,其他线程要做相应的异常处理。

一般不会这么去使用JAVA NIO,只是通过JAVA NIO学习他的设计思想,如果要想搭建NIO服务器那么应该使用Netty等NIO框架。


原文链接:https://www.cnblogs.com/funyoung/p/11758969.html
如有疑问请与原作者联系

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:基于IDEA的SSM配置文件整合基础(有步骤)

下一篇:Spring