MATSIM
Realm.java
Go to the documentation of this file.
1 /* *********************************************************************** *
2  * project: org.matsim.*
3  * *
4  * *********************************************************************** *
5  * *
6  * copyright : (C) 2014 by the members listed in the COPYING, *
7  * LICENSE and WARRANTY file. *
8  * email : info at matsim dot org *
9  * *
10  * *********************************************************************** *
11  * *
12  * This program is free software; you can redistribute it and/or modify *
13  * it under the terms of the GNU General Public License as published by *
14  * the Free Software Foundation; either version 2 of the License, or *
15  * (at your option) any later version. *
16  * See also COPYING, LICENSE and WARRANTY file *
17  * *
18  * *********************************************************************** */
19 package org.matsim.core.mobsim.hermes;
20 
21 import org.apache.logging.log4j.LogManager;
22 import org.apache.logging.log4j.Logger;
23 import org.matsim.api.core.v01.Id;
24 import org.matsim.api.core.v01.IdMap;
34 import org.matsim.core.utils.misc.Time;
36 import org.matsim.vehicles.Vehicle;
37 
38 import java.util.ArrayDeque;
39 import java.util.ArrayList;
40 
41 class Realm {
42  private final ScenarioImporter si;
43  // Global array of links.
44  // Note: the id of the link is its index in the array.
45  private final HLink[] links;
46  // Internal realm links on hold until a specific timestamp (in seconds).
47  // Internal means that the source and destination realm of are the same.
48  private final ArrayList<ArrayDeque<HLink>> delayedLinksByWakeupTime;
49  // Agents on hold until a specific timestamp (in seconds).
50  private final ArrayList<ArrayDeque<Agent>> delayedAgentsByWakeupTime;
51  // Agents waiting in pt stations. Should be used as follows:
52  // agent_stops.get(curr station id).get(line id) -> queue of agents
53  private final IdMap<TransitStopFacility, IntArrayMap<ArrayDeque<Agent>>> agent_stops;
54  // stop ids for each route: int[] stop_ids = route_stops_by_route_no[route_no]
55  protected int[][] route_stops_by_route_no;
56  // line id of a particular route
57  private final int[] line_of_route;
58  // queue of sorted events by time
59  private EventArray sortedEvents;
60  // MATSim event manager.
61  private final EventsManager eventsManager;
62  // Current timestamp
63  private int secs;
64  Logger log = LogManager.getLogger(Realm.class);
65 
66  public Realm(ScenarioImporter scenario, EventsManager eventsManager) {
67  this.si = scenario;
68  this.links = scenario.hermesLinks;
69  // The plus one is necessary because we peek into the next slot on each tick.
70  this.delayedLinksByWakeupTime = new ArrayList<>();
71  this.delayedAgentsByWakeupTime = new ArrayList<>();
72  this.agent_stops = scenario.agentStops;
73  this.route_stops_by_route_no = scenario.routeStopsByRouteNo;
74  this.line_of_route = scenario.lineOfRoute;
75  this.sortedEvents = new EventArray();
76  this.eventsManager = eventsManager;
77 
78  // the last position is to store events that will not happen...
79  for (int i = 0; i <= HermesConfigGroup.SIM_STEPS + 1; i++) {
80  delayedLinksByWakeupTime.add(new ArrayDeque<>());
81  delayedAgentsByWakeupTime.add(new ArrayDeque<>());
82  }
83  }
84 
85  public void log(int time, String s) {
86  if (HermesConfigGroup.DEBUG_REALMS) {
87  log.debug(String.format("Hermes [ time = %d ] %s", time, s));
88  }
89  }
90 
91  private void addDelayedAgent(Agent agent, int until) {
92  if (HermesConfigGroup.DEBUG_REALMS) log(secs, String.format("agent %d delayed until %d", agent.id, until));
93  delayedAgentsByWakeupTime.get(Math.min(until, HermesConfigGroup.SIM_STEPS + 1)).add(agent);
94  }
95 
96  private void addDelayedLink(HLink link, int until) {
97  if (HermesConfigGroup.DEBUG_REALMS)
98  log(secs, String.format("link %d delayed until %d size %d peek agent %d", link.id(), until, link.queue().size(), link.queue().peek().id));
99  delayedLinksByWakeupTime.get(Math.min(until, HermesConfigGroup.SIM_STEPS + 1)).add(link);
100  }
101 
102  private void advanceAgentandSetEventTime(Agent agent) {
103  advanceAgent(agent);
104  // set time in agent's event.
105  setEventTime(agent, Agent.getPlanEvent(agent.currPlan()), secs, false);
106  }
107 
108  private void advanceAgent(Agent agent) {
109  if (HermesConfigGroup.DEBUG_REALMS) {
110  long centry = agent.currPlan();
111  log(secs, String.format("agent %d finished %s (prev plan index is %d)", agent.id, Agent.toString(centry), agent.planIndex));
112  }
113  agent.planIndex++;
114  if (HermesConfigGroup.DEBUG_REALMS) {
115  long nentry = agent.currPlan();
116  log(secs, String.format("agent %d starting %s (new plan index is %d)", agent.id, Agent.toString(nentry), agent.planIndex));
117  }
118  }
119 
120  protected boolean processAgentLink(Agent agent, long planentry, int currLinkId) {
121  int linkid = Agent.getLinkPlanEntry(planentry);
122  double velocity = Agent.getVelocityPlanEntry(planentry);
123  HLink next = links[linkid];
124  int prev_finishtime = agent.linkFinishTime;
125  // this ensures that if no velocity is provided for the vehicle, we use the link
126  velocity = velocity == 0 ? next.velocity() : velocity;
127  // the max(1, ...) ensures that a link hop takes at least on step.
128  int traveltime = (HermesConfigGroup.LINK_ADVANCE_DELAY + (int) Math.round(Math.max(1, next.length() / Math.min(velocity, next.velocity()))));
129  agent.linkFinishTime = secs + traveltime;
130  float storageCapacityPCU = agent.getStorageCapacityPCUE();
131  if (next.push(agent,secs,storageCapacityPCU)) {
132  advanceAgentandSetEventTime(agent);
133  // If the agent we just added is the head, add to delayed links
134  if (currLinkId != next.id() && next.queue().peek() == agent) {
135  addDelayedLink(next, Math.max(agent.linkFinishTime, secs + 1));
136  }
137  return true;
138  } else {
139  agent.linkFinishTime = prev_finishtime;
140  return false;
141  }
142  }
143 
144  protected boolean processAgentSleepFor(Agent agent, long planentry) {
145  int sleep = Agent.getSleepPlanEntry(planentry);
146  return processAgentSleepUntil(agent, secs + Math.max(1, sleep));
147  }
148 
149  protected boolean processAgentSleepUntil(Agent agent, long planentry) {
150  int sleep = Agent.getSleepPlanEntry(planentry);
151  addDelayedAgent(agent, Math.max(sleep, secs + 1));
152  updateCapacities(agent);
153  advanceAgentandSetEventTime(agent);
154  return true;
155  }
156 
157  private void updateCapacities(Agent agent) {
158  if (agent.isTransitVehicle()) {
159  return;
160  //assures PT vehicles never update their PCUEs, as only they have a capacity > 0
161  //check is not strictly necessary in current code, adding it just in case
162  }
163  if (agent.plan.size < agent.planIndex + 3) {
164  return;
165  }
166  if (Agent.getPlanHeader(agent.plan.get(agent.planIndex + 2)) == Agent.LinkType) {
167  int category = Agent.getLinkPCEEntry(agent.nextPlan());
168  agent.setStorageCapacityPCUE(si.getStorageCapacityPCE(category));
169  agent.setFlowCapacityPCUE(si.getFlowCapacityPCE(category));
170  }
171  }
172 
173  protected boolean processAgentWait(Agent agent, long planentry) {
174  advanceAgentandSetEventTime(agent);
175  int routeNo = Agent.getRoutePlanEntry(planentry);
176  int accessStop = Agent.getStopPlanEntry(planentry);
177  // Note: getNextStop needs to be called after advanceAgent.
178  int lineid = line_of_route[routeNo];
179 
180  try {
181  agent_stops.get(accessStop)
182  .get(lineid)
183  .add(agent);
184  } catch (NullPointerException npe) {
185  log.error(String.format("Hermes NPE agent=%d routeNo=%d accessStop=%d lineid=%d", agent.id, routeNo, accessStop, lineid), npe);
186  }
187  return true;
188  }
189 
190  protected boolean processAgentStopArrive(Agent agent, long planentry) {
191  addDelayedAgent(agent, secs + 1);
192  advanceAgentandSetEventTime(agent);
193  // Although we want the agent to be processed in the next tick, we
194  // return true to remove the vehicle from the link that it is currently.
195  return true;
196  }
197 
198  protected boolean processAgentStopDelay(Agent agent, long planentry) {
199  int stopid = Agent.getStopPlanEntry(planentry);
200  int departure = Agent.getDeparture(planentry);
201 
202  // consume stop delay
203  addDelayedAgent(agent, Math.max(secs + 1, departure));
204  advanceAgent(agent);
205 
206  // drop agents
207  for (Agent out : agent.egress(stopid)) {
208  addDelayedAgent(out, secs + 1);
209  // consume access, activate egress
210  advanceAgentandSetEventTime(out);
211  // set driver in agent's event
212  setEventVehicle(out, Agent.getPlanEvent(out.currPlan()), agent.id);
213  }
214 
215  // True is returned as the agent is already in the delayed list.
216  return true;
217  }
218 
219  protected boolean processAgentStopDepart(Agent agent, long planentry) {
220  int routeNo = Agent.getRoutePlanEntry(planentry);
221  int stopid = Agent.getStopPlanEntry(planentry);
222  int lineid = line_of_route[routeNo];
223  ArrayDeque<Agent> waiting_agents = agent_stops.get(stopid).get(lineid);
224 
225  // take agents
226  if (waiting_agents != null) {
227  ArrayList<Agent> removed = new ArrayList<>();
228  for (Agent in : waiting_agents) {
229  try {
230  int egressStop = in.getNextStopPlanEntry();
231  if (agent.willServeStop(egressStop)) {
232  if (agent.access(egressStop, in)) {
233  removed.add(in);
234  // consume wait in stop, activate access
235  advanceAgentandSetEventTime(in);
236  // set driver in agent's event
237  setEventVehicle(in, Agent.getPlanEvent(in.currPlan()), agent.id);
238  } else {
239  // agent could not enter, likely the vehicle is full
240  break;
241  }
242  }
243  } catch (Exception e) {
244  throw new RuntimeException(e);
245  }
246  }
247  waiting_agents.removeAll(removed);
248  }
249  advanceAgentandSetEventTime(agent);
250  // False is returned to force this agent to be processed in the next tick.
251  // This will mean that the vehicle will be processed in the next tick.
252  return false;
253  }
254 
255  protected boolean processAgent(Agent agent, int currLinkId) {
256  // Peek the next plan element and try to execute it.
257  long planentry = agent.plan.get(agent.planIndex + 1);
258  int type = Agent.getPlanHeader(planentry);
259  switch (type) {
260  case Agent.LinkType: return processAgentLink(agent, planentry, currLinkId);
261  case Agent.SleepForType: return processAgentSleepFor(agent, planentry);
262  case Agent.SleepUntilType: return processAgentSleepUntil(agent, planentry);
263  case Agent.StopArriveType: return processAgentStopArrive(agent, planentry);
264  case Agent.StopDelayType: return processAgentStopDelay(agent, planentry);
265  case Agent.StopDepartType: return processAgentStopDepart(agent, planentry);
266  case Agent.WaitType: return processAgentWait(agent, planentry);
267  case Agent.AccessType: // The access event is consumed in the stop.
268  case Agent.EgressType: // The egress event is consumed in the stop.
269  default:
270  throw new RuntimeException(String.format(
271  "unknown plan element type %d, agent %d plan index %d",
272  type, agent.id, agent.planIndex + 1));
273  }
274  }
275 
276  protected int processAgentActivities(Agent agent) {
277  boolean finished = agent.finished();
278  // if finished, install times on last event.
279  if (finished) {
280  setEventTime(agent, agent.events().size() - 1, secs, true);
281  }
282  // -1 is used in the processAgent because the agent is not in a link currently.
283  if (!finished && !processAgent(agent, -1)) {
284  addDelayedAgent(agent, secs + 1);
285  return 0;
286  }
287  return 1;
288  }
289 
290  protected int processLinks(HLink link) {
291  int routed = 0;
292  Agent agent = link.queue().peek();
293  while (agent.linkFinishTime <= secs && link.flow(secs, agent.getFlowCapacityPCUE())) {
294  boolean finished = agent.finished();
295  // if finished, install times on last event.
296  if (finished) {
297  setEventTime(agent, agent.events().size() - 1, secs, true);
298  }
299  if (finished || processAgent(agent, link.id())) {
300  float storageCapacityPCE = agent.getStorageCapacityPCUE();
301  link.pop(storageCapacityPCE);
302  routed += 1;
303  if ((agent = link.queue().peek()) == null) {
304  break;
305  }
306  } else {
307  break;
308  }
309  }
310  // If there is at least one agent in the link that could not be processed
311  // In addition we check if this agent was not added in this tick.
312  if (agent != null) {
313  addDelayedLink(link, Math.max(agent.linkFinishTime, secs + 1));
314  }
315  return routed;
316  }
317 
318  public void run() throws Exception {
319  int routed = 0;
320  Agent agent;
321  HLink link;
322 
323  while (secs != HermesConfigGroup.SIM_STEPS) {
324  if (secs % 3600 == 0) {
325  log.info("Hermes running at " + Time.writeTime(secs));
326  }
327  while ((agent = delayedAgentsByWakeupTime.get(secs).poll()) != null) {
328  if (HermesConfigGroup.DEBUG_REALMS) {
329  log(secs, String.format("Processing agent %d", agent.id));
330  }
331  routed += processAgentActivities(agent);
332 
333  }
334  delayedAgentsByWakeupTime.set(secs, null);
335  if (si.isDeterministicPt()) {
336  for (Event e : si.getDeterministicPtEvents().get(secs)) {
337  sortedEvents.add(e);
338  }
339  si.getDeterministicPtEvents().get(secs).clear();
340  }
341 
342  while ((link = delayedLinksByWakeupTime.get(secs).poll()) != null) {
343  if (HermesConfigGroup.DEBUG_REALMS) {
344  log(secs, String.format("Processing link %d", link.id()));
345  }
346  routed += processLinks(link);
347  }
348  delayedLinksByWakeupTime.set(secs, null);
349  if (HermesConfigGroup.DEBUG_REALMS && routed > 0) {
350  log(secs, String.format("Processed %d agents", routed));
351  }
352  if (HermesConfigGroup.CONCURRENT_EVENT_PROCESSING && secs % 3600 == 0 && sortedEvents.size() > 0) {
353  eventsManager.processEvents(sortedEvents);
354  sortedEvents = new EventArray();
355  }
356 
357  routed = 0;
358  secs += 1;
359  }
360  }
361 
362  public void setEventTime(Agent agent, int agentId, int time, boolean lastEvent) {
363  if (agentId != 0) {
364  EventArray agentEvents = agent.events();
365  Event event = agentEvents.get(agentId);
366 
367  for (; agent.eventsIndex <= agentId; agent.eventsIndex++) {
368  agentEvents.get(agent.eventsIndex).setTime(time);
369  if (HermesConfigGroup.DEBUG_REALMS)
370  log(secs, String.format("agent %d setEventTime (eventsIndex=%d) %s", agent.id, agent.eventsIndex, agentEvents.get(agent.eventsIndex).toString()));
371  sortedEvents.add(agentEvents.get(agent.eventsIndex));
372  }
373 
374  // Fix delay for PT events.
375  if (event instanceof VehicleArrivesAtFacilityEvent vaafe) {
376  vaafe.setDelay(vaafe.getTime() - vaafe.getDelay());
377 
378  } else if (event instanceof VehicleDepartsAtFacilityEvent vdafe) {
379  vdafe.setDelay(vdafe.getTime() - vdafe.getDelay());
380  }
381  // This removes actend that is not issued by QSim.
382  else if (lastEvent && event instanceof ActivityEndEvent) {
383  sortedEvents.removeLast();
384  }
385  }
386  }
387 
388  public void setEventVehicle(Agent agent, int eventId, int vehicleId) {
389  if (eventId != 0) {
390  Event event = agent.events().get(eventId);
391  Id<Vehicle> vid = Id.get(si.matsim_id(vehicleId, true), Vehicle.class);
392  if (event instanceof PersonEntersVehicleEvent) {
393  ((PersonEntersVehicleEvent) event).setVehicleId(vid);
394  } else if (event instanceof PersonLeavesVehicleEvent) {
395  ((PersonLeavesVehicleEvent) event).setVehicleId(vid);
396  } else {
397  throw new RuntimeException(
398  String.format("vehicle id could not be set for event: %d", eventId));
399  }
400  }
401  }
402 
403  ArrayList<ArrayDeque<HLink>> delayedLinks() { return this.delayedLinksByWakeupTime; }
404 
405  ArrayList<ArrayDeque<Agent>> delayedAgents() {
406  return this.delayedAgentsByWakeupTime;
407  }
408 
409  EventArray getSortedEvents() {
410  return this.sortedEvents;
411  }
412 }