You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

220 lines
7.4 KiB

  1. #include <unistd.h>
  2. #include <sys/un.h>
  3. #include "config.h"
  4. #include "fstrm_logger.hh"
  5. #ifdef RECURSOR
  6. #include "logger.hh"
  7. #else
  8. #include "dolog.hh"
  9. #endif
  10. #define DNSTAP_CONTENT_TYPE "protobuf:dnstap.Dnstap"
  11. #ifdef HAVE_FSTRM
  12. FrameStreamLogger::FrameStreamLogger(const int family, const std::string& address, bool connect,
  13. const std::unordered_map<string,unsigned>& options): d_family(family), d_address(address)
  14. {
  15. fstrm_res res;
  16. try {
  17. d_fwopt = fstrm_writer_options_init();
  18. if (!d_fwopt) {
  19. throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_init failed.");
  20. }
  21. res = fstrm_writer_options_add_content_type(d_fwopt, DNSTAP_CONTENT_TYPE, sizeof(DNSTAP_CONTENT_TYPE) - 1);
  22. if (res != fstrm_res_success) {
  23. throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_add_content_type failed: " + std::to_string(res));
  24. }
  25. if (d_family == AF_UNIX) {
  26. struct sockaddr_un local;
  27. if (makeUNsockaddr(d_address, &local)) {
  28. throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address + "', it is not a valid UNIX socket path.");
  29. }
  30. d_uwopt = fstrm_unix_writer_options_init();
  31. if (!d_uwopt) {
  32. throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_options_init failed.");
  33. }
  34. // void return, no error checking.
  35. fstrm_unix_writer_options_set_socket_path(d_uwopt, d_address.c_str());
  36. d_writer = fstrm_unix_writer_init(d_uwopt, d_fwopt);
  37. if (!d_writer) {
  38. throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_init() failed.");
  39. }
  40. #ifdef HAVE_FSTRM_TCP_WRITER_INIT
  41. } else if (family == AF_INET) {
  42. d_twopt = fstrm_tcp_writer_options_init();
  43. if (!d_twopt) {
  44. throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_options_init failed.");
  45. }
  46. try {
  47. ComboAddress ca(d_address);
  48. // void return, no error checking.
  49. fstrm_tcp_writer_options_set_socket_address(d_twopt, ca.toString().c_str());
  50. fstrm_tcp_writer_options_set_socket_port(d_twopt, std::to_string(ca.getPort()).c_str());
  51. } catch (PDNSException &e) {
  52. throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address + "': " + e.reason);
  53. }
  54. d_writer = fstrm_tcp_writer_init(d_twopt, d_fwopt);
  55. if (!d_writer) {
  56. throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_init() failed.");
  57. }
  58. #endif
  59. } else {
  60. throw std::runtime_error("FrameStreamLogger: family " + std::to_string(family) + " not supported");
  61. }
  62. d_iothropt = fstrm_iothr_options_init();
  63. if (!d_iothropt) {
  64. throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_init() failed.");
  65. }
  66. res = fstrm_iothr_options_set_queue_model(d_iothropt, FSTRM_IOTHR_QUEUE_MODEL_MPSC);
  67. if (res != fstrm_res_success) {
  68. throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_model failed: " + std::to_string(res));
  69. }
  70. if (options.find("bufferHint") != options.end() && options.at("bufferHint")) {
  71. res = fstrm_iothr_options_set_buffer_hint(d_iothropt, options.at("bufferHint"));
  72. if (res != fstrm_res_success) {
  73. throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_buffer_hint failed: " + std::to_string(res));
  74. }
  75. }
  76. if (options.find("flushTimeout") != options.end() && options.at("flushTimeout")) {
  77. res = fstrm_iothr_options_set_flush_timeout(d_iothropt, options.at("flushTimeout"));
  78. if (res != fstrm_res_success) {
  79. throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_flush_timeout failed: " + std::to_string(res));
  80. }
  81. }
  82. if (options.find("inputQueueSize") != options.end() && options.at("inputQueueSize")) {
  83. res = fstrm_iothr_options_set_input_queue_size(d_iothropt, options.at("inputQueueSize"));
  84. if (res != fstrm_res_success) {
  85. throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_input_queue_size failed: " + std::to_string(res));
  86. }
  87. }
  88. if (options.find("outputQueueSize") != options.end() && options.at("outputQueueSize")) {
  89. res = fstrm_iothr_options_set_output_queue_size(d_iothropt, options.at("outputQueueSize"));
  90. if (res != fstrm_res_success) {
  91. throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_output_queue_size failed: " + std::to_string(res));
  92. }
  93. }
  94. if (options.find("queueNotifyThreshold") != options.end() && options.at("queueNotifyThreshold")) {
  95. res = fstrm_iothr_options_set_queue_notify_threshold(d_iothropt, options.at("queueNotifyThreshold"));
  96. if (res != fstrm_res_success) {
  97. throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_notify_threshold failed: " + std::to_string(res));
  98. }
  99. }
  100. if (options.find("setReopenInterval") != options.end() && options.at("setReopenInterval")) {
  101. res = fstrm_iothr_options_set_reopen_interval(d_iothropt, options.at("setReopenInterval"));
  102. if (res != fstrm_res_success) {
  103. throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_reopen_interval failed: " + std::to_string(res));
  104. }
  105. }
  106. if (connect) {
  107. d_iothr = fstrm_iothr_init(d_iothropt, &d_writer);
  108. if (!d_iothr) {
  109. throw std::runtime_error("FrameStreamLogger: fstrm_iothr_init() failed.");
  110. }
  111. d_ioqueue = fstrm_iothr_get_input_queue(d_iothr);
  112. if (!d_ioqueue) {
  113. throw std::runtime_error("FrameStreamLogger: fstrm_iothr_get_input_queue() failed.");
  114. }
  115. }
  116. } catch (std::runtime_error &e) {
  117. this->cleanup();
  118. throw;
  119. }
  120. }
  121. void FrameStreamLogger::cleanup()
  122. {
  123. if (d_iothr != nullptr) {
  124. fstrm_iothr_destroy(&d_iothr);
  125. d_iothr = nullptr;
  126. }
  127. if (d_iothropt != nullptr) {
  128. fstrm_iothr_options_destroy(&d_iothropt);
  129. d_iothropt = nullptr;
  130. }
  131. if (d_writer != nullptr) {
  132. fstrm_writer_destroy(&d_writer);
  133. d_writer = nullptr;
  134. }
  135. if (d_uwopt != nullptr) {
  136. fstrm_unix_writer_options_destroy(&d_uwopt);
  137. d_uwopt = nullptr;
  138. }
  139. #ifdef HAVE_FSTRM_TCP_WRITER_INIT
  140. if (d_twopt != nullptr) {
  141. fstrm_tcp_writer_options_destroy(&d_twopt);
  142. d_twopt = nullptr;
  143. }
  144. #endif
  145. if (d_fwopt != nullptr) {
  146. fstrm_writer_options_destroy(&d_fwopt);
  147. d_fwopt = nullptr;
  148. }
  149. }
  150. FrameStreamLogger::~FrameStreamLogger()
  151. {
  152. this->cleanup();
  153. }
  154. void FrameStreamLogger::queueData(const std::string& data)
  155. {
  156. if (!d_ioqueue || !d_iothr) {
  157. return;
  158. }
  159. uint8_t *frame = (uint8_t*)malloc(data.length());
  160. if (!frame) {
  161. #ifdef RECURSOR
  162. g_log<<Logger::Warning<<"FrameStreamLogger: cannot allocate memory for stream."<<std::endl;
  163. #else
  164. warnlog("FrameStreamLogger: cannot allocate memory for stream.");
  165. #endif
  166. return;
  167. }
  168. memcpy(frame, data.c_str(), data.length());
  169. fstrm_res res;
  170. res = fstrm_iothr_submit(d_iothr, d_ioqueue, frame, data.length(), fstrm_free_wrapper, nullptr);
  171. if (res == fstrm_res_success) {
  172. // Frame successfully queued.
  173. ++d_framesSent;
  174. } else if (res == fstrm_res_again) {
  175. free(frame);
  176. #ifdef RECURSOR
  177. g_log<<Logger::Debug<<"FrameStreamLogger: queue full, dropping."<<std::endl;
  178. #else
  179. vinfolog("FrameStreamLogger: queue full, dropping.");
  180. #endif
  181. ++d_queueFullDrops;
  182. } else {
  183. // Permanent failure.
  184. free(frame);
  185. #ifdef RECURSOR
  186. g_log<<Logger::Warning<<"FrameStreamLogger: submitting to queue failed."<<std::endl;
  187. #else
  188. warnlog("FrameStreamLogger: submitting to queue failed.");
  189. #endif
  190. ++d_permanentFailures;
  191. }
  192. }
  193. #endif /* HAVE_FSTRM */