netty之NioEventLoopGroup源码分析二
2019-04-28 08:22:41来源:博客园 阅读 ()
大家好,今天我准备死磕NioEventLoopGroup的源码,首先讲下概念,NioEventLoopGroup 它是一个线程池,存放NioEventLoop,一个数组,今天打算先看下这行代码的初始化
EventLoopGroup bossGroup = new NioEventLoopGroup();
一、初始化
1、时序图:
2.类的关系
3.源码说明:
步骤1:
1 EventLoopGroup bossGroup = new NioEventLoopGroup(); 2 //调用链1 3 public NioEventLoopGroup() { 4 this(0); 5 } 6 //调用链2 7 public NioEventLoopGroup(int nThreads) { 8 this(nThreads, (Executor) null); 9 } 10 //调用链3 11 public NioEventLoopGroup(int nThreads, Executor executor) { 12 this(nThreads, executor, SelectorProvider.provider()); 13 } 14 //调用链4 15 public NioEventLoopGroup( 16 int nThreads, Executor executor, final SelectorProvider selectorProvider) { 17 super(nThreads, executor, selectorProvider); 18 }
如上四个方法是NioEventLoopGroup类的四个构造函数,首先会执行第3行的无参构造函数,然后调用第7行的构造函数,再继续调用第11行的构造函数,这里需要获取一个SelectorProvider,见步骤2,再继续调用第15行的构造函数,这里内部实现调用父类MultithreadEventLoopGroup的构造函数。
步骤2:
1 public static SelectorProvider provider() { 2 synchronized (lock) { 3 if (provider != null) 4 return provider; 5 return AccessController.doPrivileged( 6 new PrivilegedAction<SelectorProvider>() { 7 public SelectorProvider run() { 8 if (loadProviderFromProperty()) 9 return provider; 10 if (loadProviderAsService()) 11 return provider; 12 provider = sun.nio.ch.DefaultSelectorProvider.create(); 13 return provider; 14 } 15 }); 16 } 17 }
这里SelectorProvider的静态方法provider获取一个SelectorProvider,全局唯一的,有加锁,依赖底层系统的实现provider不一样,后续另开专题分析;
步骤3:
1 protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { 2 super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); 3 }
如上是MultithreadEventLoopGroup的构造函数,内部又调用了父类MultithreadEventExecutorGroup的构造函数,继续往下看;
步骤4:
1 protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { 2 if (nThreads <= 0) { 3 throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); 4 } 5 6 if (executor == null) { 7 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); 8 } 9 10 children = new EventExecutor[nThreads]; 11 for (int i = 0; i < nThreads; i ++) { 12 boolean success = false; 13 try { 14 children[i] = newChild(executor, args); 15 success = true; 16 } catch (Exception e) { 17 // TODO: Think about if this is a good exception type 18 throw new IllegalStateException("failed to create a child event loop", e); 19 } finally { 20 if (!success) { 21 for (int j = 0; j < i; j ++) { 22 children[j].shutdownGracefully(); 23 } 24 25 for (int j = 0; j < i; j ++) { 26 EventExecutor e = children[j]; 27 try { 28 while (!e.isTerminated()) { 29 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); 30 } 31 } catch (InterruptedException interrupted) { 32 Thread.currentThread().interrupt(); 33 break; 34 } 35 } 36 } 37 } 38 } 39 40 final FutureListener<Object> terminationListener = new FutureListener<Object>() { 41 @Override 42 public void operationComplete(Future<Object> future) throws Exception { 43 if (terminatedChildren.incrementAndGet() == children.length) { 44 terminationFuture.setSuccess(null); 45 } 46 } 47 }; 48 49 for (EventExecutor e: children) { 50 e.terminationFuture().addListener(terminationListener); 51 } 52 53 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); 54 Collections.addAll(childrenSet, children); 55 readonlyChildren = Collections.unmodifiableSet(childrenSet); 56 }
如上是核心实现的构造函数,代码比较多,我们分段来看下都做了哪些事?
为了方便查看,把代码又粘贴了一遍
if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); }
判断线程数是否小于等于0,否则抛出异常。
if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); }
第6-8行,会调用子类MultithreadEventLoopGroup的newDefaultThreadFactory方法生成ThreadFactory,这里涉及到java继承初始化的知识,不是调用MultithreadEventExecutorGroup的newDefaultThreadFactory方法,需要注意一下;
//调用链1,此方法是MultithreadEventLoopGroup类的
1 protected ThreadFactory newDefaultThreadFactory() { 2 return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY); 3 } 4 5 //调用链2 6 public DefaultThreadFactory(Class<?> poolType, int priority) { 7 this(poolType, false, priority); 8 } 9 //调用链3 10 public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) { 11 this(toPoolName(poolType), daemon, priority); 12 } 13 14 //调用链4 15 private static String toPoolName(Class<?> poolType) { 16 if (poolType == null) { 17 throw new NullPointerException("poolType"); 18 } 19 String poolName; 20 Package pkg = poolType.getPackage(); 21 if (pkg != null) { 22 poolName = poolType.getName().substring(pkg.getName().length() + 1); 23 } else { 24 poolName = poolType.getName(); 25 } 26 27 switch (poolName.length()) { 28 case 0: 29 return "unknown"; 30 case 1: 31 return poolName.toLowerCase(Locale.US); 32 default: 33 if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) { 34 return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1); 35 } else { 36 return poolName; 37 } 38 } 39 } 40 41 //调用链5 42 public DefaultThreadFactory(String poolName, boolean daemon, int priority) { 43 if (poolName == null) { 44 throw new NullPointerException("poolName"); 45 } 46 if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { 47 throw new IllegalArgumentException( 48 "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)"); 49 } 50 51 prefix = poolName + '-' + poolId.incrementAndGet() + '-'; 52 this.daemon = daemon; 53 this.priority = priority; 54 }
如上依次从调用链到调用链5,完成DefaultThreadFactory初始化;
再继续初始化ThreadPerTaskExecutor类
1 public ThreadPerTaskExecutor(ThreadFactory threadFactory) { 2 if (threadFactory == null) { 3 throw new NullPointerException("threadFactory"); 4 } 5 this.threadFactory = threadFactory; 6 }
这里就是new 一个线程工程;
1 children = new EventExecutor[nThreads]; 2 for (int i = 0; i < nThreads; i ++) { 3 boolean success = false; 4 try { 5 children[i] = newChild(executor, args); 6 success = true; 7 } catch (Exception e) { 8 // TODO: Think about if this is a good exception type 9 throw new IllegalStateException("failed to create a child event loop", e); 10 } finally { 11 if (!success) { 12 for (int j = 0; j < i; j ++) { 13 children[j].shutdownGracefully(); 14 } 15 16 for (int j = 0; j < i; j ++) { 17 EventExecutor e = children[j]; 18 try { 19 while (!e.isTerminated()) { 20 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); 21 } 22 } catch (InterruptedException interrupted) { 23 Thread.currentThread().interrupt(); 24 break; 25 } 26 } 27 } 28 } 29 }
上面初始化了8个(默认,可修改)NioEventLoop, NioEventLoop另外专题分析;
final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } };
创建了一个监听器
for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); }
这里循环调用增加这个监听器;
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
这里创建一个LinkedHashSet,在把children中的元素全部加进去,然后调用unmodifiableSet,返回一个只读的Chidren,readOnlyChildren;
好了,暂时先分析这么多。
原文链接:https://www.cnblogs.com/mythling/p/10778927.html
如有疑问请与原作者联系
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
上一篇:SpringMVC框架
- 你说研究过Spring里面的源码,循环依赖你会么? 2020-06-09
- 这可能是目前最透彻的Netty讲解了... 2020-06-08
- 通俗理解spring源码(六)—— 默认标签(import、alias、be 2020-06-07
- 学习源码的第八个月,我成了Spring的开源贡献者 2020-06-02
- java 在线网络考试系统源码 springboot mybaits vue.js 前后 2020-05-31
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash