You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

183 lines
4.1 KiB

  1. #ifdef HAVE_CONFIG_H
  2. #include "config.h"
  3. #endif
  4. #include "mplexer.hh"
  5. #include "sstuff.hh"
  6. #include <iostream>
  7. #include <poll.h>
  8. #include "misc.hh"
  9. #include "namespaces.hh"
  10. FDMultiplexer* FDMultiplexer::getMultiplexerSilent()
  11. {
  12. FDMultiplexer* ret = nullptr;
  13. for(const auto& i : FDMultiplexer::getMultiplexerMap()) {
  14. try {
  15. ret = i.second();
  16. return ret;
  17. }
  18. catch(const FDMultiplexerException& fe) {
  19. }
  20. catch(...) {
  21. }
  22. }
  23. return ret;
  24. }
  25. class PollFDMultiplexer : public FDMultiplexer
  26. {
  27. public:
  28. PollFDMultiplexer()
  29. {}
  30. virtual ~PollFDMultiplexer()
  31. {
  32. }
  33. virtual int run(struct timeval* tv, int timeout=500) override;
  34. virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
  35. virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr) override;
  36. virtual void removeFD(callbackmap_t& cbmap, int fd) override;
  37. string getName() const override
  38. {
  39. return "poll";
  40. }
  41. private:
  42. vector<struct pollfd> preparePollFD() const;
  43. };
  44. static FDMultiplexer* make()
  45. {
  46. return new PollFDMultiplexer();
  47. }
  48. static struct RegisterOurselves
  49. {
  50. RegisterOurselves() {
  51. FDMultiplexer::getMultiplexerMap().insert(make_pair(1, &make));
  52. }
  53. } doIt;
  54. void PollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter, const struct timeval* ttd)
  55. {
  56. accountingAddFD(cbmap, fd, toDo, parameter, ttd);
  57. }
  58. void PollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd)
  59. {
  60. if(d_inrun && d_iter->d_fd==fd) // trying to remove us!
  61. ++d_iter;
  62. if(!cbmap.erase(fd))
  63. throw FDMultiplexerException("Tried to remove unlisted fd "+std::to_string(fd)+ " from multiplexer");
  64. }
  65. vector<struct pollfd> PollFDMultiplexer::preparePollFD() const
  66. {
  67. vector<struct pollfd> pollfds;
  68. pollfds.reserve(d_readCallbacks.size() + d_writeCallbacks.size());
  69. struct pollfd pollfd;
  70. for(const auto& cb : d_readCallbacks) {
  71. pollfd.fd = cb.d_fd;
  72. pollfd.events = POLLIN;
  73. pollfds.push_back(pollfd);
  74. }
  75. for(const auto& cb : d_writeCallbacks) {
  76. pollfd.fd = cb.d_fd;
  77. pollfd.events = POLLOUT;
  78. pollfds.push_back(pollfd);
  79. }
  80. return pollfds;
  81. }
  82. void PollFDMultiplexer::getAvailableFDs(std::vector<int>& fds, int timeout)
  83. {
  84. auto pollfds = preparePollFD();
  85. int ret = poll(&pollfds[0], pollfds.size(), timeout);
  86. if (ret < 0 && errno != EINTR)
  87. throw FDMultiplexerException("poll returned error: " + stringerror());
  88. for(const auto& pollfd : pollfds) {
  89. if (pollfd.revents & POLLIN || pollfd.revents & POLLOUT) {
  90. fds.push_back(pollfd.fd);
  91. }
  92. }
  93. }
  94. int PollFDMultiplexer::run(struct timeval* now, int timeout)
  95. {
  96. if(d_inrun) {
  97. throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n");
  98. }
  99. auto pollfds = preparePollFD();
  100. int ret=poll(&pollfds[0], pollfds.size(), timeout);
  101. gettimeofday(now, 0); // MANDATORY!
  102. if(ret < 0 && errno!=EINTR)
  103. throw FDMultiplexerException("poll returned error: "+stringerror());
  104. d_iter=d_readCallbacks.end();
  105. d_inrun=true;
  106. for(const auto& pollfd : pollfds) {
  107. if(pollfd.revents & POLLIN) {
  108. d_iter=d_readCallbacks.find(pollfd.fd);
  109. if(d_iter != d_readCallbacks.end()) {
  110. d_iter->d_callback(d_iter->d_fd, d_iter->d_parameter);
  111. continue; // so we don't refind ourselves as writable!
  112. }
  113. }
  114. else if(pollfd.revents & POLLOUT) {
  115. d_iter=d_writeCallbacks.find(pollfd.fd);
  116. if(d_iter != d_writeCallbacks.end()) {
  117. d_iter->d_callback(d_iter->d_fd, d_iter->d_parameter);
  118. }
  119. }
  120. }
  121. d_inrun=false;
  122. return ret;
  123. }
  124. #if 0
  125. void acceptData(int fd, boost::any& parameter)
  126. {
  127. cout<<"Have data on fd "<<fd<<endl;
  128. Socket* sock=boost::any_cast<Socket*>(parameter);
  129. string packet;
  130. IPEndpoint rem;
  131. sock->recvFrom(packet, rem);
  132. cout<<"Received "<<packet.size()<<" bytes!\n";
  133. }
  134. int main()
  135. {
  136. Socket s(AF_INET, SOCK_DGRAM);
  137. IPEndpoint loc("0.0.0.0", 2000);
  138. s.bind(loc);
  139. PollFDMultiplexer sfm;
  140. sfm.addReadFD(s.getHandle(), &acceptData, &s);
  141. for(int n=0; n < 100 ; ++n) {
  142. sfm.run();
  143. }
  144. sfm.removeReadFD(s.getHandle());
  145. sfm.removeReadFD(s.getHandle());
  146. }
  147. #endif