#include <XrdClXCpCtx.hh>
Definition at line 40 of file XrdClXCpCtx.hh.
◆ XCpCtx()
| XrdCl::XCpCtx::XCpCtx |
( |
const std::vector< std::string > & | urls, |
|
|
uint64_t | blockSize, |
|
|
uint8_t | parallelSrc, |
|
|
uint64_t | chunkSize, |
|
|
uint64_t | parallelChunks, |
|
|
int64_t | fileSize ) |
Constructor
- Parameters
-
| urls | : list of replica urls |
| blockSize | : the default block size |
| parallelSrc | : maximum number of parallel sources |
| chunkSize | : the default chunk size |
| parallelChunks | : the default number of parallel chunks per source |
| fileSize | : the file size if specified in the metalink file (-1 indicates that the file size is not known and a stat should be done) |
Definition at line 36 of file XrdClXCpCtx.cc.
36 :
37 pUrls( std::deque<std::string>( urls.begin(), urls.end() ) ), pBlockSize( blockSize ),
38 pParallelSrc( parallelSrc ), pChunkSize( chunkSize ), pParallelChunks( parallelChunks ),
39 pOffset( 0 ), pFileSize( -1 ), pFileSizeCV( 0 ), pDataReceived( 0 ), pDone( false ),
40 pDoneCV( 0 ), pRefCount( 1 ), pDeleteCV( 0 ), pDelete( false )
41{
43}
void SetFileSize(int64_t size)
References SetFileSize().
Referenced by Self().
◆ AllDone()
| bool XrdCl::XCpCtx::AllDone |
( |
| ) |
|
Returns true if all chunks have been transferred, otherwise blocks until NotifyIdleSrc is called, or a 1 minute timeout occurs.
- Returns
- : true is all chunks have been transferred, false otherwise.
Definition at line 193 of file XrdClXCpCtx.cc.
194{
195 XrdSysCondVarHelper lck( pDoneCV );
196
197 if( !pDone )
198 pDoneCV.Wait( 60 );
199
200 return pDone;
201}
◆ Delete()
| void XrdCl::XCpCtx::Delete |
( |
| ) |
|
|
inline |
Decrements the reference count and then waits for it to reach zero, then deletes the instance. Should only be called once.
Definition at line 62 of file XrdClXCpCtx.hh.
63 {
64 XrdSysMutexHelper lckmtx( pMtx );
65 --pRefCount;
66 if( !pRefCount )
67 {
68 lckmtx.UnLock();
69 delete this;
70 return;
71 }
72 lckmtx.UnLock();
73
74 XrdSysCondVarHelper lckcv( pDoneCV );
75 pDone = true;
76 pDoneCV.Broadcast();
77 lckcv.UnLock();
78
79 lckcv.Lock( &pDeleteCV );
80 while( !pDelete ) pDeleteCV.Wait();
81 lckcv.UnLock();
82 delete this;
83 }
References XrdSysCondVarHelper::Lock(), XrdSysCondVarHelper::UnLock(), and XrdSysMutexHelper::UnLock().
◆ GetBlock()
| std::pair< uint64_t, uint64_t > XrdCl::XCpCtx::GetBlock |
( |
| ) |
|
Get next block that has to be transferred
- Returns
- : pair of offset and block size
Definition at line 95 of file XrdClXCpCtx.cc.
96{
97 XrdSysMutexHelper lck( pMtx );
98
99 uint64_t blkSize = pBlockSize, offset = pOffset;
100 if( pOffset + blkSize > uint64_t( pFileSize ) )
101 blkSize = pFileSize - pOffset;
102 pOffset += blkSize;
103
104 return std::make_pair( offset, blkSize );
105}
◆ GetChunk()
Gets the next chunk from the sink, if the sink is empty blocks.
- Parameters
-
| ci | : the chunk retrieved from sink (output parameter) |
- Returns
- : stError if we failed to transfer the file, stOK otherwise, with one of the following codes:
- suDone : the whole file has been transferred, we are done
- suContinue : a chunk has been written into ci, continue calling GetChunk in order to retrieve remaining chunks
- suRetry : a chunk has not been written into ci, try again.
Definition at line 156 of file XrdClXCpCtx.cc.
157{
158
159 if( pDataReceived == uint64_t( pFileSize ) )
160 {
161 XrdSysCondVarHelper lck( pDoneCV );
162 pDone = true;
163 pDoneCV.Broadcast();
165 }
166
167
168 if( GetRunning() == 0 )
169 {
170 XrdSysCondVarHelper lck( pDoneCV );
171 pDone = true;
172 pDoneCV.Broadcast();
174 }
175
176 PageInfo *chunk = pSink.Get();
177 if( chunk )
178 {
179 pDataReceived += chunk->GetLength();
180 ci = std::move( *chunk );
181 delete chunk;
183 }
184
186}
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t stOK
Everything went OK.
const uint16_t suContinue
const uint16_t errNoMoreReplicas
No more replicas to try.
References XrdCl::errNoMoreReplicas, XrdCl::PageInfo::GetLength(), XrdCl::stError, XrdCl::stOK, XrdCl::suContinue, XrdCl::suDone, and XrdCl::suRetry.
◆ GetNextUrl()
| bool XrdCl::XCpCtx::GetNextUrl |
( |
std::string & | url | ) |
|
Gets the next URL from the list of file replicas
- Parameters
-
| url | : the output parameter |
- Returns
- : true if a url has been written to the url parameter, false otherwise
Definition at line 57 of file XrdClXCpCtx.cc.
58{
59 XrdSysMutexHelper lck( pMtx );
60 if( pUrls.empty() ) return false;
61 url = pUrls.front();
62 pUrls.pop();
63 return true;
64}
◆ GetSize()
| int64_t XrdCl::XCpCtx::GetSize |
( |
| ) |
|
|
inline |
Get file size. The call blocks until the file size is being set using SetFileSize.
Definition at line 157 of file XrdClXCpCtx.hh.
158 {
159 XrdSysCondVarHelper lck( pFileSizeCV );
160 while( pFileSize < 0 && GetRunning() > 0 ) pFileSizeCV.Wait();
161 return pFileSize;
162 }
◆ Initialize()
Starts one thread per source, each thread tries to open a file, stat the file if necessary, and then starts reading the file, all chunks read go to the sink.
- Returns
- Error if we were not able to create any threads
Definition at line 124 of file XrdClXCpCtx.cc.
125{
126 for( uint8_t i = 0; i < pParallelSrc; ++i )
127 {
128 XCpSrc *src = new XCpSrc( pChunkSize, pParallelChunks, pFileSize, this );
129 pSources.push_back( src );
130 }
131
132 auto scpy = pSources;
133 bool ok = false;
134 for(auto src: scpy) {
135 if( src->Start() )
136 {
137
138 src->Delete();
139 }
140 else
141 {
142 ok = true;
143 }
144 }
145
146 if( !ok )
147 {
149 log->Error(
UtilityMsg,
"Failed to initialize (failed to create new threads)" );
150 return XRootDStatus(
stError,
errInternal, EAGAIN,
"XCpCtx: failed to create new threads." );
151 }
152
153 return XRootDStatus();
154}
static Log * GetLog()
Get default log.
const uint16_t errInternal
Internal error.
const uint64_t UtilityMsg
References XrdCl::errInternal, XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::stError, and XrdCl::UtilityMsg.
◆ NotifyIdleSrc()
| void XrdCl::XCpCtx::NotifyIdleSrc |
( |
| ) |
|
Notify idle sources, used in two case:
- if one of the sources failed and an idle source needs to take over
- or if we are done and all idle source should be stopped
Definition at line 188 of file XrdClXCpCtx.cc.
189{
190 pDoneCV.Broadcast();
191}
◆ NotifyInitExpectant()
| void XrdCl::XCpCtx::NotifyInitExpectant |
( |
| ) |
|
|
inline |
Notify those who are waiting for initialization. In particular the GetSize() caller will be waiting on the result of initialization.
Definition at line 225 of file XrdClXCpCtx.hh.
226 {
227 pFileSizeCV.Broadcast();
228 }
◆ PutChunk()
| void XrdCl::XCpCtx::PutChunk |
( |
PageInfo * | chunk | ) |
|
Put a chunk into the sink
- Parameters
-
Definition at line 90 of file XrdClXCpCtx.cc.
91{
92 pSink.Put( chunk );
93}
◆ Release()
| void XrdCl::XCpCtx::Release |
( |
| ) |
|
|
inline |
Decrements the reference count and signal when we reach 0
Definition at line 88 of file XrdClXCpCtx.hh.
89 {
90 XrdSysMutexHelper lck( pMtx );
91 --pRefCount;
92 if( !pRefCount )
93 {
94 XrdSysCondVarHelper lckcv( pDeleteCV );
95 pDelete = true;
96 pDeleteCV.Broadcast();
97 }
98 }
◆ RemoveSrc()
| void XrdCl::XCpCtx::RemoveSrc |
( |
XCpSrc * | src | ) |
|
|
inline |
Remove given source
- Parameters
-
| src | : the source to be removed |
Definition at line 195 of file XrdClXCpCtx.hh.
196 {
197 XrdSysMutexHelper lck( pMtx );
198 pSources.remove( src );
199 }
◆ Self()
| XCpCtx * XrdCl::XCpCtx::Self |
( |
| ) |
|
|
inline |
Increments the reference counter.
- Returns
- : myself.
Definition at line 105 of file XrdClXCpCtx.hh.
106 {
107 XrdSysMutexHelper lck( pMtx );
108 ++pRefCount;
109 return this;
110 }
References XCpCtx().
◆ SetFileSize()
| void XrdCl::XCpCtx::SetFileSize |
( |
int64_t | size | ) |
|
Set the file size (GetSize will block until SetFileSize will be called). Also calculates the block size.
- Parameters
-
Definition at line 107 of file XrdClXCpCtx.cc.
108{
109 XrdSysCondVarHelper lckcv( pFileSizeCV );
110 XrdSysMutexHelper lckmtx( pMtx );
111 if( pFileSize < 0 && size >= 0 )
112 {
113 pFileSize = size;
114 pFileSizeCV.Broadcast();
115
116 if( pBlockSize > uint64_t( pFileSize ) / pParallelSrc )
117 pBlockSize = pFileSize / pParallelSrc;
118
119 if( pBlockSize < pChunkSize )
120 pBlockSize = pChunkSize;
121 }
122}
Referenced by XCpCtx().
◆ WeakestLink()
Get the 'weakest' sources
- Parameters
-
| exclude | : the source that is excluded from the search |
- Returns
- : the weakest source
Definition at line 66 of file XrdClXCpCtx.cc.
67{
68 uint64_t transferRate = -1;
69 XCpSrc *ret = 0;
70
71 std::list<XCpSrc*>::iterator itr;
72 XrdSysMutexHelper lck( pMtx );
73
74 for( itr = pSources.begin() ; itr != pSources.end() ; ++itr )
75 {
76 XCpSrc *src = *itr;
77 if( src == exclude ) continue;
78 uint64_t tmp = src->TransferRate();
79 if( src->HasData() && tmp < transferRate )
80 {
81 ret = src;
82 transferRate = tmp;
83 }
84 }
85
86 if( !ret ) return ret;
87 return ret->Self();
88}
References XrdCl::XCpSrc::HasData(), XrdCl::XCpSrc::Self(), and XrdCl::XCpSrc::TransferRate().
The documentation for this class was generated from the following files: