MATSIM
QNetsimEngineWithThreadpool.java
Go to the documentation of this file.
1 /* *********************************************************************** *
2  * project: org.matsim.*
3  * QNetsimEngine.java
4  * *
5  * *********************************************************************** *
6  * *
7  * copyright : (C) 2009 by the members listed in the COPYING, *
8  * LICENSE and WARRANTY file. *
9  * email : info at matsim dot org *
10  * *
11  * *********************************************************************** *
12  * *
13  * This program is free software; you can redistribute it and/or modify *
14  * it under the terms of the GNU General Public License as published by *
15  * the Free Software Foundation; either version 2 of the License, or *
16  * (at your option) any later version. *
17  * See also COPYING, LICENSE and WARRANTY file *
18  * *
19  * *********************************************************************** */
20 
21 package org.matsim.core.mobsim.qsim.qnetsimengine;
22 
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.ThreadFactory;
30 
31 import jakarta.inject.Inject;
32 
33 import org.matsim.core.mobsim.qsim.QSim;
34 
46 final class QNetsimEngineWithThreadpool extends AbstractQNetsimEngine<QNetsimEngineRunnerForThreadpool> {
47 
48  private final int numOfRunners;
49  private ExecutorService pool;
50 
51 // public QNetsimEngineWithThreadpool(final QSim sim) {
52 // this(sim, null);
53 // }
54 
55  @Inject QNetsimEngineWithThreadpool(final QSim sim, QNetworkFactory netsimNetworkFactory, NetworkModeDepartureHandler networkModeDepartureHandler) {
56  super(sim, netsimNetworkFactory, networkModeDepartureHandler);
57  this.numOfRunners = this.numOfThreads;
58  }
59 
60  @Override public void finishMultiThreading() {
61  this.pool.shutdown();
62  }
63 
64  protected void run(double time) {
65  // yy Acceleration options to try out (kai, jan'15):
66 
67  // (a) Try to do without barriers. With our
68  // message-based experiments a decade ago, it was better to let each runner decide locally when to proceed. For intuition, imagine that
69  // one runner is slowest on the links, and some other runner slowest on the nodes. With the barriers, this cannot overlap.
70  // With message passing, this was achieved by waiting for all necessary messages. Here, it could (for example) be achieved with runner-local
71  // clocks:
72  // for ( all runners that own incoming links to my nodes ) { // (*)
73  // wait until runner.getTime() == myTime ;
74  // }
75  // processLocalNodes() ;
76  // mytime += 0.5 ;
77  // for ( all runners that own toNodes of my links ) { // (**)
78  // wait until runner.getTime() == myTime ;
79  // }
80  // processLocalLinks() ;
81  // myTime += 0.5 ;
82 
83  // (b) Do deliberate domain decomposition rather than round robin (fewer runners to wait for at (*) and (**)).
84 
85  // (c) One thread that is much faster than all others is much more efficient than one thread that is much slower than all others.
86  // So make sure that no thread sticks out in terms of slowness. Difficult to achieve, though. A decade back, we used a "typical" run
87  // as input for the domain decomposition under (b).
88 
89  // set current Time
90  for (AbstractQNetsimEngineRunner engine : this.getQnetsimEngineRunner()) {
91  engine.setTime(time);
92  }
93 
94  try {
95  for (AbstractQNetsimEngineRunner engine : this.getQnetsimEngineRunner()) {
96  ((QNetsimEngineRunnerForThreadpool) engine).setMovingNodes(true);
97  }
98  for (Future<Boolean> future : pool.invokeAll(this.getQnetsimEngineRunner())) {
99  future.get();
100  }
101  for (AbstractQNetsimEngineRunner engine : this.getQnetsimEngineRunner()) {
102  ((QNetsimEngineRunnerForThreadpool) engine).setMovingNodes(false);
103  }
104  for (Future<Boolean> future : pool.invokeAll(this.getQnetsimEngineRunner())) {
105  future.get();
106  }
107  } catch (InterruptedException e) {
108  throw new RuntimeException(e) ;
109  } catch (ExecutionException e) {
110  throw new RuntimeException(e.getCause());
111  }
112  }
113 
114  private static class NamedThreadFactory implements ThreadFactory {
115  private int count = 0;
116 
117  @Override
118  public Thread newThread(Runnable r) {
119  return new Thread( r , "QNetsimEngine_PooledThread_" + count++);
120  }
121  }
122 
123  @Override
124  protected List<QNetsimEngineRunnerForThreadpool> initQSimEngineRunners() {
125  List<QNetsimEngineRunnerForThreadpool> engines = new ArrayList<>();
126  for (int i = 0; i < numOfRunners; i++) {
127  QNetsimEngineRunnerForThreadpool engine = new QNetsimEngineRunnerForThreadpool();
128  engines.add(engine);
129  }
130  return engines;
131  }
132 
133  @Override
134  protected void initMultiThreading() {
135  this.pool = Executors.newFixedThreadPool(
136  this.numOfThreads,
137  new NamedThreadFactory());
138  }
139 }