Recount the Pi digits by Java Streams

Hi, There:

A while back, I used CyclicBarrier to count Billons of digit of Pi. https://tonyyan.wordpress.com/2017/09/28/mimic-mapreduce-using-cyclicbarrier-to-count-a-billion-digits-of-pi/ Today, let’s try to use a much simpler way to do similar things, by Java Stream. Here is the code. The idea is to map the bytes read into a count Map of each digits from 0-9, and then reduce to a single Map of the same. Indeed, the stream API made the code very concise and easy to understand.

Another key benefit is that we can use parallel Stream and custom forked threads to tune the optimal thread numbers. The multi-threading is totally under the hood.

public class CountPi {

    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {

        long start = System.currentTimeMillis();
        List<byte[]> PiList = readPi(new File("src/main/resources/pi-billion.txt"));

        System.out.println("read time " + (System.currentTimeMillis() - start));
        start = System.currentTimeMillis();

        ForkJoinPool customThreadPool = new ForkJoinPool(32);

        Optional<Map<Byte, Integer>> c = customThreadPool.submit(
                () ->PiList.parallelStream().map(b -> countDigit(b)).reduce((m, n) -> mergeMap(m,n)))
                .get();

        System.out.println(c);
        System.out.println("stream time " + (System.currentTimeMillis() - start));

    }

    private static List<byte[]> readPi(File f) throws IOException {

        InputStream input = new FileInputStream(f);
        List<byte[]> PiList = new ArrayList<>();
        byte[] buffer = new byte[1024*1024];
        int data = input.read(buffer);
        while(data != -1) {
            PiList.add(buffer);
            data = input.read(buffer);
        }
        input.close();
        return PiList;

    }
    private static Map<Byte, Integer> countDigit(byte[] bytes){
        Map<Byte, Integer> map = new HashMap<>();
        for (byte b:bytes) map.put(b, map.getOrDefault(b,0)+1);
        return map;
    }

    private static Map<Byte, Integer> mergeMap(Map<Byte, Integer> m, Map<Byte, Integer> n){
        for (Byte b:n.keySet()){
            m.put(b, m.getOrDefault(b,0) + n.getOrDefault(b,0));
        }
        return m;
    }
}

Sample output:

read time 1315
Optional[{48=99304722, 49=100393236, 50=100106082, 51=99930546, 52=100190988, 53=99815112, 54=99920052, 55=100076508, 56=100064106, 57=100540152}]
stream time 6891

Looks like the ROI diminishes when the thread count is more than 4, due to overheads of threading cost.

Cheers!

~T

Leave a comment