001/* *********************************************************************** *
002 * project: org.matsim.*
003 * *********************************************************************** *
004 *                                                                         *
005 * copyright       : (C) 2018 by the members listed in the COPYING,        *
006 *                   LICENSE and WARRANTY file.                            *
007 * email           : info at matsim dot org                                *
008 *                                                                         *
009 * *********************************************************************** *
010 *                                                                         *
011 *   This program is free software; you can redistribute it and/or modify  *
012 *   it under the terms of the GNU General Public License as published by  *
013 *   the Free Software Foundation; either version 2 of the License, or     *
014 *   (at your option) any later version.                                   *
015 *   See also COPYING, LICENSE and WARRANTY file                           *
016 *                                                                         *
017 * *********************************************************************** */
018
019package org.matsim.contrib.util;
020
021import java.util.Collection;
022import java.util.List;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.ExecutorService;
025import java.util.concurrent.Executors;
026import java.util.concurrent.Future;
027import java.util.concurrent.LinkedBlockingQueue;
028import java.util.stream.Collectors;
029import java.util.stream.Stream;
030
031import com.google.common.util.concurrent.Futures;
032
033/**
034 * @author michalm
035 */
036public class ExecutorServiceWithResource<R> {
037        public interface CallableWithResource<V, R> {
038                V call(R resource) throws Exception;
039        }
040
041        public interface RunnableWithResource<R> {
042                void run(R resource);
043        }
044
045        private final BlockingQueue<R> resourceQueue = new LinkedBlockingQueue<>();
046        private final ExecutorService executorService;
047
048        public ExecutorServiceWithResource(Collection<R> resources) {
049                resourceQueue.addAll(resources);
050                executorService = Executors.newFixedThreadPool(resourceQueue.size());
051        }
052
053        public <V> Future<V> submitCallable(CallableWithResource<V, R> task) {
054                return executorService.submit(() -> {
055                        R resource = resourceQueue.remove();
056                        V value = task.call(resource);
057                        resourceQueue.add(resource);
058                        return value;
059                });
060        }
061
062        public <V> List<Future<V>> submitCallables(Stream<CallableWithResource<V, R>> tasks) {
063                return tasks.map(t -> submitCallable(t)).collect(Collectors.toList());
064        }
065
066        public <V> List<V> submitCallablesAndGetResults(Stream<CallableWithResource<V, R>> tasks) {
067                return submitCallables(tasks).stream().map(Futures::getUnchecked).collect(Collectors.toList());
068        }
069
070        public Future<?> submitRunnable(RunnableWithResource<R> task) {
071                return executorService.submit(() -> {
072                        R resource = resourceQueue.remove();
073                        task.run(resource);
074                        resourceQueue.add(resource);
075                });
076        }
077
078        public List<Future<?>> submitRunnables(Stream<RunnableWithResource<R>> tasks) {
079                return tasks.map(t -> submitRunnable(t)).collect(Collectors.toList());
080        }
081
082        public void submitRunnablesAndWait(Stream<RunnableWithResource<R>> tasks) {
083                submitRunnables(tasks).forEach(Futures::getUnchecked);
084        }
085
086        public void shutdown() {
087                executorService.shutdown();
088        }
089}