#include #include #include #include #include #include #include #include #include #include #include #include "expected.h" struct FieldVal { std::string field, val; }; struct KeyID { std::string key; std::string id; }; struct XaddReq { std::string key; std::vector field_val_list; }; struct XreadReq { int count; int timeout_ms = 5000; std::vector key_id_list; }; struct StreamEntry { std::string id; std::vector field_val_list; }; using Key = std::string; using XreadRes = std::unordered_map>; class RedisClient { public: RedisClient(const std::string& host = "127.0.0.1", int port = 6379) { ctx_ = redisConnect(host.c_str(), port); if (!ctx_) { throw std::runtime_error("failed to allocate redis context"); } else if (ctx_->err) { throw std::runtime_error(ctx_->errstr); } } RedisClient(const RedisClient&) = delete; RedisClient& operator=(const RedisClient&) = delete; RedisClient(RedisClient&& other) noexcept : ctx_(other.ctx_) { other.ctx_ = nullptr; } RedisClient& operator=(RedisClient&& other) noexcept { if (this != &other) { if (ctx_) redisFree(ctx_); ctx_ = other.ctx_; other.ctx_ = nullptr; } return *this; } ~RedisClient() { if (ctx_) redisFree(ctx_); } Expected xadd(const XaddReq& req) noexcept { // todo: safe guard against invalid requests std::stringstream ss; ss << "XADD " << req.key << " * "; for (const auto& [field, val] : req.field_val_list) ss << field << ' ' << val << ' '; auto reply = submit_cmd(ss.str()); if (!reply) { return Unexpected(ctx_->errstr); } else if (reply->type == REDIS_REPLY_ERROR) { return Unexpected(reply->str ? reply->str : "unknown error"); } return {}; } Expected xread(const XreadReq& req) noexcept { // todo: safe guard against invalid requests std::stringstream ss; ss << "XREAD COUNT " << req.count << ' '; ss << "BLOCK " << req.timeout_ms << ' '; ss << "STREAMS "; for (const auto& [key, _] : req.key_id_list) ss << key << ' '; for (const auto& [_, id] : req.key_id_list) ss << id << ' '; auto reply = submit_cmd(ss.str()); if (!reply) { return Unexpected(ctx_->errstr); } else if (reply->type == REDIS_REPLY_ERROR) { return Unexpected(reply->str ? reply->str : "unknown reply error"); } else if (reply->type == REDIS_REPLY_NIL) { return {}; } else if (reply->type != REDIS_REPLY_ARRAY) { return Unexpected(std::format("unexpected reply type {}", reply->type)); } XreadRes res; // todo: verify res has the right format for (int i = 0, n = reply->elements; i < n; ++i) { auto key = reply->element[i]->element[0]->str; auto entry_list = reply->element[i]->element[1]->element; int entry_list_len = reply->element[i]->element[1]->elements; for (int entry_i = 0; entry_i < entry_list_len; ++entry_i) { auto id = entry_list[entry_i]->element[0]->str; auto raw_field_val = entry_list[entry_i]->element[1]->element; int raw_field_val_len = entry_list[entry_i]->element[1]->elements; StreamEntry streamEntry{.id = id, .field_val_list{}}; streamEntry.field_val_list.reserve(raw_field_val_len >> 1); for (int fv_i = 0; fv_i < raw_field_val_len; fv_i += 2) { streamEntry.field_val_list.emplace_back( FieldVal{.field = raw_field_val[fv_i]->str, .val = raw_field_val[fv_i + 1]->str}); } res[key].emplace_back(streamEntry); } } return res; } private: using Reply = std::unique_ptr; redisContext* ctx_{nullptr}; Reply submit_cmd(const std::string& cmd) noexcept { redisReply* reply = static_cast(redisCommand(ctx_, cmd.c_str())); return Reply(reply, freeReplyObject); } }; std::ostream& operator<<(std::ostream& os, const XreadRes& res) { for (const auto& [key, stream_entry_list] : res) { os << key << '\n'; for (const auto& [id, field_val_list] : stream_entry_list) { os << " " << id << '\n'; for (const auto& [field, val] : field_val_list) { os << " " << field << ' ' << val << '\n'; } } } return os; } int main() { std::jthread redis_thread([](std::stop_token stop_token) { RedisClient redis("127.0.0.1", 6379); std::string key{"key_01"}; std::string id{"0"}; while (!stop_token.stop_requested()) { auto res = redis.xread(XreadReq{.count = 2, .key_id_list = {{key, id}}}); if (!res.has_value()) { std::cout << "err: " << res.error() << std::endl; break; } else if (res.value().empty()) { break; } std::cout << res.value() << std::endl << "---" << std::endl; if (!res.value().contains(key)) break; id = res.value()[key].back().id; } }); redis_thread.join(); return 0; }