CppZmqZoltanExt 0.0.1
Loading...
Searching...
No Matches
CppZmqZoltanExt

CppZmqZoltanExt is a extension library for cppzmq, the modern C++ binding for ZeroMQ (ZMQ). It provides high-level abstractions and utilities for building robust, concurrent, and event-driven applications with ZeroMQ.

CppZmqZoltanExt aims to offer a subset of the well-stablished features found in czmq, but in a modern C++ interface build on top of cppzmq, simplifying common patterns in ZeroMQ-based applications, such as interrupt handling, socket polling, event loops, and the actor model for concurrent programming.

The design is mostly based on the wonderful work done in czmq and its concepts, although the implementation is entirely new and idiomatic C++. It is also important to mention that the actor model implementation uses some concepts from the zmqpp, another C++ binding for ZeroMQ.

Note

This library is a side project in beta version currently under active development and may undergo significant changes. While it is functional for many use cases, users should be aware that APIs and features may evolve as the library matures. Contributions and feedback are welcome to help shape its future direction.

Features Overview

Interrupt Handling

The interrupt handling module provides signal management for SIGINT (Ctrl+C) and SIGTERM termination signals. It establishes a mechanism for detecting and responding to interrupt signals, enabling clean application shutdown without abrupt termination.

  • Signal Handler Installation: Register signal handlers for SIGINT and SIGTERM
  • Atomic State Tracking: Interrupt state is tracked via an atomic flag
  • Integration: Works seamlessly with the Poller and Event Loop for responsive shutdown behavior
  • Thread-Safe Monitoring: Allows applications to safely detect interruption requests from multiple threads

Poller

The Poller provides efficient monitoring of multiple ZeroMQ sockets simultaneously. It wraps ZMQ's native polling mechanism with an intuitive C++ API, allowing your application to react to socket events without busy-waiting or managing complex threading logic.

  • Multi-Socket Monitoring: Add and remove sockets dynamically for event monitoring
  • Flexible Waiting: Wait for a single socket to become ready or check all registered sockets
  • Configurable Timeouts: Control how long the poller waits for socket events
  • Interrupt Awareness: Automatically checks for interrupt signals during polling operations
  • Termination Detection: Detect when the application should shut down

Event Loop

The Event Loop combines socket polling with timer management to create a complete reactive event-driven architecture. It monitors registered sockets for read readiness and executes scheduled timers, enabling event-driven applications that respond to both socket messages and time-based events.

  • Socket Event Handling: Register callbacks that fire when sockets become ready for receiving
  • Timer Management: Schedule one-shot and recurring timer events with flexible callback handlers
  • Event-Driven Architecture: Process both socket read and timer events in a single unified loop
  • Graceful Shutdown: Integrate with interrupt handling for clean application termination
  • Cross-Platform Support: Configurable interrupt checking intervals for reliable behavior on all platforms

Actor Pattern

The actor pattern provides a powerful abstraction for concurrent programming, enabling isolated execution units that communicate exclusively through message passing. Each actor runs in its own thread with its own socket pair, eliminating shared memory and making concurrent programs easier to reason about and maintain.

  • Isolated Execution: Each actor runs independently in its own thread
  • Message-Based Communication: Parent and child threads communicate via ZeroMQ PAIR sockets
  • Synchronized Initialization: The parent thread blocks until the actor confirms successful initialization
  • Exception Propagation: Exceptions during actor initialization are safely propagated to the parent thread
  • Graceful Termination: Coordinated shutdown protocol ensures clean resource cleanup
  • Memory Safety: Minimal shared state between threads reduces concurrency bugs

ZPL Configuration

The ZPL Configuration module parses ZPL (ZeroMQ Property Language) files into a navigable tree of properties, offering a simple read-only API for configuration access and traversal.

  • Stream and File Parsing: Load configuration data from input streams or file paths
  • Hierarchical Navigation: Access nested sections and properties via relative paths
  • Ordered Children: Preserve and iterate over properties in source order
  • Flexible Lookup: Throwing and non-throwing retrieval methods for safe access

Examples

Actor/Event Loop Example

This example demonstrates a complete application using the core features of CppZmqZoltanExt:

  • Interrupt handling for graceful shutdown
  • Actor pattern for concurrent processing
  • Event loop with socket and timer event handling
#include <chrono>
#include <functional>
#include <iostream>
#include <string>
#include <zmq.hpp>
using namespace zmqzext;
bool actor_socket_handler(loop_t& loop, zmq::socket_ref socket) {
try {
// Receive message from parent
zmq::message_t msg;
auto result = recv_retry_on_eintr(socket, msg, zmq::recv_flags::dontwait);
if (!result) {
return true; // Continue loop
}
auto signal = signal_t::check_signal(msg);
if (signal && signal->is_stop()) {
return false; // Exit loop
}
std::cout << "[Actor] Received: " << msg.to_string_view() << std::endl;
// Echo the message back to parent
send_retry_on_eintr(socket, msg, zmq::send_flags::none);
} catch (...) {
// Ignore exceptions and continue, but usually should communicate them to parent
}
return true; // Continue loop
}
bool actor_runner(zmq::socket_t& socket) {
zmq::context_t actor_context;
loop_t loop;
std::cout << "[Actor] Started" << std::endl;
// Register the actor socket to receive messages from parent
loop.add(socket, actor_socket_handler);
// Send success signal to parent
send_retry_on_eintr(socket, signal_t::create_success(), zmq::send_flags::none);
// Run the event loop
try {
loop.run(false); // Non-interruptible mode, so the actor is stopped only by parent stop signal
} catch (...) {
// As the application exceptions were already caught in the socket handler,
// the exceptions left here should be only ZMQ related. So, we still could
// try to comunicate the failure to parent and wait to be stopped or return
// if that fails.
}
std::cout << "[Actor] Finished" << std::endl;
return false;
}
bool parent_api_socket_handler(loop_t& loop, zmq::socket_ref socket, zmq::socket_ref actor_socket) {
try {
zmq::message_t msg;
// Receive echoed message from actor
auto result = socket.recv(msg, zmq::recv_flags::dontwait);
if (!result) {
return false; // Exit loop
}
std::cout << "[Main] Received request. Delivering it to actor: " << msg.to_string_view() << std::endl;
// Forward the message to the actor
auto send_result = actor_socket.send(msg, zmq::send_flags::none);
if (!send_result) {
return false; // Exit loop
}
// Send reply back to API client
auto reply_msg = zmq::message_t{std::string{"Ok"}};
send_result = socket.send(reply_msg, zmq::send_flags::none);
if (!send_result) {
return false; // Exit loop
}
} catch (...) {
return false; // Exit loop
}
return true; // Continue loop
}
bool parent_actors_socket_handler(loop_t& loop, zmq::socket_ref socket) {
try {
zmq::message_t msg;
// Receive echoed message from actor
auto result = socket.recv(msg, zmq::recv_flags::dontwait);
if (!result) {
return false; // Exit loop
}
std::cout << "[Main] Received from actor: " << msg.to_string_view() << std::endl;
} catch (...) {
return false; // Exit loop
}
return true; // Continue loop
}
int main() {
std::cout << "[Main] Starting application" << std::endl;
// Install interrupt handler for graceful shutdown
try {
// Create ZMQ context
zmq::context_t context;
// Create and start the actor
std::cout << "[Main] Creating and starting actor" << std::endl;
actor_t actor(context);
actor.start(actor_runner);
// Create a REP socket for request-reply communication
zmq::socket_t rep_socket(context, zmq::socket_type::rep);
rep_socket.bind("tcp://127.0.0.1:5555");
// Create the main event loop
loop_t loop;
// Register actor socket handler
loop.add(actor.socket(), parent_actors_socket_handler);
// Register REP socket handler
loop.add(rep_socket, std::bind(parent_api_socket_handler, std::placeholders::_1, std::placeholders::_2,
zmq::socket_ref(actor.socket())));
// Register a timer to print status every 2 seconds
loop.add_timer(std::chrono::milliseconds(2000),
0, // Infinite occurrences
std::cout << "[Main] Timer event - application is running" << std::endl;
return true; // Recurring timer
});
std::cout << "[Main] Running loop" << std::endl;
// Run the event loop
// The loop will continue until:
// - A handler returns false
// - An interrupt signal (Ctrl+C) is received
// On Windows, we must assure the loop will check for interrupts on a regular basis
// On linux, the loop will be interrupted by signals automatically when waiting on zmq::poll
loop.run(true, std::chrono::milliseconds{500}); // interruptible mode with 500ms check interval
std::cout << "[Main] Loop finished" << std::endl;
// No need to stop the actor explicitly, as its destructor will handle it
std::cout << "[Main] Stopping actor" << std::endl;
} catch (...) {
return 1;
}
std::cout << "[Main] Actor stopped" << std::endl;
std::cout << "[Main] Application finished" << std::endl;
return 0;
}
Actor pattern implementation using ZeroMQ PAIR sockets.
Class that implements the Actor pattern using ZMQ PAIR sockets.
Definition: actor.h:131
Event loop for managing socket and timer events.
Definition: loop.h:134
void run(bool interruptible=true, std::chrono::milliseconds interruptCheckInterval=std::chrono::milliseconds{-1})
Run the event loop.
Definition: loop.cpp:77
timer_id_t add_timer(std::chrono::milliseconds timeout, std::size_t occurences, fn_timer_handler_t fn)
Register a timer with an expiration handler.
Definition: loop.cpp:52
void add(zmq::socket_ref socket, fn_socket_handler_t fn)
Register a socket with an I/O handler.
Definition: loop.cpp:42
Helper utilities for robust ZMQ message sending and receiving.
CZZE_EXPORT zmq::send_result_t send_retry_on_eintr(T &socket, zmq::const_buffer const &buf, zmq::send_flags flags=zmq::send_flags::none)
Send a message buffer with automatic retry on EINTR.
Definition: helpers.cpp:43
Signal interrupt handling for graceful application shutdown.
CZZE_EXPORT void install_interrupt_handler() noexcept
Install signal handlers for SIGINT and SIGTERM.
Definition: interrupt.cpp:82
Event loop for managing sockets and timers.
std::size_t timer_id_t
Unique identifier for timer instances.
Definition: loop.h:78
Signal definitions and utilities for actor inter-thread communication.

ZPL Configuration Example

This example shows how to parse a ZPL configuration and navigate the resulting tree.

#include <cassert>
#include <iostream>
#include <sstream>
using namespace zmqzext;
int main() {
const char* zpl_text = R"ZPL(
# This is a sample ZPL configuration for a telemetry gateway
app
name = "ZPL's Example Telemetry Gateway"
environment = production # values can be 'production', 'staging', 'development'
logging
# Indented comment
level = info
outputs/0 = stdout
outputs/1 = file
file
path = /var/log/telemetry-gw.log
rotate
size_mb = 50
keep = 5
network
http
host = 0.0.0.0
port = 8080
grpc
host = 127.0.0.1
port = 50051
rules
sampling
default = 0.1
per_device/device-123 = 0.5
)ZPL";
std::istringstream input(zpl_text);
zpl_config_t root = zpl_config_t::from_stream(input);
// Basic access (relative paths from root)
std::cout << "app.name: " << root.get("app/name") << "\n";
std::cout << "http.port: " << root.get("network/http/port") << "\n";
// Non-throwing access
auto maybe_rotations = root.try_get("app/logging/file/rotate/keep");
if (maybe_rotations) {
std::cout << "log.rotate.keep: " << *maybe_rotations << "\n";
}
// Default value when missing
std::string region = root.get_or("app/region", "us-east-1");
std::cout << "region: " << region << "\n";
// Navigate to a child node and enumerate its children
zpl_config_t logging = root.child("app/logging");
for (const auto& child : logging.children()) {
std::cout << "logging child: " << child.name() << " = " << child.value() << "\n";
}
// Check existence
assert(root.contains("rules/sampling/per_device/device-123"));
assert(!root.contains("rules/sampling/per_device/device-999"));
return 0;
}
ZPL configuration tree loaded from text or file.
Definition: zpl_config.h:137
bool contains(const std::string &path) const noexcept
Check if a path exists.
Definition: zpl_config.cpp:418
const std::string & name() const noexcept
Get the node name.
Definition: zpl_config.cpp:404
std::string get_or(const std::string &path, std::string default_value) const noexcept
Get a property value or return a default.
Definition: zpl_config.cpp:438
std::optional< std::string > try_get(const std::string &path) const noexcept
Try to get a property value by path.
Definition: zpl_config.cpp:430
const std::string & get(const std::string &path) const
Get a property value by path.
Definition: zpl_config.cpp:422
std::vector< zpl_config_t > children() const noexcept
Get direct children of this node.
Definition: zpl_config.cpp:468
zpl_config_t child(const std::string &path) const
Get a child node by path.
Definition: zpl_config.cpp:446
ZPL (ZeroMQ Property Language) parser.

Output:

app.name: ZPL's Example Telemetry Gateway
http.port: 8080
log.rotate.keep: 5
region: us-east-1
logging child: level = info
logging child: outputs =
logging child: file =

Documentation

For detailed documentation, please refer to the CppZmqZoltanExt Documentation.

Additionally, you can explore the examples provided in the examples directory of the repository, build and run them.

Building and Installing

Dependencies

libzmq

  1. Build and install libzmq from source by cmake.
$ git clone https://github.com/zeromq/libzmq.git
$ cd libzmq
$ mkdir build
$ cmake -B build -DCMAKE_BUILD_TYPE=Release
$ cmake --build build
$ sudo cmake --install build
  1. Optionally, you can install libzmq using your system's package manager.

See detailed instructions in the libzmq repository.

cppzmq

Build and install cppzmq from source by cmake.

$ git clone https://github.com/zeromq/cppzmq.git
$ cd cppzmq
$ mkdir build
$ cmake -B build -DCMAKE_BUILD_TYPE=Release -DCPPZMQ_BUILD_TESTS=OFF
$ cmake --build build
$ sudo cmake --install build

See detailed instructions in the cppzmq repository.

CppZmqZoltanExt

Build and install CppZmqZoltanExt from source by cmake.

$ git clone https://github.com/luan-young/cppzmqzoltanext.git
$ cd cppzmqzoltanext
$ mkdir build
$ cmake -B build -DCMAKE_BUILD_TYPE=Release -DCZZE_BUILD_TESTS=ON
$ cmake --build build
$ sudo cmake --install build

Run the tests (if built):

$ ctest --test-dir build

Using CppZmqZoltanExt in Your CMake Project

To use CppZmqZoltanExt in your CMake project, you can use the following snippet in your CMakeLists.txt:

find_package(cppzmqzoltanext REQUIRED)
target_link_libraries(your_target PRIVATE cppzmqzoltanext::cppzmqzoltanext)

Contributing

Contributions are welcome! As this is an early-stage project, your feedback and contributions can help shape its development.

Please feel free to submit issues and pull requests on the GitHub repository.

Licensing

CppZmqZoltanExt is licensed under the MIT License. See the LICENSE file for details.