Eclipse SUMO - Simulation of Urban MObility
Connection.cpp
Go to the documentation of this file.
1/****************************************************************************/
2// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.org/sumo
3// Copyright (C) 2012-2023 German Aerospace Center (DLR) and others.
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// https://www.eclipse.org/legal/epl-2.0/
7// This Source Code may also be made available under the following Secondary
8// Licenses when the conditions for such availability set forth in the Eclipse
9// Public License 2.0 are satisfied: GNU General Public License, version 2
10// or later which is available at
11// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13/****************************************************************************/
21// C++ TraCI client API implementation
22/****************************************************************************/
23#include <config.h>
24
25#include <thread>
26#include <chrono>
27#include <array>
29#include <libsumo/TraCIDefs.h>
30#include "Connection.h"
31
32
33namespace libtraci {
34// ===========================================================================
35// static member initializations
36// ===========================================================================
37Connection* Connection::myActive = nullptr;
38std::map<const std::string, Connection*> Connection::myConnections;
39
40
41// ===========================================================================
42// member method definitions
43// ===========================================================================
44Connection::Connection(const std::string& host, int port, int numRetries, const std::string& label, FILE* const pipe) :
45 myLabel(label), myProcessPipe(pipe), myProcessReader(nullptr), mySocket(host, port) {
46 if (pipe != nullptr) {
47 myProcessReader = new std::thread(&Connection::readOutput, this);
48 }
49 for (int i = 0; i <= numRetries; i++) {
50 try {
52 break;
53 } catch (tcpip::SocketException& e) {
54 if (i == numRetries) {
55 close();
56 throw;
57 }
58 std::cout << "Could not connect to TraCI server at " << host << ":" << port << " " << e.what() << std::endl;
59 std::cout << " Retrying in 1 second" << std::endl;
60 std::this_thread::sleep_for(std::chrono::seconds(1));
61 }
62 }
63}
64
65
66void
68 std::array<char, 256> buffer;
69 bool errout = false;
70 while (fgets(buffer.data(), (int)buffer.size(), myProcessPipe) != nullptr) {
71 std::stringstream result;
72 result << buffer.data();
73 std::string line;
74 while (std::getline(result, line)) {
75 if ((errout && (line.empty() || line[0] == ' ')) || line.compare(0, 6, "Error:") == 0 || line.compare(0, 8, "Warning:") == 0) {
76 std::cerr << line << std::endl;
77 errout = true;
78 } else {
79 std::cout << line << std::endl;
80 errout = false;
81 }
82 }
83 }
84}
85
86
87void
90 std::unique_lock<std::mutex> lock{ myMutex };
91 tcpip::Storage outMsg;
92 // command length
93 outMsg.writeUnsignedByte(1 + 1);
94 // command id
96 mySocket.sendExact(outMsg);
97
98 tcpip::Storage inMsg;
99 std::string acknowledgement;
100 check_resultState(inMsg, libsumo::CMD_CLOSE, false, &acknowledgement);
101 mySocket.close();
102 }
103 if (myProcessReader != nullptr) {
104 myProcessReader->join();
105 delete myProcessReader;
106 myProcessReader = nullptr;
107#ifdef WIN32
108 _pclose(myProcessPipe);
109#else
110 pclose(myProcessPipe);
111#endif
112 }
113 myConnections.erase(myLabel);
114 delete myActive;
115 myActive = nullptr;
116}
117
118
119void
121 std::unique_lock<std::mutex> lock{myMutex};
122 tcpip::Storage outMsg;
123 // command length
124 outMsg.writeUnsignedByte(1 + 1 + 8);
125 // command id
127 outMsg.writeDouble(time);
128 // send request message
129 mySocket.sendExact(outMsg);
130
131 tcpip::Storage inMsg;
133 mySubscriptionResults.clear();
135 int numSubs = inMsg.readInt();
136 while (numSubs-- > 0) {
137 const int responseID = check_commandGetResult(inMsg, 0, -1, true);
140 readVariableSubscription(responseID, inMsg);
141 } else {
142 readContextSubscription(responseID, inMsg);
143 }
144 }
145}
146
147
148void
150 std::unique_lock<std::mutex> lock{ myMutex };
151 tcpip::Storage outMsg;
152 // command length
153 outMsg.writeUnsignedByte(1 + 1 + 4);
154 // command id
156 // client index
157 outMsg.writeInt(order);
158 mySocket.sendExact(outMsg);
159
160 tcpip::Storage inMsg;
162}
163
164
165void
166Connection::createCommand(int cmdID, int varID, const std::string* const objID, tcpip::Storage* add) const {
168 throw libsumo::FatalTraCIError("Not connected.");
169 }
170 myOutput.reset();
171 // command length
172 int length = 1 + 1;
173 if (varID >= 0) {
174 length += 1;
175 if (objID != nullptr) {
176 length += 4 + (int)objID->length();
177 }
178 }
179 if (add != nullptr) {
180 length += (int)add->size();
181 }
182 if (length <= 255) {
184 } else {
186 myOutput.writeInt(length + 4);
187 }
189 if (varID >= 0) {
191 if (objID != nullptr) {
192 myOutput.writeString(*objID);
193 }
194 }
195 // additional values
196 if (add != nullptr) {
198 }
199}
200
201
202void
203Connection::subscribe(int domID, const std::string& objID, double beginTime, double endTime,
204 int domain, double range, const std::vector<int>& vars, const libsumo::TraCIResults& params) {
206 throw tcpip::SocketException("Socket is not initialised");
207 }
208 const bool isContext = domain != -1;
209 tcpip::Storage outMsg;
210 outMsg.writeUnsignedByte(domID); // command id
211 outMsg.writeDouble(beginTime);
212 outMsg.writeDouble(endTime);
213 outMsg.writeString(objID);
214 if (isContext) {
215 outMsg.writeUnsignedByte(domain);
216 outMsg.writeDouble(range);
217 }
218 if (vars.size() == 1 && vars.front() == -1) {
219 if (domID == libsumo::CMD_SUBSCRIBE_VEHICLE_VARIABLE && !isContext) {
220 // default for vehicles is edge id and lane position
221 outMsg.writeUnsignedByte(2);
224 } else {
225 // default for detectors is vehicle number, for all others (and contexts) id list
226 outMsg.writeUnsignedByte(1);
227 const bool isDetector = domID == libsumo::CMD_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
233 }
234 } else {
235 outMsg.writeUnsignedByte((int)vars.size());
236 for (const int v : vars) {
237 outMsg.writeUnsignedByte(v);
238 const auto& paramEntry = params.find(v);
239 if (paramEntry != params.end()) {
240 outMsg.writeStorage(*libsumo::StorageHelper::toStorage(*paramEntry->second));
241 }
242 }
243 }
244 tcpip::Storage complete;
245 complete.writeUnsignedByte(0);
246 complete.writeInt(5 + (int)outMsg.size());
247 complete.writeStorage(outMsg);
248 std::unique_lock<std::mutex> lock{ myMutex };
249 // send message
250 mySocket.sendExact(complete);
251
252 tcpip::Storage inMsg;
253 check_resultState(inMsg, domID);
254 if (!vars.empty()) {
255 const int responseID = check_commandGetResult(inMsg, domID);
256 if (isContext) {
257 readContextSubscription(responseID, inMsg);
258 } else {
259 readVariableSubscription(responseID, inMsg);
260 }
261 }
262}
263
264
265void
266Connection::check_resultState(tcpip::Storage& inMsg, int command, bool ignoreCommandId, std::string* acknowledgement) {
267 mySocket.receiveExact(inMsg);
268 int cmdLength;
269 int cmdId;
270 int resultType;
271 int cmdStart;
272 std::string msg;
273 try {
274 cmdStart = inMsg.position();
275 cmdLength = inMsg.readUnsignedByte();
276 cmdId = inMsg.readUnsignedByte();
277 resultType = inMsg.readUnsignedByte();
278 msg = inMsg.readString();
279 } catch (std::invalid_argument&) {
280 throw libsumo::TraCIException("#Error: an exception was thrown while reading result state message");
281 }
282 switch (resultType) {
284 throw libsumo::TraCIException(msg);
286 throw libsumo::TraCIException(".. Sent command is not implemented (" + toHex(command) + "), [description: " + msg + "]");
288 if (acknowledgement != nullptr) {
289 (*acknowledgement) = ".. Command acknowledged (" + toHex(command) + "), [description: " + msg + "]";
290 }
291 break;
292 default:
293 throw libsumo::TraCIException(".. Answered with unknown result code(" + toHex(resultType) + ") to command(" + toHex(command) + "), [description: " + msg + "]");
294 }
295 if (command != cmdId && !ignoreCommandId) {
296 throw libsumo::TraCIException("#Error: received status response to command: " + toHex(cmdId) + " but expected: " + toHex(command));
297 }
298 if ((cmdStart + cmdLength) != (int) inMsg.position()) {
299 throw libsumo::TraCIException("#Error: command at position " + toHex(cmdStart) + " has wrong length");
300 }
301}
302
303
304int
305Connection::check_commandGetResult(tcpip::Storage& inMsg, int command, int expectedType, bool ignoreCommandId) const {
306 int length = inMsg.readUnsignedByte();
307 if (length == 0) {
308 length = inMsg.readInt();
309 }
310 int cmdId = inMsg.readUnsignedByte();
311 if (!ignoreCommandId && cmdId != (command + 0x10)) {
312 throw libsumo::TraCIException("#Error: received response with command id: " + toString(cmdId) + "but expected: " + toString(command + 0x10));
313 }
314 if (expectedType >= 0) {
315 // not called from the TraCITestClient but from within the Connection
316 inMsg.readUnsignedByte(); // variableID
317 inMsg.readString(); // objectID
318 int valueDataType = inMsg.readUnsignedByte();
319 if (valueDataType != expectedType) {
320 throw libsumo::TraCIException("Expected " + toString(expectedType) + " but got " + toString(valueDataType));
321 }
322 }
323 return cmdId;
324}
325
326
328Connection::doCommand(int command, int var, const std::string& id, tcpip::Storage* add, int expectedType) {
329 createCommand(command, var, &id, add);
331 myInput.reset();
332 check_resultState(myInput, command);
333 if (expectedType >= 0) {
334 check_commandGetResult(myInput, command, expectedType);
335 }
336 return myInput;
337}
338
339
340void
342 std::unique_lock<std::mutex> lock{ myMutex };
345 myInput.reset();
347}
348
349
350void
351Connection::readVariables(tcpip::Storage& inMsg, const std::string& objectID, int variableCount, libsumo::SubscriptionResults& into) {
352 while (variableCount > 0) {
353
354 const int variableID = inMsg.readUnsignedByte();
355 const int status = inMsg.readUnsignedByte();
356 const int type = inMsg.readUnsignedByte();
357
358 if (status == libsumo::RTYPE_OK) {
359 switch (type) {
361 into[objectID][variableID] = std::make_shared<libsumo::TraCIDouble>(inMsg.readDouble());
362 break;
364 into[objectID][variableID] = std::make_shared<libsumo::TraCIString>(inMsg.readString());
365 break;
367 auto p = std::make_shared<libsumo::TraCIPosition>();
368 p->x = inMsg.readDouble();
369 p->y = inMsg.readDouble();
370 into[objectID][variableID] = p;
371 break;
372 }
374 auto p = std::make_shared<libsumo::TraCIPosition>();
375 p->x = inMsg.readDouble();
376 p->y = inMsg.readDouble();
377 p->z = inMsg.readDouble();
378 into[objectID][variableID] = p;
379 break;
380 }
381 case libsumo::TYPE_COLOR: {
382 auto c = std::make_shared<libsumo::TraCIColor>();
383 c->r = (unsigned char)inMsg.readUnsignedByte();
384 c->g = (unsigned char)inMsg.readUnsignedByte();
385 c->b = (unsigned char)inMsg.readUnsignedByte();
386 c->a = (unsigned char)inMsg.readUnsignedByte();
387 into[objectID][variableID] = c;
388 break;
389 }
391 into[objectID][variableID] = std::make_shared<libsumo::TraCIInt>(inMsg.readInt());
392 break;
394 auto sl = std::make_shared<libsumo::TraCIStringList>();
395 int n = inMsg.readInt();
396 for (int i = 0; i < n; ++i) {
397 sl->value.push_back(inMsg.readString());
398 }
399 into[objectID][variableID] = sl;
400 }
401 break;
403 int n = inMsg.readInt();
404 if (n == 2) {
405 inMsg.readUnsignedByte();
406 const std::string s = inMsg.readString();
407 const int secondType = inMsg.readUnsignedByte();
408 if (secondType == libsumo::TYPE_DOUBLE) {
409 auto r = std::make_shared<libsumo::TraCIRoadPosition>();
410 r->edgeID = s;
411 r->pos = inMsg.readDouble();
412 into[objectID][variableID] = r;
413 } else if (secondType == libsumo::TYPE_STRING) {
414 auto sl = std::make_shared<libsumo::TraCIStringList>();
415 sl->value.push_back(s);
416 sl->value.push_back(inMsg.readString());
417 into[objectID][variableID] = sl;
418 }
419 }
420 }
421 break;
422
423 // TODO Other data types
424
425 default:
426 throw libsumo::TraCIException("Unimplemented subscription type: " + toString(type));
427 }
428 } else {
429 throw libsumo::TraCIException("Subscription response error: variableID=" + toString(variableID) + " status=" + toString(status));
430 }
431
432 variableCount--;
433 }
434}
435
436
437void
439 const std::string objectID = inMsg.readString();
440 const int variableCount = inMsg.readUnsignedByte();
441 readVariables(inMsg, objectID, variableCount, mySubscriptionResults[responseID]);
442}
443
444
445void
447 const std::string contextID = inMsg.readString();
448 inMsg.readUnsignedByte(); // context domain
449 const int variableCount = inMsg.readUnsignedByte();
450 int numObjects = inMsg.readInt();
451 // the following also instantiates the empty map to get comparable results with libsumo
452 // see also https://github.com/eclipse/sumo/issues/7288
453 libsumo::SubscriptionResults& results = myContextSubscriptionResults[responseID][contextID];
454 while (numObjects-- > 0) {
455 std::string objectID = inMsg.readString();
456 readVariables(inMsg, objectID, variableCount, results);
457 }
458}
459
460
461}
462
463
464/****************************************************************************/
An error which is not recoverable.
Definition: TraCIDefs.h:155
static std::shared_ptr< tcpip::Storage > toStorage(const TraCIResult &v)
Definition: StorageHelper.h:33
An error which allows to continue.
Definition: TraCIDefs.h:144
void simulationStep(double time)
Sends a SimulationStep command.
Definition: Connection.cpp:120
Connection(const std::string &host, int port, int numRetries, const std::string &label, FILE *const pipe)
Constructor, connects to the specified SUMO server.
Definition: Connection.cpp:44
void close()
ends the simulation and closes the connection
Definition: Connection.cpp:88
void createCommand(int cmdID, int varID, const std::string *const objID, tcpip::Storage *add=nullptr) const
Sends a GetVariable / SetVariable request if mySocket is connected. Otherwise writes to myOutput only...
Definition: Connection.cpp:166
int check_commandGetResult(tcpip::Storage &inMsg, int command, int expectedType=-1, bool ignoreCommandId=false) const
Validates the result state of a command.
Definition: Connection.cpp:305
void addFilter(int var, tcpip::Storage *add=nullptr)
Definition: Connection.cpp:341
void readVariableSubscription(int responseID, tcpip::Storage &inMsg)
Definition: Connection.cpp:438
tcpip::Socket mySocket
The socket.
Definition: Connection.h:177
std::map< int, libsumo::SubscriptionResults > mySubscriptionResults
Definition: Connection.h:185
void check_resultState(tcpip::Storage &inMsg, int command, bool ignoreCommandId=false, std::string *acknowledgement=0)
Validates the result state of a command.
Definition: Connection.cpp:266
tcpip::Storage myInput
The reusable input storage.
Definition: Connection.h:181
FILE *const myProcessPipe
Definition: Connection.h:174
void readVariables(tcpip::Storage &inMsg, const std::string &objectID, int variableCount, libsumo::SubscriptionResults &into)
Definition: Connection.cpp:351
std::mutex myMutex
Definition: Connection.h:183
std::map< int, libsumo::ContextSubscriptionResults > myContextSubscriptionResults
Definition: Connection.h:186
tcpip::Storage myOutput
The reusable output storage.
Definition: Connection.h:179
void setOrder(int order)
Sends a SetOrder command.
Definition: Connection.cpp:149
void subscribe(int domID, const std::string &objID, double beginTime, double endTime, int domain, double range, const std::vector< int > &vars, const libsumo::TraCIResults &params)
Sends a SubscribeContext or a SubscribeVariable request.
Definition: Connection.cpp:203
static std::map< const std::string, Connection * > myConnections
Definition: Connection.h:189
const std::string myLabel
Definition: Connection.h:173
void readContextSubscription(int responseID, tcpip::Storage &inMsg)
Definition: Connection.cpp:446
tcpip::Storage & doCommand(int command, int var=-1, const std::string &id="", tcpip::Storage *add=nullptr, int expectedType=-1)
Definition: Connection.cpp:328
static Connection * myActive
Definition: Connection.h:188
static std::string toString(const T &t, std::streamsize accuracy=PRECISION)
Definition: Connection.h:147
std::thread * myProcessReader
Definition: Connection.h:175
std::string toHex(const T i, std::streamsize numDigits=2)
Definition: Connection.h:156
bool receiveExact(Storage &)
Receive a complete TraCI message from Socket::socket_.
Definition: socket.cpp:536
void sendExact(const Storage &)
Definition: socket.cpp:439
bool has_client_connection() const
Definition: socket.cpp:568
void connect()
Connects to host_:port_.
Definition: socket.cpp:367
void close()
Definition: socket.cpp:391
virtual std::string readString()
Definition: storage.cpp:180
virtual void writeString(const std::string &s)
Definition: storage.cpp:197
virtual unsigned int position() const
Definition: storage.cpp:76
virtual void writeInt(int)
Definition: storage.cpp:321
virtual void writeDouble(double)
Definition: storage.cpp:354
virtual int readUnsignedByte()
Definition: storage.cpp:155
void reset()
Definition: storage.cpp:85
virtual void writeUnsignedByte(int)
Definition: storage.cpp:165
StorageType::size_type size() const
Definition: storage.h:119
virtual void writeStorage(tcpip::Storage &store)
Definition: storage.cpp:388
virtual double readDouble()
Definition: storage.cpp:362
virtual int readInt()
Definition: storage.cpp:311
TRACI_CONST int TYPE_COLOR
TRACI_CONST int LAST_STEP_VEHICLE_NUMBER
TRACI_CONST int POSITION_3D
TRACI_CONST int RTYPE_NOTIMPLEMENTED
TRACI_CONST int TRACI_ID_LIST
TRACI_CONST int VAR_ROAD_ID
TRACI_CONST int TYPE_COMPOUND
TRACI_CONST int RESPONSE_SUBSCRIBE_PARKINGAREA_VARIABLE
TRACI_CONST int RESPONSE_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
TRACI_CONST int POSITION_2D
TRACI_CONST int RESPONSE_SUBSCRIBE_OVERHEADWIRE_VARIABLE
TRACI_CONST int CMD_CLOSE
TRACI_CONST int CMD_SETORDER
TRACI_CONST int TYPE_STRINGLIST
TRACI_CONST int TYPE_INTEGER
TRACI_CONST int RESPONSE_SUBSCRIBE_BUSSTOP_VARIABLE
TRACI_CONST int CMD_ADD_SUBSCRIPTION_FILTER
std::map< std::string, libsumo::TraCIResults > SubscriptionResults
{object->{variable->value}}
Definition: TraCIDefs.h:337
TRACI_CONST int VAR_LANEPOSITION
TRACI_CONST int CMD_SUBSCRIBE_VEHICLE_VARIABLE
TRACI_CONST int TYPE_DOUBLE
TRACI_CONST int CMD_SUBSCRIBE_LANEAREA_VARIABLE
TRACI_CONST int CMD_SUBSCRIBE_INDUCTIONLOOP_VARIABLE
TRACI_CONST int CMD_SUBSCRIBE_MULTIENTRYEXIT_VARIABLE
TRACI_CONST int RTYPE_ERR
TRACI_CONST int CMD_SIMSTEP
TRACI_CONST int CMD_SUBSCRIBE_LANE_VARIABLE
TRACI_CONST int RTYPE_OK
std::map< int, std::shared_ptr< libsumo::TraCIResult > > TraCIResults
{variable->value}
Definition: TraCIDefs.h:335
TRACI_CONST int CMD_SUBSCRIBE_EDGE_VARIABLE
TRACI_CONST int TYPE_STRING