MATSIM
ParallelPopulationReaderMatsimV6.java
Go to the documentation of this file.
1 package org.matsim.core.population.io;
2 
3 /* *********************************************************************** *
4  * project: org.matsim.*
5  * ParallelPopulationReaderMatsimV6.java
6  * *
7  * *********************************************************************** *
8  * *
9  * copyright : (C) 2023 by the members listed in the COPYING, *
10  * LICENSE and WARRANTY file. *
11  * email : info at matsim dot org *
12  * *
13  * *********************************************************************** *
14  * *
15  * This program is free software; you can redistribute it and/or modify *
16  * it under the terms of the GNU General Public License as published by *
17  * the Free Software Foundation; either version 2 of the License, or *
18  * (at your option) any later version. *
19  * See also COPYING, LICENSE and WARRANTY file *
20  * *
21  * *********************************************************************** */
22 
23 import org.apache.logging.log4j.LogManager;
24 import org.apache.logging.log4j.Logger;
25 import org.matsim.api.core.v01.Id;
26 import org.matsim.api.core.v01.Scenario;
32 import org.xml.sax.Attributes;
33 import org.xml.sax.helpers.AttributesImpl;
34 
35 import java.util.*;
36 import java.util.concurrent.BlockingQueue;
37 import java.util.concurrent.CompletableFuture;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.LinkedBlockingQueue;
40 
50 /* deliberately package */ class ParallelPopulationReaderMatsimV6 extends PopulationReaderMatsimV6 {
51 
52  static final Logger log = LogManager.getLogger(ParallelPopulationReaderMatsimV6.class);
53  private static final int THREADS_LIMIT = 4;
54  private final boolean isPopulationStreaming;
55  private final int numThreads;
56  private final BlockingQueue<List<Tag>> tagQueue;
57  private Thread[] threads;
58  private List<Tag> currentPersonXmlData;
59 
60  private String inputCRS;
61  private final String targetCRS;
62 
63  private boolean reachedPersons = false;
64 
65  private final BlockingQueue<CompletableFuture<Person>> personInsertionQueue = new LinkedBlockingQueue<>();
66  private Thread personInsertionThread;
67  private Throwable exception = null;
68 
69  public ParallelPopulationReaderMatsimV6(
70  final String inputCRS,
71  final String targetCRS,
72  final Scenario scenario) {
73  super(inputCRS, targetCRS, scenario);
74  this.inputCRS = inputCRS;
75  this.targetCRS = targetCRS;
76  this.tagQueue = new LinkedBlockingQueue<>();
77 
78  /*
79  * Check whether population streaming is activated
80  */
81 
82  this.isPopulationStreaming = scenario.getPopulation() instanceof StreamingPopulationReader.StreamingPopulation;
83 
84  // Set threads
85  if (scenario.getConfig().global().getNumberOfThreads() > 0) {
86  this.numThreads = Math.min(THREADS_LIMIT,scenario.getConfig().global().getNumberOfThreads());
87  } else this.numThreads = 1;
88  }
89 
90  private static void initObjectAttributeConverters(ParallelPopulationReaderMatsimV6Runner runner, ObjectAttributesConverter converter)
91  {
92  Map<String, AttributeConverter<?>> targetConverter = runner.getObjectAttributesConverter().getConverters();
93  targetConverter.putAll(converter.getConverters());
94  }
95 
96  private void initThreads() {
97  threads = new Thread[numThreads];
98  for (int i = 0; i < numThreads; i++) {
99 
100  ParallelPopulationReaderMatsimV6Runner runner =
101  new ParallelPopulationReaderMatsimV6Runner(
102  this.inputCRS,
103  this.targetCRS,
104  this.scenario,
105  this.tagQueue,
106  this.isPopulationStreaming);
107  initObjectAttributeConverters(runner, this.getObjectAttributesConverter());
108 
109  Thread thread = new Thread(runner);
110  thread.setDaemon(true);
111  thread.setName(ParallelPopulationReaderMatsimV6Runner.class.toString() + i);
112  thread.setUncaughtExceptionHandler(this::catchReaderException);
113  threads[i] = thread;
114  thread.start();
115  }
116 
117  if (this.scenario.getPopulation() instanceof StreamingPopulationReader.StreamingPopulation) {
118  this.personInsertionThread = new Thread(new PersonInserter(this.scenario.getPopulation(), this.personInsertionQueue));
119  this.personInsertionThread.start();
120  }
121  }
122 
123  private void stopThreads() {
124  // signal the threads that they should end parsing
125  for (int i = 0; i < this.numThreads; i++) {
126  this.tagQueue.add(List.of(new EndProcessingTag()));
127  }
128 
129  if (isPopulationStreaming) {
130  CompletableFuture<Person> finishPerson = new CompletableFuture<>();
131  finishPerson.complete(null);
132  try {
133  this.personInsertionQueue.put(finishPerson);
134  } catch (InterruptedException e) {
135  throw new RuntimeException(e);
136  }
137  }
138 
139  // wait for the threads to finish
140  try {
141  for (Thread thread : threads) {
142  thread.join();
143  }
144  if(this.isPopulationStreaming) {
145  this.personInsertionThread.join();
146  }
147  } catch (InterruptedException e) {
148  throw new RuntimeException(e);
149  }
150 
151  if (this.exception != null) {
152  throw new RuntimeException(this.exception);
153  }
154  }
155 
156  private void catchReaderException(Thread thread, Throwable throwable) {
157  log.error("Error parsing XML", throwable);
158  this.exception = throwable;
159  }
160 
161  @Override
162  public void startTag(String name, Attributes atts, Stack<String> context) {
163  //Reached first time a person
164  if (PERSON.equals(name) && !this.reachedPersons) {
165  this.reachedPersons = true;
166 
167  if (this.threads == null) {
168  log.info("Start parallel population reading...");
169  initThreads();
170  }
171  }
172 
173  // As long as we have not reached the persons in the xml, use super class
174  if (!this.reachedPersons) {
175  super.startTag(name, atts, context);
176  return;
177  }
178 
179 
180  // If it is a new person, create a new person and a list for its attributes.
181  if (PERSON.equals(name)) {
182  if (this.exception != null) {
183  this.stopThreads();
184  throw new RuntimeException(this.exception);
185  }
186 
187  // Just create a person, but do not add it here!
188  Person person = this.plans.getFactory().createPerson(Id.create(atts.getValue(ATTR_PERSON_ID), Person.class));
189  currentPersonXmlData = new ArrayList<>();
190  PersonTag personTag = new PersonTag();
191  personTag.person = person;
192  currentPersonXmlData.add(personTag);
193 
194  // If in streaming mode, we need later complete persons
195  if (isPopulationStreaming) {
196  personTag.futurePerson = new CompletableFuture<>();
197  try {
198  this.personInsertionQueue.put(personTag.futurePerson);
199  } catch (InterruptedException e) {
200  throw new RuntimeException(e);
201  }
202  } else {
203  // If not in streaming mode, we can work with a reference
204  // of an unfinished person and add it right now...
205  this.plans.addPerson(person);
206  }
207  } else {
208  // Create a new start tag and add it to the person data.
209  Stack<String> contextCopy = new Stack<>();
210  contextCopy.addAll(context);
211  StartTag tag = new StartTag();
212  tag.name = name;
213  tag.context = contextCopy;
214  tag.atts = new AttributesImpl(atts); // We have to create copies of the attributes because the object is re-used by the parser!
215  currentPersonXmlData.add(tag);
216  }
217  }
218 
219  @Override
220  public void endTag(String name, String content, Stack<String> context) {
221  if(ATTRIBUTES.equals(name)&&context.peek().equals(POPULATION))
222  {
223  this.inputCRS = ProjectionUtils.getCRS(scenario.getPopulation());
224  }
225 
226  // if population streaming is activated, use non-parallel reader
227  // or if not reached the persons in the xml
228  if (!this.reachedPersons) {
229  super.endTag(name, content, context);
230  return;
231  }
232 
233  // End of population reached
234  if (POPULATION.equals(name)) {
235  this.stopThreads();
236 
237  super.endTag(name, content, context);
238  log.info("Finished parallel population reading...");
239  // Till parsing population
240  } else {
241  // Create a new end tag and add it to the person data.
242  Stack<String> contextCopy = new Stack<>();
243  contextCopy.addAll(context);
244  EndTag tag = new EndTag();
245  tag.name = name;
246  tag.content = content;
247  tag.context = contextCopy;
248  currentPersonXmlData.add(tag);
249 
250  // if it's a person end tag, add the persons xml data to the queue.
251  if (PERSON.equals(name)) {
252  tagQueue.add(currentPersonXmlData);
253  }
254  }
255  }
256 
257  public abstract static class Tag {
258  String name;
259  Stack<String> context;
260  }
261 
262  public final static class StartTag extends Tag {
263  Attributes atts;
264  }
265 
266  public final static class PersonTag extends Tag {
267  Person person;
268  CompletableFuture<Person> futurePerson;
269  }
270 
271  public final static class EndTag extends Tag {
272  String content;
273  }
274 
275  /*
276  * Marker Tag to inform the threads that no further data has to be parsed.
277  */
278  public final static class EndProcessingTag extends Tag {
279  }
280 
281  // This class is used to feed the population step by step
282  // with new complete persons while being in streaming mode
283  public final static class PersonInserter implements Runnable {
284  Population population;
285  BlockingQueue<CompletableFuture<Person>> personInsertionQueue;
286 
287  PersonInserter(Population population, BlockingQueue<CompletableFuture<Person>> personInsertionQueue)
288  {
289  this.population = population;
290  this.personInsertionQueue = personInsertionQueue;
291  }
292 
293  @Override
294  public void run() {
295  while (true) {
296  try {
297  CompletableFuture<Person> finishedPerson = this.personInsertionQueue.take();
298  Person person = finishedPerson.get();
299  if (person == null) {
300  return;
301  }
302  this.population.addPerson(person);
303 
304  } catch (InterruptedException | ExecutionException e) {
305  throw new RuntimeException(e);
306  }
307  }
308  }
309  }
310 }
311 
abstract void startTag(String name, Attributes atts, Stack< String > context)