XRootD
Loading...
Searching...
No Matches
XrdClOperations.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3// Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4// Michal Simon <michal.simon@cern.ch>
5//------------------------------------------------------------------------------
6// This file is part of the XRootD software suite.
7//
8// XRootD is free software: you can redistribute it and/or modify
9// it under the terms of the GNU Lesser General Public License as published by
10// the Free Software Foundation, either version 3 of the License, or
11// (at your option) any later version.
12//
13// XRootD is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16// GNU General Public License for more details.
17//
18// You should have received a copy of the GNU Lesser General Public License
19// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20//
21// In applying this licence, CERN does not waive the privileges and immunities
22// granted to it by virtue of its status as an Intergovernmental Organization
23// or submit itself to any jurisdiction.
24//------------------------------------------------------------------------------
25
26#ifndef __XRD_CL_OPERATIONS_HH__
27#define __XRD_CL_OPERATIONS_HH__
28
29#include <memory>
30#include <stdexcept>
31#include <sstream>
32#include <tuple>
33#include <future>
36#include "XrdCl/XrdClArg.hh"
40
45
46namespace XrdCl
47{
48
49 class Pipeline;
50 class PipelineHandler;
51
52 //----------------------------------------------------------------------------
58 //----------------------------------------------------------------------------
59 template<bool HasHndl>
61 {
62 // Declare friendship between templates
63 template<bool>
64 friend class Operation;
65
66 friend std::future<XRootDStatus> Async( Pipeline, uint16_t );
67
68 friend class Pipeline;
69 friend class PipelineHandler;
70
71 public:
72
73 //------------------------------------------------------------------------
75 //------------------------------------------------------------------------
76 Operation() : valid( true )
77 {
78 }
79
80 //------------------------------------------------------------------------
82 //------------------------------------------------------------------------
83 template<bool from>
85 handler( std::move( op.handler ) ), valid( true )
86 {
87 if( !op.valid ) throw std::invalid_argument( "Cannot construct "
88 "Operation from an invalid Operation!" );
89 op.valid = false;
90 }
91
92 //------------------------------------------------------------------------
94 //------------------------------------------------------------------------
95 virtual ~Operation()
96 {
97 }
98
99 //------------------------------------------------------------------------
101 //------------------------------------------------------------------------
102 virtual std::string ToString() = 0;
103
104 //------------------------------------------------------------------------
108 //------------------------------------------------------------------------
109 virtual Operation<HasHndl>* Move() = 0;
110
111 //------------------------------------------------------------------------
116 //------------------------------------------------------------------------
118
119 protected:
120
121 //------------------------------------------------------------------------
126 //------------------------------------------------------------------------
127 void Run( Timeout timeout,
128 std::promise<XRootDStatus> prms,
129 std::function<void(const XRootDStatus&)> final );
130
131 //------------------------------------------------------------------------
137 //------------------------------------------------------------------------
138 virtual XRootDStatus RunImpl( PipelineHandler *handler, uint16_t timeout ) = 0;
139
140 //------------------------------------------------------------------------
144 //------------------------------------------------------------------------
146
147 //------------------------------------------------------------------------
149 //------------------------------------------------------------------------
150 std::unique_ptr<PipelineHandler> handler;
151
152 //------------------------------------------------------------------------
154 //------------------------------------------------------------------------
155 bool valid;
156 };
157
158 //----------------------------------------------------------------------------
160 //----------------------------------------------------------------------------
161 typedef std::function<Operation<true>*(const XRootDStatus&)> rcvry_func;
162
163 //----------------------------------------------------------------------------
166 //----------------------------------------------------------------------------
168 {
169 template<bool> friend class Operation;
170
171 public:
172
173 //------------------------------------------------------------------------
177 //------------------------------------------------------------------------
179
180 //------------------------------------------------------------------------
182 //------------------------------------------------------------------------
184 {
185 }
186
187 //------------------------------------------------------------------------
189 //------------------------------------------------------------------------
190 void HandleResponseWithHosts( XRootDStatus *status, AnyObject *response,
191 HostList *hostList );
192
193 //------------------------------------------------------------------------
195 //------------------------------------------------------------------------
196 void HandleResponse( XRootDStatus *status, AnyObject *response );
197
198 //------------------------------------------------------------------------
200 //------------------------------------------------------------------------
202 {
203 }
204
205 //------------------------------------------------------------------------
209 //------------------------------------------------------------------------
210 void AddOperation( Operation<true> *operation );
211
212 //------------------------------------------------------------------------
219 //------------------------------------------------------------------------
220 void Assign( const Timeout &timeout,
221 std::promise<XRootDStatus> prms,
222 std::function<void(const XRootDStatus&)> final,
223 Operation<true> *opr );
224
225 //------------------------------------------------------------------------
227 //------------------------------------------------------------------------
228 void Assign( std::function<void(const XRootDStatus&)> final );
229
230 //------------------------------------------------------------------------
232 //------------------------------------------------------------------------
234
235 private:
236
237 //------------------------------------------------------------------------
239 //------------------------------------------------------------------------
240 void HandleResponseImpl( XRootDStatus *status, AnyObject *response, HostList *hostList );
241
242 inline void dealloc( XRootDStatus *status, AnyObject *response,
243 HostList *hostList )
244 {
245 delete status;
246 delete response;
247 delete hostList;
248 }
249
250 //------------------------------------------------------------------------
252 //------------------------------------------------------------------------
253 std::unique_ptr<ResponseHandler> responseHandler;
254
255 //------------------------------------------------------------------------
257 //------------------------------------------------------------------------
258 std::unique_ptr<Operation<true>> currentOperation;
259
260 //------------------------------------------------------------------------
262 //------------------------------------------------------------------------
263 std::unique_ptr<Operation<true>> nextOperation;
264
265 //------------------------------------------------------------------------
267 //------------------------------------------------------------------------
268 Timeout timeout;
269
270 //------------------------------------------------------------------------
272 //------------------------------------------------------------------------
273 std::promise<XRootDStatus> prms;
274
275 //------------------------------------------------------------------------
278 //------------------------------------------------------------------------
279 std::function<void(const XRootDStatus&)> final;
280 };
281
282 //----------------------------------------------------------------------------
288 //----------------------------------------------------------------------------
290 {
291 template<bool> friend class ParallelOperation;
292 friend std::future<XRootDStatus> Async( Pipeline, uint16_t );
293 friend class PipelineHandler;
294
295 public:
296
297 //------------------------------------------------------------------------
299 //------------------------------------------------------------------------
301 {
302 }
303
304 //------------------------------------------------------------------------
306 //------------------------------------------------------------------------
308 operation( op->Move() )
309 {
310 }
311
312 //------------------------------------------------------------------------
314 //------------------------------------------------------------------------
316 operation( op.Move() )
317 {
318 }
319
320 //------------------------------------------------------------------------
322 //------------------------------------------------------------------------
324 operation( op.Move() )
325 {
326 }
327
329 operation( op->ToHandled() )
330 {
331 }
332
333 //------------------------------------------------------------------------
335 //------------------------------------------------------------------------
337 operation( op.ToHandled() )
338 {
339 }
340
341 //------------------------------------------------------------------------
343 //------------------------------------------------------------------------
345 operation( op.ToHandled() )
346 {
347 }
348
349 Pipeline( Pipeline &&pipe ) :
350 operation( std::move( pipe.operation ) )
351 {
352 }
353
354 //------------------------------------------------------------------------
356 //------------------------------------------------------------------------
358 {
359 operation = std::move( pipe.operation );
360 return *this;
361 }
362
363 //------------------------------------------------------------------------
365 //------------------------------------------------------------------------
367 {
368 operation->AddOperation( op.Move() );
369 return *this;
370 }
371
372 //------------------------------------------------------------------------
374 //------------------------------------------------------------------------
376 {
377 operation->AddOperation( op.ToHandled() );
378 return *this;
379 }
380
381 //------------------------------------------------------------------------
385 //------------------------------------------------------------------------
386 operator Operation<true>&()
387 {
388 if( !bool( operation ) ) throw std::logic_error( "Invalid pipeline." );
389 return *operation.get();
390 }
391
392 //------------------------------------------------------------------------
396 //------------------------------------------------------------------------
397 operator bool()
398 {
399 return bool( operation );
400 }
401
402 //------------------------------------------------------------------------
406 //------------------------------------------------------------------------
407 static void Stop( const XRootDStatus &status = XrdCl::XRootDStatus() );
408
409 //------------------------------------------------------------------------
411 //------------------------------------------------------------------------
412 static void Repeat();
413
414 //------------------------------------------------------------------------
416 //------------------------------------------------------------------------
417 static void Replace( Operation<false> &&opr );
418
419 //------------------------------------------------------------------------
421 //------------------------------------------------------------------------
422 static void Replace( Pipeline p );
423
424 //------------------------------------------------------------------------
426 //------------------------------------------------------------------------
427 static void Ignore();
428
429 private:
430
431 //------------------------------------------------------------------------
436 //------------------------------------------------------------------------
437 Operation<true>* operator->()
438 {
439 return operation.get();
440 }
441
442 //------------------------------------------------------------------------
447 //------------------------------------------------------------------------
448 void Run( Timeout timeout, std::function<void(const XRootDStatus&)> final = nullptr )
449 {
450 if( ftr.valid() )
451 throw std::logic_error( "Pipeline is already running!" );
452
453 // a promise that the pipe will have a result
454 std::promise<XRootDStatus> prms;
455 ftr = prms.get_future();
456
457 if( !operation ) std::logic_error( "Empty pipeline!" );
458
459 Operation<true> *opr = operation.release();
460 PipelineHandler *h = opr->handler.get();
461 if( h )
462 h->PreparePipelineStart();
463
464 opr->Run( timeout, std::move( prms ), std::move( final ) );
465 }
466
467 //------------------------------------------------------------------------
469 //------------------------------------------------------------------------
470 std::unique_ptr<Operation<true>> operation;
471
472 //------------------------------------------------------------------------
474 //------------------------------------------------------------------------
475 std::future<XRootDStatus> ftr;
476
477 };
478
479 //----------------------------------------------------------------------------
486 //----------------------------------------------------------------------------
487 inline std::future<XRootDStatus> Async( Pipeline pipeline, uint16_t timeout = 0 )
488 {
489 pipeline.Run( timeout );
490 return std::move( pipeline.ftr );
491 }
492
493 //----------------------------------------------------------------------------
501 //----------------------------------------------------------------------------
502 inline XRootDStatus WaitFor( Pipeline pipeline, uint16_t timeout = 0 )
503 {
504 return Async( std::move( pipeline ), timeout ).get();
505 }
506
507 //----------------------------------------------------------------------------
514 //----------------------------------------------------------------------------
515 template<template<bool> class Derived, bool HasHndl, typename HdlrFactory, typename ... Args>
516 class ConcreteOperation: public Operation<HasHndl>
517 {
518 template<template<bool> class, bool, typename, typename ...>
519 friend class ConcreteOperation;
520
521 public:
522
523 //------------------------------------------------------------------------
527 //------------------------------------------------------------------------
528 ConcreteOperation( Args&&... args ) : args( std::tuple<Args...>( std::move( args )... ) ),
529 timeout( 0 )
530 {
531 static_assert( !HasHndl, "It is only possible to construct operation without handler" );
532 }
533
534 //------------------------------------------------------------------------
540 //------------------------------------------------------------------------
541 template<bool from>
543 Operation<HasHndl>( std::move( op ) ), args( std::move( op.args ) ), timeout( 0 )
544 {
545 }
546
547 //------------------------------------------------------------------------
555 //------------------------------------------------------------------------
556 template<typename Hdlr>
557 Derived<true> operator>>( Hdlr &&hdlr )
558 {
559 return this->StreamImpl( HdlrFactory::Create( hdlr ) );
560 }
561
562 //------------------------------------------------------------------------
568 //------------------------------------------------------------------------
569 Derived<true> operator|( Operation<true> &op )
570 {
571 return PipeImpl( *this, op );
572 }
573
574 //------------------------------------------------------------------------
580 //------------------------------------------------------------------------
581 Derived<true> operator|( Operation<true> &&op )
582 {
583 return PipeImpl( *this, op );
584 }
585
586 //------------------------------------------------------------------------
592 //------------------------------------------------------------------------
593 Derived<true> operator|( Operation<false> &op )
594 {
595 return PipeImpl( *this, op );
596 }
597
598 //------------------------------------------------------------------------
604 //------------------------------------------------------------------------
605 Derived<true> operator|( Operation<false> &&op )
606 {
607 return PipeImpl( *this, op );
608 }
609
610 //------------------------------------------------------------------------
612 //------------------------------------------------------------------------
613 Derived<true> operator|( FinalOperation &&fo )
614 {
615 AllocHandler( *this );
616 this->handler->Assign( fo.final );
617 return this->template Transform<true>();
618 }
619
620 //------------------------------------------------------------------------
624 //------------------------------------------------------------------------
626 {
627 Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
628 return new Derived<HasHndl>( std::move( *me ) );
629 }
630
631 //------------------------------------------------------------------------
635 //------------------------------------------------------------------------
637 {
638 this->handler.reset( new PipelineHandler() );
639 Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
640 return new Derived<true>( std::move( *me ) );
641 }
642
643 //------------------------------------------------------------------------
645 //------------------------------------------------------------------------
646 Derived<HasHndl> Timeout( uint16_t timeout )
647 {
648 this->timeout = timeout;
649 Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
650 return std::move( *me );
651 }
652
653 protected:
654
655 //------------------------------------------------------------------------
659 //------------------------------------------------------------------------
660 template<bool to>
661 inline Derived<to> Transform()
662 {
663 Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
664 return Derived<to>( std::move( *me ) );
665 }
666
667 //------------------------------------------------------------------------
673 //------------------------------------------------------------------------
674 inline Derived<true> StreamImpl( ResponseHandler *handler )
675 {
676 static_assert( !HasHndl, "Operator >> is available only for operation without handler" );
677 this->handler.reset( new PipelineHandler( handler ) );
678 return Transform<true>();
679 }
680
681 //------------------------------------------------------------------------
682 // Allocate handler if necessary
683 //------------------------------------------------------------------------
684 inline static
686 {
687 // nothing to do
688 }
689
690 //------------------------------------------------------------------------
691 // Allocate handler if necessary
692 //------------------------------------------------------------------------
693 inline static
698
699 //------------------------------------------------------------------------
706 //------------------------------------------------------------------------
707 inline static
708 Derived<true> PipeImpl( ConcreteOperation<Derived, HasHndl, HdlrFactory,
709 Args...> &me, Operation<true> &op )
710 {
711 AllocHandler( me ); // if HasHndl is false allocate handler
712 me.AddOperation( op.Move() );
713 return me.template Transform<true>();
714 }
715
716 //------------------------------------------------------------------------
723 //------------------------------------------------------------------------
724 inline static
725 Derived<true> PipeImpl( ConcreteOperation<Derived, HasHndl, HdlrFactory,
726 Args...> &me, Operation<false> &op )
727 {
728 AllocHandler( me ); // if HasHndl is false allocate handler
729 me.AddOperation( op.ToHandled() );
730 return me.template Transform<true>();
731 }
732
733 //------------------------------------------------------------------------
735 //------------------------------------------------------------------------
736 std::tuple<Args...> args;
737
738 //------------------------------------------------------------------------
740 //------------------------------------------------------------------------
741 uint16_t timeout;
742 };
743
744 // Out-of-line methods for class Operation
745
746 template <bool HasHndl>
747 void Operation<HasHndl>::Run(Timeout timeout, std::promise<XRootDStatus> prms,
748 std::function<void(const XRootDStatus &)> f)
749 {
750 static_assert(HasHndl, "Only an operation that has a handler can be assigned to workflow");
751
752 XRootDStatus st;
753 handler->Assign(timeout, std::move(prms), std::move(f), this);
754 PipelineHandler *h = handler.release();
755
756 try {
757 st = RunImpl(h, timeout);
758 } catch (const operation_expired &ex) {
760 } catch (const PipelineException &ex) { // probably not needed
761 st = ex.GetError();
762 } catch (const std::exception &ex) {
763 st = XRootDStatus(stError, errInternal, 0, ex.what());
764 }
765
766 if (!st.IsOK()) {
767 ResponseJob *job = new ResponseJob(h, new XRootDStatus(st), 0, nullptr);
769 }
770 }
771
772 template <bool HasHndl>
774 {
775 if (handler)
776 handler->AddOperation(op);
777 }
778}
779
780#endif // __XRD_CL_OPERATIONS_HH__
Derived< true > operator|(Operation< true > &&op)
static void AllocHandler(ConcreteOperation< Derived, true, HdlrFactory, Args... > &me)
static void AllocHandler(ConcreteOperation< Derived, false, HdlrFactory, Args... > &me)
std::tuple< Args... > args
Operation arguments.
Derived< true > StreamImpl(ResponseHandler *handler)
ConcreteOperation(ConcreteOperation< Derived, from, HdlrFactory, Args... > &&op)
Derived< HasHndl > Timeout(uint16_t timeout)
Set operation timeout.
Derived< true > operator>>(Hdlr &&hdlr)
uint16_t timeout
Operation timeout.
ConcreteOperation(Args &&... args)
Derived< true > operator|(Operation< false > &op)
Operation< HasHndl > * Move()
static Derived< true > PipeImpl(ConcreteOperation< Derived, HasHndl, HdlrFactory, Args... > &me, Operation< false > &op)
Derived< true > operator|(Operation< false > &&op)
Derived< true > operator|(FinalOperation &&fo)
Adds a final operation to the pipeline.
Operation< true > * ToHandled()
static Derived< true > PipeImpl(ConcreteOperation< Derived, HasHndl, HdlrFactory, Args... > &me, Operation< true > &op)
Derived< true > operator|(Operation< true > &op)
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
virtual ~Operation()
Destructor.
virtual Operation< HasHndl > * Move()=0
friend std::future< XRootDStatus > Async(Pipeline, uint16_t)
friend class PipelineHandler
Operation()
Constructor.
void AddOperation(Operation< true > *op)
void Run(Timeout timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final)
bool valid
Flag indicating if it is a valid object.
virtual std::string ToString()=0
Name of the operation.
virtual XRootDStatus RunImpl(PipelineHandler *handler, uint16_t timeout)=0
virtual Operation< true > * ToHandled()=0
std::unique_ptr< PipelineHandler > handler
Operation handler.
friend class Operation
Operation(Operation< from > &&op)
Move constructor between template instances.
Pipeline exception, wrapps an XRootDStatus.
void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
Callback function.
PipelineHandler()
Default Constructor.
void PreparePipelineStart()
Called by a pipeline on the handler of its first operation before Run.
void Assign(const Timeout &timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final, Operation< true > *opr)
PipelineHandler(ResponseHandler *handler)
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback function.
void AddOperation(Operation< true > *operation)
Pipeline(Operation< true > *op)
Constructor.
Pipeline(Operation< true > &&op)
Constructor.
Pipeline & operator|=(Operation< true > &&op)
Extend pipeline.
static void Repeat()
Repeat current operation.
Pipeline(Operation< true > &op)
Constructor.
friend std::future< XRootDStatus > Async(Pipeline, uint16_t)
friend class PipelineHandler
Pipeline(Pipeline &&pipe)
Pipeline & operator=(Pipeline &&pipe)
Constructor.
Pipeline(Operation< false > *op)
static void Stop(const XRootDStatus &status=XrdCl::XRootDStatus())
Pipeline(Operation< false > &&op)
Constructor.
static void Replace(Operation< false > &&opr)
Replace current operation.
friend class ParallelOperation
static void Ignore()
Ignore error and proceed with the pipeline.
Pipeline(Operation< false > &op)
Constructor.
Pipeline()
Default constructor.
Pipeline & operator|=(Operation< false > &&op)
Extend pipeline.
JobManager * GetJobManager()
Get the job manager object user by the post master.
Handle an async response.
Call the user callback.
const uint16_t errOperationExpired
const uint16_t stError
An error occurred that could potentially be retried.
std::function< Operation< true > *(const XRootDStatus &)> rcvry_func
Type of the recovery function to be provided by the user.
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
std::vector< HostInfo > HostList
const uint16_t errInternal
Internal error.
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
bool IsOK() const
We're fine.