Executor Framework
2018-10-03 17:57:38来源:博客园 阅读 ()
Why?
look at the following 2 pieces of code for implementing a simple web server based on socket, can you point out the problems(I put them in the comments)?
/*
* Single thread. bad response time and throughout(CPU idle). Think about how it will block(read from/ write to socket, perform I/O operation
* like File system , DB, Network...)
*
*/
class SingleThreadWebServer { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { Socket connection = socket.accept(); handleRequest(connection); } } } /////////////////////////////////////////////////////////////
/*
* Multiple threads. will get good response time and throughout.
* Problems:
* 1. Creating a thread for each request. Creating thread will consume resource(cpu time / memory).
* 2. one Thead is only for one request.(not reused).
* 3. in a heavy concurrent load environment, JVM will have pressure for GC or even get OOM exception.
*/
class ThreadPerTaskWebServer { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { final Socket connection = socket.accept(); Runnable task = new Runnable() { public void run() { handleRequest(connection); } }; new Thread(task).start(); } } }
Java Executor Framework
The framework has a flexible design to run tasks asynchronously, decouple the task submission and execution based on producer/consumer pattern, provide mechanisms to manage the lifecycle of the service and tasks.
The Executor Interface
It is simple with only one method signature below:
public interface Executor { void execute(Runnable command); }
Task/command can run in a new thread, in a thread pool or in the calling thread depending on the implementations of the Executor. For example:
public class MyExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); // run in a new thread r.run(); //run in the calling thread. }; }
Runnable & Callable
They both are interfaces in different packages. But Callable's call method has a return value and throws a checked Exception. See checked exception and runtime exception here which is interesting. Task usually needs a return value like query database, perform a calculation. The Executor interface provides a very basic task submission where a task is a Runnable implementation. So, if you do not care about the result and exception handling, just use Executor.execute(Runnable). Regarding how Callable is called in a thread, we will talk later in this page.
The ExecutorService Interface
JVM cannot exit until all the nondaemon threads have terminated. So, failing to shut down the executor could prevent JVM from exiting.
Executor runs task asynchronously, at any given time the state of previously submitted tasks is not immediately obvious. Some may have finished, some may be concurrently running and some may be in the queue awaiting execution. Since Executors(those implements) provide a service to the application, they should be able to be shut down as well, both gracefully(like do not accept new tasks) and abruptly(like power off) and feedback application with information about the status of the tasks that affected by the shutdown.
The interface of this mechanism is ExecutorService which extends the Executor interface, adding some new methods to provide lifecycle management for the executor/service and methods to facilitate the task submission. For example, you can use ExecutorService.submit(Runnable) if you do not want a return result or use ExecutorService.submit(Runnable,T) to get a result if the runnable is completed successfully.
"An Executor that provides methods to manage termination and methods that can produce a Future for tracking the progress of one or more asynchronous tasks."
The shutdown
method will allow previously submitted tasks to execute before terminating, while the shutdownNow
method prevents waiting tasks from starting and attempts to stop currently executing tasks. The executing tasks can be terminated/stopped if it wants to be by properly handling the InterruptedException which is caused by the(another) executor who interrupts/cancels it(invoking the the interrupt method of the target thread).
Upon termination, an executor has no tasks actively executing, no tasks awaiting execution, and no new tasks can be submitted. An unused ExecutorService
should be shut down to allow reclamation of its resources.
Method submit
extends base method Executor.execute(Runnable)
by creating and returning a Future
that can be used to cancel execution and/or wait for completion. Methods invokeAny
and invokeAll
perform the most commonly useful forms of bulk execution, executing a collection of tasks and then waiting for at least one, or all, to complete. (Class ExecutorCompletionService
can be used to write customized variants of these methods.)
Future, FutureTask
Future as an interface represents the lifecycle of a task and provides methods to test whether the task has completed or been canceled, retrieve its result, and cancel the task.
cancel():
Attempts to cancel execution of this task. This attempt will fail if the task has already completed, has already been cancelled, or could not be cancelled for some other reason. If successful, and this task has not started when cancel
is called, this task should never run(the task, in this case, is not picked from the queue). If the task has already started, then the mayInterruptIfRunning
parameter determines whether the thread executing this task should be interrupted in an attempt to stop the task. If it is true and the task properly handles the InterruptException for cancelling, the cancel will be sucessful. If it is false, the in-progress tasks are allowed to complete.
After this method returns, subsequent calls to isDone
will always return true
. Subsequent calls to isCancelled
will always return true
if this method returned true
.
In the Executor framework, tasks that have been submitted but not yet started can always be cancelled, and tasks that have started can sometimes be cancelled if they are responsive to interruption.Cancelling a task that has already completed has no effect.
get():
throws InterruptedException, ExecutionException,CancellationException. The another version also throws TimeoutException
Waits(blocked) if necessary for the computation to complete, and then retrieves its result. It throws checked exception.
ExecutionException is the wrapper of all other exceptions including runtime exception, customized exception, program error..., use e.getCause to get the specific exception.
IsCanclelled:
Returns true
if this task was cancelled before it completed normally.
IsDone:
Returns true if this task completed.Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
1 public interface Callable < V > { 2 V call() throws Exception; 3 } 4 public interface Future < V > { 5 boolean cancel(boolean mayInterruptIfRunning); 6 boolean isCancelled(); 7 boolean isDone(); 8 V get() throws InterruptedException,ExecutionException,CancellationException; 11 V get(long timeout, TimeUnit unit) throws InterruptedException,ExecutionException,CancellationException,TimeoutException; 16 }
Here is a question, How Callable as a task run in thread(consumer) in ExecutorService ? As we know, the java Thread Model needs a Runnable (target) implementing the run method, how this work? When submitting a task by ExecutorService.submit(Callable<T>), in the code(AbstractExecutorService), it wrapper the Callable into a FutureTask class in newTaskFor(Callable) method. FutureTask implements the RunnableFuture. As the name tells us, RunnableFuture implements both Runnable and Future. So, in the run method of FutureTask, it invokes the Callable.call() method and set the result of the task(FutureTask).
run method in class FutureTask and the wrapper
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); }
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
Execution policy and Thread pool
Because the framework decouples the task submission and execution, it support the execution policy which specify the "what, where, when and how" of task execution,including:
? In what thread will tasks be executed?
? In what order should tasks be executed (FIFO, LIFO, priority order)?
? How many tasks may execute concurrently?
? How many tasks may be queued pending execution?
? If a task has to be rejected because the system is overloaded, which task should be selected as the victim, and how should the application be notified?
? What actions should be taken before or after executing a task?
You can use execution policy to match your system resource.
Thread pool, uses a work queue to hold tasks waiting to be executed. The work thread takes next task from the queue, execute it, then back to the pool for another task.
Below is the new version of the web server demo using thread pool. What's the benefits?
1) Reused threads can save the time to create/destroy thread repeatedly, increasing response time.
2) You can have the proper size of the threads to keep cpu busy but not too many of them.
1 class TaskExecutionWebServer { 2 private static final int NTHREADS = 100; 3 private static final Executor exec = Executors.newFixedThreadPool(NTHREADS); 4 public static void main(String[] args) throws IOException { 5 ServerSocket socket = new ServerSocket(80); 6 while (true) { 7 final Socket connection = socket.accept(); 8 Runnable task = new Runnable() { 9 public void run() { 10 handleRequest(connection); 11 } 12 }; 13 exec.execute(task); 14 } 15 } 16 }
You can create a thread pool by calling one of the static factory methods in Executors provide by the java library:
- newFixedThreadPool
"Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks.If additional tasks are submitted when all threads are active,they will wait in the queue until a thread is available.If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown." - newCachedThreadPool
Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available. These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks. Calls to execute will reuse previously constructed threads if available. If no existing thread is available, a new thread will be created and added to the pool. Threads that have not been used for sixty seconds are terminated and removed from the cache. Thus, a pool that remains idle for long enough will not consume any resources. Note that pools with similar properties but different details (for example, timeout parameters)may be created using ThreadPoolExecutor constructors. -
newSingleThreadExecutor
A single-threaded executor creates a single worker thread to process tasks, replacing it if it dies unexpectedly. Tasks are guaranteed to be processed sequentially according to the order imposed by the task queue (FIFO, LIFO, priority order). -
newScheduledThreadPool
Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically, similar to Timer.
You can also create your custom pool with work queue to implement FIFO,LIFO, priority order. For the web server example, using the pool-based policy will no longer fail under heavy load as it does not create too many threads. But as the work queue is unbounded, if the speed of task execution is much slower than the task comes(producer is much faster than consumer), it can still fail. We can add a bounded work queue to the pool if needed.
Examples/Applications
- Shutdown the ExecutorService. Below is the logic of in what situation task will be rejected
1 class LifecycleWebServer { 2 private final ExecutorService exec = ...; 3 public void start() throws IOException { 4 ServerSocket socket = new ServerSocket(80); 5 while (!exec.isShutdown()) { 6 try { 7 final Socket conn = socket.accept(); 8 exec.execute(new Runnable() { 9 public void run() { 10 handleRequest(conn); 11 } 12 }); 13 } catch (RejectedExecutionException e) { 14 //when the Executor has been shut down, and also when the Executor uses finite bounds for both maximum threads and work queue capacity,
//and is saturated. 15 if (!exec.isShutdown()) 16 log("task submission rejected", e); 17 } 18 } 19 } 20 public void stop() { 21 exec.shutdown(); 22 } 23 void handleRequest(Socket connection) { 24 Request req = readRequest(connection); 25 if (isShutdownRequest(req)) 26 stop(); 27 else 28 dispatchRequest(req); 29 } 30 }
- Delayed and periodic tasks
Use ScheduledThreadPoolExecutor replace Time/TimerTask class. If you need to build your own scheduling service, you may still be able to take advantage of the library by using a DelayQueue, a BlockingQueue implementation that provides the scheduling functionality of ScheduledThreadPoolExecutor. A DelayQueue manages a collection of Delayed objects. A Delayed has a delay time associated with it: DelayQueue lets you take an element only if its delay has expired. Objects are returned from a DelayQueue ordered by the time associated with their delay
See demo here - ExecutorCompletionService
What if you submit many tasks and need to get the result immediately if available. Using get method, to repeatedly poll the result with timeout zero is not a good idea. CompletionService combines the Executor and BlockingQueue. It delegates the task execution to Executor, put the Future to the BlockingQueue once it is done by overriding the done() method of Future/FutureTask. You can use the service.take() method to get the Future(finished) from the queue once there is one available and then call get method for the result. So, in CompletionService, it will wrapper task as QueueingFuture.
1 for (int t = 0, n = info.size(); t < n; t++) { 2 Future < ImageData > f = completionService.take(); 3 ImageData imageData = f.get(); 4 renderImage(imageData); 5 }
By calling completionService.take(), you get the result soon once any of the tasks is completed.
- Placing time limits on tasks. Sometimes, we want to cancel tasks due to a timeout setting.
1 Page renderPageWithAd() throws InterruptedException { 2 long endNanos = System.nanoTime() + TIME_BUDGET; 3 Future < Ad > f = exec.submit(new FetchAdTask()); 4 // Render the page while waiting for the ad 5 Page page = renderPageBody(); 6 Ad ad; 7 try { 8 // Only wait for the remaining time budget 9 long timeLeft = endNanos - System.nanoTime(); 10 ad = f.get(timeLeft, NANOSECONDS); 11 } catch (ExecutionException e) { 12 ad = DEFAULT_AD; 13 } catch (TimeoutException e) { 14 ad = DEFAULT_AD; 15 f.cancel(true); 16 } 17 page.setAd(ad); 18 return page; 19 }
If you have a list of tasks, you can use a more convenient way of following:
1 private class QuoteTask implements Callable < TravelQuote > { 2 private final TravelCompany company; 3 private final TravelInfo travelInfo; 4 ... 5 public TravelQuote call() throws Exception { 6 return company.solicitQuote(travelInfo); 7 } 8 } 9 public List < TravelQuote > getRankedTravelQuotes(TravelInfo travelInfo, Set < TravelCompany > companies, 11 Comparator < TravelQuote > ranking, long time, TimeUnit unit) 12 throws InterruptedException { 13 List < QuoteTask > tasks = new ArrayList < QuoteTask > (); 14 for (TravelCompany company: companies) tasks.add(new QuoteTask(company, travelInfo)); 16 List < Future < TravelQuote >> futures = exec.invokeAll(tasks, time, unit); 18 List < TravelQuote > quotes = new ArrayList < TravelQuote > (tasks.size()); 20 Iterator < QuoteTask > taskIter = tasks.iterator(); 21 for (Future < TravelQuote > f: futures) { 22 QuoteTask task = taskIter.next(); 23 try { 24 quotes.add(f.get()); 25 } catch (ExecutionException e) { 26 quotes.add(task.getFailureQuote(e.getCause())); 27 } catch (CancellationException e) { 28 quotes.add(task.getTimeoutQuote(e)); 29 } 30 } 31 Collections.sort(quotes, ranking); 32 return quotes; 33 }
Reference:
Java 8 API doc
Book JCIP
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
- 这也太难了,2020年只有外包公司给面试机会,Why? 2020-06-02
- 阿里面试官鬼得很,问我为什么他们要禁用Executors创建线程 2020-05-18
- IDEA 创建Spring项目后org.springframework.boot报错 2020-05-14
- Java—线程池ThreadPoolExecutor案例详解,高薪必备 2020-05-04
- 如何让ThreadPoolExecutor更早地创建非核心线程 2020-04-28
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash