CppZmqZoltanExt is a extension library for cppzmq, the modern C++ binding for ZeroMQ (ZMQ). It provides high-level abstractions and utilities for building robust, concurrent, and event-driven applications with ZeroMQ.
CppZmqZoltanExt aims to offer a subset of the well-stablished features found in czmq, but in a modern C++ interface build on top of cppzmq, simplifying common patterns in ZeroMQ-based applications, such as interrupt handling, socket polling, event loops, and the actor model for concurrent programming.
The design is mostly based on the wonderful work done in czmq and its concepts, although the implementation is entirely new and idiomatic C++. It is also important to mention that the actor model implementation uses some concepts from the zmqpp, another C++ binding for ZeroMQ.
Note
This library is a side project in beta version currently under active development and may undergo significant changes. While it is functional for many use cases, users should be aware that APIs and features may evolve as the library matures. Contributions and feedback are welcome to help shape its future direction.
Features Overview
Interrupt Handling
The interrupt handling module provides signal management for SIGINT (Ctrl+C) and SIGTERM termination signals. It establishes a mechanism for detecting and responding to interrupt signals, enabling clean application shutdown without abrupt termination.
- Signal Handler Installation: Register signal handlers for SIGINT and SIGTERM
- Atomic State Tracking: Interrupt state is tracked via an atomic flag
- Integration: Works seamlessly with the Poller and Event Loop for responsive shutdown behavior
- Thread-Safe Monitoring: Allows applications to safely detect interruption requests from multiple threads
Poller
The Poller provides efficient monitoring of multiple ZeroMQ sockets simultaneously. It wraps ZMQ's native polling mechanism with an intuitive C++ API, allowing your application to react to socket events without busy-waiting or managing complex threading logic.
- Multi-Socket Monitoring: Add and remove sockets dynamically for event monitoring
- Flexible Waiting: Wait for a single socket to become ready or check all registered sockets
- Configurable Timeouts: Control how long the poller waits for socket events
- Interrupt Awareness: Automatically checks for interrupt signals during polling operations
- Termination Detection: Detect when the application should shut down
Event Loop
The Event Loop combines socket polling with timer management to create a complete reactive event-driven architecture. It monitors registered sockets for read readiness and executes scheduled timers, enabling event-driven applications that respond to both socket messages and time-based events.
- Socket Event Handling: Register callbacks that fire when sockets become ready for receiving
- Timer Management: Schedule one-shot and recurring timer events with flexible callback handlers
- Event-Driven Architecture: Process both socket read and timer events in a single unified loop
- Graceful Shutdown: Integrate with interrupt handling for clean application termination
- Cross-Platform Support: Configurable interrupt checking intervals for reliable behavior on all platforms
Actor Pattern
The actor pattern provides a powerful abstraction for concurrent programming, enabling isolated execution units that communicate exclusively through message passing. Each actor runs in its own thread with its own socket pair, eliminating shared memory and making concurrent programs easier to reason about and maintain.
- Isolated Execution: Each actor runs independently in its own thread
- Message-Based Communication: Parent and child threads communicate via ZeroMQ PAIR sockets
- Synchronized Initialization: The parent thread blocks until the actor confirms successful initialization
- Exception Propagation: Exceptions during actor initialization are safely propagated to the parent thread
- Graceful Termination: Coordinated shutdown protocol ensures clean resource cleanup
- Memory Safety: Minimal shared state between threads reduces concurrency bugs
ZPL Configuration
The ZPL Configuration module parses ZPL (ZeroMQ Property Language) files into a navigable tree of properties, offering a simple read-only API for configuration access and traversal.
- Stream and File Parsing: Load configuration data from input streams or file paths
- Hierarchical Navigation: Access nested sections and properties via relative paths
- Ordered Children: Preserve and iterate over properties in source order
- Flexible Lookup: Throwing and non-throwing retrieval methods for safe access
Examples
Actor/Event Loop Example
This example demonstrates a complete application using the core features of CppZmqZoltanExt:
- Interrupt handling for graceful shutdown
- Actor pattern for concurrent processing
- Event loop with socket and timer event handling
#include <chrono>
#include <functional>
#include <iostream>
#include <string>
#include <zmq.hpp>
using namespace zmqzext;
bool actor_socket_handler(
loop_t& loop, zmq::socket_ref socket) {
try {
zmq::message_t msg;
auto result = recv_retry_on_eintr(socket, msg, zmq::recv_flags::dontwait);
if (!result) {
return true;
}
auto signal = signal_t::check_signal(msg);
if (signal && signal->is_stop()) {
return false;
}
std::cout << "[Actor] Received: " << msg.to_string_view() << std::endl;
} catch (...) {
}
return true;
}
bool actor_runner(zmq::socket_t& socket) {
zmq::context_t actor_context;
std::cout << "[Actor] Started" << std::endl;
loop.
add(socket, actor_socket_handler);
try {
} catch (...) {
}
std::cout << "[Actor] Finished" << std::endl;
return false;
}
bool parent_api_socket_handler(
loop_t& loop, zmq::socket_ref socket, zmq::socket_ref actor_socket) {
try {
zmq::message_t msg;
auto result = socket.recv(msg, zmq::recv_flags::dontwait);
if (!result) {
return false;
}
std::cout << "[Main] Received request. Delivering it to actor: " << msg.to_string_view() << std::endl;
auto send_result = actor_socket.send(msg, zmq::send_flags::none);
if (!send_result) {
return false;
}
auto reply_msg = zmq::message_t{std::string{"Ok"}};
send_result = socket.send(reply_msg, zmq::send_flags::none);
if (!send_result) {
return false;
}
} catch (...) {
return false;
}
return true;
}
bool parent_actors_socket_handler(
loop_t& loop, zmq::socket_ref socket) {
try {
zmq::message_t msg;
auto result = socket.recv(msg, zmq::recv_flags::dontwait);
if (!result) {
return false;
}
std::cout << "[Main] Received from actor: " << msg.to_string_view() << std::endl;
} catch (...) {
return false;
}
return true;
}
int main() {
std::cout << "[Main] Starting application" << std::endl;
try {
zmq::context_t context;
std::cout << "[Main] Creating and starting actor" << std::endl;
actor.start(actor_runner);
zmq::socket_t rep_socket(context, zmq::socket_type::rep);
rep_socket.bind("tcp://127.0.0.1:5555");
loop.
add(actor.socket(), parent_actors_socket_handler);
loop.
add(rep_socket, std::bind(parent_api_socket_handler, std::placeholders::_1, std::placeholders::_2,
zmq::socket_ref(actor.socket())));
loop.
add_timer(std::chrono::milliseconds(2000),
0,
std::cout << "[Main] Timer event - application is running" << std::endl;
return true;
});
std::cout << "[Main] Running loop" << std::endl;
loop.
run(
true, std::chrono::milliseconds{500});
std::cout << "[Main] Loop finished" << std::endl;
std::cout << "[Main] Stopping actor" << std::endl;
} catch (...) {
return 1;
}
std::cout << "[Main] Actor stopped" << std::endl;
std::cout << "[Main] Application finished" << std::endl;
return 0;
}
Actor pattern implementation using ZeroMQ PAIR sockets.
Class that implements the Actor pattern using ZMQ PAIR sockets.
Event loop for managing socket and timer events.
void run(bool interruptible=true, std::chrono::milliseconds interruptCheckInterval=std::chrono::milliseconds{-1})
Run the event loop.
timer_id_t add_timer(std::chrono::milliseconds timeout, std::size_t occurences, fn_timer_handler_t fn)
Register a timer with an expiration handler.
void add(zmq::socket_ref socket, fn_socket_handler_t fn)
Register a socket with an I/O handler.
Helper utilities for robust ZMQ message sending and receiving.
CZZE_EXPORT zmq::send_result_t send_retry_on_eintr(T &socket, zmq::const_buffer const &buf, zmq::send_flags flags=zmq::send_flags::none)
Send a message buffer with automatic retry on EINTR.
Signal interrupt handling for graceful application shutdown.
CZZE_EXPORT void install_interrupt_handler() noexcept
Install signal handlers for SIGINT and SIGTERM.
Event loop for managing sockets and timers.
std::size_t timer_id_t
Unique identifier for timer instances.
Signal definitions and utilities for actor inter-thread communication.
ZPL Configuration Example
This example shows how to parse a ZPL configuration and navigate the resulting tree.
#include <cassert>
#include <iostream>
#include <sstream>
using namespace zmqzext;
int main() {
const char* zpl_text = R"ZPL(
# This is a sample ZPL configuration for a telemetry gateway
app
name = "ZPL's Example Telemetry Gateway"
environment = production # values can be 'production', 'staging', 'development'
logging
# Indented comment
level = info
outputs/0 = stdout
outputs/1 = file
file
path = /var/log/telemetry-gw.log
rotate
size_mb = 50
keep = 5
network
http
host = 0.0.0.0
port = 8080
grpc
host = 127.0.0.1
port = 50051
rules
sampling
default = 0.1
per_device/device-123 = 0.5
)ZPL";
std::istringstream input(zpl_text);
std::cout <<
"app.name: " << root.
get(
"app/name") <<
"\n";
std::cout <<
"http.port: " << root.
get(
"network/http/port") <<
"\n";
auto maybe_rotations = root.
try_get(
"app/logging/file/rotate/keep");
if (maybe_rotations) {
std::cout << "log.rotate.keep: " << *maybe_rotations << "\n";
}
std::string region = root.
get_or(
"app/region",
"us-east-1");
std::cout << "region: " << region << "\n";
for (
const auto& child : logging.
children()) {
std::cout <<
"logging child: " << child.
name() <<
" = " << child.value() <<
"\n";
}
assert(root.
contains(
"rules/sampling/per_device/device-123"));
assert(!root.
contains(
"rules/sampling/per_device/device-999"));
return 0;
}
ZPL configuration tree loaded from text or file.
bool contains(const std::string &path) const noexcept
Check if a path exists.
const std::string & name() const noexcept
Get the node name.
std::string get_or(const std::string &path, std::string default_value) const noexcept
Get a property value or return a default.
std::optional< std::string > try_get(const std::string &path) const noexcept
Try to get a property value by path.
const std::string & get(const std::string &path) const
Get a property value by path.
std::vector< zpl_config_t > children() const noexcept
Get direct children of this node.
zpl_config_t child(const std::string &path) const
Get a child node by path.
ZPL (ZeroMQ Property Language) parser.
Output:
app.name: ZPL's Example Telemetry Gateway
http.port: 8080
log.rotate.keep: 5
region: us-east-1
logging child: level = info
logging child: outputs =
logging child: file =
Documentation
For detailed documentation, please refer to the CppZmqZoltanExt Documentation.
Additionally, you can explore the examples provided in the examples directory of the repository, build and run them.
Building and Installing
Dependencies
libzmq
- Build and install libzmq from source by cmake.
$ git clone https://github.com/zeromq/libzmq.git
$ cd libzmq
$ mkdir build
$ cmake -B build -DCMAKE_BUILD_TYPE=Release
$ cmake --build build
$ sudo cmake --install build
- Optionally, you can install libzmq using your system's package manager.
See detailed instructions in the libzmq repository.
cppzmq
Build and install cppzmq from source by cmake.
$ git clone https://github.com/zeromq/cppzmq.git
$ cd cppzmq
$ mkdir build
$ cmake -B build -DCMAKE_BUILD_TYPE=Release -DCPPZMQ_BUILD_TESTS=OFF
$ cmake --build build
$ sudo cmake --install build
See detailed instructions in the cppzmq repository.
CppZmqZoltanExt
Build and install CppZmqZoltanExt from source by cmake.
$ git clone https://github.com/luan-young/cppzmqzoltanext.git
$ cd cppzmqzoltanext
$ mkdir build
$ cmake -B build -DCMAKE_BUILD_TYPE=Release -DCZZE_BUILD_TESTS=ON
$ cmake --build build
$ sudo cmake --install build
Run the tests (if built):
Using CppZmqZoltanExt in Your CMake Project
To use CppZmqZoltanExt in your CMake project, you can use the following snippet in your CMakeLists.txt:
find_package(cppzmqzoltanext REQUIRED)
target_link_libraries(your_target PRIVATE cppzmqzoltanext::cppzmqzoltanext)
Contributing
Contributions are welcome! As this is an early-stage project, your feedback and contributions can help shape its development.
Please feel free to submit issues and pull requests on the GitHub repository.
Licensing
CppZmqZoltanExt is licensed under the MIT License. See the LICENSE file for details.