XRootD
Loading...
Searching...
No Matches
XrdPfc::Cache Class Reference

Attaches/creates and detaches/deletes cache-io objects for disk based cache. More...

#include <XrdPfc.hh>

Inheritance diagram for XrdPfc::Cache:
Collaboration diagram for XrdPfc::Cache:

Public Member Functions

 Cache (XrdSysLogger *logger, XrdOucEnv *env)
 Constructor.
void AddWriteTask (Block *b, bool from_read)
 Add downloaded block in write queue.
virtual XrdOucCacheIOAttach (XrdOucCacheIO *, int Options=0)
bool blocksize_str2value (const char *from, const char *str, long long &val, long long min, long long max) const
void ClearPurgeProtectedSet ()
bool Config (const char *config_filename, const char *parameters, XrdOucEnv *env)
 Parse configuration file.
virtual int ConsiderCached (const char *url)
bool Decide (XrdOucCacheIO *)
 Makes decision if the original XrdOucCacheIO should be cached.
bool DecideIfConsideredCached (long long file_size, long long bytes_on_disk)
void DeRegisterPrefetchFile (File *)
long long DetermineFullFileSize (const std::string &cinfo_fname)
void ExecuteCommandUrl (const std::string &command_url)
void FileSyncDone (File *, bool high_debug)
FileGetFile (const std::string &, IO *, long long off=0, long long filesize=0)
XrdXrootdGStreamGetGStream ()
XrdSysErrorGetLog () const
FileGetNextFileToPrefetch ()
XrdOssGetOss () const
PurgePinGetPurgePin () const
XrdSysTraceGetTrace () const
bool is_prefetch_enabled () const
bool IsFileActiveOrPurgeProtected (const std::string &) const
virtual int LocalFilePath (const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
void Prefetch ()
bool prefetch_str2value (const char *from, const char *str, int &val, int min, int max) const
virtual int Prepare (const char *url, int oflags, mode_t mode)
void ProcessWriteTasks ()
 Separate task which writes blocks from ram to disk.
const ConfigurationRefConfiguration () const
 Reference XrdPfc configuration.
ResourceMonitorRefResMon ()
void RegisterPrefetchFile (File *)
void ReleaseFile (File *, IO *)
void ReleaseRAM (char *buf, long long size)
void RemoveWriteQEntriesFor (File *f)
 Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction.
char * RequestRAM (long long size)
void ScheduleFileSync (File *f)
virtual int Stat (const char *url, struct stat &sbuff)
virtual int Unlink (const char *url)
int UnlinkFile (const std::string &f_name, bool fail_if_open)
 Remove cinfo and data files from cache.
void WriteFileSizeXAttr (int cinfo_fd, long long file_size)
long long WritesSinceLastCall ()
Public Member Functions inherited from XrdOucCache
 XrdOucCache (const char *ctype)
virtual ~XrdOucCache ()
 Destructor.
virtual int Rename (const char *oldp, const char *newp)
virtual int Rmdir (const char *dirp)
virtual int Truncate (const char *path, off_t size)
virtual int Xeq (XeqCmd cmd, char *arg, int arglen)

Static Public Member Functions

static const ConfigurationConf ()
static CacheCreateInstance (XrdSysLogger *logger, XrdOucEnv *env)
 Singleton creation.
static CacheGetInstance ()
 Singleton access.
static ResourceMonitorResMon ()
static const CacheTheOne ()
static bool VCheck (XrdVersionInfo &urVersion)
 Version check.

Static Public Attributes

static XrdSchedulerschedP = nullptr
Static Public Attributes inherited from XrdOucCache
static const int optFIS = 0x0001
 File is structured (e.g. root file)
static const int optNEW = 0x0014
 File is new -> optRW (o/w read or write)
static const int optRW = 0x0004
 File is read/write (o/w read/only)
static const int optWIN = 0x0024
 File is new -> optRW use write-in cache.

Additional Inherited Members

Public Types inherited from XrdOucCache
enum  LFP_Reason {
  ForAccess =0 ,
  ForInfo ,
  ForPath
}
enum  XeqCmd { xeqNoop = 0 }
Public Attributes inherited from XrdOucCache
const char CacheType [8]
 A 1-to-7 character cache type identifier (usually pfc or rmc).
XrdOucCacheStats Statistics

Detailed Description

Attaches/creates and detaches/deletes cache-io objects for disk based cache.

Definition at line 162 of file XrdPfc.hh.

Constructor & Destructor Documentation

◆ Cache()

Cache::Cache ( XrdSysLogger * logger,
XrdOucEnv * env )

Constructor.

Definition at line 158 of file XrdPfc.cc.

158 :
159 XrdOucCache("pfc"),
160 m_env(env),
161 m_log(logger, "XrdPfc_"),
162 m_trace(new XrdSysTrace("XrdPfc", logger)),
163 m_traceID("Cache"),
164 m_oss(0),
165 m_gstream(0),
166 m_purge_pin(0),
167 m_prefetch_condVar(0),
168 m_prefetch_enabled(false),
169 m_RAM_used(0),
170 m_RAM_write_queue(0),
171 m_RAM_std_size(0),
172 m_isClient(false),
173 m_active_cond(0)
174{
175 // Default log level is Warning.
176 m_trace->What = 2;
177}
XrdOucCache(const char *ctype)

References XrdOucCache::XrdOucCache().

Referenced by CreateInstance(), GetInstance(), and TheOne().

Here is the call graph for this function:
Here is the caller graph for this function:

Member Function Documentation

◆ AddWriteTask()

void Cache::AddWriteTask ( Block * b,
bool from_read )

Add downloaded block in write queue.

Definition at line 221 of file XrdPfc.cc.

222{
223 TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
224
225 {
226 XrdSysMutexHelper lock(&m_RAM_mutex);
227 m_RAM_write_queue += b->get_size();
228 }
229
230 m_writeQ.condVar.Lock();
231 if (fromRead)
232 m_writeQ.queue.push_back(b);
233 else
234 m_writeQ.queue.push_front(b);
235 m_writeQ.size++;
236 m_writeQ.condVar.Signal();
237 m_writeQ.condVar.UnLock();
238}
#define TRACE(act, x)
Definition XrdTrace.hh:63
int get_size() const
long long m_offset
File * get_file() const
const std::string & GetLocalPath() const

References XrdPfc::Block::get_file(), XrdPfc::Block::get_size(), XrdPfc::File::GetLocalPath(), XrdPfc::Block::m_offset, and TRACE.

Here is the call graph for this function:

◆ Attach()

XrdOucCacheIO * Cache::Attach ( XrdOucCacheIO * io,
int Options = 0 )
virtual

Implements XrdOucCache.

Definition at line 179 of file XrdPfc.cc.

180{
181 const char* tpfx = "Attach() ";
182
183 if (Cache::GetInstance().Decide(io))
184 {
185 TRACE(Info, tpfx << obfuscateAuth(io->Path()));
186
187 IO *cio;
188
189 if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
190 {
191 cio = new IOFileBlock(io, *this);
192 }
193 else
194 {
195 IOFile *iof = new IOFile(io, *this);
196
197 if ( ! iof->HasFile())
198 {
199 delete iof;
200 // TODO - redirect instead. But this is kind of an awkward place for it.
201 // errno is set during IOFile construction.
202 TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
203 return io;
204 }
205
206 cio = iof;
207 }
208
209 TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
210 ((loc && loc[0] != 0) ? loc : "<deferred open>"));
211
212 return cio;
213 }
214 else
215 {
216 TRACE(Info, tpfx << "decision decline " << io->Path());
217 }
218 return io;
219}
std::string obfuscateAuth(const std::string &input)
#define TRACE_PC(act, pre_code, x)
bool Debug
virtual const char * Path()=0
virtual const char * Location(bool refresh=false)
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:215
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:132
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition XrdPfc.cc:137
bool HasFile() const
Check if File was opened successfully.

References Debug, Decide(), Error, GetInstance(), XrdPfc::IOFile::HasFile(), XrdOucCacheIO::Location(), obfuscateAuth(), XrdOucCacheIO::Path(), RefConfiguration(), TRACE, and TRACE_PC.

Here is the call graph for this function:

◆ blocksize_str2value()

bool Cache::blocksize_str2value ( const char * from,
const char * str,
long long & val,
long long min,
long long max ) const

Definition at line 103 of file XrdPfcConfiguration.cc.

105{
106 if (XrdOuca2x::a2sz(m_log, "Error parsing block-size", str, &val, min, max))
107 return false;
108
109 if (val & 0xFFF) {
110 val &= ~0x0FFF;
111 val += 0x1000;
112 m_log.Emsg(from, "blocksize must be a multiple of 4 kB. Rounded up.");
113 }
114
115 return true;
116}
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:257

References XrdOuca2x::a2sz().

Here is the call graph for this function:

◆ ClearPurgeProtectedSet()

void Cache::ClearPurgeProtectedSet ( )

Definition at line 685 of file XrdPfc.cc.

686{
687 XrdSysCondVarHelper lock(&m_active_cond);
688 m_purge_delay_set.clear();
689}

Referenced by XrdPfc::ResourceMonitor::perform_purge_check(), and XrdPfc::ResourceMonitor::perform_purge_task_cleanup().

Here is the caller graph for this function:

◆ Conf()

const Configuration & Cache::Conf ( )
static

Definition at line 134 of file XrdPfc.cc.

134{ return m_instance->RefConfiguration(); }

Referenced by XrdPfc::ResourceMonitor::heart_beat(), XrdPfc::OldStylePurgeDriver(), XrdPfc::ResourceMonitor::perform_purge_check(), Proto_ResourceMonitorHeartBeat(), XrdPfc::ResourceMonitor::update_vs_and_file_usage_info(), and XrdPfc::DataFsSnapshot::write_json_file().

Here is the caller graph for this function:

◆ Config()

bool Cache::Config ( const char * config_filename,
const char * parameters,
XrdOucEnv * env )

Parse configuration file.

Parameters
config_filenamepath to configuration file
parametersoptional parameters to be passed
envoptional environment to use for configuration
Returns
parse status

Definition at line 431 of file XrdPfcConfiguration.cc.

432{
433 // Indicate whether or not we are a client instance
434 const char *theINS = getenv("XRDINSTANCE");
435 m_isClient = (theINS != 0 && strncmp("*client ", theINS, 8) == 0);
436
437 // Tell everyone else we are a caching proxy
438 XrdOucEnv::Export("XRDPFC", 1);
439
440 XrdOucEnv emptyEnv;
441 XrdOucEnv *myEnv = env ? env : &emptyEnv;
442
443 XrdOucStream Config(&m_log, theINS, myEnv, "=====> ");
444
445 if (! config_filename || ! *config_filename)
446 {
447 TRACE(Error, "Config() configuration file not specified.");
448 return false;
449 }
450
451 int fd;
452 if ( (fd = open(config_filename, O_RDONLY, 0)) < 0)
453 {
454 TRACE( Error, "Config() can't open configuration file " << config_filename);
455 return false;
456 }
457
458 Config.Attach(fd);
459 static const char *cvec[] = { "*** pfc plugin config:", 0 };
460 Config.Capture(cvec);
461
462 // Obtain OFS configurator for OSS plugin.
463 XrdOfsConfigPI *ofsCfg = XrdOfsConfigPI::New(config_filename,&Config,&m_log,
464 &XrdVERSIONINFOVAR(XrdOucGetCache));
465 if (! ofsCfg) return false;
466
467 TmpConfiguration tmpc;
468
469 Configuration &CFG = m_configuration;
470
471 // Adjust default parameters for client/serverless caching
472 if (m_isClient)
473 {
474 m_configuration.m_bufferSize = 128 * 1024; // same as normal.
475 m_configuration.m_wqueue_blocks = 8;
476 m_configuration.m_wqueue_threads = 1;
477 }
478
479 // If network checksum processing is the default, indicate so.
480 if (m_configuration.is_cschk_net()) m_env->Put("psx.CSNet", m_configuration.m_cs_ChkTLS ? "2" : "1");
481
482 // Actual parsing of the config file.
483 bool retval = true, aOK = true;
484 char *var;
485 while ((var = Config.GetMyFirstWord()))
486 {
487 if (! strcmp(var,"pfc.osslib"))
488 {
489 retval = ofsCfg->Parse(XrdOfsConfigPI::theOssLib);
490 }
491 else if (! strcmp(var,"pfc.cschk"))
492 {
493 retval = xcschk(Config);
494 }
495 else if (! strcmp(var,"pfc.decisionlib"))
496 {
497 retval = xdlib(Config);
498 }
499 else if (! strcmp(var,"pfc.purgelib"))
500 {
501 retval = xplib(Config);
502 }
503 else if (! strcmp(var,"pfc.trace"))
504 {
505 retval = xtrace(Config);
506 }
507 else if (! strcmp(var,"pfc.allow_xrdpfc_command"))
508 {
509 m_configuration.m_allow_xrdpfc_command = true;
510 }
511 else if (! strncmp(var,"pfc.", 4))
512 {
513 retval = ConfigParameters(std::string(var+4), Config, tmpc);
514 }
515
516 if ( ! retval)
517 {
518 TRACE(Error, "Config() error in parsing");
519 aOK = false;
520 }
521 }
522
523 Config.Close();
524
525 // Load OSS plugin.
526 auto orig_runmode = myEnv->Get("oss.runmode");
527 myEnv->Put("oss.runmode", "pfc");
528 if (m_configuration.is_cschk_cache())
529 {
530 char csi_conf[128];
531 if (snprintf(csi_conf, 128, "space=%s nofill", m_configuration.m_meta_space.c_str()) < 128)
532 {
533 ofsCfg->Push(XrdOfsConfigPI::theOssLib, "libXrdOssCsi.so", csi_conf);
534 } else {
535 TRACE(Error, "Config() buffer too small for libXrdOssCsi params.");
536 return false;
537 }
538 }
539 if (ofsCfg->Load(XrdOfsConfigPI::theOssLib, myEnv))
540 {
541 ofsCfg->Plugin(m_oss);
542 }
543 else
544 {
545 TRACE(Error, "Config() Unable to create an OSS object");
546 return false;
547 }
548 if (orig_runmode) myEnv->Put("oss.runmode", orig_runmode);
549 else myEnv->Put("oss.runmode", "");
550
551 // Test if OSS is operational, determine optional features.
552 aOK &= test_oss_basics_and_features();
553
554 // sets default value for disk usage
555 XrdOssVSInfo sP;
556 {
557 if (m_configuration.m_meta_space != m_configuration.m_data_space &&
558 m_oss->StatVS(&sP, m_configuration.m_meta_space.c_str(), 1) < 0)
559 {
560 m_log.Emsg("ConfigParameters()", "error obtaining stat info for meta space ", m_configuration.m_meta_space.c_str());
561 return false;
562 }
563 if (m_configuration.m_meta_space != m_configuration.m_data_space && sP.Total < 10ll << 20)
564 {
565 m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
566 m_configuration.m_meta_space.c_str());
567 return false;
568 }
569 if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
570 {
571 m_log.Emsg("ConfigParameters()", "error obtaining stat info for data space ", m_configuration.m_data_space.c_str());
572 return false;
573 }
574 if (sP.Total < 10ll << 20)
575 {
576 m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
577 m_configuration.m_data_space.c_str());
578 return false;
579 }
580
581 m_configuration.m_diskTotalSpace = sP.Total;
582
583 if (cfg2bytes(tmpc.m_diskUsageLWM, m_configuration.m_diskUsageLWM, sP.Total, "lowWatermark") &&
584 cfg2bytes(tmpc.m_diskUsageHWM, m_configuration.m_diskUsageHWM, sP.Total, "highWatermark"))
585 {
586 if (m_configuration.m_diskUsageLWM >= m_configuration.m_diskUsageHWM) {
587 m_log.Emsg("ConfigParameters()", "pfc.diskusage should have lowWatermark < highWatermark.");
588 aOK = false;
589 }
590 }
591 else aOK = false;
592
593 if ( ! tmpc.m_fileUsageMax.empty())
594 {
595 if (cfg2bytes(tmpc.m_fileUsageBaseline, m_configuration.m_fileUsageBaseline, sP.Total, "files baseline") &&
596 cfg2bytes(tmpc.m_fileUsageNominal, m_configuration.m_fileUsageNominal, sP.Total, "files nominal") &&
597 cfg2bytes(tmpc.m_fileUsageMax, m_configuration.m_fileUsageMax, sP.Total, "files max"))
598 {
599 if (m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageNominal ||
600 m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageMax ||
601 m_configuration.m_fileUsageNominal >= m_configuration.m_fileUsageMax)
602 {
603 m_log.Emsg("ConfigParameters()", "pfc.diskusage files should have baseline < nominal < max.");
604 aOK = false;
605 }
606
607
608 if (aOK && m_configuration.m_fileUsageMax >= m_configuration.m_diskUsageLWM)
609 {
610 m_log.Emsg("ConfigParameters()", "pfc.diskusage files values must be below lowWatermark");
611 aOK = false;
612 }
613 }
614 else aOK = false;
615 }
616 }
617
618 // sets flush frequency
619 if ( ! tmpc.m_flushRaw.empty())
620 {
621 if (::isalpha(*(tmpc.m_flushRaw.rbegin())))
622 {
623 if (XrdOuca2x::a2sz(m_log, "Error getting number of bytes written before flush", tmpc.m_flushRaw.c_str(),
624 &m_configuration.m_flushCnt,
625 100 * m_configuration.m_bufferSize , 100000 * m_configuration.m_bufferSize))
626 {
627 return false;
628 }
629 m_configuration.m_flushCnt /= m_configuration.m_bufferSize;
630 }
631 else
632 {
633 if (XrdOuca2x::a2ll(m_log, "Error getting number of blocks written before flush", tmpc.m_flushRaw.c_str(),
634 &m_configuration.m_flushCnt, 100, 100000))
635 {
636 return false;
637 }
638 }
639 }
640
641 // get number of available RAM blocks after process configuration
642 if (m_configuration.m_RamAbsAvailable == 0)
643 {
644 m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024;
645 char buff[1024];
646 snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
647 m_log.Say("Config info: ", buff);
648 }
649 // Setup number of standard-size blocks not released back to the system to 5% of total RAM.
650 m_configuration.m_RamKeepStdBlocks = (m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize + 1) * 5 / 100;
651
652 // Set tracing to debug if this is set in environment
653 char* cenv = getenv("XRDDEBUG");
654 if (cenv && ! strcmp(cenv,"1") && m_trace->What < 4) m_trace->What = 4;
655
656 if (aOK)
657 {
658// 000 001 010
659 const char *csc[] = {"off", "cache nonet", "nocache net notls",
660// 011
661 "cache net notls",
662// 100 101 110
663 "off", "cache nonet", "nocache net tls",
664// 111
665 "cache net tls"};
666 char uvk[32];
667 if (m_configuration.m_cs_UVKeep < 0)
668 strcpy(uvk, "lru");
669 else
670 sprintf(uvk, "%lld", (long long) m_configuration.m_cs_UVKeep);
671 float ram_gb = (m_configuration.m_RamAbsAvailable) / float(1024*1024*1024);
672
673 char urlcgi_blks[64] = "ignore", urlcgi_npref[32] = "ignore";
675 snprintf(urlcgi_blks, sizeof(urlcgi_blks), "%lldk %lldk",
676 CFG.m_cgi_min_bufferSize >> 10, CFG.m_cgi_max_bufferSize >> 10);
678 snprintf(urlcgi_npref, sizeof(urlcgi_npref), "%d %d",
680
681 char buff[8192];
682 int loff = 0;
683 loff = snprintf(buff, sizeof(buff), "Config effective %s pfc configuration:\n"
684 " pfc.cschk %s uvkeep %s\n"
685 " pfc.blocksize %lldk\n"
686 " pfc.prefetch %d\n"
687 " pfc.urlcgi blocksize %s prefetch %s\n"
688 " pfc.ram %.fg\n"
689 " pfc.writequeue %d %d\n"
690 " # Total available disk: %lld\n"
691 " pfc.diskusage %lld %lld files %lld %lld %lld purgeinterval %d purgecoldfiles %d\n"
692 " pfc.spaces %s %s\n"
693 " pfc.trace %d\n"
694 " pfc.flush %lld\n"
695 " pfc.acchistorysize %d\n"
696 " pfc.onlyIfCachedMinBytes %lld\n"
697 " pfc.onlyIfCachedMinFrac %.2f\n",
698 config_filename,
699 csc[int(m_configuration.m_cs_Chk)], uvk,
700 m_configuration.m_bufferSize >> 10,
701 m_configuration.m_prefetch_max_blocks,
702 urlcgi_blks, urlcgi_npref,
703 ram_gb,
704 m_configuration.m_wqueue_blocks, m_configuration.m_wqueue_threads,
705 sP.Total,
706 m_configuration.m_diskUsageLWM, m_configuration.m_diskUsageHWM,
707 m_configuration.m_fileUsageBaseline, m_configuration.m_fileUsageNominal, m_configuration.m_fileUsageMax,
708 m_configuration.m_purgeInterval, m_configuration.m_purgeColdFilesAge,
709 m_configuration.m_data_space.c_str(),
710 m_configuration.m_meta_space.c_str(),
711 m_trace->What,
712 m_configuration.m_flushCnt,
713 m_configuration.m_accHistorySize,
714 m_configuration.m_onlyIfCachedMinSize,
715 m_configuration.m_onlyIfCachedMinFrac);
716
717 if (m_configuration.is_dir_stat_reporting_on())
718 {
719 loff += snprintf(buff + loff, sizeof(buff) - loff,
720 " pfc.dirstats interval %d maxdepth %d (internal: size_of_dirlist %d, size_of_globlist %d)\n",
721 m_configuration.m_dirStatsInterval, m_configuration.m_dirStatsStoreDepth,
722 (int) m_configuration.m_dirStatsDirs.size(), (int) m_configuration.m_dirStatsDirGlobs.size());
723 loff += snprintf(buff + loff, sizeof(buff) - loff, " dirlist:\n");
724 for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirs.begin(); i != m_configuration.m_dirStatsDirs.end(); ++i)
725 loff += snprintf(buff + loff, sizeof(buff) - loff, " %s\n", i->c_str());
726 loff += snprintf(buff + loff, sizeof(buff) - loff, " globlist:\n");
727 for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirGlobs.begin(); i != m_configuration.m_dirStatsDirGlobs.end(); ++i)
728 loff += snprintf(buff + loff, sizeof(buff) - loff, " %s/*\n", i->c_str());
729 }
730
731 if (m_configuration.m_hdfsmode)
732 {
733 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.hdfsmode hdfsbsize %lld\n", m_configuration.m_hdfsbsize);
734 }
735
736 if (m_configuration.m_username.empty())
737 {
738 char unameBuff[256];
739 XrdOucUtils::UserName(getuid(), unameBuff, sizeof(unameBuff));
740 m_configuration.m_username = unameBuff;
741 }
742 else
743 {
744 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.user %s\n", m_configuration.m_username.c_str());
745 }
746
747 m_log.Say(buff);
748
749 m_env->Put("XRDPFC.SEGSIZE", std::to_string(m_configuration.m_bufferSize).c_str());
750 }
751
752 // Derived settings
753 m_prefetch_enabled = CFG.m_prefetch_max_blocks > 0 || CFG.m_cgi_max_prefetch_max_blocks > 0;
755
756 m_gstream = (XrdXrootdGStream*) m_env->GetPtr("pfc.gStream*");
757
758 m_log.Say(" pfc g-stream has", m_gstream ? "" : " NOT", " been configured via xrootd.monitor directive\n");
759
760 // Create the ResourceMonitor and get it ready for starting the main thread function.
761 if (aOK)
762 {
763 m_res_mon = new ResourceMonitor(*m_oss);
764 m_res_mon->init_before_main();
765 }
766
767 m_log.Say("=====> Proxy file cache configuration parsing ", aOK ? "completed" : "failed");
768
769 if (ofsCfg) delete ofsCfg;
770
771 // XXXX-CKSUM Testing. To be removed after OssPgi is also merged and valildated.
772 // Building of xrdpfc_print fails when this is enabled.
773#ifdef XRDPFC_CKSUM_TEST
774 {
775 int xxx = m_configuration.m_cs_Chk;
776
777 for (m_configuration.m_cs_Chk = CSChk_None; m_configuration.m_cs_Chk <= CSChk_Both; ++m_configuration.m_cs_Chk)
778 {
779 Info::TestCksumStuff();
780 }
781
782 m_configuration.m_cs_Chk = xxx;
783 }
784#endif
785
786 return aOK;
787}
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition XrdPfc.cc:76
#define open
Definition XrdPosix.hh:76
bool Parse(TheLib what)
bool Plugin(XrdAccAuthorize *&piP)
Get Authorization plugin.
static XrdOfsConfigPI * New(const char *cfn, XrdOucStream *cfgP, XrdSysError *errP, XrdVersionInfo *verP=0, XrdSfsFileSystem *sfsP=0)
bool Load(int what, XrdOucEnv *envP=0)
bool Push(TheLib what, const char *plugP, const char *parmP=0)
@ theOssLib
Oss plugin.
long long Total
Definition XrdOssVS.hh:90
char * Get(const char *varname)
Definition XrdOucEnv.hh:69
static int Export(const char *Var, const char *Val)
Definition XrdOucEnv.cc:170
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
static int UserName(uid_t uID, char *uName, int uNsz)
static int a2ll(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:70
bool Config(const char *config_filename, const char *parameters, XrdOucEnv *env)
Parse configuration file.
static size_t s_maxNumAccess
long long m_cgi_max_bufferSize
max buffer size allowed in pfc.blocksize
Definition XrdPfc.hh:115
int m_accHistorySize
max number of entries in access history part of cinfo file
Definition XrdPfc.hh:100
int m_cgi_min_prefetch_max_blocks
min prefetch block count allowed in pfc.prefetch
Definition XrdPfc.hh:116
bool m_cgi_prefetch_allowed
allow cgi setting of prefetch
Definition XrdPfc.hh:119
int m_prefetch_max_blocks
default maximum number of blocks to prefetch per file
Definition XrdPfc.hh:112
long long m_cgi_min_bufferSize
min buffer size allowed in pfc.blocksize
Definition XrdPfc.hh:114
int m_cgi_max_prefetch_max_blocks
max prefetch block count allowed in pfc.prefetch
Definition XrdPfc.hh:117
bool m_cgi_blocksize_allowed
allow cgi setting of blocksize
Definition XrdPfc.hh:118
std::string m_diskUsageLWM
Definition XrdPfc.hh:141
std::string m_diskUsageHWM
Definition XrdPfc.hh:142
std::string m_fileUsageBaseline
Definition XrdPfc.hh:143
std::string m_fileUsageNominal
Definition XrdPfc.hh:144
std::string m_flushRaw
Definition XrdPfc.hh:146
std::string m_fileUsageMax
Definition XrdPfc.hh:145

References XrdOuca2x::a2ll(), XrdOuca2x::a2sz(), Config(), XrdPfc::CSChk_Both, XrdPfc::CSChk_None, Error, XrdOucEnv::Export(), XrdOucEnv::Get(), XrdOfsConfigPI::Load(), XrdPfc::Configuration::m_accHistorySize, XrdPfc::Configuration::m_cgi_blocksize_allowed, XrdPfc::Configuration::m_cgi_max_bufferSize, XrdPfc::Configuration::m_cgi_max_prefetch_max_blocks, XrdPfc::Configuration::m_cgi_min_bufferSize, XrdPfc::Configuration::m_cgi_min_prefetch_max_blocks, XrdPfc::Configuration::m_cgi_prefetch_allowed, XrdPfc::TmpConfiguration::m_diskUsageHWM, XrdPfc::TmpConfiguration::m_diskUsageLWM, XrdPfc::TmpConfiguration::m_fileUsageBaseline, XrdPfc::TmpConfiguration::m_fileUsageMax, XrdPfc::TmpConfiguration::m_fileUsageNominal, XrdPfc::TmpConfiguration::m_flushRaw, XrdPfc::Configuration::m_prefetch_max_blocks, XrdOfsConfigPI::New(), open, XrdOfsConfigPI::Parse(), XrdOfsConfigPI::Plugin(), XrdOfsConfigPI::Push(), XrdOucEnv::Put(), XrdPfc::Info::s_maxNumAccess, XrdOfsConfigPI::theOssLib, XrdOssVSInfo::Total, TRACE, XrdOucUtils::UserName(), and XrdOucGetCache().

Referenced by Config(), and XrdOucGetCache().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ ConsiderCached()

int Cache::ConsiderCached ( const char * curl)
virtual
Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why.
>0 - Reserved for future use.

Definition at line 1000 of file XrdPfc.cc.

1001{
1002 static const char* tpfx = "ConsiderCached ";
1003
1004 TRACE(Debug, tpfx << curl);
1005
1006 XrdCl::URL url(curl);
1007 std::string f_name = url.GetPath();
1008
1009 File *file = nullptr;
1010 {
1011 XrdSysCondVarHelper lock(&m_active_cond);
1012 auto it = m_active.find(f_name);
1013 if (it != m_active.end()) {
1014 file = it->second;
1015 // If the file-open is in progress, `file` is a nullptr
1016 // so we cannot increase the reference count. For now,
1017 // simply treat it as if the file open doesn't exist instead
1018 // of trying to wait and see if it succeeds.
1019 if (file) {
1020 inc_ref_cnt(file, false, false);
1021 }
1022 }
1023 }
1024 if (file) {
1025 struct stat sbuff;
1026 int res = file->Fstat(sbuff);
1027 dec_ref_cnt(file, false);
1028 if (res)
1029 return res;
1030 // DecideIfConsideredCached() already called in File::Fstat().
1031 return sbuff.st_atime > 0 ? 0 : -EREMOTE;
1032 }
1033
1034 struct stat sbuff;
1035 int res = m_oss->Stat(f_name.c_str(), &sbuff);
1036 if (res != XrdOssOK) {
1037 TRACE(Debug, tpfx << curl << " -> " << res);
1038 return res;
1039 }
1040 if (S_ISDIR(sbuff.st_mode))
1041 {
1042 TRACE(Debug, tpfx << curl << " -> EISDIR");
1043 return -EISDIR;
1044 }
1045
1046 long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1047 if (file_size < 0) {
1048 TRACE(Debug, tpfx << curl << " -> " << file_size);
1049 return (int) file_size;
1050 }
1051 bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1052
1053 return is_cached ? 0 : -EREMOTE;
1054}
#define XrdOssOK
Definition XrdOss.hh:50
#define stat(a, b)
Definition XrdPosix.hh:101
XrdOucString File
long long DetermineFullFileSize(const std::string &cinfo_fname)
Definition XrdPfc.cc:926
bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
Definition XrdPfc.cc:967
int Fstat(struct stat &sbuff)
static const char * s_infoExtension

References Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, stat, TRACE, and XrdOssOK.

Here is the call graph for this function:

◆ CreateInstance()

Cache & Cache::CreateInstance ( XrdSysLogger * logger,
XrdOucEnv * env )
static

Singleton creation.

Definition at line 125 of file XrdPfc.cc.

126{
127 assert (m_instance == 0);
128 m_instance = new Cache(logger, env);
129 return *m_instance;
130}
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition XrdPfc.cc:158

References Cache().

Referenced by XrdOucGetCache().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Decide()

bool Cache::Decide ( XrdOucCacheIO * io)

Makes decision if the original XrdOucCacheIO should be cached.

Parameters
&URL of file
Returns
decision if IO object will be cached.

Definition at line 137 of file XrdPfc.cc.

138{
139 if (! m_decisionpoints.empty())
140 {
141 XrdCl::URL url(io->Path());
142 std::string filename = url.GetPath();
143 std::vector<Decision*>::const_iterator it;
144 for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
145 {
146 XrdPfc::Decision *d = *it;
147 if (! d) continue;
148 if (! d->Decide(filename, *m_oss))
149 {
150 return false;
151 }
152 }
153 }
154
155 return true;
156}
virtual bool Decide(const std::string &, XrdOss &) const =0

References XrdPfc::Decision::Decide(), XrdCl::URL::GetPath(), and XrdOucCacheIO::Path().

Referenced by Attach().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ DecideIfConsideredCached()

bool Cache::DecideIfConsideredCached ( long long file_size,
long long bytes_on_disk )

Definition at line 967 of file XrdPfc.cc.

968{
969 if (file_size == 0 || bytes_on_disk >= file_size)
970 return true;
971
972 double frac_on_disk = (double) bytes_on_disk / file_size;
973
974 if (file_size <= m_configuration.m_onlyIfCachedMinSize)
975 {
976 if (frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
977 return true;
978 }
979 else
980 {
981 if (bytes_on_disk >= m_configuration.m_onlyIfCachedMinSize &&
982 frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
983 return true;
984 }
985 return false;
986}

Referenced by ConsiderCached(), and Stat().

Here is the caller graph for this function:

◆ DeRegisterPrefetchFile()

void Cache::DeRegisterPrefetchFile ( File * file)

Definition at line 711 of file XrdPfc.cc.

712{
713 // Can be called with other locks held.
714
715 if ( ! m_prefetch_enabled)
716 {
717 return;
718 }
719
720 m_prefetch_condVar.Lock();
721 for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
722 {
723 if (*it == file)
724 {
725 m_prefetchList.erase(it);
726 break;
727 }
728 }
729 m_prefetch_condVar.UnLock();
730}

◆ DetermineFullFileSize()

long long Cache::DetermineFullFileSize ( const std::string & cinfo_fname)

Definition at line 926 of file XrdPfc.cc.

927{
928 if (m_metaXattr) {
929 char pfn[4096];
930 m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
931 long long fsize = -1ll;
932 int res = XrdSysXAttrActive->Get("pfc.fsize", &fsize, sizeof(long long), pfn);
933 if (res == sizeof(long long))
934 {
935 return fsize;
936 }
937 else
938 {
939 TRACE(Debug, "DetermineFullFileSize error getting xattr " << res);
940 }
941 }
942
943 XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
944 XrdOucEnv env;
945 long long ret;
946 int res = infoFile->Open(cinfo_fname.c_str(), O_RDONLY, 0600, env);
947 if (res < 0) {
948 ret = res;
949 } else {
950 Info info(m_trace, 0);
951 if ( ! info.Read(infoFile, cinfo_fname.c_str())) {
952 ret = -EBADF;
953 } else {
954 ret = info.GetFileSize();
955 }
956 infoFile->Close();
957 }
958 delete infoFile;
959 return ret;
960}
XrdSysXAttr * XrdSysXAttrActive
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0

References XrdOssDF::Close(), Debug, XrdPfc::Info::GetFileSize(), XrdOssDF::Open(), XrdPfc::Info::Read(), TRACE, and XrdSysXAttrActive.

Referenced by ConsiderCached(), and Stat().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ ExecuteCommandUrl()

void Cache::ExecuteCommandUrl ( const std::string & command_url)

Definition at line 51 of file XrdPfcCommand.cc.

52{
53 static const char *top_epfx = "ExecuteCommandUrl ";
54
55 SplitParser cp(command_url, "/");
56
57 std::string token = cp.get_token();
58
59 if (token != "xrdpfc_command")
60 {
61 TRACE(Error, top_epfx << "First token is NOT xrdpfc_command.");
62 return;
63 }
64
65 // Get the command
66 token = cp.get_token_as_string();
67
68 auto get_opt = [](SplitParser &sp) -> char {
69 const char *t = sp.get_token();
70 if (t)
71 return (t[0] == '-' && t[1] != 0) ? t[1] : 0;
72 else
73 return -1;
74 };
75
76 //================================================================
77 // create_file
78 //================================================================
79
80 if (token == "create_file")
81 {
82 static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/create_file: ";
83 static const char* usage =
84 "Usage: create_file/ [-h] [-s filesize] [-b blocksize] [-t access_time] [-d access_duration]/<path>\n"
85 " Creates a cache file with given parameters. Data in file is random.\n"
86 " Useful for cache purge testing.\n"
87 "Notes:\n"
88 " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n"
89 " . Default filesize=1G, blocksize=<as configured>, access_time=-10, access_duration=10.\n"
90 " . -t and -d can be given multiple times to record several accesses.\n"
91 " . Negative arguments given to -t are interpreted as relative to now.\n";
92
93 const Configuration &conf = m_configuration;
94
95 token = cp.get_token_as_string();
96 TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
97 if (token.empty()) {
98 TRACE(Error, err_prefix << "Options section must not be empty, a single space character is OK.");
99 return;
100 }
101 TRACE(Debug, err_prefix << "File path (reminder of URL) is '" << cp.get_reminder() <<"'.");
102 if ( ! cp.has_reminder()) {
103 TRACE(Error, err_prefix << "Path section must not be empty.");
104 return;
105 }
106
107 long long file_size = ONE_GB;
108 long long block_size = conf.m_bufferSize;
109 int access_time [MAX_ACCESSES];
110 int access_duration[MAX_ACCESSES];
111 int at_count = 0, ad_count = 0;
112
113 time_t time_now = time(0);
114
115 SplitParser ap(token, " ");
116 char theOpt;
117
118 while ((theOpt = get_opt(ap)) != (char) -1)
119 {
120 switch (theOpt)
121 {
122 case 'h': {
123 m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
124 return;
125 }
126 case 's': {
127 if (XrdOuca2x::a2sz(m_log, "Error getting filesize", ap.get_token(),
128 &file_size, 0ll, 32 * ONE_GB))
129 return;
130 break;
131 }
132 case 'b': {
133 if (XrdOuca2x::a2sz(m_log, "Error getting blocksize", ap.get_token(),
134 &block_size, 0ll, 64 * ONE_MB))
135 return;
136 break;
137 }
138 case 't': {
139 if (XrdOuca2x::a2i(m_log, "Error getting access time", ap.get_token(),
140 &access_time[at_count++], INT_MIN, INT_MAX))
141 return;
142 break;
143 }
144 case 'd': {
145 if (XrdOuca2x::a2i(m_log, "Error getting access duration", ap.get_token(),
146 &access_duration[ad_count++], 0, 24 * 3600))
147 return;
148 break;
149 }
150 default: {
151 TRACE(Error, err_prefix << "Unhandled command argument.");
152 return;
153 }
154 }
155 }
156
157 if (at_count < 1) access_time [at_count++] = time_now - 10;
158 if (ad_count < 1) access_duration[ad_count++] = 10;
159
160 if (at_count != ad_count)
161 {
162 TRACE(Error, err_prefix << "Options -t and -d must be given the same number of times.");
163 return;
164 }
165
166 std::string file_path (cp.get_reminder_with_delim());
167 std::string cinfo_path(file_path + Info::s_infoExtension);
168
169 TRACE(Debug, err_prefix << "Command arguments parsed successfully. Proceeding to create file " << file_path);
170
171 // Check if cinfo exists ... bail out if it does.
172 {
173 struct stat infoStat;
174 if (GetOss()->Stat(cinfo_path.c_str(), &infoStat) == XrdOssOK)
175 {
176 TRACE(Error, err_prefix << "cinfo file already exists for '" << file_path << "'. Refusing to overwrite.");
177 return;
178 }
179 }
180
181 TRACE(Debug, err_prefix << "Command arguments parsed successfully, proceeding to execution.");
182
183 {
184 const char *myUser = conf.m_username.c_str();
185 XrdOucEnv myEnv;
186
187 // Create the data file.
188
189 char size_str[32]; sprintf(size_str, "%lld", file_size);
190 myEnv.Put("oss.asize", size_str);
191 myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
192 int cret;
193 if ((cret = GetOss()->Create(myUser, file_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
194 {
195 TRACE(Error, err_prefix << "Create failed for data file " << file_path << ERRNO_AND_ERRSTR(-cret));
196 return;
197 }
198
199 XrdOssDF *myFile = GetOss()->newFile(myUser);
200 if ((cret = myFile->Open(file_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
201 {
202 TRACE(Error, err_prefix << "Open failed for data file " << file_path << ERRNO_AND_ERRSTR(-cret));
203 delete myFile;
204 return;
205 }
206
207 // Create the info file.
208
209 myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
210 myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
211 if ((cret = GetOss()->Create(myUser, cinfo_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
212 {
213 TRACE(Error, err_prefix << "Create failed for info file " << cinfo_path << ERRNO_AND_ERRSTR(-cret));
214 myFile->Close(); delete myFile;
215 return;
216 }
217
218 XrdOssDF *myInfoFile = GetOss()->newFile(myUser);
219 if ((cret = myInfoFile->Open(cinfo_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
220 {
221 TRACE(Error, err_prefix << "Open failed for info file " << cinfo_path << ERRNO_AND_ERRSTR(-cret));
222 delete myInfoFile;
223 myFile->Close(); delete myFile;
224 return;
225 }
226
227 // Allocate space for the data file.
228
229 if ((cret = posix_fallocate(myFile->getFD(), 0, file_size)))
230 {
231 TRACE(Error, err_prefix << "posix_fallocate failed for data file " << file_path << ERRNO_AND_ERRSTR(cret));
232 }
233
234 // Fill up cinfo.
235
236 Info myInfo(m_trace, false);
237 myInfo.SetBufferSizeFileSizeAndCreationTime(block_size, file_size);
238 myInfo.SetAllBitsSynced();
239
240 for (int i = 0; i < at_count; ++i)
241 {
242 time_t att_time = access_time[i] >= 0 ? access_time[i] : time_now + access_time[i];
243
244 myInfo.WriteIOStatSingle(file_size, att_time, att_time + access_duration[i]);
245 }
246
247 myInfo.Write(myInfoFile, cinfo_path.c_str());
248
249 // Fake last modified time to the last access_time
250 {
251 time_t last_detach;
252 myInfo.GetLatestDetachTime(last_detach);
253 struct timespec acc_mod_time[2] = { {last_detach, UTIME_OMIT}, {last_detach, 0} };
254
255 futimens(myInfoFile->getFD(), acc_mod_time);
256 }
257
258 myInfoFile->Close(); delete myInfoFile;
259 myFile->Close(); delete myFile;
260
261 struct stat dstat;
262 GetOss()->Stat(file_path.c_str(), &dstat);
263 TRACE(Info, err_prefix << "Created file '" << file_path << "', size=" << (file_size>>20) << "MB, "
264 << "st_blocks=" << dstat.st_blocks);
265
266 {
267 XrdSysCondVarHelper lock(&m_writeQ.condVar);
268
269 m_writeQ.writes_between_purges += file_size;
270 }
271 {
272 int token = m_res_mon->register_file_open(file_path, time_now, false);
273 XrdPfc::Stats stats;
274 stats.m_BytesWritten = file_size;
275 stats.m_StBlocksAdded = dstat.st_blocks;
276 m_res_mon->register_file_update_stats(token, stats);
277 m_res_mon->register_file_close(token, time(0), stats);
278 }
279 }
280 }
281
282 //================================================================
283 // remove_file
284 //================================================================
285
286 else if (token == "remove_file")
287 {
288 static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/remove_file: ";
289 static const char* usage =
290 "Usage: remove_file/ [-h] /<path>\n"
291 " Removes given file from the cache unless it is currently open.\n"
292 " Useful for removal of stale files or duplicate files in a caching cluster.\n"
293 "Notes:\n"
294 " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n";
295
296 token = cp.get_token_as_string();
297 TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
298 if (token.empty()) {
299 TRACE(Error, err_prefix << "Options section must not be empty, a single space character is OK.");
300 return;
301 }
302 TRACE(Debug, err_prefix << "File path (reminder of URL) is '" << cp.get_reminder() <<"'.");
303 if ( ! cp.has_reminder()) {
304 TRACE(Error, err_prefix << "Path section must not be empty.");
305 return;
306 }
307
308 SplitParser ap(token, " ");
309 char theOpt;
310
311 while ((theOpt = get_opt(ap)) != (char) -1)
312 {
313 switch (theOpt)
314 {
315 case 'h': {
316 m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
317 return;
318 }
319 default: {
320 TRACE(Error, err_prefix << "Unhandled command argument.");
321 return;
322 }
323 }
324 }
325
326 std::string f_name(cp.get_reminder_with_delim());
327
328 TRACE(Debug, err_prefix << "file argument '" << f_name << "'.");
329
330 int ret = UnlinkFile(f_name, true);
331
332 TRACE(Info, err_prefix << "returned with status " << ret);
333 }
334
335 //================================================================
336 // unknown command
337 //================================================================
338
339 else
340 {
341 TRACE(Error, top_epfx << "Unknown or empty command '" << token << "'");
342 }
343}
struct stat Stat
Definition XrdCks.cc:49
void usage()
#define XRDOSS_mkpath
Definition XrdOss.hh:466
#define ERRNO_AND_ERRSTR(err_code)
bool Create
virtual int getFD()
Definition XrdOss.hh:426
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition XrdOuca2x.cc:45
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1188
XrdOss * GetOss() const
Definition XrdPfc.hh:280
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesWritten
number of bytes written to disk
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:88
long long m_bufferSize
cache block size, default 128 kB
Definition XrdPfc.hh:107
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:89
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:87

References XrdOuca2x::a2i(), XrdOuca2x::a2sz(), XrdOssDF::Close(), Create, Debug, ERRNO_AND_ERRSTR, Error, XrdPfc::SplitParser::get_reminder(), XrdPfc::SplitParser::get_reminder_with_delim(), XrdPfc::SplitParser::get_token(), XrdPfc::SplitParser::get_token_as_string(), XrdOssDF::getFD(), XrdPfc::Info::GetLatestDetachTime(), GetOss(), XrdPfc::SplitParser::has_reminder(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Stats::m_BytesWritten, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_meta_space, XrdPfc::Stats::m_StBlocksAdded, XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdOucEnv::Put(), XrdPfc::Info::s_infoExtension, XrdPfc::Info::SetAllBitsSynced(), XrdPfc::Info::SetBufferSizeFileSizeAndCreationTime(), Stat, XrdOss::Stat(), stat, TRACE, UnlinkFile(), usage(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), XRDOSS_mkpath, and XrdOssOK.

Here is the call graph for this function:

◆ FileSyncDone()

void Cache::FileSyncDone ( File * f,
bool high_debug )

Definition at line 542 of file XrdPfc.cc.

543{
544 dec_ref_cnt(f, high_debug);
545}

◆ GetFile()

File * Cache::GetFile ( const std::string & path,
IO * io,
long long off = 0,
long long filesize = 0 )

Definition at line 389 of file XrdPfc.cc.

390{
391 // Called from virtual IOFile constructor.
392
393 TRACE(Debug, "GetFile " << path << ", io " << io);
394
395 ActiveMap_i it;
396
397 {
398 XrdSysCondVarHelper lock(&m_active_cond);
399
400 while (true)
401 {
402 it = m_active.find(path);
403
404 // File is not open or being opened. Mark it as being opened and
405 // proceed to opening it outside of while loop.
406 if (it == m_active.end())
407 {
408 it = m_active.insert(std::make_pair(path, (File*) 0)).first;
409 break;
410 }
411
412 if (it->second != 0)
413 {
414 it->second->AddIO(io);
415 inc_ref_cnt(it->second, false, true);
416
417 return it->second;
418 }
419 else
420 {
421 // Wait for some change in m_active, then recheck.
422 m_active_cond.Wait();
423 }
424 }
425 }
426
427 // This is always true, now that IOFileBlock is unsupported.
428 if (filesize == 0)
429 {
430 struct stat st;
431 int res = io->Fstat(st);
432 if (res < 0) {
433 errno = res;
434 TRACE(Error, "GetFile, could not get valid stat");
435 } else if (res > 0) {
436 errno = ENOTSUP;
437 TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
438 } else {
439 filesize = st.st_size;
440 }
441 }
442
443 File *file = 0;
444
445 if (filesize >= 0)
446 {
447 file = File::FileOpen(path, off, filesize, io->GetInput());
448 }
449
450 {
451 XrdSysCondVarHelper lock(&m_active_cond);
452
453 if (file)
454 {
455 inc_ref_cnt(file, false, true);
456 it->second = file;
457
458 file->AddIO(io);
459 }
460 else
461 {
462 m_active.erase(it);
463 }
464
465 m_active_cond.Broadcast();
466 }
467
468 return file;
469}
virtual int Fstat(struct stat &sbuff)
void AddIO(IO *io)
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
XrdOucCacheIO * GetInput()
Definition XrdPfcIO.cc:31

References XrdPfc::File::AddIO(), Debug, Error, XrdPfc::File::FileOpen(), XrdOucCacheIO::Fstat(), XrdPfc::IO::GetInput(), stat, and TRACE.

Referenced by XrdPfc::IOFile::IOFile().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GetGStream()

XrdXrootdGStream * XrdPfc::Cache::GetGStream ( )
inline

Definition at line 298 of file XrdPfc.hh.

298{ return m_gstream; }

◆ GetInstance()

Cache & Cache::GetInstance ( )
static

Singleton access.

Definition at line 132 of file XrdPfc.cc.

132{ return *m_instance; }

References Cache().

Referenced by XrdPfc::IOFile::IOFile(), XrdPfc::IOFileBlock::IOFileBlock(), Attach(), XrdPfc::IOFile::DetachFinalize(), XrdPfc::ResourceMonitor::perform_purge_check(), XrdPfc::ResourceMonitor::perform_purge_task_cleanup(), PrefetchThread(), ProcessWriteTaskThread(), Proto_ResourceMonitorHeartBeat(), XrdPfc::File::Sync(), and XrdPfc::File::WriteBlockToDisk().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GetLog()

XrdSysError * XrdPfc::Cache::GetLog ( ) const
inline

Definition at line 294 of file XrdPfc.hh.

294{ return &m_log; }

Referenced by XrdPfc::File::GetLog().

Here is the caller graph for this function:

◆ GetNextFileToPrefetch()

File * Cache::GetNextFileToPrefetch ( )

Definition at line 733 of file XrdPfc.cc.

734{
735 m_prefetch_condVar.Lock();
736 while (m_prefetchList.empty())
737 {
738 m_prefetch_condVar.Wait();
739 }
740
741 // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
742
743 size_t l = m_prefetchList.size();
744 int idx = rand() % l;
745 File* f = m_prefetchList[idx];
746
747 m_prefetch_condVar.UnLock();
748 return f;
749}

Referenced by Prefetch().

Here is the caller graph for this function:

◆ GetOss()

XrdOss * XrdPfc::Cache::GetOss ( ) const
inline

Definition at line 280 of file XrdPfc.hh.

280{ return m_oss; }

Referenced by ExecuteCommandUrl().

Here is the caller graph for this function:

◆ GetPurgePin()

PurgePin * XrdPfc::Cache::GetPurgePin ( ) const
inline

Definition at line 284 of file XrdPfc.hh.

284{ return m_purge_pin; }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check().

Here is the caller graph for this function:

◆ GetTrace()

XrdSysTrace * XrdPfc::Cache::GetTrace ( ) const
inline

Definition at line 295 of file XrdPfc.hh.

295{ return m_trace; }

Referenced by XrdPfc::File::GetTrace().

Here is the caller graph for this function:

◆ is_prefetch_enabled()

bool XrdPfc::Cache::is_prefetch_enabled ( ) const
inline

Definition at line 307 of file XrdPfc.hh.

307{ return m_prefetch_enabled; }

Referenced by XrdOucGetCache().

Here is the caller graph for this function:

◆ IsFileActiveOrPurgeProtected()

bool Cache::IsFileActiveOrPurgeProtected ( const std::string & path) const

Definition at line 677 of file XrdPfc.cc.

678{
679 XrdSysCondVarHelper lock(&m_active_cond);
680
681 return m_active.find(path) != m_active.end() ||
682 m_purge_delay_set.find(path) != m_purge_delay_set.end();
683}

◆ LocalFilePath()

int Cache::LocalFilePath ( const char * curl,
char * buff = 0,
int blen = 0,
LFP_Reason why = ForAccess,
bool forall = false )
virtual

Get the path to a file that is complete in the local cache. By default, the file must be complete in the cache (i.e. no blocks are missing). This can be overridden. This path can be used to access the file on the local node.

Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why. If a buffer was supplied and a path could be generated it is returned only if "why" is ForCheck or ForInfo. Otherwise, a null path is returned.
>0 - Reserved for future use.

Reimplemented from XrdOucCache.

Definition at line 794 of file XrdPfc.cc.

796{
797 static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
798 static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
799 static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
800
801 TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
802
803 if (buff && blen > 0) buff[0] = 0;
804
805 XrdCl::URL url(curl);
806 std::string f_name = url.GetPath();
807 std::string i_name = f_name + Info::s_infoExtension;
808
809 if (why == ForPath)
810 {
811 int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
812 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
813 return ret;
814 }
815
816 {
817 XrdSysCondVarHelper lock(&m_active_cond);
818 m_purge_delay_set.insert(f_name);
819 }
820
821 struct stat sbuff, sbuff2;
822 if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
823 m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
824 {
825 if (S_ISDIR(sbuff.st_mode))
826 {
827 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
828 return -EISDIR;
829 }
830 else
831 {
832 bool read_ok = false;
833 bool is_complete = false;
834
835 // Lock and check if the file is active. If NOT, keep the lock
836 // and add dummy access after successful reading of info file.
837 // If it IS active, just release the lock, this ongoing access will
838 // assure the file continues to exist.
839
840 // XXXX How can I just loop over the cinfo file when active?
841 // Can I not get is_complete from the existing file?
842 // Do I still want to inject access record?
843 // Oh, it writes only if not active .... still let's try to use existing File.
844
845 m_active_cond.Lock();
846
847 bool is_active = m_active.find(f_name) != m_active.end();
848
849 if (is_active) m_active_cond.UnLock();
850
851 XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
852 XrdOucEnv myEnv;
853 int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
854 if (res >= 0)
855 {
856 Info info(m_trace, 0);
857 if (info.Read(infoFile, i_name.c_str()))
858 {
859 read_ok = true;
860
861 is_complete = info.IsComplete();
862
863 // Add full-size access if reason is for access.
864 if ( ! is_active && is_complete && why == ForAccess)
865 {
866 info.WriteIOStatSingle(info.GetFileSize());
867 info.Write(infoFile, i_name.c_str());
868 }
869 }
870 infoFile->Close();
871 }
872 delete infoFile;
873
874 if ( ! is_active) m_active_cond.UnLock();
875
876 if (read_ok)
877 {
878 if ((is_complete || why == ForInfo) && buff != 0)
879 {
880 int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
881 if (res2 < 0)
882 return res2;
883
884 // Normally, files are owned by us but when direct cache access
885 // is wanted and possible, make sure the file is world readable.
886 if (why == ForAccess)
887 {mode_t mode = (forall ? worldReadable : groupReadable);
888 if (((sbuff.st_mode & worldReadable) != mode)
889 && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
890 {is_complete = false;
891 *buff = 0;
892 }
893 }
894 }
895
896 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
897 (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
898
899 return is_complete ? 0 : -EREMOTE;
900 }
901 }
902 }
903
904 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
905 return -ENOENT;
906}

References XrdOssDF::Close(), Debug, XrdOucCache::ForAccess, XrdOucCache::ForInfo, XrdOucCache::ForPath, XrdPfc::Info::GetFileSize(), XrdCl::URL::GetPath(), XrdPfc::Info::IsComplete(), XrdOssDF::Open(), XrdPfc::Info::Read(), XrdPfc::Info::s_infoExtension, stat, TRACE, XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), and XrdOssOK.

Here is the call graph for this function:

◆ Prefetch()

void Cache::Prefetch ( )

Definition at line 752 of file XrdPfc.cc.

753{
754 const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
755
756 while (true)
757 {
758 m_RAM_mutex.Lock();
759 bool doPrefetch = (m_RAM_used < limit_RAM);
760 m_RAM_mutex.UnLock();
761
762 if (doPrefetch)
763 {
765 f->Prefetch();
766 }
767 else
768 {
770 }
771 }
772}
File * GetNextFileToPrefetch()
Definition XrdPfc.cc:733
static void Wait(int milliseconds)

References GetNextFileToPrefetch(), XrdPfc::File::Prefetch(), and XrdSysTimer::Wait().

Referenced by PrefetchThread().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ prefetch_str2value()

bool Cache::prefetch_str2value ( const char * from,
const char * str,
int & val,
int min,
int max ) const

Definition at line 118 of file XrdPfcConfiguration.cc.

120{
121 if (XrdOuca2x::a2i(m_log, "Error parsing prefetch block count", str, &val, min, max))
122 return false;
123
124 return true;
125}

References XrdOuca2x::a2i().

Here is the call graph for this function:

◆ Prepare()

int Cache::Prepare ( const char * curl,
int oflags,
mode_t mode )
virtual

Preapare the cache for a file open request. This method is called prior to actually opening a file. This method is meant to allow defering an open request or implementing the full I/O stack in the cache layer.

Returns
<0 Error has occurred, return value is -errno; fail open request. =0 Continue with open() request. >0 Defer open but treat the file as actually being open. Use the XrdOucCacheIO::Open() method to open the file at a later time.

Reimplemented from XrdOucCache.

Definition at line 1066 of file XrdPfc.cc.

1067{
1068 XrdCl::URL url(curl);
1069 std::string f_name = url.GetPath();
1070 std::string i_name = f_name + Info::s_infoExtension;
1071
1072 // Do not allow write access.
1073 if ((oflags & O_ACCMODE) != O_RDONLY)
1074 {
1075 TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1076 return -EROFS;
1077 }
1078
1079 // Intercept xrdpfc_command requests.
1080 if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1081 {
1082 // Schedule a job to process command request.
1083 {
1084 CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1085
1086 schedP->Schedule(ce);
1087 }
1088
1089 return -EAGAIN;
1090 }
1091
1092 {
1093 XrdSysCondVarHelper lock(&m_active_cond);
1094 m_purge_delay_set.insert(f_name);
1095 }
1096
1097 struct stat sbuff;
1098 if (m_oss->Stat(i_name.c_str(), &sbuff) == XrdOssOK)
1099 {
1100 TRACE(Dump, "Prepare defer open " << f_name);
1101 return 1;
1102 }
1103 else
1104 {
1105 return 0;
1106 }
1107}
static XrdScheduler * schedP
Definition XrdPfc.hh:302

References XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, schedP, stat, TRACE, and XrdOssOK.

Here is the call graph for this function:

◆ ProcessWriteTasks()

void Cache::ProcessWriteTasks ( )

Separate task which writes blocks from ram to disk.

Definition at line 273 of file XrdPfc.cc.

274{
275 std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
276
277 while (true)
278 {
279 m_writeQ.condVar.Lock();
280 while (m_writeQ.size == 0)
281 {
282 m_writeQ.condVar.Wait();
283 }
284
285 // MT -- optimize to pop several blocks if they are available (or swap the list).
286 // This makes sense especially for smallish block sizes.
287
288 int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
289 long long sum_size = 0;
290
291 for (int bi = 0; bi < n_pushed; ++bi)
292 {
293 Block* block = m_writeQ.queue.front();
294 m_writeQ.queue.pop_front();
295 m_writeQ.writes_between_purges += block->get_size();
296 sum_size += block->get_size();
297
298 blks_to_write[bi] = block;
299
300 TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
301 }
302 m_writeQ.size -= n_pushed;
303
304 m_writeQ.condVar.UnLock();
305
306 {
307 XrdSysMutexHelper lock(&m_RAM_mutex);
308 m_RAM_write_queue -= sum_size;
309 }
310
311 for (int bi = 0; bi < n_pushed; ++bi)
312 {
313 Block* block = blks_to_write[bi];
314
315 block->m_file->WriteBlockToDisk(block);
316 }
317 }
318}
const char * lPath() const
Log path.
void WriteBlockToDisk(Block *b)

References XrdPfc::Block::get_size(), XrdPfc::File::lPath(), XrdPfc::Block::m_file, TRACE, and XrdPfc::File::WriteBlockToDisk().

Referenced by ProcessWriteTaskThread().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ RefConfiguration()

const Configuration & XrdPfc::Cache::RefConfiguration ( ) const
inline

Reference XrdPfc configuration.

Definition at line 215 of file XrdPfc.hh.

215{ return m_configuration; }

Referenced by XrdPfc::IOFileBlock::IOFileBlock(), Attach(), and XrdOucGetCache().

Here is the caller graph for this function:

◆ RefResMon()

ResourceMonitor & XrdPfc::Cache::RefResMon ( )
inline

Definition at line 297 of file XrdPfc.hh.

297{ return *m_res_mon; }

◆ RegisterPrefetchFile()

void Cache::RegisterPrefetchFile ( File * file)

Definition at line 695 of file XrdPfc.cc.

696{
697 // Can be called with other locks held.
698
699 if ( ! m_prefetch_enabled)
700 {
701 return;
702 }
703
704 m_prefetch_condVar.Lock();
705 m_prefetchList.push_back(file);
706 m_prefetch_condVar.Signal();
707 m_prefetch_condVar.UnLock();
708}

◆ ReleaseFile()

void Cache::ReleaseFile ( File * f,
IO * io )

Definition at line 471 of file XrdPfc.cc.

472{
473 // Called from virtual IO::DetachFinalize.
474
475 TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
476
477 {
478 XrdSysCondVarHelper lock(&m_active_cond);
479
480 f->RemoveIO(io);
481 }
482 dec_ref_cnt(f, true);
483}
void RemoveIO(IO *io)

References Debug, XrdPfc::File::GetLocalPath(), XrdPfc::File::RemoveIO(), and TRACE.

Referenced by XrdPfc::IOFile::DetachFinalize().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ ReleaseRAM()

void Cache::ReleaseRAM ( char * buf,
long long size )

Definition at line 371 of file XrdPfc.cc.

372{
373 bool std_size = (size == m_configuration.m_bufferSize);
374 {
375 XrdSysMutexHelper lock(&m_RAM_mutex);
376
377 m_RAM_used -= size;
378
379 if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
380 {
381 m_RAM_std_blocks.push_back(buf);
382 ++m_RAM_std_size;
383 return;
384 }
385 }
386 free(buf);
387}

◆ RemoveWriteQEntriesFor()

void Cache::RemoveWriteQEntriesFor ( File * f)

Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction.

Definition at line 240 of file XrdPfc.cc.

241{
242 std::list<Block*> removed_blocks;
243 long long sum_size = 0;
244
245 m_writeQ.condVar.Lock();
246 std::list<Block*>::iterator i = m_writeQ.queue.begin();
247 while (i != m_writeQ.queue.end())
248 {
249 if ((*i)->m_file == file)
250 {
251 TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
252 std::list<Block*>::iterator j = i++;
253 removed_blocks.push_back(*j);
254 sum_size += (*j)->get_size();
255 m_writeQ.queue.erase(j);
256 --m_writeQ.size;
257 }
258 else
259 {
260 ++i;
261 }
262 }
263 m_writeQ.condVar.UnLock();
264
265 {
266 XrdSysMutexHelper lock(&m_RAM_mutex);
267 m_RAM_write_queue -= sum_size;
268 }
269
270 file->BlocksRemovedFromWriteQ(removed_blocks);
271}

References XrdPfc::File::BlocksRemovedFromWriteQ(), XrdPfc::File::lPath(), and TRACE.

Referenced by UnlinkFile().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ RequestRAM()

char * Cache::RequestRAM ( long long size)

Definition at line 331 of file XrdPfc.cc.

332{
333 static const size_t s_block_align = sysconf(_SC_PAGESIZE);
334
335 bool std_size = (size == m_configuration.m_bufferSize);
336
337 m_RAM_mutex.Lock();
338
339 long long total = m_RAM_used + size;
340
341 if (total <= m_configuration.m_RamAbsAvailable)
342 {
343 m_RAM_used = total;
344 if (std_size && m_RAM_std_size > 0)
345 {
346 char *buf = m_RAM_std_blocks.back();
347 m_RAM_std_blocks.pop_back();
348 --m_RAM_std_size;
349
350 m_RAM_mutex.UnLock();
351
352 return buf;
353 }
354 else
355 {
356 m_RAM_mutex.UnLock();
357 char *buf;
358 if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
359 {
360 // Report out of mem? Probably should report it at least the first time,
361 // then periodically.
362 return 0;
363 }
364 return buf;
365 }
366 }
367 m_RAM_mutex.UnLock();
368 return 0;
369}

◆ ResMon()

ResourceMonitor & Cache::ResMon ( )
static

Definition at line 135 of file XrdPfc.cc.

135{ return m_instance->RefResMon(); }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check(), ResourceMonitorThread(), and XrdPfc::UnlinkPurgeStateFilesInMap().

Here is the caller graph for this function:

◆ ScheduleFileSync()

void XrdPfc::Cache::ScheduleFileSync ( File * f)
inline

Definition at line 290 of file XrdPfc.hh.

290{ schedule_file_sync(f, false, false); }

◆ Stat()

int Cache::Stat ( const char * curl,
struct stat & sbuff )
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information. >0 - Stat could not be done, forward operation to next level.

Reimplemented from XrdOucCache.

Definition at line 1117 of file XrdPfc.cc.

1118{
1119 const char *tpfx = "Stat ";
1120
1121 XrdCl::URL url(curl);
1122 std::string f_name = url.GetPath();
1123
1124 File *file = nullptr;
1125 {
1126 XrdSysCondVarHelper lock(&m_active_cond);
1127 auto it = m_active.find(f_name);
1128 if (it != m_active.end()) {
1129 file = it->second;
1130 // If `file` is nullptr, the file-open is in progress; instead
1131 // of waiting for the file-open to finish, simply treat it as if
1132 // the file-open doesn't exist.
1133 if (file) {
1134 inc_ref_cnt(file, false, false);
1135 }
1136 }
1137 }
1138 if (file) {
1139 int res = file->Fstat(sbuff);
1140 dec_ref_cnt(file, false);
1141 TRACE(Debug, tpfx << "from active file " << curl << " -> " << res);
1142 return res;
1143 }
1144
1145 int res = m_oss->Stat(f_name.c_str(), &sbuff);
1146 if (res != XrdOssOK) {
1147 TRACE(Debug, tpfx << curl << " -> " << res);
1148 return 1; // res; -- for only-if-cached
1149 }
1150 if (S_ISDIR(sbuff.st_mode))
1151 {
1152 TRACE(Debug, tpfx << curl << " -> EISDIR");
1153 return -EISDIR;
1154 }
1155
1156 long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1157 if (file_size < 0) {
1158 TRACE(Debug, tpfx << curl << " -> " << file_size);
1159 return 1; // (int) file_size; -- for only-if-cached
1160 }
1161 sbuff.st_size = file_size;
1162 bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1163 if ( ! is_cached)
1164 sbuff.st_atime = 0;
1165
1166 TRACE(Debug, tpfx << "from disk " << curl << " -> " << res);
1167
1168 return 0;
1169}

References Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, stat, TRACE, and XrdOssOK.

Here is the call graph for this function:

◆ TheOne()

const Cache & Cache::TheOne ( )
static

Definition at line 133 of file XrdPfc.cc.

133{ return *m_instance; }

References Cache().

Referenced by XrdPfc::File::GetLog(), XrdPfc::File::GetTrace(), XrdPfc::OldStylePurgeDriver(), and XrdPfc::UnlinkPurgeStateFilesInMap().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Unlink()

int Cache::Unlink ( const char * curl)
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information.

Reimplemented from XrdOucCache.

Definition at line 1178 of file XrdPfc.cc.

1179{
1180 XrdCl::URL url(curl);
1181 std::string f_name = url.GetPath();
1182
1183 // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1184
1185 return UnlinkFile(f_name, false);
1186}

References XrdCl::URL::GetPath(), and UnlinkFile().

Here is the call graph for this function:

◆ UnlinkFile()

int Cache::UnlinkFile ( const std::string & f_name,
bool fail_if_open )

Remove cinfo and data files from cache.

Definition at line 1188 of file XrdPfc.cc.

1189{
1190 static const char* trc_pfx = "UnlinkFile ";
1191 ActiveMap_i it;
1192 File *file = 0;
1193 long long st_blocks_to_purge = 0;
1194 {
1195 XrdSysCondVarHelper lock(&m_active_cond);
1196
1197 it = m_active.find(f_name);
1198
1199 if (it != m_active.end())
1200 {
1201 if (fail_if_open)
1202 {
1203 TRACE(Info, trc_pfx << f_name << ", file currently open and force not requested - denying request");
1204 return -EBUSY;
1205 }
1206
1207 // Null File* in m_active map means an operation is ongoing, probably
1208 // Attach() with possible File::Open(). Ask for retry.
1209 if (it->second == 0)
1210 {
1211 TRACE(Info, trc_pfx << f_name << ", an operation on this file is ongoing - denying request");
1212 return -EAGAIN;
1213 }
1214
1215 file = it->second;
1216 st_blocks_to_purge = file->initiate_emergency_shutdown();
1217 it->second = 0;
1218 }
1219 else
1220 {
1221 it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1222 }
1223 }
1224
1225 if (file) {
1227 } else {
1228 struct stat f_stat;
1229 if (m_oss->Stat(f_name.c_str(), &f_stat) == XrdOssOK)
1230 st_blocks_to_purge = f_stat.st_blocks;
1231 }
1232
1233 std::string i_name = f_name + Info::s_infoExtension;
1234
1235 // Unlink file & cinfo
1236 int f_ret = m_oss->Unlink(f_name.c_str());
1237 int i_ret = m_oss->Unlink(i_name.c_str());
1238
1239 if (st_blocks_to_purge)
1240 m_res_mon->register_file_purge(f_name, st_blocks_to_purge);
1241
1242 TRACE(Debug, trc_pfx << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1243
1244 {
1245 XrdSysCondVarHelper lock(&m_active_cond);
1246 m_active.erase(it);
1247 m_active_cond.Broadcast();
1248 }
1249
1250 return std::min(f_ret, i_ret);
1251}
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition XrdPfc.cc:240
long long initiate_emergency_shutdown()

References Debug, XrdPfc::File::initiate_emergency_shutdown(), RemoveWriteQEntriesFor(), XrdPfc::Info::s_infoExtension, stat, TRACE, and XrdOssOK.

Referenced by ExecuteCommandUrl(), XrdPfc::File::Sync(), and Unlink().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ VCheck()

bool XrdPfc::Cache::VCheck ( XrdVersionInfo & urVersion)
inlinestatic

Version check.

Definition at line 245 of file XrdPfc.hh.

245{ return true; }

◆ WriteFileSizeXAttr()

void Cache::WriteFileSizeXAttr ( int cinfo_fd,
long long file_size )

Definition at line 911 of file XrdPfc.cc.

912{
913 if (m_metaXattr) {
914 int res = XrdSysXAttrActive->Set("pfc.fsize", &file_size, sizeof(long long), 0, cinfo_fd, 0);
915 if (res != 0) {
916 TRACE(Debug, "WriteFileSizeXAttr error setting xattr " << res);
917 }
918 }
919}
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0

References Debug, TRACE, and XrdSysXAttrActive.

◆ WritesSinceLastCall()

long long Cache::WritesSinceLastCall ( )

Definition at line 320 of file XrdPfc.cc.

321{
322 // Called from ResourceMonitor for an alternative estimation of disk writes.
323 XrdSysCondVarHelper lock(&m_writeQ.condVar);
324 long long ret = m_writeQ.writes_between_purges;
325 m_writeQ.writes_between_purges = 0;
326 return ret;
327}

Referenced by XrdPfc::ResourceMonitor::perform_purge_check().

Here is the caller graph for this function:

Member Data Documentation

◆ schedP

XrdScheduler * Cache::schedP = nullptr
static

The documentation for this class was generated from the following files: