【Java】NIO中Channel的注册源码分析
2019-05-18 07:08:00来源:博客园 阅读 ()
Channel的注册是在SelectableChannel中定义的:
1 public abstract SelectionKey register(Selector sel, int ops, Object att) 2 throws ClosedChannelException; 3 4 public final SelectionKey register(Selector sel, int ops) 5 throws ClosedChannelException { 6 return register(sel, ops, null); 7 }
而其具体实现是在AbstractSelectableChannel中:
1 public final SelectionKey register(Selector sel, int ops, 2 Object att) 3 throws ClosedChannelException { 4 synchronized (regLock) { 5 if (!isOpen()) 6 throw new ClosedChannelException(); 7 if ((ops & ~validOps()) != 0) 8 throw new IllegalArgumentException(); 9 if (blocking) 10 throw new IllegalBlockingModeException(); 11 SelectionKey k = findKey(sel); 12 if (k != null) { 13 k.interestOps(ops); 14 k.attach(att); 15 } 16 if (k == null) { 17 // New registration 18 synchronized (keyLock) { 19 if (!isOpen()) 20 throw new ClosedChannelException(); 21 k = ((AbstractSelector)sel).register(this, ops, att); 22 addKey(k); 23 } 24 } 25 return k; 26 } 27 }
其中regLock和keyLock是两个对象,分别用来做注册锁和key集合锁
1 // Lock for key set and count 2 private final Object keyLock = new Object(); 3 4 // Lock for registration and configureBlocking operations 5 private final Object regLock = new Object();
isOpen判断Channel是否关闭,只有在Channel关闭后才会令isOpen返回false;接着检验传入进来的ops(SelectionKey的状态,包括OP_READ、OP_WRITE、OP_CONNECT、OP_ACCEPT四种)是否满足条件,validOps方法在不同的Channel子类中有不同的实现:
SocketChannel中:
1 public final int validOps() { 2 return (SelectionKey.OP_READ 3 | SelectionKey.OP_WRITE 4 | SelectionKey.OP_CONNECT); 5 }
那么ops只要是上面三种状态的任意一种或者一种以上,再和validOps的结果运算都为0,若是其他值则抛出IllegalArgumentException异常;
ServerSocketChannel中:
1 public final int validOps() { 2 return SelectionKey.OP_ACCEPT; 3 }
和上面同理ServerSocketChannel在注册时,只能传入OP_ACCEPT状态。
回到AbstractSelectableChannel的register方法,接下来是对blocking成员的判断,
1 boolean blocking = true;
这是很重要的一步,因为NIO是既支持阻塞模式也支持非阻塞模式,但是若使用非阻塞模式,那么必然需要Selector的轮询,若是在注册Selector之前没有通过Channel调用configureBlocking方法设置为非阻塞模式,那么就会在此时注册时抛出IllegalBlockingModeException异常。
configureBlocking方法的实现也是在AbstractSelectableChannel中:
1 public final SelectableChannel configureBlocking(boolean block) 2 throws IOException { 3 synchronized (regLock) { 4 if (!isOpen()) 5 throw new ClosedChannelException(); 6 if (blocking == block) 7 return this; 8 if (block && haveValidKeys()) 9 throw new IllegalBlockingModeException(); 10 implConfigureBlocking(block); 11 blocking = block; 12 } 13 return this; 14 }
前两个判断逻辑都很简单,在Channel打开的情况下根据参数block设置阻塞或者非阻塞模式,注意到第二个判断说明重复设置相同的阻塞模式直接返回,而第三个判断则表明block 和blocking不相等,那么就是在之前设置为了非阻塞模式,而haveValidKeys则间接表明已经完成了注册,并且已经拥有了自己的SelectionKey集合,此时再设置为非阻塞模式就会引起IllegalBlockingModeException异常。
haveValidKeys方法:
1 private SelectionKey[] keys = null; 2 private int keyCount = 0; 3 4 private boolean haveValidKeys() { 5 synchronized (keyLock) { 6 if (keyCount == 0) 7 return false; 8 for (int i = 0; i < keys.length; i++) { 9 if ((keys[i] != null) && keys[i].isValid()) 10 return true; 11 } 12 return false; 13 } 14 }
逻辑比较简单,先检查keys的个数,为0直接返回没有可用的SelectionKey,接着遍历keys集合,找到一个可用的就返回true,其中isValid方法在AbstractSelectionKey中实现:
1 private volatile boolean valid = true; 2 3 public final boolean isValid() { 4 return valid; 5 }
可以看到在初始化时valid = true就代表自身是可用状态,当SelectionKey执行cancel方法撤销时或者在Channel关闭时的撤销都会改变:
1 public final void cancel() { 2 // Synchronizing "this" to prevent this key from getting canceled 3 // multiple times by different threads, which might cause race 4 // condition between selector's select() and channel's close(). 5 synchronized (this) { 6 if (valid) { 7 valid = false; 8 ((AbstractSelector)selector()).cancel(this); 9 } 10 } 11 }
Channel关闭时的撤销在后续的博客给出,这里先不讨论。
在configureBlocking中的implConfigureBlocking是一个抽象方法,具体的实现和使用的Channel有关,ServerSocketChannel和SocketChannel的实现分别是在ServerSocketChannelImpl和
SocketChannelImpl中,这两个的实现方式也是完全一样:
1 protected void implConfigureBlocking(boolean var1) throws IOException { 2 IOUtil.configureBlocking(this.fd, var1); 3 }
而IOUtil的configureBlocking方法是一个native方法,主要是对底层的操作,这里就不讨论了。
继续回到AbstractSelectableChannel的register方法,在对阻塞模式判断完毕后,调用findKey方法:
1 private SelectionKey findKey(Selector sel) { 2 synchronized (keyLock) { 3 if (keys == null) 4 return null; 5 for (int i = 0; i < keys.length; i++) 6 if ((keys[i] != null) && (keys[i].selector() == sel)) 7 return keys[i]; 8 return null; 9 } 10 }
在同步块中,首先判断keys是否初始化过,如果是第一次注册,那么keys必定为null,直接就返回null结束;否则已经注册过,则遍历keys这个SelectionKey集合,找的传入的Selector 持有的SelectionKey后直接返回该SelectionKey对象,若没找到则返回null;
接着对findKey方法的返回值k判断
若k不为null,则说明注册过这个Selector ,先调用interestOps方法,该方法是在SelectionKeyImpl中实现的:
1 public SelectionKey interestOps(int var1) { 2 this.ensureValid(); 3 return this.nioInterestOps(var1); 4 }
首先通过ensureValid检验当前的SelectionKey是否可用(没有被撤销,调用cancel方法会撤销):
1 private void ensureValid() { 2 if (!this.isValid()) { 3 throw new CancelledKeyException(); 4 } 5 }
比较简单,使用之前说过的isValid方法,检查当前SelectionKey是否可用
nioInterestOps方法:
1 public SelectionKey nioInterestOps(int var1) { 2 if ((var1 & ~this.channel().validOps()) != 0) { 3 throw new IllegalArgumentException(); 4 } else { 5 this.channel.translateAndSetInterestOps(var1, this); 6 this.interestOps = var1; 7 return this; 8 } 9 }
这个判断和一开始的register中的检查ops状态是否合法一样,若是合法需要调用Channel的translateAndSetInterestOps方法,同样不同的Channel有不同的实现:
SocketChannel是在SocketChannelImpl中实现的:
1 public void translateAndSetInterestOps(int var1, SelectionKeyImpl var2) { 2 int var3 = 0; 3 if ((var1 & 1) != 0) { 4 var3 |= Net.POLLIN; 5 } 6 7 if ((var1 & 4) != 0) { 8 var3 |= Net.POLLOUT; 9 } 10 11 if ((var1 & 8) != 0) { 12 var3 |= Net.POLLCONN; 13 } 14 15 var2.selector.putEventOps(var2, var3); 16 }
之前说过SelectionKey有四种状态:
1 public static final int OP_READ = 1 << 0; // 0 2 public static final int OP_WRITE = 1 << 2; // 4 3 public static final int OP_CONNECT = 1 << 3; // 8 4 public static final int OP_ACCEPT = 1 << 4; // 16
正如之前所说的SocketChannel只允许存在OP_READ、OP_WRITE 、OP_CONNECT 这三种状态,所以上面就根据这三种状态得到对应的POLL事件,最后给SelectionKey绑定的Selector设置POLL事件响应。
putEventOps的实现是在WindowsSelectorImpl中:
1 public void putEventOps(SelectionKeyImpl var1, int var2) { 2 Object var3 = this.closeLock; 3 synchronized(this.closeLock) { 4 if (this.pollWrapper == null) { 5 throw new ClosedSelectorException(); 6 } else { 7 int var4 = var1.getIndex(); 8 if (var4 == -1) { 9 throw new CancelledKeyException(); 10 } else { 11 this.pollWrapper.putEventOps(var4, var2); 12 } 13 } 14 } 15 }
还是一样若是Selector关闭则抛出异常,否则得到SelectionKey的index(在Selector中存储的SelectionKey数组的下标),判断下标的合法性,然后给pollWrapper设置事件响应,而pollWrapper的putEventOps方法是一个native方法,这里就不仔细讨论了。
pollWrapper是存放socket句柄fdVal和事件响应events的,用八个位来存储一对。
而ServerSocketChannel的translateAndSetInterestOps实现和上面一样,只不过只负责OP_ACCEPT 状态:
1 public void translateAndSetInterestOps(int var1, SelectionKeyImpl var2) { 2 int var3 = 0; 3 if ((var1 & 16) != 0) { 4 var3 |= Net.POLLIN; 5 } 6 7 var2.selector.putEventOps(var2, var3); 8 }
还是回到AbstractSelectableChannel的register方法中,interestOps调用结束后调用SelectionKey的attach方法:
1 private volatile Object attachment = null; 2 3 private static final AtomicReferenceFieldUpdater<SelectionKey,Object> 4 attachmentUpdater = AtomicReferenceFieldUpdater.newUpdater( 5 SelectionKey.class, Object.class, "attachment" 6 ); 7 8 public final Object attach(Object ob) { 9 return attachmentUpdater.getAndSet(this, ob); 10 }
这里直接使用了原子更新器对象来更新attachment 。
k不为null的情况解决了,接下来就是解决k为null的情况,即第一次注册,或者是再次注册没有找到和Selector对应的的SelectionKey。
首先在同步块内还是先检查Channel是否关闭,若没有关闭,调用AbstractSelector的register方法完成Selector对SelectionKey的注册:
而这个register方法的实现是在SelectorImpl中:
1 protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) { 2 if (!(var1 instanceof SelChImpl)) { 3 throw new IllegalSelectorException(); 4 } else { 5 SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this); 6 var4.attach(var3); 7 Set var5 = this.publicKeys; 8 synchronized(this.publicKeys) { 9 this.implRegister(var4); 10 } 11 12 var4.interestOps(var2); 13 return var4; 14 } 15 }
检查Channel类型是否符合,然后直接创建一个SelectionKeyImpl对象:
1 final SelChImpl channel; 2 public final SelectorImpl selector; 3 4 SelectionKeyImpl(SelChImpl var1, SelectorImpl var2) { 5 this.channel = var1; 6 this.selector = var2; 7 }
SelectionKeyImpl构造很简单,直接给两个成员赋值;然后调用SelectionKeyImpl对象的attach方法更新附件,接着在同步块中调用抽象方法implRegister
implRegister方法是在WindowsSelectorImpl中实现的:
1 protected void implRegister(SelectionKeyImpl var1) { 2 Object var2 = this.closeLock; 3 synchronized(this.closeLock) { 4 if (this.pollWrapper == null) { 5 throw new ClosedSelectorException(); 6 } else { 7 this.growIfNeeded(); 8 this.channelArray[this.totalChannels] = var1; 9 var1.setIndex(this.totalChannels); 10 this.fdMap.put(var1); 11 this.keys.add(var1); 12 this.pollWrapper.addEntry(this.totalChannels, var1); 13 ++this.totalChannels; 14 } 15 } 16 }
首先调用growIfNeeded方法,因为Selector选择器解决非阻塞,就是使用轮询的方式,它存储了一个SelectionKeyImpl数组,而SelectionKeyImpl记录了channel以及SelectionKey的状态,那么就是根据SelectionKey的状态和channel来完成通信。由于在服务端的时候需要和多个客户端连接,那么这个数组必定是动态维持的,所以就考虑到扩容。
1 private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[8]; 2 private int totalChannels = 1;
可以看到这个channelArray一开始固定初始化大小是8,而totalChannels 一开始就是1,这是为了方便后面的操作,channelArray 中下标为0的元素没用使用,直接从下标为1开始。
growIfNeeded方法:
1 private void growIfNeeded() { 2 if (this.channelArray.length == this.totalChannels) { 3 int var1 = this.totalChannels * 2; 4 SelectionKeyImpl[] var2 = new SelectionKeyImpl[var1]; 5 System.arraycopy(this.channelArray, 1, var2, 1, this.totalChannels - 1); 6 this.channelArray = var2; 7 this.pollWrapper.grow(var1); 8 } 9 10 if (this.totalChannels % 1024 == 0) { 11 this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, this.totalChannels); 12 ++this.totalChannels; 13 ++this.threadsCount; 14 } 15 16 }
因为totalChannels 是从1开始,所以直接判断totalChannels是否达到了数组长度,若已达到就需要扩容,可以看到每次扩容都是原来两倍,从原数组下标为1的地方开始一直到最后一个元素,拷贝到新数组下标为1的位置上,再更新channelArray,同时还要给pollWrapper扩容。
pollWrapper的grow方法:
1 void grow(int var1) { 2 PollArrayWrapper var2 = new PollArrayWrapper(var1); 3 4 for(int var3 = 0; var3 < this.size; ++var3) { 5 this.replaceEntry(this, var3, var2, var3); 6 } 7 8 this.pollArray.free(); 9 this.pollArray = var2.pollArray; 10 this.size = var2.size; 11 this.pollArrayAddress = this.pollArray.address(); 12 } 13 14 void replaceEntry(PollArrayWrapper var1, int var2, PollArrayWrapper var3, int var4) { 15 var3.putDescriptor(var4, var1.getDescriptor(var2)); 16 var3.putEventOps(var4, var1.getEventOps(var2)); 17 }
逻辑很简单,就是把原来的socket句柄fdVal和事件响应events复制到新的PollArrayWrapper对象中,且位置不变。
再回到growIfNeeded,可以看到第二个判断是检查totalChannels是否达到了1024的整数次方(totalChannels初始是1,排除0),若是则需要pollWrapper.addWakeupSocket(this.wakeupSourceFd, this.totalChannels)这个操作在WindowsSelectorImpl构造方法时也被调用:
1 WindowsSelectorImpl(SelectorProvider var1) throws IOException { 2 super(var1); 3 this.wakeupSourceFd = ((SelChImpl)this.wakeupPipe.source()).getFDVal(); 4 SinkChannelImpl var2 = (SinkChannelImpl)this.wakeupPipe.sink(); 5 var2.sc.socket().setTcpNoDelay(true); 6 this.wakeupSinkFd = var2.getFDVal(); 7 this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0); 8 }
addWakeupSocket方法:
1 void addWakeupSocket(int var1, int var2) { 2 this.putDescriptor(var2, var1); 3 this.putEventOps(var2, Net.POLLIN); 4 }
可以看到设置的事件响应是Net.POLLIN,其实就对应OP_READ,而这个wakeupSourceFd是初始化时就设置的wakeupPipe的source的描述符fdVal,即一开始建立的ServerSocketChannel端的SocketChannel(SourceChannel)的描述符fdVal,之前说过Selector的select方法是一个阻塞的操作,调用select方法时只有注册在Selector上的Channel有事件就绪时才会被唤醒;如果说有很多Channel注册了,但是只有一个Channel事件就绪,那么岂不是要做很多无用的轮询,而fdVal就是解决这个问题,实际上交给操作系统的轮询的是wakeupSourceFd,操作系统在轮询pollWrapper中的这些wakeupSourceFd描述符后就能知道哪些wakeupSourceFd上有事件就绪。
可以看到在growIfNeeded后面有一个++this.threadsCount操作,实际上Channel事件的轮询是交给线程来做的,WindowsSelectorImpl中有如下成员:
1 private int threadsCount = 0; 2 private final List<WindowsSelectorImpl.SelectThread> threads = new ArrayList();
SelectThread是Thread的子类,threadsCount记录轮询线程个数。
那么就有这种关系,在pollWrapper中,总是以wakeupSourceFd描述符开头,后面跟着1024个Channel的描述,再往后就又是这种形式;那么操作系统在轮询pollWrapper中的这些wakeupSourceFd知道哪些wakeupSourceFd上有事件就绪,进而得到pollWrapper中的wakeupSourceFd起始的偏移地址,每个线程只负责轮询1024个Channel的描述,哪个wakeupSourceFd上有事件就绪,就让负责的线程去轮询,这样就减少了不必要的轮询。
所以在totalChannels达到1024的整数次方时,需要增加新的轮询线程。
growIfNeeded方法结束,channelArray中增添新的SelectionKeyImpl,并且设置下标(呼应前面获取下标的操作),然后将SelectionKeyImpl存放在fdMap
fdMap保存的时Channel的描述符和SelectionKeyImpl的映射关系:
1 private static final class MapEntry { 2 SelectionKeyImpl ski; 3 long updateCount = 0L; 4 long clearedCount = 0L; 5 6 MapEntry(SelectionKeyImpl var1) { 7 this.ski = var1; 8 } 9 } 10 11 private static final class FdMap extends HashMap<Integer, WindowsSelectorImpl.MapEntry> { 12 static final long serialVersionUID = 0L; 13 14 private FdMap() { 15 } 16 17 private WindowsSelectorImpl.MapEntry get(int var1) { 18 return (WindowsSelectorImpl.MapEntry)this.get(new Integer(var1)); 19 } 20 21 private WindowsSelectorImpl.MapEntry put(SelectionKeyImpl var1) { 22 return (WindowsSelectorImpl.MapEntry)this.put(new Integer(var1.channel.getFDVal()), new WindowsSelectorImpl.MapEntry(var1)); 23 } 24 25 private WindowsSelectorImpl.MapEntry remove(SelectionKeyImpl var1) { 26 Integer var2 = new Integer(var1.channel.getFDVal()); 27 WindowsSelectorImpl.MapEntry var3 = (WindowsSelectorImpl.MapEntry)this.get(var2); 28 return var3 != null && var3.ski.channel == var1.channel ? (WindowsSelectorImpl.MapEntry)this.remove(var2) : null; 29 } 30 }
代码逻辑都很简单,就不详细介绍了。
接着调用keys的add方法,keys是父类SelectorImpl的成员:
1 protected HashSet<SelectionKey> keys = new HashSet();
接着调用pollWrapper的addEntry方法:
1 void addEntry(int var1, SelectionKeyImpl var2) { 2 this.putDescriptor(var1, var2.channel.getFDVal()); 3 }
可以看到仅仅是添加了channel的描述符fdVal,还没有设置事件响应,最后totalChannels自增implRegister方法结束。
回到SelectorImpl的register方法,在implRegister方法结束后,调用SelectionKeyImpl的interestOps方法,前面说过的,在此时设置了事件响应,最后返回SelectionKeyImpl对象赋给AbstractSelectableChannel方法中的k,之后调用addKey方法,返回k,register方法调用全部结束。
addKey方法:
1 private void addKey(SelectionKey k) { 2 assert Thread.holdsLock(keyLock); 3 int i = 0; 4 if ((keys != null) && (keyCount < keys.length)) { 5 // Find empty element of key array 6 for (i = 0; i < keys.length; i++) 7 if (keys[i] == null) 8 break; 9 } else if (keys == null) { 10 keys = new SelectionKey[3]; 11 } else { 12 // Grow key array 13 int n = keys.length * 2; 14 SelectionKey[] ks = new SelectionKey[n]; 15 for (i = 0; i < keys.length; i++) 16 ks[i] = keys[i]; 17 keys = ks; 18 i = keyCount; 19 } 20 keys[i] = k; 21 keyCount++; 22 }
逻辑很清晰,首先检查有没有没有使用的key,若存在,直接用k覆盖结束;若keys没有初始化大小为3的数组,先初始化keys,再将k放在下标为0的位置结束;若是keys已经初始化且keyCount == keys.length,就需要给keys扩容,并将原来的元素拷贝,最后将k放在新keys下标为keyCount的位置。
Channel的注册到此全部结束。
原文链接:https://www.cnblogs.com/a526583280/p/10880786.html
如有疑问请与原作者联系
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
- 国外程序员整理的Java资源大全(全部是干货) 2020-06-12
- 2020年深圳中国平安各部门Java中级面试真题合集(附答案) 2020-06-11
- 2020年java就业前景 2020-06-11
- 04.Java基础语法 2020-06-11
- Java--反射(框架设计的灵魂)案例 2020-06-11
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash