1 package org.matsim.core.population.io;
23 import org.apache.logging.log4j.LogManager;
24 import org.apache.logging.log4j.Logger;
32 import org.xml.sax.Attributes;
33 import org.xml.sax.helpers.AttributesImpl;
36 import java.util.concurrent.BlockingQueue;
37 import java.util.concurrent.CompletableFuture;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.LinkedBlockingQueue;
50 class ParallelPopulationReaderMatsimV6
extends PopulationReaderMatsimV6 {
52 static final Logger log = LogManager.getLogger(ParallelPopulationReaderMatsimV6.class);
53 private static final int THREADS_LIMIT = 4;
54 private final boolean isPopulationStreaming;
55 private final int numThreads;
56 private final BlockingQueue<List<Tag>> tagQueue;
57 private Thread[] threads;
58 private List<Tag> currentPersonXmlData;
60 private String inputCRS;
61 private final String targetCRS;
63 private boolean reachedPersons =
false;
65 private final BlockingQueue<CompletableFuture<Person>> personInsertionQueue =
new LinkedBlockingQueue<>();
66 private Thread personInsertionThread;
67 private Throwable exception = null;
69 public ParallelPopulationReaderMatsimV6(
70 final String inputCRS,
71 final String targetCRS,
72 final Scenario scenario) {
73 super(inputCRS, targetCRS, scenario);
74 this.inputCRS = inputCRS;
75 this.targetCRS = targetCRS;
76 this.tagQueue =
new LinkedBlockingQueue<>();
82 this.isPopulationStreaming = scenario.getPopulation() instanceof StreamingPopulationReader.StreamingPopulation;
85 if (scenario.getConfig().global().getNumberOfThreads() > 0) {
86 this.numThreads = Math.min(THREADS_LIMIT,scenario.getConfig().global().getNumberOfThreads());
87 }
else this.numThreads = 1;
90 private static void initObjectAttributeConverters(ParallelPopulationReaderMatsimV6Runner runner, ObjectAttributesConverter converter)
92 Map<String, AttributeConverter<?>> targetConverter = runner.getObjectAttributesConverter().getConverters();
93 targetConverter.putAll(converter.getConverters());
97 threads =
new Thread[numThreads];
98 for (
int i = 0; i < numThreads; i++) {
100 ParallelPopulationReaderMatsimV6Runner runner =
101 new ParallelPopulationReaderMatsimV6Runner(
106 this.isPopulationStreaming);
107 initObjectAttributeConverters(runner, this.getObjectAttributesConverter());
109 Thread thread =
new Thread(runner);
110 thread.setDaemon(
true);
111 thread.setName(ParallelPopulationReaderMatsimV6Runner.class.toString() + i);
112 thread.setUncaughtExceptionHandler(this::catchReaderException);
117 if (this.scenario.getPopulation() instanceof StreamingPopulationReader.StreamingPopulation) {
118 this.personInsertionThread =
new Thread(
new PersonInserter(this.scenario.getPopulation(), this.personInsertionQueue));
119 this.personInsertionThread.start();
123 private void stopThreads() {
125 for (
int i = 0; i < this.numThreads; i++) {
126 this.tagQueue.add(List.of(
new EndProcessingTag()));
129 if (isPopulationStreaming) {
130 CompletableFuture<Person> finishPerson =
new CompletableFuture<>();
131 finishPerson.complete(null);
133 this.personInsertionQueue.put(finishPerson);
134 }
catch (InterruptedException e) {
141 for (Thread thread : threads) {
144 if(this.isPopulationStreaming) {
145 this.personInsertionThread.join();
147 }
catch (InterruptedException e) {
151 if (this.exception != null) {
156 private void catchReaderException(Thread thread, Throwable throwable) {
157 log.error(
"Error parsing XML", throwable);
158 this.exception = throwable;
162 public void startTag(String name, Attributes atts, Stack<String> context) {
164 if (PERSON.equals(name) && !this.reachedPersons) {
165 this.reachedPersons =
true;
167 if (this.threads == null) {
168 log.info(
"Start parallel population reading...");
174 if (!this.reachedPersons) {
175 super.startTag(name, atts, context);
181 if (PERSON.equals(name)) {
182 if (this.exception != null) {
188 Person person = this.plans.getFactory().createPerson(Id.create(atts.getValue(ATTR_PERSON_ID), Person.class));
189 currentPersonXmlData =
new ArrayList<>();
190 PersonTag personTag =
new PersonTag();
191 personTag.person = person;
192 currentPersonXmlData.add(personTag);
195 if (isPopulationStreaming) {
196 personTag.futurePerson =
new CompletableFuture<>();
198 this.personInsertionQueue.put(personTag.futurePerson);
199 }
catch (InterruptedException e) {
205 this.plans.addPerson(person);
209 Stack<String> contextCopy =
new Stack<>();
210 contextCopy.addAll(context);
211 StartTag tag =
new StartTag();
213 tag.context = contextCopy;
214 tag.atts =
new AttributesImpl(atts);
215 currentPersonXmlData.add(tag);
220 public void endTag(String name, String content, Stack<String> context) {
221 if(ATTRIBUTES.equals(name)&&context.peek().equals(POPULATION))
223 this.inputCRS = ProjectionUtils.getCRS(scenario.getPopulation());
228 if (!this.reachedPersons) {
229 super.endTag(name, content, context);
234 if (POPULATION.equals(name)) {
237 super.endTag(name, content, context);
238 log.info(
"Finished parallel population reading...");
242 Stack<String> contextCopy =
new Stack<>();
243 contextCopy.addAll(context);
244 EndTag tag =
new EndTag();
246 tag.content = content;
247 tag.context = contextCopy;
248 currentPersonXmlData.add(tag);
251 if (PERSON.equals(name)) {
252 tagQueue.add(currentPersonXmlData);
257 public abstract static class Tag {
259 Stack<String> context;
268 CompletableFuture<Person> futurePerson;
285 BlockingQueue<CompletableFuture<Person>> personInsertionQueue;
289 this.population = population;
290 this.personInsertionQueue = personInsertionQueue;
297 CompletableFuture<Person> finishedPerson = this.personInsertionQueue.take();
298 Person person = finishedPerson.get();
299 if (person == null) {
304 }
catch (InterruptedException | ExecutionException e) {
abstract void startTag(String name, Attributes atts, Stack< String > context)
void addPerson(final Person p)