MATSIM
ParallelReplanner.java
Go to the documentation of this file.
1 /* *********************************************************************** *
2  * project: org.matsim.*
3  * ParallelReplanner.java
4  * *
5  * *********************************************************************** *
6  * *
7  * copyright : (C) 2008 by the members listed in the COPYING, *
8  * LICENSE and WARRANTY file. *
9  * email : info at matsim dot org *
10  * *
11  * *********************************************************************** *
12  * *
13  * This program is free software; you can redistribute it and/or modify *
14  * it under the terms of the GNU General Public License as published by *
15  * the Free Software Foundation; either version 2 of the License, or *
16  * (at your option) any later version. *
17  * See also COPYING, LICENSE and WARRANTY file *
18  * *
19  * *********************************************************************** */
20 
21 package org.matsim.withinday.replanning.parallel;
22 
23 import java.lang.Thread.UncaughtExceptionHandler;
24 import java.util.Collections;
25 import java.util.LinkedHashSet;
26 import java.util.LinkedList;
27 import java.util.Queue;
28 import java.util.Set;
29 import java.util.concurrent.BrokenBarrierException;
30 import java.util.concurrent.CyclicBarrier;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 
34 import org.apache.logging.log4j.LogManager;
35 import org.apache.logging.log4j.Logger;
41 
42 /*
43  * Abstract class that contains the basic elements that are needed
44  * to do parallel replanning within the QSim.
45  *
46  * Features like the creation of parallel running threads and the
47  * split up of the replanning actions have to be implemented in
48  * the subclasses.
49  */
50 public abstract class ParallelReplanner<T extends WithinDayReplannerFactory<? extends AgentSelector>> {
51 
52  private final static Logger log = LogManager.getLogger(ParallelReplanner.class);
53 
54  /*
55  * All replanners from the same type can either share one queue that contains all
56  * ReplanningTasks or use a separate queue per replanner object. A shared queue
57  * should result in a better load balancing but also might become a bottleneck when
58  * many threads are accessing it at the same time. When using a shared queue, a
59  * LinkedBlockingQueue is used. Otherwise, each replanner uses a LinkedList.
60  * Both approaches should produce the same simulation results.
61  */
62  private final boolean shareReplannerQueue = true;
63 
64  protected final EventsManager eventsManager;
65  protected int numOfThreads;
66 
67  protected Set<T> replannerFactories = new LinkedHashSet<T>();
69  protected String replannerName;
70  protected int roundRobin = 0;
71  private int lastRoundRobin = 0;
72  protected AtomicBoolean hadException;
73  protected ExceptionHandler uncaughtExceptionHandler;
74  protected CyclicBarrier timeStepStartBarrier;
75  protected CyclicBarrier betweenReplannerBarrier;
76  protected CyclicBarrier timeStepEndBarrier;
77 
78  protected boolean simIsRunning = false;
79 
80  public ParallelReplanner(int numOfThreads, EventsManager eventsManager) {
81  this.setNumberOfThreads(numOfThreads);
82  this.eventsManager = eventsManager;
83  }
84 
85  public final void init(String replannerName) {
86 
87  this.replannerName = replannerName;
88 
89  replanningRunnables = new InternalReplanningRunnable[numOfThreads];
90 
91  this.timeStepStartBarrier = new CyclicBarrier(numOfThreads + 1);
92  this.betweenReplannerBarrier = new CyclicBarrier(numOfThreads);
93  this.timeStepEndBarrier = new CyclicBarrier(numOfThreads + 1);
94 
95  // Do initial Setup of the Runnables
96  for (int i = 0; i < numOfThreads; i++) {
97  ReplanningRunnable replanningRunnable = new InternalReplanningRunnable(replannerName + " Thread" + i + " replanned plans: ");
98  replanningRunnable.setCyclicTimeStepStartBarrier(this.timeStepStartBarrier);
99  replanningRunnable.setBetweenReplannerBarrier(betweenReplannerBarrier);
100  replanningRunnable.setCyclicTimeStepEndBarrier(this.timeStepEndBarrier);
101  replanningRunnable.setEventsManager(eventsManager);
102 
103  replanningRunnables[i] = replanningRunnable;
104  }
105  }
106 
107  public final void onPrepareSim() {
108 
109  /*
110  * Moved this here from addWithinDayReplannerFactory(...).
111  * By doing so, the Replanners are created after the mobsim has been initialized.
112  * Moreover, the Replanners are now re-created from scratch for each iteration.
113  * cdobler, jul'13
114  */
115  for (T factory : this.replannerFactories) {
116  if (shareReplannerQueue) {
117  Queue<ReplanningTask> queue = new LinkedBlockingQueue<ReplanningTask>();
118  for (ReplanningRunnable replanningRunnable : this.replanningRunnables) {
119  WithinDayReplanner<? extends AgentSelector> newInstance = factory.createReplanner();
120  replanningRunnable.addWithinDayReplanner(newInstance, queue);
121  }
122  } else {
123  for (ReplanningRunnable replanningRunnable : this.replanningRunnables) {
124  WithinDayReplanner<? extends AgentSelector> newInstance = factory.createReplanner();
125  replanningRunnable.addWithinDayReplanner(newInstance, new LinkedList<ReplanningTask>());
126  }
127  }
128  }
129 
130  this.hadException = new AtomicBoolean(false);
131  this.uncaughtExceptionHandler = new ExceptionHandler(this.hadException, this.timeStepStartBarrier,
132  this.betweenReplannerBarrier, this.timeStepEndBarrier);
133 
134  Thread[] replanningThreads = new Thread[numOfThreads];
135 
136  // initialize threads
137  for (int i = 0; i < numOfThreads; i++) {
138  Thread replanningThread = new Thread(replanningRunnables[i]);
139  Thread.setDefaultUncaughtExceptionHandler(this.uncaughtExceptionHandler);
140  replanningThread.setName(replannerName + i);
141  replanningThreads[i] = replanningThread;
142  }
143 
144  // finalize thread setup and start them
145  for (int i = 0; i < numOfThreads; i++) {
146  replanningRunnables[i].beforeSim();
147  Thread replanningThread = replanningThreads[i];
148  replanningThread.setDaemon(true);
149  replanningThread.start();
150  }
151 
152  this.simIsRunning = true;
153 
154  /*
155  * After initialization the threads are waiting at the
156  * TimeStepEndBarrier. We trigger this Barrier once so
157  * they wait at the TimeStepStartBarrier what has to be
158  * their state if the run() method is called.
159  */
160  try {
161  this.timeStepEndBarrier.await();
162  } catch (InterruptedException e) {
163  throw new RuntimeException(e);
164  } catch (BrokenBarrierException e) {
165  throw new RuntimeException(e);
166  }
167  }
168 
169  /*
170  * Typical Implementations should be able to use this Method
171  * "as it is"...
172  */
173  public final void run(double time) {
174  // no Agents to Replan
175  if (lastRoundRobin == roundRobin) return;
176  else lastRoundRobin = roundRobin;
177 
178  /*
179  * If an exception occurred, at least one of the events replanning threads
180  * has crashed. Therefore the remaining threads would get stuck at the
181  * CyclicBarrier.
182  */
183  if (hadException.get()) {
184  return;
185  }
186 
187  try {
188  // set current time
189  for (ReplanningRunnable replanningRunnable : replanningRunnables) {
190  replanningRunnable.setTime(time);
191  }
192 
193  this.timeStepStartBarrier.await();
194 
195  this.timeStepEndBarrier.await();
196 
197  } catch (InterruptedException e) {
198  throw new RuntimeException(e);
199  } catch (BrokenBarrierException e) {
200  throw new RuntimeException(e);
201  }
202  }
203 
204  public final void afterSim() {
205 
206  this.simIsRunning = false;
207 
208  if (this.hadException.get()) {
209  throw new RuntimeException("Exception while replanning. " +
210  "Cannot guarantee that all replanning operations have been fully processed.");
211  }
212 
213  // reset counters
214  roundRobin = 0;
215  lastRoundRobin = 0;
216 
217  /*
218  * Calling the afterSim Method of the QSimEngineThreads
219  * will set their simulationRunning flag to false.
220  */
221  for (ReplanningRunnable runnable : this.replanningRunnables) {
222  runnable.afterSim();
223 
224  /*
225  * Remove replanners from the runnables - now they are re-created from scratch
226  * for each iteration.
227  * cdobler, jul'13
228  */
229  for (T factory : this.replannerFactories) {
230  runnable.removeWithinDayReplanner(factory.getId());
231  }
232  }
233 
234  /*
235  * Triggering the startBarrier of the QSimEngineThreads.
236  * They will check whether the Simulation is still running.
237  * It is not, so the Threads will stop running.
238  */
239  try {
240  this.timeStepStartBarrier.await();
241  } catch (InterruptedException e) {
242  throw new RuntimeException(e);
243  } catch (BrokenBarrierException e) {
244  throw new RuntimeException(e);
245  }
246  }
247 
248  public final void addWithinDayReplannerFactory(T factory) {
249  this.replannerFactories.add(factory);
250 
251  /*
252  * This is necessary for timed within-day replanners. They are added while the
253  * simulation is already running. Theirfore, now Queue<ReplanningTask> is created
254  * in the onPrepare() method.
255  * cdobler, dec'13
256  */
257  if (simIsRunning) {
258  if (shareReplannerQueue) {
259  Queue<ReplanningTask> queue = new LinkedBlockingQueue<ReplanningTask>();
260  for (ReplanningRunnable replanningRunnable : this.replanningRunnables) {
261  WithinDayReplanner<? extends AgentSelector> newInstance = factory.createReplanner();
262  replanningRunnable.addWithinDayReplanner(newInstance, queue);
263  }
264  } else {
265  for (ReplanningRunnable replanningRunnable : this.replanningRunnables) {
266  WithinDayReplanner<? extends AgentSelector> newInstance = factory.createReplanner();
267  replanningRunnable.addWithinDayReplanner(newInstance, new LinkedList<ReplanningTask>());
268  }
269  }
270  }
271  }
272 
273  public final void removeWithinDayReplannerFactory(T factory) {
274  this.replannerFactories.remove(factory);
275 
276  for (ReplanningRunnable replanningRunnable : this.replanningRunnables) {
277  replanningRunnable.removeWithinDayReplanner(factory.getId());
278  }
279  }
280 
281  public final void resetReplanners() {
282  for (ReplanningRunnable replanningRunnable : this.replanningRunnables) {
283  replanningRunnable.resetReplanners();
284  }
285  }
286 
287  public final Set<T> getWithinDayReplannerFactories() {
288  return Collections.unmodifiableSet(this.replannerFactories);
289  }
290 
291  public final void addReplanningTask(ReplanningTask replanningTask) {
292  this.replanningRunnables[this.roundRobin % this.numOfThreads].addReplanningTask(replanningTask);
293  this.roundRobin++;
294  }
295 
296  private final void setNumberOfThreads(int numberOfThreads) {
297  numOfThreads = Math.max(numberOfThreads, 1); // it should be at least 1 here; we allow 0 in other places for "no threads"
298 
299  log.info("Using " + numOfThreads + " threads for parallel within-day replanning.");
300 
301  /*
302  * Throw error message if the number of threads is bigger than the number of available CPUs.
303  * This should not speed up calculation anymore.
304  */
305  if (numOfThreads > Runtime.getRuntime().availableProcessors()) {
306  log.warn("The number of parallel running replanning threads is bigger than the number of available CPUs/Cores!");
307  }
308  }
309 
310  /*
311  * The thread class that really handles the replanning.
312  */
313  /*package*/ static final class InternalReplanningRunnable extends ReplanningRunnable {
314 
315  public InternalReplanningRunnable(String counterText) {
316  super(counterText);
317  }
318 
319  } // InternalReplanningThread
320 
324  private static class ExceptionHandler implements UncaughtExceptionHandler {
325 
326  private final AtomicBoolean hadException;
327  private final CyclicBarrier timeStepStartBarrier;
328  private final CyclicBarrier betweenReplannerBarrier;
329  private final CyclicBarrier timeStepEndBarrier;
330 
331  public ExceptionHandler(final AtomicBoolean hadException, CyclicBarrier timeStepStartBarrier,
332  CyclicBarrier betweenReplannerBarrier, CyclicBarrier timeStepEndBarrier) {
333  this.hadException = hadException;
334  this.timeStepStartBarrier = timeStepStartBarrier;
335  this.betweenReplannerBarrier = betweenReplannerBarrier;
336  this.timeStepEndBarrier = timeStepEndBarrier;
337  }
338 
339  @Override
340  public void uncaughtException(Thread t, Throwable e) {
341  this.hadException.set(true);
342  log.error("Thread " + t.getName() + " died with exception while replanning.", e);
343 
344  /*
345  * By reseting the barriers, they will throw a BrokenBarrierException
346  * which again will stop the events processing threads.
347  */
348  this.timeStepStartBarrier.reset();
349  this.betweenReplannerBarrier.reset();
350  this.timeStepEndBarrier.reset();
351  }
352 
353  }
354 }
final void setEventsManager(EventsManager eventsManager)
ParallelReplanner(int numOfThreads, EventsManager eventsManager)
ExceptionHandler(final AtomicBoolean hadException, CyclicBarrier timeStepStartBarrier, CyclicBarrier betweenReplannerBarrier, CyclicBarrier timeStepEndBarrier)
final void addReplanningTask(ReplanningTask replanningTask)
final void addReplanningTask(ReplanningTask replanningTask)