21 package org.matsim.withinday.replanning.parallel;
23 import java.lang.Thread.UncaughtExceptionHandler;
24 import java.util.Collections;
25 import java.util.LinkedHashSet;
26 import java.util.LinkedList;
27 import java.util.Queue;
29 import java.util.concurrent.BrokenBarrierException;
30 import java.util.concurrent.CyclicBarrier;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.concurrent.atomic.AtomicBoolean;
34 import org.apache.logging.log4j.LogManager;
35 import org.apache.logging.log4j.Logger;
85 public final void init(String replannerName) {
89 replanningRunnables =
new InternalReplanningRunnable[
numOfThreads];
91 this.timeStepStartBarrier =
new CyclicBarrier(numOfThreads + 1);
92 this.betweenReplannerBarrier =
new CyclicBarrier(numOfThreads);
93 this.timeStepEndBarrier =
new CyclicBarrier(numOfThreads + 1);
97 ReplanningRunnable replanningRunnable =
new InternalReplanningRunnable(replannerName +
" Thread" + i +
" replanned plans: ");
103 replanningRunnables[i] = replanningRunnable;
115 for (T factory : this.replannerFactories) {
116 if (shareReplannerQueue) {
117 Queue<ReplanningTask> queue =
new LinkedBlockingQueue<ReplanningTask>();
120 replanningRunnable.addWithinDayReplanner(newInstance, queue);
125 replanningRunnable.addWithinDayReplanner(newInstance,
new LinkedList<ReplanningTask>());
130 this.hadException =
new AtomicBoolean(
false);
131 this.uncaughtExceptionHandler =
new ExceptionHandler(this.hadException, this.timeStepStartBarrier,
132 this.betweenReplannerBarrier, this.timeStepEndBarrier);
138 Thread replanningThread =
new Thread(replanningRunnables[i]);
139 Thread.setDefaultUncaughtExceptionHandler(this.uncaughtExceptionHandler);
140 replanningThread.setName(replannerName + i);
141 replanningThreads[i] = replanningThread;
147 Thread replanningThread = replanningThreads[i];
148 replanningThread.setDaemon(
true);
149 replanningThread.start();
152 this.simIsRunning =
true;
161 this.timeStepEndBarrier.await();
162 }
catch (InterruptedException e) {
164 }
catch (BrokenBarrierException e) {
173 public final void run(
double time) {
175 if (lastRoundRobin == roundRobin)
return;
183 if (hadException.get()) {
190 replanningRunnable.setTime(time);
193 this.timeStepStartBarrier.await();
195 this.timeStepEndBarrier.await();
197 }
catch (InterruptedException e) {
199 }
catch (BrokenBarrierException e) {
206 this.simIsRunning =
false;
208 if (this.hadException.get()) {
210 "Cannot guarantee that all replanning operations have been fully processed.");
229 for (T factory : this.replannerFactories) {
230 runnable.removeWithinDayReplanner(factory.getId());
240 this.timeStepStartBarrier.await();
241 }
catch (InterruptedException e) {
243 }
catch (BrokenBarrierException e) {
249 this.replannerFactories.add(factory);
258 if (shareReplannerQueue) {
259 Queue<ReplanningTask> queue =
new LinkedBlockingQueue<ReplanningTask>();
262 replanningRunnable.addWithinDayReplanner(newInstance, queue);
267 replanningRunnable.addWithinDayReplanner(newInstance,
new LinkedList<ReplanningTask>());
274 this.replannerFactories.remove(factory);
277 replanningRunnable.removeWithinDayReplanner(factory.getId());
283 replanningRunnable.resetReplanners();
288 return Collections.unmodifiableSet(this.replannerFactories);
297 numOfThreads = Math.max(numberOfThreads, 1);
299 log.info(
"Using " + numOfThreads +
" threads for parallel within-day replanning.");
305 if (numOfThreads > Runtime.getRuntime().availableProcessors()) {
306 log.warn(
"The number of parallel running replanning threads is bigger than the number of available CPUs/Cores!");
315 public InternalReplanningRunnable(String counterText) {
331 public ExceptionHandler(
final AtomicBoolean hadException, CyclicBarrier timeStepStartBarrier,
332 CyclicBarrier betweenReplannerBarrier, CyclicBarrier timeStepEndBarrier) {
341 this.hadException.set(
true);
342 log.error(
"Thread " + t.getName() +
" died with exception while replanning.", e);
348 this.timeStepStartBarrier.reset();
349 this.betweenReplannerBarrier.reset();
350 this.timeStepEndBarrier.reset();
final CyclicBarrier betweenReplannerBarrier
final CyclicBarrier timeStepEndBarrier
final void removeWithinDayReplannerFactory(T factory)
final AtomicBoolean hadException
final void setEventsManager(EventsManager eventsManager)
ParallelReplanner(int numOfThreads, EventsManager eventsManager)
final void resetReplanners()
Set< T > replannerFactories
final void onPrepareSim()
final Set< T > getWithinDayReplannerFactories()
AtomicBoolean hadException
ExceptionHandler uncaughtExceptionHandler
final CyclicBarrier timeStepStartBarrier
CyclicBarrier timeStepEndBarrier
ExceptionHandler(final AtomicBoolean hadException, CyclicBarrier timeStepStartBarrier, CyclicBarrier betweenReplannerBarrier, CyclicBarrier timeStepEndBarrier)
final void run(double time)
void uncaughtException(Thread t, Throwable e)
CyclicBarrier timeStepStartBarrier
final EventsManager eventsManager
final void init(String replannerName)
CyclicBarrier betweenReplannerBarrier
final void setBetweenReplannerBarrier(CyclicBarrier barrier)
final void setNumberOfThreads(int numberOfThreads)
final void setCyclicTimeStepStartBarrier(CyclicBarrier barrier)
final void setCyclicTimeStepEndBarrier(CyclicBarrier barrier)
final void addReplanningTask(ReplanningTask replanningTask)
final void addWithinDayReplannerFactory(T factory)
final boolean shareReplannerQueue
final void addReplanningTask(ReplanningTask replanningTask)
ReplanningRunnable [] replanningRunnables