47 throw std::invalid_argument(
"Cannot add null socket to poller");
50 if (has_socket(socket.handle())) {
51 throw std::invalid_argument(
"Socket already exists in poller");
54 _poll_items.push_back({socket.handle(), 0, ZMQ_POLLIN, 0});
58 auto handle = socket.handle();
59 _poll_items.erase(std::remove_if(_poll_items.begin(), _poll_items.end(),
60 [handle](zmq::pollitem_t
const& item) { return item.socket == handle; }),
67 return zmq::socket_ref{};
71 auto const n_items = zmq::poll(_poll_items, timeout);
77 return zmq::socket_ref{};
80 for (std::size_t i = 0; i < _poll_items.size(); ++i) {
81 if (_poll_items[i].revents == ZMQ_POLLIN) {
82 return zmq::socket_ref{zmq::from_handle, _poll_items[i].socket};
86 }
catch (zmq::error_t
const& e) {
87 auto const error = e.num();
92 }
else if (error == ETERM) {
98 return zmq::socket_ref{};
102 std::vector<zmq::socket_ref> result{};
109 auto const n_items = zmq::poll(_poll_items, timeout);
118 result.reserve(n_items);
119 for (std::size_t i = 0; i < _poll_items.size(); ++i) {
120 if (_poll_items[i].revents == ZMQ_POLLIN) {
121 result.emplace_back(zmq::socket_ref{zmq::from_handle, _poll_items[i].socket});
125 }
catch (zmq::error_t
const& e) {
126 auto const error = e.num();
127 if (error == EINTR) {
131 }
else if (error == ETERM) {
140bool poller_t::has_socket(
void* socket_handle)
const {
141 return std::any_of(_poll_items.begin(), _poll_items.end(),
142 [socket_handle](
const zmq::pollitem_t& item) { return item.socket == socket_handle; });
void add(zmq::socket_ref socket)
Add a socket to the polling set.
zmq::socket_ref wait(std::chrono::milliseconds timeout=std::chrono::milliseconds{-1})
Wait for any socket to become ready for receiving.
bool is_interruptible() const noexcept
Check if polling is interruptible.
std::vector< zmq::socket_ref > wait_all(std::chrono::milliseconds timeout=std::chrono::milliseconds{-1})
Wait for at least one socket to become ready for receiving and return all ready.
void remove(zmq::socket_ref socket)
Remove a socket from the polling set.
Signal interrupt handling for graceful application shutdown.
CZZE_EXPORT bool is_interrupted() noexcept
Check if a program interrupt was received.
Event polling for monitoring multiple ZMQ sockets.