MATSIM
ParallelPopulationReaderMatsimV6Runner.java
Go to the documentation of this file.
1 /* *********************************************************************** *
2  * project: org.matsim.*
3  * ParallelPopulationReaderMatsimV6Runner.java
4  * *
5  * *********************************************************************** *
6  * *
7  * copyright : (C) 2023 by the members listed in the COPYING, *
8  * LICENSE and WARRANTY file. *
9  * email : info at matsim dot org *
10  * *
11  * *********************************************************************** *
12  * *
13  * This program is free software; you can redistribute it and/or modify *
14  * it under the terms of the GNU General Public License as published by *
15  * the Free Software Foundation; either version 2 of the License, or *
16  * (at your option) any later version. *
17  * See also COPYING, LICENSE and WARRANTY file *
18  * *
19  * *********************************************************************** */
20 
21 package org.matsim.core.population.io;
22 
23 import org.matsim.api.core.v01.Scenario;
26 
27 import java.util.List;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.CompletableFuture;
30 
38 /* deliberately package */ class ParallelPopulationReaderMatsimV6Runner extends PopulationReaderMatsimV6 implements Runnable {
39 
40  private final BlockingQueue<List<Tag>> queue;
41  private boolean isStreaming;
42 
43  public ParallelPopulationReaderMatsimV6Runner(
44  final String inputCRS,
45  final String targetCRS,
46  final Scenario scenario,
47  final BlockingQueue<List<Tag>> tagQueue,
48  boolean isStreaming) {
49  super(inputCRS ,targetCRS, scenario);
50  this.queue = tagQueue;
51  this.isStreaming = isStreaming;
52  }
53 
54  @Override
55  public void run() {
56  /*
57  * The thread will go on with the parsing until an EndProcessingTag is found,
58  * which calls "return".
59  */
60  while (true) {
61  try {
62  List<Tag> tags;
63  tags = queue.take();
64  PersonTag currentPersonTag = null;
65  for (Tag tag : tags) {
66  if (tag instanceof PersonTag personTag) {
67  currentPersonTag = personTag;
68  this.currperson = personTag.person;
69  } else if (tag instanceof StartTag startTag) {
70  this.startTag(tag.name, startTag.atts, tag.context);
71  } else if (tag instanceof EndTag endTag) {
72  /*
73  * If its is a person tag, we reset the current person. We do not hand the
74  * tag over to the superclass because the person has already been added
75  * to the population.
76  */
77  if (PERSON.equals(tag.name)) {
78  if(isStreaming)
79  {
80  CompletableFuture<Person> cf = currentPersonTag.futurePerson;
81  cf.complete(currentPersonTag.person);
82  }
83 
84  this.currperson = null;
85  currentPersonTag = null;
86  }
87  // otherwise hand the tag over to the super class
88  else {
89  this.endTag(tag.name, endTag.content, tag.context);
90  }
91  } else if (tag instanceof EndProcessingTag) {
92  return;
93  }
94  }
95  } catch (InterruptedException e) {
96  throw new RuntimeException(e);
97  }
98  }
99  }
100 }