001/* *********************************************************************** * 002 * project: org.matsim.* 003 * *********************************************************************** * 004 * * 005 * copyright : (C) 2018 by the members listed in the COPYING, * 006 * LICENSE and WARRANTY file. * 007 * email : info at matsim dot org * 008 * * 009 * *********************************************************************** * 010 * * 011 * This program is free software; you can redistribute it and/or modify * 012 * it under the terms of the GNU General Public License as published by * 013 * the Free Software Foundation; either version 2 of the License, or * 014 * (at your option) any later version. * 015 * See also COPYING, LICENSE and WARRANTY file * 016 * * 017 * *********************************************************************** */ 018 019package org.matsim.contrib.util; 020 021import java.util.Collection; 022import java.util.List; 023import java.util.concurrent.BlockingQueue; 024import java.util.concurrent.ExecutorService; 025import java.util.concurrent.Executors; 026import java.util.concurrent.Future; 027import java.util.concurrent.LinkedBlockingQueue; 028import java.util.stream.Collectors; 029import java.util.stream.Stream; 030 031import com.google.common.util.concurrent.Futures; 032 033/** 034 * @author michalm 035 */ 036public class ExecutorServiceWithResource<R> { 037 public interface CallableWithResource<V, R> { 038 V call(R resource) throws Exception; 039 } 040 041 public interface RunnableWithResource<R> { 042 void run(R resource); 043 } 044 045 private final BlockingQueue<R> resourceQueue = new LinkedBlockingQueue<>(); 046 private final ExecutorService executorService; 047 048 public ExecutorServiceWithResource(Collection<R> resources) { 049 resourceQueue.addAll(resources); 050 executorService = Executors.newFixedThreadPool(resourceQueue.size()); 051 } 052 053 public <V> Future<V> submitCallable(CallableWithResource<V, R> task) { 054 return executorService.submit(() -> { 055 R resource = resourceQueue.remove(); 056 V value = task.call(resource); 057 resourceQueue.add(resource); 058 return value; 059 }); 060 } 061 062 public <V> List<Future<V>> submitCallables(Stream<CallableWithResource<V, R>> tasks) { 063 return tasks.map(t -> submitCallable(t)).collect(Collectors.toList()); 064 } 065 066 public <V> List<V> submitCallablesAndGetResults(Stream<CallableWithResource<V, R>> tasks) { 067 return submitCallables(tasks).stream().map(Futures::getUnchecked).collect(Collectors.toList()); 068 } 069 070 public Future<?> submitRunnable(RunnableWithResource<R> task) { 071 return executorService.submit(() -> { 072 R resource = resourceQueue.remove(); 073 task.run(resource); 074 resourceQueue.add(resource); 075 }); 076 } 077 078 public List<Future<?>> submitRunnables(Stream<RunnableWithResource<R>> tasks) { 079 return tasks.map(t -> submitRunnable(t)).collect(Collectors.toList()); 080 } 081 082 public void submitRunnablesAndWait(Stream<RunnableWithResource<R>> tasks) { 083 submitRunnables(tasks).forEach(Futures::getUnchecked); 084 } 085 086 public void shutdown() { 087 executorService.shutdown(); 088 } 089}