Tuesday, October 2, 2018

Executors and Futures in Java

This is part of an experiment. It is "code as blog". This entire blog post is just documented Java code.

package software.coop.know.future;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;


/**
 * In this class, we will look at the most common way to interact with Futures -- via executors.
 */
public class FuturesWithExecutors {

    public static void main(String... args) throws Exception {
       doExecutorService();
       doFutures();
    }

    /** This is just a utility method to sleep without a checked exception.
     *
     * @param ms Number of milliseconds to sleep.
     */
    private static void sleepWithoutException(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /** A look at the ExecutorService class and how it is used...
     *
     * @throws InterruptedException
     */
    private static void doExecutorService() throws InterruptedException {
        // An ExecutorService is a service that does work off the thread you call it from. They come in many forms, but
        // generally they have a pool of threads that pull units of work off a queue, execute them, then pull the next
        // one.
        //
        // There is also the the Executors class, which has some utility methods for quickly creating executor services.
        //
        // Let's start with the dead simple example...

        ExecutorService executorService = Executors.newSingleThreadExecutor();

        // This created an executor service with a single thread to do work. So if we do...

        System.out.println("Submitting Job 1 from " + Thread.currentThread().getName());
        executorService.submit(() -> {
            System.out.println("Starting Job 1 on " + Thread.currentThread().getName());
            sleepWithoutException(2000);
            System.out.println("Finishing Job 1");
        });
        System.out.println("Submitting Job 2 from " + Thread.currentThread().getName());
        executorService.submit(() -> {
            System.out.println("Starting Job 2 on " + Thread.currentThread().getName());
            sleepWithoutException(2000);
            System.out.println("Finishing Job 2");
        });
        System.out.println("Submitted Job 2");


        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);

        System.out.println("---------------------------------------------------------------------------------");

        // This will give us the following output :
        //
        // Submitting Job 1 from main          // #1 into the
        // Submitting Job 2 from main          // #1 is finished submitting
        // Starting Job 1 on pool-1-thread-1   // #1 begins running
        // Submitted Job 2                     // Since #1 is out of the queue and running, submit() for #2 completes
        // Finishing Job 1                     // #1 finishes
        // Starting Job 2 on pool-1-thread-1   // #2 begins running
        // Finishing Job 2                     // #2 finishes
        //
        // Now we have to do shutdown() and awaitTermination() to prevent the JVM from just shutting down on us. The
        // thread in the Executor is a daemon thread, which means it won't prevent the JVM from terminating when "main"
        // is done.
        //
        // shutdown() tells the Executor to stop accepting new work.
        // awaitTermination() waits for a given amount of time, blockingly, until all the jobs that are in the queue
        // have finished.
        //
        // This was perhaps the simplest example possible. Now lets look at perhaps the most complex...

        final AtomicInteger index = new AtomicInteger(0);
        executorService = new ThreadPoolExecutor(
                1,                              // a minimum number of threads.
                5,                              // a maximum number of threads
                2, TimeUnit.SECONDS,            // a time to wait before growing the pool
                new ArrayBlockingQueue<>(10),   // queue of tasks
                r -> {                          // a custom ThreadFactory
                    int thread = index.getAndIncrement();
                    System.out.println("Creating thread "+thread);
                    Thread t = new Thread(r);
                    t.setName("Custom Thread " + thread);
                    t.setDaemon(true);
                    return t;
                },
                new ThreadPoolExecutor.CallerRunsPolicy() // A policy for jobs that are rejected from the queue
                                                          // "CallerRunsPolicy" means that if you can't accept a new
                                                          // task, run it immediately on the calling thread.

        );

        for (int i = 0; i < 40; i++) {
            final int idx = i;
            executorService.submit(
                    () -> {
                        long runtime = 1900 + Math.round(Math.random() * 200D); // sleep for random time.
                        System.out.println("Starting " + idx + " for " + runtime + " ms on "
                                + Thread.currentThread().getName());
                        sleepWithoutException(runtime);
                        System.out.println("Finishing " + idx);
                    }
            );
            sleepWithoutException(100);
        }

        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
        System.out.println("---------------------------------------------------------------------------------");

        // This gives us output akin to the following...
        //
        // Creating thread 0                           < create the first thread
        // Starting 0 for 1986 ms on Custom Thread 0   < start task
        // Creating thread 1                           < grow the thread bool.
        // Starting 11 for 1928 ms on Custom Thread 1  < start the next task. Notice this ISN'T #1, it is the first job
        //                                               that won't fit in the queue
        // Creating thread 2                           < grow again.
        // Starting 12 for 2010 ms on Custom Thread 2
        // Creating thread 3
        // Starting 13 for 2001 ms on Custom Thread 3
        // Creating thread 4                           < Max thread pool size
        // Starting 14 for 2074 ms on Custom Thread 4
        // Starting 15 for 1999 ms on main             < Since we are now at max threads, and the queue is full,
        //                                               job 15 executes on the "main" thread inline with our our call
        //                                               to submit it.
        // Finishing 0
        // Starting 1 for 2042 ms on Custom Thread 0   < We just now pull the second job off the queue
        // Finishing 11
        // Starting 2 for 1993 ms on Custom Thread 1

        // As you can see, jobs are not necessarily executed in a FIFO manner, especially if you have a variable sized
        // thread pool.
    }

    /** Using Futures with Executors
     *
     */
    private static void doFutures() throws InterruptedException {
        // In the previous example, we looked entirely at submitting "Runnables" to our ExecutorService. But sometimes,
        // you want to get a result back from a task running off thread. Let's look at that.
        System.out.println("doFutures() ---------------------------------------------------------------------");
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        List<Double> doubles = Arrays.asList( 0D, 1D, 2D, 3D, 4D, 5D);

        List<Future<String>> futures = doubles.stream()
                .map(d-> executorService.submit(()->{
                        long runtime =  Math.round(Math.random() * 2000D);
                        System.out.println("Running on "+Thread.currentThread().getName()+ " for "+runtime);
                        sleepWithoutException(runtime);
                        return Double.toString(d * Math.PI) +" from "+Thread.currentThread().getName();
                    })
                ).collect(Collectors.toList());
        futures.forEach(future -> {
            try {
                System.out.println(future.get());
            } catch (ExecutionException |InterruptedException e) {
                e.printStackTrace();
            }
        });

        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
        System.out.println("---------------------------------------------------------------------------------");

        // This gives us the output:
        //
        // Running on pool-1-thread-2 for 695
        // Running on pool-1-thread-1 for 173
        // 0.0 from pool-1-thread-1
        // Running on pool-1-thread-1 for 1135
        // 3.141592653589793 from pool-1-thread-2
        // Running on pool-1-thread-2 for 915
        // 6.283185307179586 from pool-1-thread-1
        // Running on pool-1-thread-1 for 316
        // 9.42477796076938 from pool-1-thread-2
        // Running on pool-1-thread-2 for 409
        // 12.566370614359172 from pool-1-thread-1
        // 15.707963267948966 from pool-1-thread-2

        // You can see that this is obviously in order from our list of Doubles, but it is also "As Fast As Possible"
        // with two threads. Why? Because despite the fact that our execution time varies wildly, we iterate over the
        // mapped Futures in order. That a job down the queue finished before a previous one doesn't stop the
        // ExecutorService from continuing to run. The value of the Callable is contained in the Future. So if a later
        // one finished before the one we are waiting on, it is just a long pole small pole problem.

        // In all of the examples so far, we have created an ExcecutorService to control threads, or queue size or
        // whatever. Java does have a default one we can use that should have some reasonable defaults:
        // The ForkJoinPool.

        // The ForkJoinPool is used when you use language-level parallelism. For example:

        DoubleStream.of(1D, 2D, 3D, 4D, 5D).parallel()
                .forEach(d-> {
                    sleepWithoutException(100);
                    System.out.println( d + " from "+Thread.currentThread().getName());
                });
        System.out.println("---------------------------------------------------------------------------------");

        // This gives us something like:
        //
        // 5.0 from ForkJoinPool.commonPool-worker-2
        // 2.0 from ForkJoinPool.commonPool-worker-1
        // 4.0 from ForkJoinPool.commonPool-worker-4
        // 3.0 from main
        // 1.0 from ForkJoinPool.commonPool-worker-3

        // 3.0 on Main? Why? Who knows. This is Java making a guess about the pool size based on the number of cores on
        // my machine (8) and whatever other heuristic it uses.

        // The important think here is you can get at this "generic" executor service the same way .parallel() does...

        ForkJoinPool forkJoin = ForkJoinPool.commonPool();
        List<Future<Double>> futureDoubles = new ArrayList<>(20);
        for(double d = 0; d < 20D; d++) {
            double finalD = d;
            futureDoubles.add(forkJoin.submit(() -> {
                sleepWithoutException(2000);
                System.out.println("Computing on " + Thread.currentThread().getName());
                return finalD * Math.PI;
            }));
        }
        futureDoubles.forEach(f-> {
            try {
                System.out.println(f.get());
            } catch (ExecutionException|InterruptedException e) {
                e.printStackTrace();
            }
        });

        System.out.println("---------------------------------------------------------------------------------");

        // This gives us something like:

        // Computing on ForkJoinPool.commonPool-worker-5
        // Computing on ForkJoinPool.commonPool-worker-6
        // 0.0
        // Computing on ForkJoinPool.commonPool-worker-4
        // Computing on ForkJoinPool.commonPool-worker-2
        // Computing on ForkJoinPool.commonPool-worker-3
        // Computing on ForkJoinPool.commonPool-worker-1
        // Computing on ForkJoinPool.commonPool-worker-7
        // 3.141592653589793
        // 6.283185307179586
        // 9.42477796076938

    }
}

No comments:

Post a Comment