XRootD
Loading...
Searching...
No Matches
XrdCl::ClassicCopyJob Class Reference

#include <XrdClClassicCopyJob.hh>

Inheritance diagram for XrdCl::ClassicCopyJob:
Collaboration diagram for XrdCl::ClassicCopyJob:

Public Member Functions

 ClassicCopyJob (uint16_t jobId, PropertyList *jobProperties, PropertyList *jobResults)
const XRootDStatusGetResult () const
virtual XRootDStatus Run (CopyProgressHandler *progress=0)
Public Member Functions inherited from XrdCl::CopyJob
 CopyJob (uint16_t jobId, PropertyList *jobProperties, PropertyList *jobResults)
 Constructor.
virtual ~CopyJob ()
 Virtual destructor.
PropertyListGetProperties ()
 Get the job properties.
PropertyListGetResults ()
 Get the job results.
const URLGetSource () const
 Get source.
const URLGetTarget () const
 Get target.
void Init ()

Additional Inherited Members

Protected Attributes inherited from XrdCl::CopyJob
uint16_t pJobId
PropertyListpProperties
PropertyListpResults
URL pSource
URL pTarget

Detailed Description

Definition at line 27 of file XrdClClassicCopyJob.hh.

Constructor & Destructor Documentation

◆ ClassicCopyJob()

XrdCl::ClassicCopyJob::ClassicCopyJob ( uint16_t jobId,
PropertyList * jobProperties,
PropertyList * jobResults )

Definition at line 2417 of file XrdClClassicCopyJob.cc.

2419 :
2420 CopyJob( jobId, jobProperties, jobResults )
2421 {
2422 Log *log = DefaultEnv::GetLog();
2423 log->Debug( UtilityMsg, "Creating a classic copy job, from %s to %s",
2424 GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str() );
2425 }
const URL & GetSource() const
Get source.
CopyJob(uint16_t jobId, PropertyList *jobProperties, PropertyList *jobResults)
Constructor.
const URL & GetTarget() const
Get target.
static Log * GetLog()
Get default log.
const uint64_t UtilityMsg
XrdSysError Log
Definition XrdConfig.cc:113

References XrdCl::CopyJob::CopyJob(), XrdCl::Log::Debug(), XrdCl::DefaultEnv::GetLog(), XrdCl::CopyJob::GetSource(), XrdCl::CopyJob::GetTarget(), and XrdCl::UtilityMsg.

Here is the call graph for this function:

Member Function Documentation

◆ GetResult()

const XRootDStatus & XrdCl::ClassicCopyJob::GetResult ( ) const
inline

Definition at line 48 of file XrdClClassicCopyJob.hh.

49 {
50 return result;
51 }

◆ Run()

XRootDStatus XrdCl::ClassicCopyJob::Run ( CopyProgressHandler * progress = 0)
virtual

Run the copy job

Parameters
progressthe handler to be notified about the copy progress
Returns
status of the copy operation

Implements XrdCl::CopyJob.

Definition at line 2430 of file XrdClClassicCopyJob.cc.

2431 {
2432 Log *log = DefaultEnv::GetLog();
2433
2434 std::string checkSumMode;
2435 std::string checkSumType;
2436 std::string checkSumPreset;
2437 std::string zipSource;
2438 uint16_t parallelChunks;
2439 uint32_t chunkSize;
2440 uint64_t blockSize;
2441 bool posc, force, coerce, makeDir, dynamicSource, zip, xcp, preserveXAttr,
2442 rmOnBadCksum, continue_, zipappend, doserver;
2443 int32_t nbXcpSources;
2444 long long xRate;
2445 long long xRateThreshold;
2446 uint16_t cpTimeout;
2447 std::vector<std::string> addcksums;
2448
2449 pProperties->Get( "checkSumMode", checkSumMode );
2450 pProperties->Get( "checkSumType", checkSumType );
2451 pProperties->Get( "checkSumPreset", checkSumPreset );
2452 pProperties->Get( "parallelChunks", parallelChunks );
2453 pProperties->Get( "chunkSize", chunkSize );
2454 pProperties->Get( "posc", posc );
2455 pProperties->Get( "force", force );
2456 pProperties->Get( "coerce", coerce );
2457 pProperties->Get( "makeDir", makeDir );
2458 pProperties->Get( "dynamicSource", dynamicSource );
2459 pProperties->Get( "zipArchive", zip );
2460 pProperties->Get( "xcp", xcp );
2461 pProperties->Get( "xcpBlockSize", blockSize );
2462 pProperties->Get( "preserveXAttr", preserveXAttr );
2463 pProperties->Get( "xrate", xRate );
2464 pProperties->Get( "xrateThreshold", xRateThreshold );
2465 pProperties->Get( "rmOnBadCksum", rmOnBadCksum );
2466 pProperties->Get( "continue", continue_ );
2467 pProperties->Get( "cpTimeout", cpTimeout );
2468 pProperties->Get( "zipAppend", zipappend );
2469 pProperties->Get( "addcksums", addcksums );
2470 pProperties->Get( "doServer", doserver );
2471
2472 if( zip )
2473 pProperties->Get( "zipSource", zipSource );
2474
2475 if( xcp )
2476 pProperties->Get( "nbXcpSources", nbXcpSources );
2477
2478 if( force && continue_ )
2479 return SetResult( stError, errInvalidArgs, EINVAL,
2480 "Invalid argument combination: continue + force." );
2481
2482 if( zipappend && ( continue_ || force ) )
2483 return SetResult( stError, errInvalidArgs, EINVAL,
2484 "Invalid argument combination: ( continue | force ) + zip-append." );
2485
2486 //--------------------------------------------------------------------------
2487 // Start the cp t/o timer if necessary
2488 //--------------------------------------------------------------------------
2489 std::unique_ptr<timer_sec_t> cptimer;
2490 if( cpTimeout ) cptimer.reset( new timer_sec_t() );
2491
2492 //--------------------------------------------------------------------------
2493 // Remove on bad checksum implies that POSC semantics has to be enabled
2494 //--------------------------------------------------------------------------
2495 if( rmOnBadCksum ) posc = true;
2496
2497 //--------------------------------------------------------------------------
2498 // Resolve the 'auto' checksum type.
2499 //--------------------------------------------------------------------------
2500 if( checkSumType == "auto" )
2501 {
2502 checkSumType = Utils::InferChecksumType( GetSource(), GetTarget(), zip );
2503 if( checkSumType.empty() )
2504 return SetResult( stError, errCheckSumError, ENOTSUP, "Could not infer checksum type." );
2505 else
2506 log->Info( UtilityMsg, "Using inferred checksum type: %s.", checkSumType.c_str() );
2507 }
2508
2509 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2510 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2511
2512 //--------------------------------------------------------------------------
2513 // Initialize the source and the destination
2514 //--------------------------------------------------------------------------
2515 std::unique_ptr<Source> src;
2516 if( xcp )
2517 src.reset( new XRootDSourceXCp( &GetSource(), chunkSize, parallelChunks, nbXcpSources, blockSize ) );
2518 else if( zip ) // TODO make zip work for xcp
2519 src.reset( new XRootDSourceZip( zipSource, &GetSource(), chunkSize, parallelChunks,
2520 checkSumType, addcksums , doserver) );
2521 else if( GetSource().GetProtocol() == "stdio" )
2522 src.reset( new StdInSource( checkSumType, chunkSize, addcksums ) );
2523 else
2524 {
2525 if( dynamicSource )
2526 src.reset( new XRootDSourceDynamic( &GetSource(), chunkSize, checkSumType, addcksums ) );
2527 else
2528 src.reset( new XRootDSource( &GetSource(), chunkSize, parallelChunks, checkSumType, addcksums, doserver ) );
2529 }
2530
2531 XRootDStatus st = src->Initialize();
2532 if( !st.IsOK() ) return SourceError( st );
2533 uint64_t size = src->GetSize() >= 0 ? src->GetSize() : 0;
2534
2535 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2536 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2537
2538 std::unique_ptr<Destination> dest;
2539 URL newDestUrl( GetTarget() );
2540
2541 if( GetTarget().GetProtocol() == "stdio" )
2542 dest.reset( new StdOutDestination( checkSumType ) );
2543 else if( zipappend )
2544 {
2545 std::string fn = GetSource().GetPath();
2546 size_t pos = fn.rfind( '/' );
2547 if( pos != std::string::npos )
2548 fn = fn.substr( pos + 1 );
2549 int64_t size = src->GetSize();
2550 dest.reset( new XRootDZipDestination( newDestUrl, fn, size, parallelChunks, *this ) );
2551 }
2552 //--------------------------------------------------------------------------
2553 // For xrootd destination build the oss.asize hint
2554 //--------------------------------------------------------------------------
2555 else
2556 {
2557 if( src->GetSize() >= 0 )
2558 {
2559 URL::ParamsMap params = newDestUrl.GetParams();
2560 std::ostringstream o; o << src->GetSize();
2561 params["oss.asize"] = o.str();
2562 newDestUrl.SetParams( params );
2563 // makeDir = true; // Backward compatibility for xroot destinations!!!
2564 }
2565 dest.reset( new XRootDDestination( newDestUrl, parallelChunks, checkSumType, *this ) );
2566 }
2567
2568 dest->SetForce( force );
2569 dest->SetPOSC( posc );
2570 dest->SetCoerce( coerce );
2571 dest->SetMakeDir( makeDir );
2572 dest->SetContinue( continue_ );
2573 st = dest->Initialize();
2574 if( !st.IsOK() ) return DestinationError( st );
2575
2576 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2577 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2578
2579 //--------------------------------------------------------------------------
2580 // Copy the chunks
2581 //--------------------------------------------------------------------------
2582 if( continue_ )
2583 {
2584 size -= dest->GetSize();
2585 XrdCl::XRootDStatus st = src->StartAt( dest->GetSize() );
2586 if( !st.IsOK() ) return SetResult( st );
2587 }
2588
2589 PageInfo pageInfo;
2590 uint64_t total_processed = 0;
2591 uint64_t processed = 0;
2592 auto start = time_nsec();
2593 uint16_t threshold_interval = parallelChunks;
2594 bool threshold_draining = false;
2595 timer_nsec_t threshold_timer;
2596 while( 1 )
2597 {
2598 st = src->GetChunk( pageInfo );
2599 if( !st.IsOK() )
2600 return SourceError( st);
2601
2602 if( st.IsOK() && st.code == suDone )
2603 break;
2604
2605 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2606 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2607
2608 if( xRate )
2609 {
2610 auto elapsed = ( time_nsec() - start ).count();
2611 double transferred = total_processed + pageInfo.GetLength();
2612 double expected = double( xRate ) / to_nsec( 1 ) * elapsed;
2613 //----------------------------------------------------------------------
2614 // check if our transfer rate didn't exceeded the limit
2615 // (we are too fast)
2616 //----------------------------------------------------------------------
2617 if( elapsed && // make sure elapsed time is greater than 0
2618 transferred > expected )
2619 {
2620 auto nsec = ( transferred / xRate * to_nsec( 1 ) ) - elapsed;
2621 sleep_nsec( nsec );
2622 }
2623 }
2624
2625 if( xRateThreshold )
2626 {
2627 auto elapsed = threshold_timer.elapsed();
2628 double transferred = processed + pageInfo.GetLength();
2629 double expected = double( xRateThreshold ) / to_nsec( 1 ) * elapsed;
2630 //----------------------------------------------------------------------
2631 // check if our transfer rate dropped below the threshold
2632 // (we are too slow)
2633 //----------------------------------------------------------------------
2634 if( elapsed && // make sure elapsed time is greater than 0
2635 transferred < expected &&
2636 threshold_interval == 0 ) // we check every # parallelChunks
2637 {
2638 if( !threshold_draining )
2639 {
2640 log->Warning( UtilityMsg, "Transfer rate dropped below requested ehreshold,"
2641 " trying different source!" );
2642 XRootDStatus st = src->TryOtherServer();
2643 if( !st.IsOK() ) return SetResult( stError, errThresholdExceeded, 0,
2644 "The transfer rate dropped below "
2645 "requested threshold!" );
2646 threshold_draining = true; // before the next measurement we need to drain
2647 // all the chunks that will come from the old server
2648 }
2649 else // now that all the chunks from the old server have
2650 { // been received we can start another measurement
2651 processed = 0;
2652 threshold_timer.reset();
2653 threshold_interval = parallelChunks;
2654 threshold_draining = false;
2655 }
2656 }
2657
2658 threshold_interval = threshold_interval > 0 ? threshold_interval - 1 : parallelChunks;
2659 }
2660
2661 total_processed += pageInfo.GetLength();
2662 processed += pageInfo.GetLength();
2663
2664 st = dest->PutChunk( std::move( pageInfo ) );
2665 if( !st.IsOK() )
2666 {
2667 if( st.code == errRetry )
2668 {
2669 pResults->Set( "LastURL", dest->GetLastURL() );
2670 pResults->Set( "WrtRecoveryRedir", dest->GetWrtRecoveryRedir() );
2671 return SetResult( st );
2672 }
2673 return DestinationError( st );
2674 }
2675
2676 if( progress )
2677 {
2678 progress->JobProgress( pJobId, total_processed, size );
2679 if( progress->ShouldCancel( pJobId ) )
2680 return SetResult( stError, errOperationInterrupted, kXR_Cancelled, "The copy-job has been cancelled!" );
2681 }
2682 }
2683
2684 st = dest->Flush();
2685 if( !st.IsOK() )
2686 return DestinationError( st );
2687
2688 //--------------------------------------------------------------------------
2689 // Copy extended attributes
2690 //--------------------------------------------------------------------------
2691 if( preserveXAttr && Utils::HasXAttr( GetSource() ) && Utils::HasXAttr( GetTarget() ) )
2692 {
2693 std::vector<xattr_t> xattrs;
2694 st = src->GetXAttr( xattrs );
2695 if( !st.IsOK() ) return SourceError( st );
2696 st = dest->SetXAttr( xattrs );
2697 if( !st.IsOK() ) return DestinationError( st );
2698 }
2699
2700 //--------------------------------------------------------------------------
2701 // The size of the source is known and not enough data has been transferred
2702 // to the destination
2703 //--------------------------------------------------------------------------
2704 if( src->GetSize() >= 0 && size != total_processed )
2705 {
2706 log->Error( UtilityMsg, "The declared source size is %llu bytes, but "
2707 "received %llu bytes.", (unsigned long long) size, (unsigned long long) total_processed );
2708 return SetResult( stError, errDataError );
2709 }
2710 pResults->Set( "size", total_processed );
2711
2712 //--------------------------------------------------------------------------
2713 // Finalize the destination
2714 //--------------------------------------------------------------------------
2715 st = dest->Finalize();
2716 if( !st.IsOK() )
2717 return DestinationError( st );
2718
2719 //--------------------------------------------------------------------------
2720 // Verify the checksums if needed
2721 //--------------------------------------------------------------------------
2722 if( checkSumMode != "none" )
2723 {
2724 log->Debug( UtilityMsg, "Attempting checksum calculation, mode: %s.",
2725 checkSumMode.c_str() );
2726 std::string sourceCheckSum;
2727 std::string targetCheckSum;
2728
2729 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2730 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2731
2732 //------------------------------------------------------------------------
2733 // Get the check sum at source
2734 //------------------------------------------------------------------------
2735 timeval oStart, oEnd;
2736 XRootDStatus st;
2737
2738 if( checkSumMode == "end2end" || checkSumMode == "source" ||
2739 !checkSumPreset.empty() )
2740 {
2741 gettimeofday( &oStart, 0 );
2742 if( !checkSumPreset.empty() )
2743 {
2744 sourceCheckSum = checkSumType + ":";
2745 sourceCheckSum += Utils::NormalizeChecksum( checkSumType,
2746 checkSumPreset );
2747 }
2748 else
2749 {
2750 st = src->GetCheckSum( sourceCheckSum, checkSumType );
2751 }
2752 gettimeofday( &oEnd, 0 );
2753
2754 if( !st.IsOK() )
2755 return SourceError( st );
2756
2757 pResults->Set( "sourceCheckSum", sourceCheckSum );
2758 }
2759
2760 if( !addcksums.empty() )
2761 pResults->Set( "additionalCkeckSum", src->GetAddCks() );
2762
2763 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2764 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2765
2766 //------------------------------------------------------------------------
2767 // Get the check sum at destination
2768 //------------------------------------------------------------------------
2769 timeval tStart, tEnd;
2770
2771 if( checkSumMode == "end2end" || checkSumMode == "target" )
2772 {
2773 gettimeofday( &tStart, 0 );
2774 st = dest->GetCheckSum( targetCheckSum, checkSumType );
2775 if( !st.IsOK() )
2776 return DestinationError( st );
2777 gettimeofday( &tEnd, 0 );
2778 pResults->Set( "targetCheckSum", targetCheckSum );
2779 }
2780
2781 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2782 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2783
2784 //------------------------------------------------------------------------
2785 // Make sure the checksums are both lower case
2786 //------------------------------------------------------------------------
2787 auto sanitize_cksum = []( char c )
2788 {
2789 std::locale loc;
2790 if( std::isalpha( c ) ) return std::tolower( c, loc );
2791 return c;
2792 };
2793
2794 std::transform( sourceCheckSum.begin(), sourceCheckSum.end(),
2795 sourceCheckSum.begin(), sanitize_cksum );
2796
2797 std::transform( targetCheckSum.begin(), targetCheckSum.end(),
2798 targetCheckSum.begin(), sanitize_cksum );
2799
2800 //------------------------------------------------------------------------
2801 // Compare and inform monitoring
2802 //------------------------------------------------------------------------
2803 if( !sourceCheckSum.empty() && !targetCheckSum.empty() )
2804 {
2805 bool match = false;
2806 if( sourceCheckSum == targetCheckSum )
2807 match = true;
2808
2809 Monitor *mon = DefaultEnv::GetMonitor();
2810 if( mon )
2811 {
2812 Monitor::CheckSumInfo i;
2813 i.transfer.origin = &GetSource();
2814 i.transfer.target = &GetTarget();
2815 i.cksum = sourceCheckSum;
2816 i.oTime = Utils::GetElapsedMicroSecs( oStart, oEnd );
2817 i.tTime = Utils::GetElapsedMicroSecs( tStart, tEnd );
2818 i.isOK = match;
2819 mon->Event( Monitor::EvCheckSum, &i );
2820 }
2821
2822 if( !match )
2823 {
2824 if( rmOnBadCksum )
2825 {
2826 FileSystem fs( newDestUrl );
2827 st = fs.Rm( newDestUrl.GetPath() );
2828 if( !st.IsOK() )
2829 log->Error( UtilityMsg, "Invalid checksum: failed to remove the target file: %s", st.ToString().c_str() );
2830 else
2831 log->Info( UtilityMsg, "Target file removed due to bad checksum!" );
2832 }
2833
2834 st = dest->Finalize();
2835 if( !st.IsOK() )
2836 log->Error( UtilityMsg, "Failed to finalize the destination: %s", st.ToString().c_str() );
2837
2838 return SetResult( stError, errCheckSumError, 0 );
2839 }
2840
2841 log->Info( UtilityMsg, "Checksum verification: succeeded." );
2842 }
2843 }
2844
2845 return SetResult();
2846 }
@ kXR_Cancelled
std::chrono::nanoseconds time_nsec()
long long to_nsec(long long sec)
void sleep_nsec(long long nsec)
PropertyList * pResults
PropertyList * pProperties
static Monitor * GetMonitor()
Get the monitor object.
@ EvCheckSum
CheckSumInfo: File checksummed.
const std::string & GetPath() const
Get the path.
Definition XrdClURL.hh:217
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
static std::string NormalizeChecksum(const std::string &name, const std::string &checksum)
Normalize checksum.
static std::string InferChecksumType(const XrdCl::URL &source, const XrdCl::URL &destination, bool zip=false)
Automatically infer the right checksum type.
static uint64_t GetElapsedMicroSecs(timeval start, timeval end)
Get the elapsed microseconds between two timevals.
static bool HasXAttr(const XrdCl::URL &url)
const uint16_t errOperationExpired
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errDataError
data is corrupted
const uint16_t errInvalidArgs
const uint16_t errRetry
Try again for whatever reason.
const uint16_t errCheckSumError
const uint16_t suDone
const uint16_t errThresholdExceeded
const uint16_t errOperationInterrupted
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.

References XrdCl::Monitor::CheckSumInfo::cksum, XrdCl::Status::code, XrdCl::Log::Debug(), XrdCl::errCheckSumError, XrdCl::errDataError, XrdCl::errInvalidArgs, XrdCl::errOperationExpired, XrdCl::errOperationInterrupted, XrdCl::Log::Error(), XrdCl::errRetry, XrdCl::errThresholdExceeded, XrdCl::Monitor::EvCheckSum, XrdCl::Monitor::Event(), XrdCl::Utils::GetElapsedMicroSecs(), XrdCl::PageInfo::GetLength(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetMonitor(), XrdCl::URL::GetParams(), XrdCl::URL::GetPath(), XrdCl::CopyJob::GetSource(), XrdCl::CopyJob::GetTarget(), XrdCl::Utils::HasXAttr(), XrdCl::Utils::InferChecksumType(), XrdCl::Log::Info(), XrdCl::Status::IsOK(), XrdCl::Monitor::CheckSumInfo::isOK, XrdCl::CopyProgressHandler::JobProgress(), kXR_Cancelled, XrdCl::Utils::NormalizeChecksum(), XrdCl::Monitor::TransferInfo::origin, XrdCl::Monitor::CheckSumInfo::oTime, XrdCl::CopyJob::pJobId, XrdCl::CopyJob::pProperties, XrdCl::CopyJob::pResults, XrdCl::FileSystem::Rm(), XrdCl::URL::SetParams(), XrdCl::CopyProgressHandler::ShouldCancel(), sleep_nsec(), XrdCl::stError, XrdCl::suDone, XrdCl::Monitor::TransferInfo::target, time_nsec(), to_nsec(), XrdCl::Status::ToString(), XrdCl::Monitor::CheckSumInfo::transfer, XrdCl::Monitor::CheckSumInfo::tTime, XrdCl::UtilityMsg, and XrdCl::Log::Warning().

Here is the call graph for this function:

The documentation for this class was generated from the following files: