xrootd
XrdClAsyncMsgReader.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
19#ifndef SRC_XRDCL_XRDCLASYNCMSGREADER_HH_
20#define SRC_XRDCL_XRDCLASYNCMSGREADER_HH_
21
22#include "XrdCl/XrdClMessage.hh"
25#include "XrdCl/XrdClSocket.hh"
27#include "XrdCl/XrdClStream.hh"
28
29#include <memory>
30
31namespace XrdCl
32{
33 //----------------------------------------------------------------------------
35 //----------------------------------------------------------------------------
37 {
38 public:
39 //------------------------------------------------------------------------
47 //------------------------------------------------------------------------
50 const std::string &strmname,
51 Stream &strm,
52 uint16_t substrmnb) : readstage( ReadStart ),
54 socket( socket ),
56 strm( strm ),
58 inmsgsize( 0 ),
59 inhandler( nullptr )
60 {
61 }
62
63 //------------------------------------------------------------------------
65 //------------------------------------------------------------------------
66 virtual ~AsyncMsgReader(){ }
67
68 //------------------------------------------------------------------------
70 //------------------------------------------------------------------------
71 inline void Reset()
72 {
74 inmsg.reset();
75 inmsgsize = 0;
76 inhandler = nullptr;
77 }
78
79 //------------------------------------------------------------------------
81 //------------------------------------------------------------------------
83 {
84 Log *log = DefaultEnv::GetLog();
85
86 while( true )
87 {
88 switch( readstage )
89 {
90 //------------------------------------------------------------------
91 // There is no incoming message currently being processed so we
92 // create a new one
93 //------------------------------------------------------------------
94 case ReadStart:
95 {
96 inmsg = std::make_shared<Message>();
97 //----------------------------------------------------------------
98 // The next step is to read the header
99 //----------------------------------------------------------------
101 continue;
102 }
103 //------------------------------------------------------------------
104 // We need to read the header
105 //------------------------------------------------------------------
106 case ReadHeader:
107 {
109 if( !st.IsOK() || st.code == suRetry )
110 return st;
111
112 log->Dump( AsyncSockMsg, "[%s] Received message header for 0x%x size: %d",
113 strmname.c_str(), inmsg.get(), inmsg->GetCursor() );
114
115 ServerResponse *rsp = (ServerResponse*)inmsg->GetBuffer();
116 if( rsp->hdr.status == kXR_attn )
117 {
118 log->Dump( AsyncSockMsg, "[%s] Will readout the attn action code "
119 "of message 0x%x", strmname.c_str(), inmsg.get() );
120 inmsg->ReAllocate( 16 ); // header (bytes 8) + action code (8 bytes)
122 continue;
123 }
124
125 inmsgsize = inmsg->GetCursor();
127
128 if( inhandler )
129 {
130 log->Dump( AsyncSockMsg, "[%s] Will use the raw handler to read body "
131 "of message 0x%x", strmname.c_str(), inmsg.get() );
132 //--------------------------------------------------------------
133 // The next step is to read raw data
134 //--------------------------------------------------------------
136 continue;
137 }
138 //----------------------------------------------------------------
139 // The next step is to read the message body
140 //----------------------------------------------------------------
142 continue;
143 }
144 //------------------------------------------------------------------
145 // Before proceeding we need to figure out the attn action code
146 //------------------------------------------------------------------
147 case ReadAttn:
148 {
150 if( !st.IsOK() || st.code == suRetry )
151 return st;
152
153 //----------------------------------------------------------------
154 // There is an embedded response, overwrite the message with that
155 //----------------------------------------------------------------
156 if( HasEmbeddedRsp() )
157 {
158 inmsg->Free();
160 continue;
161 }
162
163 //----------------------------------------------------------------
164 // Readout the rest of the body
165 //----------------------------------------------------------------
166 inmsgsize = inmsg->GetCursor();
168 continue;
169 }
170 //------------------------------------------------------------------
171 // We need to call a raw message handler to get the data from the
172 // socket
173 //------------------------------------------------------------------
174 case ReadRawData:
175 {
176 uint32_t bytesRead = 0;
177 XRootDStatus st = inhandler->ReadMessageBody( inmsg.get(), &socket, bytesRead );
178 if( !st.IsOK() )
179 return st;
180 inmsgsize += bytesRead;
181 if( st.code == suRetry )
182 return st;
183 //----------------------------------------------------------------
184 // The next step is to finalize the read
185 //----------------------------------------------------------------
187 continue;
188 }
189 //------------------------------------------------------------------
190 // No raw handler, so we read the message to the buffer
191 //------------------------------------------------------------------
192 case ReadMsgBody:
193 {
195 if( !st.IsOK() || st.code == suRetry )
196 return st;
197 inmsgsize = inmsg->GetCursor();
198
199 //----------------------------------------------------------------
200 // Now check if there are some additional raw data to be read
201 //----------------------------------------------------------------
202 if( inhandler )
203 {
204 //--------------------------------------------------------------
205 // The next step is to finalize the read
206 //--------------------------------------------------------------
208 continue;
209 }
210
211 uint16_t action = strm.InspectStatusRsp( substrmnb,
212 inhandler );
213
214 if( action & MsgHandler::Corrupted )
216
217 if( action & MsgHandler::Raw )
218 {
219 //--------------------------------------------------------------
220 // The next step is to read the raw data
221 //--------------------------------------------------------------
223 continue;
224 }
225
226 if( action & MsgHandler::More )
227 {
228 //--------------------------------------------------------------
229 // The next step is to read the additional data in the message
230 // body
231 //--------------------------------------------------------------
233 continue;
234 }
235
236 //----------------------------------------------------------------
237 // Unless we've got a kXR_status message and no handler the
238 // read is done
239 //----------------------------------------------------------------
240 ServerResponse *rsphdr = (ServerResponse *)inmsg->GetBuffer();
241 if( !( action & MsgHandler::RemoveHandler ) ||
242 rsphdr->hdr.status != kXR_status ||
243 inmsg->GetSize() < sizeof( ServerResponseStatus ) )
244 {
246 continue;
247 }
248
249 //----------------------------------------------------------------
250 // There is no handler and we have a kXR_status message. If we
251 // have already read all the message then we're done.
252 //----------------------------------------------------------------
253 ServerResponseStatus *rspst = (ServerResponseStatus*)inmsg->GetBuffer();
254 const uint32_t hdrSize = rspst->hdr.dlen;
255 if( inmsg->GetSize() != hdrSize + 8 )
256 {
258 continue;
259 }
260
261 //----------------------------------------------------------------
262 // Only the header of kXR_status has been read. Unmarshall the
263 // header and if if there is more body data call GetBody() again.
264 //----------------------------------------------------------------
265 const uint16_t reqType = rspst->bdy.requestid + kXR_1stRequest;
267
268 if( !st.IsOK() && st.code == errDataError )
269 {
270 log->Error( AsyncSockMsg, "[%s] Failed to unmarshall "
271 "corrupted status body in message 0x%x.",
272 strmname.c_str(), inmsg.get() );
274 }
275 if( !st.IsOK() )
276 {
277 log->Error( AsyncSockMsg, "[%s] Failed to unmarshall "
278 "status body of message 0x%x.",
279 strmname.c_str(), inmsg.get() );
281 continue;
282 }
283 if ( rspst->bdy.dlen != 0 )
284 {
286 continue;
287 }
288
289 //----------------------------------------------------------------
290 // The next step is to finalize the read
291 //----------------------------------------------------------------
293 continue;
294 }
295
296 case ReadDone:
297 {
298 //----------------------------------------------------------------
299 // Report the incoming message
300 //----------------------------------------------------------------
301 log->Dump( AsyncSockMsg, "[%s] Received message 0x%x of %d bytes",
302 strmname.c_str(), inmsg.get(), inmsgsize );
303
304 strm.OnIncoming( substrmnb, std::move( inmsg ), inmsgsize );
305 }
306 }
307 // just in case
308 break;
309 }
310
311 //----------------------------------------------------------------------
312 // We are done
313 //----------------------------------------------------------------------
314 return XRootDStatus();
315 }
316
317 private:
318
320 {
321 //----------------------------------------------------------------------
322 // Readout the action code from the socket. We are reading out 8 bytes
323 // into the message, the 8 byte header is already there.
324 //----------------------------------------------------------------------
325 size_t btsleft = 8 - ( inmsg->GetCursor() - 8 );
326 while( btsleft > 0 )
327 {
328 int btsrd = 0;
329 XRootDStatus st = socket.Read( inmsg->GetBufferAtCursor(), btsleft, btsrd );
330 if( !st.IsOK() || st.code == suRetry )
331 return st;
332 btsleft -= btsrd;
333 inmsg->AdvanceCursor( btsrd );
334 }
335
336 //----------------------------------------------------------------------
337 // Marshal the action code
338 //----------------------------------------------------------------------
339 ServerResponseBody_Attn *attn = (ServerResponseBody_Attn*)inmsg->GetBuffer( 8 );
340 attn->actnum = ntohl( attn->actnum );
341
342 return XRootDStatus();
343 }
344
345 inline bool HasEmbeddedRsp()
346 {
347 ServerResponseBody_Attn *attn = (ServerResponseBody_Attn*)inmsg->GetBuffer( 8 );
348 return ( attn->actnum == kXR_asynresp );
349 }
350
351 //------------------------------------------------------------------------
353 //------------------------------------------------------------------------
354 enum Stage
355 {
356 ReadStart, //< the next step is to initialize the read
357 ReadHeader, //< the next step is to read the header
358 ReadAttn, //< the next step is to read attn action code
359 ReadMsgBody, //< the next step is to read the body
360 ReadRawData, //< the next step is to read the raw data
361 ReadDone //< the next step is to finalize the read
362 };
363
364 //------------------------------------------------------------------------
365 // Current read stage
366 //------------------------------------------------------------------------
368
369 //------------------------------------------------------------------------
370 // The context of the read operation
371 //------------------------------------------------------------------------
374 const std::string &strmname;
376 uint16_t substrmnb;
377
378
379 //------------------------------------------------------------------------
380 // The internal state of the the reader
381 //------------------------------------------------------------------------
382 std::shared_ptr<Message> inmsg; //< the ownership is shared with MsgHandler
383 uint32_t inmsgsize;
385
386 };
387
388} /* namespace XrdCl */
389
390#endif /* SRC_XRDCL_XRDCLASYNCMSGREADER_HH_ */
@ kXR_asynresp
Definition: XProtocol.hh:936
@ kXR_status
Definition: XProtocol.hh:905
@ kXR_attn
Definition: XProtocol.hh:899
@ kXR_1stRequest
Definition: XProtocol.hh:111
Utility class encapsulating reading response message logic.
Definition: XrdClAsyncMsgReader.hh:37
void Reset()
Reset the state of the object (makes it ready to read out next msg)
Definition: XrdClAsyncMsgReader.hh:71
Socket & socket
Definition: XrdClAsyncMsgReader.hh:373
TransportHandler & xrdTransport
Definition: XrdClAsyncMsgReader.hh:372
const std::string & strmname
Definition: XrdClAsyncMsgReader.hh:374
std::shared_ptr< Message > inmsg
Definition: XrdClAsyncMsgReader.hh:382
uint32_t inmsgsize
Definition: XrdClAsyncMsgReader.hh:383
Stage readstage
Definition: XrdClAsyncMsgReader.hh:367
Stage
Stages of reading out a response from the socket.
Definition: XrdClAsyncMsgReader.hh:355
@ ReadAttn
Definition: XrdClAsyncMsgReader.hh:358
@ ReadRawData
Definition: XrdClAsyncMsgReader.hh:360
@ ReadMsgBody
Definition: XrdClAsyncMsgReader.hh:359
@ ReadHeader
Definition: XrdClAsyncMsgReader.hh:357
@ ReadStart
Definition: XrdClAsyncMsgReader.hh:356
@ ReadDone
Definition: XrdClAsyncMsgReader.hh:361
bool HasEmbeddedRsp()
Definition: XrdClAsyncMsgReader.hh:345
uint16_t substrmnb
Definition: XrdClAsyncMsgReader.hh:376
XRootDStatus Read()
Read out the response from the socket.
Definition: XrdClAsyncMsgReader.hh:82
virtual ~AsyncMsgReader()
Destructor.
Definition: XrdClAsyncMsgReader.hh:66
MsgHandler * inhandler
Definition: XrdClAsyncMsgReader.hh:384
XRootDStatus ReadAttnActnum()
Definition: XrdClAsyncMsgReader.hh:319
AsyncMsgReader(TransportHandler &xrdTransport, Socket &socket, const std::string &strmname, Stream &strm, uint16_t substrmnb)
Definition: XrdClAsyncMsgReader.hh:48
Stream & strm
Definition: XrdClAsyncMsgReader.hh:375
static Log * GetLog()
Get default log.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Message handler.
Definition: XrdClPostMasterInterfaces.hh:51
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead)
Definition: XrdClPostMasterInterfaces.hh:138
@ Raw
Definition: XrdClPostMasterInterfaces.hh:63
@ RemoveHandler
Definition: XrdClPostMasterInterfaces.hh:61
@ More
there are more (non-raw) data to be read
Definition: XrdClPostMasterInterfaces.hh:72
@ Corrupted
Definition: XrdClPostMasterInterfaces.hh:69
A network socket.
Definition: XrdClSocket.hh:43
virtual XRootDStatus Read(char *buffer, size_t size, int &bytesRead)
Stream.
Definition: XrdClStream.hh:52
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
Perform the handshake and the authentication for each physical stream.
Definition: XrdClPostMasterInterfaces.hh:310
virtual XRootDStatus GetBody(Message &message, Socket *socket)=0
virtual XRootDStatus GetHeader(Message &message, Socket *socket)=0
Request status.
Definition: XrdClXRootDResponses.hh:219
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
Definition: XrdClAction.hh:34
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint64_t AsyncSockMsg
Definition: XrdClConstants.hh:41
const uint16_t errCorruptedHeader
Definition: XrdClStatus.hh:103
Definition: XProtocol.hh:939
kXR_int32 actnum
Definition: XProtocol.hh:940
kXR_int32 dlen
Definition: XProtocol.hh:1236
kXR_char requestid
Definition: XProtocol.hh:1233
kXR_unt16 status
Definition: XProtocol.hh:913
kXR_int32 dlen
Definition: XProtocol.hh:914
Definition: XProtocol.hh:1255
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1257
struct ServerResponseHeader hdr
Definition: XProtocol.hh:1256
Definition: XProtocol.hh:1282
ServerResponseHeader hdr
Definition: XProtocol.hh:1283
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124