MATSIM
ParallelPersonAlgorithmUtils.java
Go to the documentation of this file.
1 /* *********************************************************************** *
2  * project: org.matsim.*
3  * ParallelPersonAlgorithmRunner.java
4  * *
5  * *********************************************************************** *
6  * *
7  * copyright : (C) 2008 by the members listed in the COPYING, *
8  * LICENSE and WARRANTY file. *
9  * email : info at matsim dot org *
10  * *
11  * *********************************************************************** *
12  * *
13  * This program is free software; you can redistribute it and/or modify *
14  * it under the terms of the GNU General Public License as published by *
15  * the Free Software Foundation; either version 2 of the License, or *
16  * (at your option) any later version. *
17  * See also COPYING, LICENSE and WARRANTY file *
18  * *
19  * *********************************************************************** */
20 
21 package org.matsim.core.population.algorithms;
22 
23 import java.lang.Thread.UncaughtExceptionHandler;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 
28 import org.apache.logging.log4j.LogManager;
29 import org.apache.logging.log4j.Logger;
33 
39 public final class ParallelPersonAlgorithmUtils {
40  private ParallelPersonAlgorithmUtils(){} // do not instantiate
41 
42  private final static Logger log = LogManager.getLogger(ParallelPersonAlgorithmUtils.class);
43 
44  public interface PersonAlgorithmProvider {
46  }
47 
57  public static void run(final Population population, final int numberOfThreads, final PersonAlgorithm algorithm) {
58  run(population, numberOfThreads, new PersonAlgorithmProvider() {
59  @Override public PersonAlgorithm getPersonAlgorithm() {
60  return algorithm;
61  }
62  });
63  }
64 
75  public static void run(final Population population, final int numberOfThreads, final PersonAlgorithmProvider algoProvider) {
76  int numOfThreads = Math.max(numberOfThreads, 1); // it should be at least 1 here; we allow 0 in other places for "no threads"
77  PersonAlgoThread[] algoThreads = new PersonAlgoThread[numOfThreads];
78  Thread[] threads = new Thread[numOfThreads];
79  String name = null;
80  Counter counter = null;
81 
82  final AtomicBoolean hadException = new AtomicBoolean(false);
83  final ExceptionHandler uncaughtExceptionHandler = new ExceptionHandler(hadException);
84 
85  // setup threads
86  for (int i = 0; i < numOfThreads; i++) {
87  PersonAlgorithm algo = algoProvider.getPersonAlgorithm();
88  if (i == 0) {
89  name = algo.getClass().getSimpleName();
90  counter = new Counter("[" + name + "] handled person # ");
91  }
92  PersonAlgoThread algothread = new PersonAlgoThread(algo, counter);
93  Thread thread = new Thread(algothread, name + "." + i);
94  thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
95  threads[i] = thread;
96  algoThreads[i] = algothread;
97  }
98 
99  // distribute workload between threads, as long as threads are not yet started, so we don't need synchronized data structures
100  int i = 0;
101  for (Person person : population.getPersons().values()) {
102  algoThreads[i % numOfThreads].handlePerson(person);
103  i++;
104  }
105 
106  // start the threads
107  for (Thread thread : threads) {
108  thread.start();
109  }
110 
111  // wait for the threads to finish
112  try {
113  for (Thread thread : threads) {
114  thread.join();
115  }
116  counter.printCounter();
117  } catch (InterruptedException e) {
118  throw new RuntimeException(e);
119  }
120  if (hadException.get()) {
121  throw new RuntimeException("Exception while processing persons. Cannot guarantee that all persons have been fully processed.");
122  }
123  }
124 
128  private static class PersonAlgoThread implements Runnable {
129 
131  private final List<Person> persons = new LinkedList<Person>();
132  private final Counter counter;
133 
134  public PersonAlgoThread(final PersonAlgorithm algo, final Counter counter) {
135  this.personAlgo = algo;
136  this.counter = counter;
137  }
138 
139  public void handlePerson(final Person person) {
140  this.persons.add(person);
141  }
142 
143  @Override
144  public void run() {
145  for (Person person : this.persons) {
146  this.personAlgo.run(person);
147  counter.incCounter();
148  }
149  }
150  }
151 
155  private static class ExceptionHandler implements UncaughtExceptionHandler {
156 
157  private final AtomicBoolean hadException;
158 
159  public ExceptionHandler(final AtomicBoolean hadException) {
160  this.hadException = hadException;
161  }
162 
163  @Override
164  public void uncaughtException(Thread t, Throwable e) {
165  log.error("Thread " + t.getName() + " died with exception while handling events.", e);
166  this.hadException.set(true);
167  }
168 
169  }
170 
171 }
Map< Id< Person >,? extends Person > getPersons()
static void run(final Population population, final int numberOfThreads, final PersonAlgorithmProvider algoProvider)
static void run(final Population population, final int numberOfThreads, final PersonAlgorithm algorithm)