XRootD
Loading...
Searching...
No Matches
XrdSys::IOEvents::Poller Class Referenceabstract

#include <XrdSysIOEvents.hh>

Inheritance diagram for XrdSys::IOEvents::Poller:
Collaboration diagram for XrdSys::IOEvents::Poller:

Classes

struct  PipeData

Public Types

enum  CreateOpts { optTOM }

Public Member Functions

 Poller (int cFD, int rFD)
virtual ~Poller ()
 Destructor. Stop() is effecively called when this object is deleted.
void Stop ()

Static Public Member Functions

static PollerCreate (int &eNum, const char **eTxt=0, int crOpts=0)

Protected Member Functions

virtual void Begin (XrdSysSemaphore *syncp, int &rc, const char **eTxt)=0
void CbkTMO ()
bool CbkXeq (Channel *cP, int events, int eNum, const char *eTxt)
 CPP_ATOMIC_TYPE (bool) wakePend
virtual void Exclude (Channel *cP, bool &isLocked, bool dover=1)=0
int GetFault (Channel *cP)
int GetPollEnt (Channel *cP)
int GetRequest ()
virtual bool Include (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
bool Init (Channel *cP, int &eNum, const char **eTxt, bool &isLockd)
void LockChannel (Channel *cP)
virtual bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
int Poll2Enum (short events)
int SendCmd (PipeData &cmd)
void SetPollEnt (Channel *cP, int ptEnt)
virtual void Shutdown ()=0
bool TmoAdd (Channel *cP, int tmoSet)
void TmoDel (Channel *cP)
int TmoGet ()
void UnLockChannel (Channel *cP)

Protected Attributes

ChannelattBase
bool chDead
int cmdFD
int pipeBlen
char * pipeBuff
struct pollfd pipePoll
pthread_t pollTid
PipeData reqBuff
int reqFD
ChanneltmoBase
unsigned char tmoMask

Static Protected Attributes

static time_t maxTime = (sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff)
static pid_t parentPID = getpid()

Friends

class BootStrap
class Channel

Detailed Description

Define a poller object interface. A poller fields and dispatches event callbacks. An actual instance of a poller object is obtained by using the Create() method. You cannot simply create an instance of this object using new or in-place declaration since it is abstract. Any number of these objects may created. Each creation spawns a polling thread.

Definition at line 371 of file XrdSysIOEvents.hh.

Member Enumeration Documentation

◆ CreateOpts

Create a specialized instance of a poller object, initialize it, and start the polling process. You must call Create() to obtain a specialized poller.

Parameters
eNumPlace where errno is placed upon failure.
eTxtPlace where a pointer to the description of the failing operation is to be set. If null, no description is returned.
crOptsPoller options (see static const optxxx): optTOM - Timeout resumption after a timeout event must be manually reenabled. By default, event timeouts are automatically renabled after successful callbacks.
Returns
!0 Poller successfully created and started. eNum contains zero. eTxt if not null contains a null string. The returned value is a pointer to the Poller object. 0 Poller could not be created. eNum contains the associated errno value. eTxt if not null contains the failing operation.
Enumerator
optTOM 

Definition at line 398 of file XrdSysIOEvents.hh.

Constructor & Destructor Documentation

◆ Poller()

XrdSys::IOEvents::Poller::Poller ( int cFD,
int rFD )

Constructor

Parameters
cFDThe file descriptor to send commands to the poll thread.
rFDThe file descriptor to recv commands in the poll thread.

Definition at line 571 of file XrdSysIOEvents.cc.

572{
573
574// Now initialize local class members
575//
576 attBase = 0;
577 tmoBase = 0;
578 cmdFD = cFD;
579 reqFD = rFD;
580 wakePend = false;
581 pipeBuff = 0;
582 pipeBlen = 0;
583 pipePoll.fd = rFD;
584 pipePoll.events = POLLIN | POLLRDNORM;
585 tmoMask = 255;
586}

References attBase, cmdFD, pipeBlen, pipeBuff, pipePoll, reqFD, tmoBase, and tmoMask.

Referenced by XrdSys::IOEvents::PollE::PollE(), XrdSys::IOEvents::PollerErr1::PollerErr1(), XrdSys::IOEvents::PollerInit::PollerInit(), XrdSys::IOEvents::PollerWait::PollerWait(), XrdSys::IOEvents::PollKQ::PollKQ(), XrdSys::IOEvents::PollPoll::PollPoll(), and XrdSys::IOEvents::PollPort::PollPort().

Here is the caller graph for this function:

◆ ~Poller()

virtual XrdSys::IOEvents::Poller::~Poller ( )
inlinevirtual

Destructor. Stop() is effecively called when this object is deleted.

Definition at line 430 of file XrdSysIOEvents.hh.

430{}

Member Function Documentation

◆ Begin()

virtual void XrdSys::IOEvents::Poller::Begin ( XrdSysSemaphore * syncp,
int & rc,
const char ** eTxt )
protectedpure virtual

Start the polling event loop. An implementation must be supplied. Begin() is called via the internal BootStrap class from a new thread.

Implemented in XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollPoll, and XrdSys::IOEvents::PollPort.

Referenced by XrdSys::IOEvents::BootStrap::Start().

Here is the caller graph for this function:

◆ CbkTMO()

void XrdSys::IOEvents::Poller::CbkTMO ( )
protected

Definition at line 614 of file XrdSysIOEvents.cc.

615{
616 Channel *cP;
617
618// Process each element in the timeout queue, calling the callback function
619// if the timeout has passed. As this method can be called with a lock on the
620// channel mutex, we need to drop it prior to calling the callback.
621//
622 toMutex.Lock();
623 while((cP = tmoBase) && cP->deadLine <= time(0))
624 {int dlType = cP->dlType;
625 toMutex.UnLock();
626 CbkXeq(cP, dlType, 0, 0);
627 toMutex.Lock();
628 }
629 toMutex.UnLock();
630}
bool CbkXeq(Channel *cP, int events, int eNum, const char *eTxt)

References CbkXeq(), Channel, and tmoBase.

Referenced by XrdSys::IOEvents::PollE::Begin(), XrdSys::IOEvents::PollKQ::Begin(), XrdSys::IOEvents::PollPoll::Begin(), XrdSys::IOEvents::PollPort::Begin(), and TmoGet().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ CbkXeq()

bool XrdSys::IOEvents::Poller::CbkXeq ( Channel * cP,
int events,
int eNum,
const char * eTxt )
protected

Definition at line 636 of file XrdSysIOEvents.cc.

638{
639 XrdSysMutexHelper cbkMHelp(cP->chMutex);
640 char oldEvents;
641 bool cbok, retval, isRead, isWrite, isLocked = true;
642
643// Perform any required tracing
644//
645 if (TRACING)
646 {const char *cbtype = (cP->chPoller == cP->chPollXQ ? "norm" :
647 (cP->chPoller == &pollInit ? "init" :
648 (cP->chPoller == &pollWait ? "wait" : "err")));
649 DO_TRACE(CbkXeq,cP->chFD,"callback events=" <<events
650 <<" chev=" <<static_cast<int>(cP->chEvents)
651 <<" toq=" <<(cP->inTOQ != 0) <<" erc=" <<eNum
652 <<" callback " <<(cP->chCB ? "present" : "missing")
653 <<" poller=" <<cbtype);
654 }
655
656// Remove this from the timeout queue if there and reset the deadlines based
657// on the event we are reflecting. This separates read and write deadlines
658//
659 if (cP->inTOQ)
660 {TmoDel(cP);
661 cP->dlType |= (events & CallBack::ValidEvents) << 4;
662 isRead = events & (CallBack::ReadyToRead | CallBack:: ReadTimeOut);
663 if (isRead) cP->rdDL = maxTime;
665 if (isWrite) cP->wrDL = maxTime;
666 } else {
667 cP->dlType &= CallBack::ValidEvents;
668 isRead = isWrite = false;
669 }
670
671// Verify that there is a callback here and the channel is ready. If not,
672// disable this channel for the events being refelcted unless the event is a
673// fatal error. In this case we need to abandon the channel since error events
674// may continue to be generated as we can't always disable them.
675//
676 if (!(cP->chCB) || cP->chPoller != cP->chPollXQ)
677 {if (eNum)
678 {cP->chPoller = &pollErr1; cP->chFault = eNum;
679 cP->inPSet = 0;
680 return false;
681 }
682 oldEvents = cP->chEvents;
683 cP->chEvents = 0;
684 retval = cP->chPoller->Modify(cP, eNum, 0, isLocked);
685 TRACE_MOD(CbkXeq,cP->chFD,0);
686 if (!isLocked) cP->chMutex.Lock();
687 cP->chEvents = oldEvents;
688 return true;
689 }
690
691// Resolve the problem where we get an error event but the channel wants them
692// presented as a read or write event. If neither is possible then defer the
693// error until the channel is enabled again.
694//
695 if (eNum)
696 {if (cP->chEvents & Channel::errorEvents)
697 {cP->chPoller = &pollErr1; cP->chFault = eNum;
698 cP->chStat = Channel::isCBMode;
699 chDead = false;
700 cbkMHelp.UnLock();
701 cP->chCB->Fatal(cP,cP->chCBA, eNum, eTxt);
702 if (chDead) return true;
703 cbkMHelp.Lock(&(cP->chMutex));
704 cP->inPSet = 0;
705 return false;
706 }
707 if (REVENTS(cP->chEvents)) events = CallBack::ReadyToRead;
708 else if (WEVENTS(cP->chEvents)) events = CallBack::ReadyToWrite;
709 else {cP->chPoller = &pollErr1; cP->chFault = eNum; cP->inPSet = 0;
710 return false;
711 }
712 }
713
714// Indicate that we are in callback mode then drop the channel lock and effect
715// the callback. This allows the callback to freely manage locks.
716//
717 cP->chStat = Channel::isCBMode;
718 chDead = false;
719 // Detach() may be called after unlocking the channel and would zero the
720 // callback pointer and argument. So keep a copy.
721 CallBack *cb = cP->chCB;
722 void *cba = cP->chCBA;
723 cbkMHelp.UnLock();
724 IF_TRACE(CbkXeq,cP->chFD,"invoking callback; events=" <<events);
725 cbok = cb->Event(cP,cba, events);
726 IF_TRACE(CbkXeq,cP->chFD,"callback returned " <<BOOLNAME(cbok));
727
728// If channel destroyed by the callback, bail really fast. Otherwise, regain
729// the channel lock.
730//
731 if (chDead) return true;
732 cbkMHelp.Lock(&(cP->chMutex));
733
734// If the channel is being destroyed; then another thread must have done so.
735// Tell it the callback has finished and just return.
736//
737 if (cP->chStat != Channel::isCBMode)
738 {if (cP->chStat == Channel::isDead)
739 {XrdSysSemaphore *theSem = (XrdSysSemaphore *)cP->chCBA;
740 // channel will be destroyed shortly after post, unlock mutex before
741 cbkMHelp.UnLock();
742 theSem->Post();
743 }
744 return true;
745 }
746 cP->chStat = Channel::isClear;
747
748// Handle enable or disable here. If we keep the channel enabled then reset
749// the timeout if it hasn't been handled via a call from the callback.
750//
751 if (!cbok) Detach(cP,isLocked,false);
752 else if ((isRead || isWrite) && !(cP->inTOQ) && (cP->chRTO || cP->chWTO))
753 TmoAdd(cP, 0);
754
755// All done. While the mutex should not have been unlocked, we relock it if
756// it has to keep the mutex helper from croaking.
757//
758 if (!isLocked) cP->chMutex.Lock();
759 return true;
760}
#define IF_TRACE(x, fd, y)
#define DO_TRACE(x, fd, y)
#define REVENTS(x)
#define BOOLNAME(x)
#define TRACE_MOD(x, fd, y)
#define WEVENTS(x)
#define TRACING(x)
Definition XrdTrace.hh:70
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ ValidEvents
Mask to test for valid events.
@ errorEvents
Error event non-r/w specific.
bool TmoAdd(Channel *cP, int tmoSet)

References BOOLNAME, CbkXeq(), chDead, DO_TRACE, XrdSys::IOEvents::Channel::errorEvents, XrdSys::IOEvents::CallBack::Event(), XrdSys::IOEvents::CallBack::Fatal(), IF_TRACE, XrdSysMutex::Lock(), XrdSysMutexHelper::Lock(), maxTime, Modify(), XrdSys::IOEvents::pollErr1, XrdSys::IOEvents::pollInit, XrdSys::IOEvents::pollWait, XrdSysSemaphore::Post(), XrdSys::IOEvents::CallBack::ReadTimeOut, XrdSys::IOEvents::CallBack::ReadyToRead, XrdSys::IOEvents::CallBack::ReadyToWrite, REVENTS, TmoAdd(), TmoDel(), TRACE_MOD, TRACING, XrdSysMutexHelper::UnLock(), XrdSys::IOEvents::CallBack::ValidEvents, WEVENTS, and XrdSys::IOEvents::CallBack::WriteTimeOut.

Referenced by CbkTMO(), and CbkXeq().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ CPP_ATOMIC_TYPE()

XrdSys::IOEvents::Poller::CPP_ATOMIC_TYPE ( bool )
protected

◆ Create()

XrdSys::IOEvents::Poller * XrdSys::IOEvents::Poller::Create ( int & eNum,
const char ** eTxt = 0,
int crOpts = 0 )
static

Definition at line 766 of file XrdSysIOEvents.cc.

769{
770 int fildes[2];
771 struct pollArg pArg;
772 pthread_t tid;
773
774// Create a pipe used to break the poll wait loop
775//
776 if (XrdSysFD_Pipe(fildes))
777 {eNum = errno;
778 if (eTxt) *eTxt = "creating poll pipe";
779 return 0;
780 }
781
782// Create an actual implementation of a poller
783//
784 if (!(pArg.pollP = newPoller(fildes, eNum, eTxt)))
785 {close(fildes[0]);
786 close(fildes[1]);
787 return 0;
788 }
789
790// Now start a thread to handle this poller object
791//
793 (void *)&pArg, XRDSYSTHREAD_BIND, "Poller")))
794 {if (eTxt) *eTxt = "creating poller thread"; return 0;}
795
796// Now wait for the thread to finish initializing before we allow use
797// Note that the bootstrap takes ownership of the semaphore and will delete it
798// once the thread positing the semaphore actually ends. This is to avoid
799// semaphore bugs present in certain (e.g. Linux) kernels.
800//
801 pArg.pollSync->Wait();
802
803// Check if all went well
804//
805 if (pArg.retCode)
806 {if (eTxt) *eTxt = (pArg.retMsg ? pArg.retMsg : "starting poller");
807 eNum = pArg.retCode;
808 delete pArg.pollP;
809 return 0;
810 }
811
812// Set creation options in the new poller
813//
814 if (crOpts & optTOM)
815 pArg.pollP->tmoMask = ~(CallBack::ReadTimeOut|CallBack::WriteTimeOut);
816
817// All done
818//
819 eNum = 0;
820 if (eTxt) *eTxt = "";
821 return pArg.pollP;
822}
#define close(a)
Definition XrdPosix.hh:48
#define XRDSYSTHREAD_BIND
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void * Start(void *parg)

References close, optTOM, XrdSys::IOEvents::pollArg::pollP, XrdSys::IOEvents::pollArg::pollSync, XrdSys::IOEvents::CallBack::ReadTimeOut, XrdSys::IOEvents::pollArg::retCode, XrdSys::IOEvents::pollArg::retMsg, XrdSysThread::Run(), XrdSys::IOEvents::BootStrap::Start(), tmoMask, XrdSysSemaphore::Wait(), XrdSys::IOEvents::CallBack::WriteTimeOut, and XRDSYSTHREAD_BIND.

Referenced by XrdCl::PollerBuiltIn::Start().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Exclude()

virtual void XrdSys::IOEvents::Poller::Exclude ( Channel * cP,
bool & isLocked,
bool dover = 1 )
protectedpure virtual

Remove a channel to the poll set. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implemented in XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollPoll, and XrdSys::IOEvents::PollPort.

References Channel.

Here is the call graph for this function:

◆ GetFault()

int XrdSys::IOEvents::Poller::GetFault ( Channel * cP)
inlineprotected

Definition at line 437 of file XrdSysIOEvents.hh.

437{return cP->chFault;}

References Channel.

Referenced by XrdSys::IOEvents::PollerErr1::Include(), and XrdSys::IOEvents::PollerErr1::Modify().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GetPollEnt()

int XrdSys::IOEvents::Poller::GetPollEnt ( Channel * cP)
inlineprotected

Definition at line 438 of file XrdSysIOEvents.hh.

438{return cP->pollEnt;}

References Channel.

Referenced by XrdSys::IOEvents::PollKQ::Exclude(), XrdSys::IOEvents::PollPoll::Exclude(), XrdSys::IOEvents::PollPoll::Include(), XrdSys::IOEvents::PollKQ::Modify(), and XrdSys::IOEvents::PollPoll::Modify().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ GetRequest()

int XrdSys::IOEvents::Poller::GetRequest ( )
protected

Definition at line 874 of file XrdSysIOEvents.cc.

875{
876 ssize_t rlen;
877 int rc;
878
879// See if we are to resume a read or start a fresh one
880//
881 if (!pipeBlen)
882 {pipeBuff = (char *)&reqBuff; pipeBlen = sizeof(reqBuff);}
883
884// Wait for the next request. Some OS's (like Linux) don't support non-blocking
885// pipes. So, we must front the read with a poll.
886//
887 do {rc = poll(&pipePoll, 1, 0);}
888 while(rc < 0 && (errno == EAGAIN || errno == EINTR));
889 if (rc < 1) return 0;
890
891// Now we can put up a read without a delay. Normally a full command will be
892// present. Under some heavy conditions, this may not be the case.
893//
894 do {rlen = read(reqFD, pipeBuff, pipeBlen);}
895 while(rlen < 0 && errno == EINTR);
896 if (rlen <= 0)
897 {std::cerr <<"Poll: "<<XrdSysE2T(errno)<<" reading from request pipe\n"<< std::flush;
898 return 0;
899 }
900
901// Check if all the data has arrived. If not all the data is present, defer
902// this request until more data arrives.
903//
904 if (!(pipeBlen -= rlen)) return 1;
905 pipeBuff += rlen;
906 return 0;
907}
#define read(a, b, c)
Definition XrdPosix.hh:82
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104

References pipeBlen, pipeBuff, pipePoll, read, reqBuff, reqFD, and XrdSysE2T().

Here is the call graph for this function:

◆ Include()

virtual bool XrdSys::IOEvents::Poller::Include ( Channel * cP,
int & eNum,
const char ** eTxt,
bool & isLocked )
protectedpure virtual

Add a channel to the poll set. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implemented in XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollPoll, and XrdSys::IOEvents::PollPort.

References Channel.

Referenced by Init().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Init()

bool XrdSys::IOEvents::Poller::Init ( Channel * cP,
int & eNum,
const char ** eTxt,
bool & isLockd )
protected

Definition at line 913 of file XrdSysIOEvents.cc.

915{
916// The channel must be locked upon entry!
917//
918 bool retval;
919
920
921// If we are already in progress then simply update the shadow events and
922// resuppress all current events.
923//
924 if (cP->chPoller == &pollWait)
925 {cP->reMod = cP->chEvents;
926 cP->chEvents = 0;
927 IF_TRACE(Init,cP->chFD,"defer events=" <<cP->reMod);
928 return true;
929 }
930
931// Trace this entry
932//
933 IF_TRACE(Init,cP->chFD,"begin events=" <<int(cP->chEvents));
934
935// If no events are enabled at this point, just return
936//
937 if (!(cP->chEvents)) return true;
938
939// Refuse to enable a channel without a callback function
940//
941 if (!(cP->chCB))
942 {eNum = EDESTADDRREQ;
943 if (eTxt) *eTxt = "enabling without a callback";
944 return false;
945 }
946
947// So, now we can include the channel in the poll set. We will include it
948// with no events enabled to prevent callbacks prior to completion here.
949//
950 cP->chPoller = &pollWait; cP->reMod = cP->chEvents; cP->chEvents = 0;
951 retval = cP->chPollXQ->Include(cP, eNum, eTxt, isLocked);
952 IF_TRACE(Init,cP->chFD,"Include() returned " <<BOOLNAME(retval) <<TRACE_LOK);
953 if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
954
955// Determine what future poller to use. If we can use the regular poller then
956// set the correct event mask for the channel. Note that we could have lost
957// control but the correct events will be reflected in the "reMod" member.
958//
959 if (!retval) {cP->chPoller = &pollErr1; cP->chFault = eNum;}
960 else {cP->chPoller = cP->chPollXQ;
961 cP->inPSet = 1;
962 if (cP->reMod)
963 {cP->chEvents = cP->reMod;
964 retval = cP->chPoller->Modify(cP, eNum, eTxt, isLocked);
965 TRACE_MOD(Init,cP->chFD,int(cP->reMod));
966 if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
967 } else {
968 TRACE_NOD(Init,cP->chFD,0);
969 }
970 }
971
972// All done
973//
974 cP->reMod = 0;
975 return retval;
976}
#define TRACE_LOK
#define TRACE_NOD(x, fd, y)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Init(Channel *cP, int &eNum, const char **eTxt, bool &isLockd)

References BOOLNAME, IF_TRACE, Include(), Init(), XrdSysMutex::Lock(), Modify(), XrdSys::IOEvents::pollErr1, XrdSys::IOEvents::pollWait, TRACE_LOK, TRACE_MOD, and TRACE_NOD.

Referenced by Init(), XrdSys::IOEvents::PollerInit::Modify(), and XrdSys::IOEvents::PollerWait::Modify().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ LockChannel()

void XrdSys::IOEvents::Poller::LockChannel ( Channel * cP)
inlineprotected

Definition at line 441 of file XrdSysIOEvents.hh.

441{cP->chMutex.Lock();}

References Channel, and XrdSysMutex::Lock().

Here is the call graph for this function:

◆ Modify()

virtual bool XrdSys::IOEvents::Poller::Modify ( Channel * cP,
int & eNum,
const char ** eTxt,
bool & isLocked )
protectedpure virtual

Modify the event status of a channel. An implementation must be supplied. The channel is locked when this method is called but must be unlocked by the method if a command is sent to the poller thread and isLocked set to false.

Implemented in XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollPoll, and XrdSys::IOEvents::PollPort.

References Channel.

Referenced by CbkXeq(), and Init().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Poll2Enum()

int XrdSys::IOEvents::Poller::Poll2Enum ( short events)
protected

Definition at line 982 of file XrdSysIOEvents.cc.

983{
984 if (events & POLLERR) return EPIPE;
985
986 if (events & POLLHUP) return ECONNRESET;
987
988 if (events & POLLNVAL) return EBADF;
989
990 return EOPNOTSUPP;
991}

◆ SendCmd()

int XrdSys::IOEvents::Poller::SendCmd ( PipeData & cmd)
protected

Definition at line 997 of file XrdSysIOEvents.cc.

998{
999 int wlen;
1000
1001// Pipe writes are atomic so we don't need locks. Some commands require
1002// confirmation. We handle that here based on the command. Note that pipes
1003// gaurantee that all of the data will be written or we will block.
1004//
1005 if (cmd.req >= PipeData::Post)
1006 {XrdSysSemaphore mySem(0);
1007 cmd.theSem = &mySem;
1008 do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
1009 while (wlen < 0 && errno == EINTR);
1010 if (wlen > 0) mySem.Wait();
1011 } else {
1012 do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
1013 while (wlen < 0 && errno == EINTR);
1014 }
1015
1016// All done
1017//
1018 return (wlen >= 0 ? 0 : errno);
1019}
#define write(a, b, c)
Definition XrdPosix.hh:115

References cmdFD, XrdSys::IOEvents::Poller::PipeData::Post, XrdSys::IOEvents::Poller::PipeData::req, XrdSys::IOEvents::Poller::PipeData::theSem, XrdSysSemaphore::Wait(), and write.

Referenced by XrdSys::IOEvents::PollE::Exclude(), XrdSys::IOEvents::PollKQ::Exclude(), XrdSys::IOEvents::PollPoll::Exclude(), XrdSys::IOEvents::PollPort::Exclude(), XrdSys::IOEvents::PollPoll::Include(), XrdSys::IOEvents::PollPoll::Modify(), and Stop().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ SetPollEnt()

void XrdSys::IOEvents::Poller::SetPollEnt ( Channel * cP,
int ptEnt )
protected

Definition at line 1025 of file XrdSysIOEvents.cc.

1026{
1027 cP->pollEnt = pe;
1028}

Referenced by XrdSys::IOEvents::PollKQ::Exclude(), XrdSys::IOEvents::PollPoll::Include(), and XrdSys::IOEvents::PollKQ::Modify().

Here is the caller graph for this function:

◆ Shutdown()

virtual void XrdSys::IOEvents::Poller::Shutdown ( )
protectedpure virtual

Shutdown the poller. An implementation must be supplied. The shutdown method must release any allocated storage and close private file descriptors. The polling thread will have already been terminated and x-thread pipe closed. Warning: the derived destructor must call Stop() and do nothing else!

Implemented in XrdSys::IOEvents::PollE, XrdSys::IOEvents::PollerErr1, XrdSys::IOEvents::PollerInit, XrdSys::IOEvents::PollerWait, XrdSys::IOEvents::PollKQ, XrdSys::IOEvents::PollPoll, and XrdSys::IOEvents::PollPort.

Referenced by Stop().

Here is the caller graph for this function:

◆ Stop()

void XrdSys::IOEvents::Poller::Stop ( )

Stop a poller object. Active callbacks are completed. Pending callbacks are discarded. After which the poller event thread exits. Subsequently, each associated channel is disabled and removed from the poller object. If the channel is enabled for a StopEvent, the stop callback is invoked. However, any attempt to use the channel methods that require an active poller will return an error.

Since a stopped poller cannot be restarted; the only thing left is to delete it. This also applies to all the associated channels since they no longer have an active poller.

Definition at line 1034 of file XrdSysIOEvents.cc.

1035{
1036 PipeData cmdbuff;
1037 CallBack *theCB;
1038 Channel *cP;
1039 void *cbArg;
1040 int doCB;
1041
1042// Initialize the pipdata structure
1043//
1044 memset(static_cast<void*>( &cmdbuff ), 0, sizeof(cmdbuff));
1045 cmdbuff.req = PipeData::Stop;
1046
1047// Lock all of this
1048//
1049 adMutex.Lock();
1050
1051// If we are already shutdown then we are done
1052//
1053 if (cmdFD == -1) {adMutex.UnLock(); return;}
1054
1055// First we must stop the poller thread in an orderly fashion.
1056//
1057 adMutex.UnLock();
1058 SendCmd(cmdbuff);
1059 adMutex.Lock();
1060
1061// Close the pipe communication mechanism
1062//
1063 close(cmdFD); cmdFD = -1;
1064 close(reqFD); reqFD = -1;
1065
1066// Run through cleaning up the channels. While there should not be any other
1067// operations happening on this poller, we take the conservative approach.
1068//
1069 while((cP = attBase))
1070 {REMOVE(attBase, attList, cP);
1071 adMutex.UnLock();
1072 cP->chMutex.Lock();
1073 doCB = cP->chCB != 0 && (cP->chEvents & Channel::stopEvent);
1074 if (cP->inTOQ) TmoDel(cP);
1075 cP->Reset(&pollErr1, cP->chFD, EIDRM);
1076 cP->chPollXQ = &pollErr1;
1077 if (doCB)
1078 {cP->chStat = Channel::isClear;
1079 theCB = cP->chCB; cbArg = cP->chCBA;
1080 cP->chMutex.UnLock();
1081 theCB->Stop(cP, cbArg);
1082 } else cP->chMutex.UnLock();
1083 adMutex.Lock();
1084 }
1085
1086// Now invoke the poller specific shutdown
1087//
1088 Shutdown();
1089 adMutex.UnLock();
1090}
#define REMOVE(dlbase, dlvar, curitem)
@ stopEvent
Poller stop event.
int SendCmd(PipeData &cmd)
virtual void Shutdown()=0

References attBase, Channel, close, cmdFD, XrdSysMutex::Lock(), XrdSys::IOEvents::pollErr1, REMOVE, XrdSys::IOEvents::Poller::PipeData::req, reqFD, SendCmd(), Shutdown(), XrdSys::IOEvents::CallBack::Stop(), XrdSys::IOEvents::Poller::PipeData::Stop, XrdSys::IOEvents::Channel::stopEvent, TmoDel(), and XrdSysMutex::UnLock().

Referenced by XrdSys::IOEvents::PollE::~PollE(), XrdSys::IOEvents::PollKQ::~PollKQ(), XrdSys::IOEvents::PollPoll::~PollPoll(), XrdSys::IOEvents::PollPort::~PollPort(), and XrdCl::PollerBuiltIn::Stop().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ TmoAdd()

bool XrdSys::IOEvents::Poller::TmoAdd ( Channel * cP,
int tmoSet )
protected

Definition at line 1096 of file XrdSysIOEvents.cc.

1097{
1098 XrdSysMutexHelper mHelper(toMutex);
1099 time_t tNow;
1100 Channel *ncP;
1101 bool setRTO, setWTO;
1102
1103// Do some tracing
1104//
1105 IF_TRACE(TmoAdd,cP->chFD,"chan="<< std::hex<<(void*)cP<< std::dec
1106 <<" inTOQ="<<BOOLNAME(cP->inTOQ)<<" status="<<STATUSOF(cP));
1107
1108// Remove element from timeout queue if it is there
1109//
1110 if (cP->inTOQ)
1111 {REMOVE(tmoBase, tmoList, cP);
1112 cP->inTOQ = 0;
1113 }
1114
1115// Determine which timeouts need to be reset
1116//
1117 tmoSet|= cP->dlType >> 4;
1120
1121// Reset the required deadlines
1122//
1123 tNow = time(0);
1124 if (setRTO && REVENTS(cP->chEvents) && cP->chRTO)
1125 cP->rdDL = cP->chRTO + tNow;
1126 if (setWTO && WEVENTS(cP->chEvents) && cP->chWTO)
1127 cP->wrDL = cP->chWTO + tNow;
1128
1129// Calculate the closest enabled deadline
1130//
1131 if (cP->rdDL < cP->wrDL)
1132 {cP->deadLine = cP->rdDL; cP->dlType = CallBack:: ReadTimeOut;
1133 } else {
1134 cP->deadLine = cP->wrDL; cP->dlType = CallBack::WriteTimeOut;
1135 if (cP->rdDL == cP->wrDL) cP->dlType |= CallBack:: ReadTimeOut;
1136 }
1137 IF_TRACE(TmoAdd, cP->chFD, "t=" <<tNow <<" rdDL=" <<setRTO <<' ' <<cP->rdDL
1138 <<" wrDL=" <<setWTO <<' ' <<cP->wrDL);
1139
1140// If no timeout really applies, we are done
1141//
1142 if (cP->deadLine == maxTime) return false;
1143
1144// Add the channel to the timeout queue in correct deadline position.
1145//
1146 if ((ncP = tmoBase))
1147 {do {if (cP->deadLine < ncP->deadLine) break;
1148 ncP = ncP->tmoList.next;
1149 } while(ncP != tmoBase);
1150 INSERT(tmoList, ncP, cP);
1151 if (cP->deadLine < tmoBase->deadLine) tmoBase = cP;
1152 } else tmoBase = cP;
1153 cP->inTOQ = 1;
1154
1155// Indicate to the caller whether or not a wakeup is required
1156//
1157 return (tmoBase == cP);
1158}
#define INSERT(dlvar, curitem, newitem)
#define STATUSOF(x)

References BOOLNAME, Channel, IF_TRACE, INSERT, maxTime, XrdSys::IOEvents::CallBack::ReadTimeOut, XrdSys::IOEvents::CallBack::ReadyToRead, XrdSys::IOEvents::CallBack::ReadyToWrite, REMOVE, REVENTS, STATUSOF, TmoAdd(), tmoBase, tmoMask, WEVENTS, and XrdSys::IOEvents::CallBack::WriteTimeOut.

Referenced by CbkXeq(), and TmoAdd().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ TmoDel()

void XrdSys::IOEvents::Poller::TmoDel ( Channel * cP)
protected

Definition at line 1164 of file XrdSysIOEvents.cc.

1165{
1166
1167// Do some tracing
1168//
1169 IF_TRACE(TmoDel,cP->chFD,"chan="<< std::hex<<(void*)cP<< std::dec
1170 <<" inTOQ="<<BOOLNAME(cP->inTOQ)<<" status="<<STATUSOF(cP));
1171
1172// Get the timeout queue lock and remove the channel from the queue
1173//
1174 toMutex.Lock();
1175 REMOVE(tmoBase, tmoList, cP);
1176 cP->inTOQ = 0;
1177 toMutex.UnLock();
1178}

References BOOLNAME, IF_TRACE, REMOVE, STATUSOF, tmoBase, and TmoDel().

Referenced by CbkXeq(), Stop(), and TmoDel().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ TmoGet()

int XrdSys::IOEvents::Poller::TmoGet ( )
protected

Definition at line 1184 of file XrdSysIOEvents.cc.

1185{
1186 int wtval;
1187
1188// Lock the timeout queue
1189//
1190 toMutex.Lock();
1191
1192// Calculate wait time. If the deadline passed, invoke the timeout callback.
1193// we will need to drop the timeout lock as we don't have the channel lock.
1194//
1195 do {if (!tmoBase) {wtval = -1; break;}
1196 wtval = (tmoBase->deadLine - time(0)) * 1000;
1197 if (wtval > 0) break;
1198 toMutex.UnLock();
1199 CbkTMO();
1200 toMutex.Lock();
1201 } while(1);
1202
1203// Return the value
1204//
1205 CPP_ATOMIC_STORE(wakePend, false, std::memory_order_release);
1206 toMutex.UnLock();
1207 return wtval;
1208}
#define CPP_ATOMIC_STORE(x, val, order)

References CbkTMO(), CPP_ATOMIC_STORE, and tmoBase.

Referenced by XrdSys::IOEvents::PollE::Begin(), XrdSys::IOEvents::PollKQ::Begin(), XrdSys::IOEvents::PollPoll::Begin(), and XrdSys::IOEvents::PollPort::BegTO().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ UnLockChannel()

void XrdSys::IOEvents::Poller::UnLockChannel ( Channel * cP)
inlineprotected

Definition at line 448 of file XrdSysIOEvents.hh.

448{cP->chMutex.UnLock();}

References Channel, and XrdSysMutex::UnLock().

Referenced by XrdSys::IOEvents::PollE::Exclude(), XrdSys::IOEvents::PollKQ::Exclude(), XrdSys::IOEvents::PollPoll::Exclude(), XrdSys::IOEvents::PollPort::Exclude(), XrdSys::IOEvents::PollPoll::Include(), and XrdSys::IOEvents::PollPoll::Modify().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ BootStrap

friend class BootStrap
friend

Definition at line 373 of file XrdSysIOEvents.hh.

References BootStrap.

Referenced by BootStrap.

◆ Channel

Member Data Documentation

◆ attBase

Channel* XrdSys::IOEvents::Poller::attBase
protected

Definition at line 488 of file XrdSysIOEvents.hh.

Referenced by Poller(), and Stop().

◆ chDead

bool XrdSys::IOEvents::Poller::chDead
protected

Definition at line 511 of file XrdSysIOEvents.hh.

Referenced by CbkXeq(), and XrdSys::IOEvents::Channel::Delete().

◆ cmdFD

int XrdSys::IOEvents::Poller::cmdFD
protected

Definition at line 494 of file XrdSysIOEvents.hh.

Referenced by Poller(), SendCmd(), and Stop().

◆ maxTime

time_t XrdSys::IOEvents::Poller::maxTime = (sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff)
staticprotected

Definition at line 513 of file XrdSysIOEvents.hh.

Referenced by CbkXeq(), XrdSys::IOEvents::Channel::Enable(), and TmoAdd().

◆ parentPID

pid_t XrdSys::IOEvents::Poller::parentPID = getpid()
staticprotected

◆ pipeBlen

int XrdSys::IOEvents::Poller::pipeBlen
protected

Definition at line 508 of file XrdSysIOEvents.hh.

Referenced by Poller(), and GetRequest().

◆ pipeBuff

char* XrdSys::IOEvents::Poller::pipeBuff
protected

Definition at line 507 of file XrdSysIOEvents.hh.

Referenced by Poller(), and GetRequest().

◆ pipePoll

struct pollfd XrdSys::IOEvents::Poller::pipePoll
protected

Definition at line 493 of file XrdSysIOEvents.hh.

Referenced by Poller(), and GetRequest().

◆ pollTid

pthread_t XrdSys::IOEvents::Poller::pollTid
protected

◆ reqBuff

PipeData XrdSys::IOEvents::Poller::reqBuff
protected

Definition at line 506 of file XrdSysIOEvents.hh.

Referenced by GetRequest().

◆ reqFD

int XrdSys::IOEvents::Poller::reqFD
protected

Definition at line 495 of file XrdSysIOEvents.hh.

Referenced by Poller(), XrdSys::IOEvents::PollKQ::PollKQ(), GetRequest(), and Stop().

◆ tmoBase

Channel* XrdSys::IOEvents::Poller::tmoBase
protected

Definition at line 489 of file XrdSysIOEvents.hh.

Referenced by Poller(), CbkTMO(), TmoAdd(), TmoDel(), and TmoGet().

◆ tmoMask

unsigned char XrdSys::IOEvents::Poller::tmoMask
protected

Definition at line 509 of file XrdSysIOEvents.hh.

Referenced by Poller(), Create(), and TmoAdd().


The documentation for this class was generated from the following files: