diff --git a/CMakeLists.txt b/CMakeLists.txt index b064c43..f20de25 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,5 +15,8 @@ target_compile_options(proj_warnings INTERFACE -Wall -Werror -Wextra -Wpedantic) add_library(expected INTERFACE src/expected.h) target_link_libraries(expected INTERFACE proj_warnings) +add_library(redis INTERFACE src/redis.h) +target_link_libraries(redis INTERFACE proj_warnings expected hiredis::hiredis) + add_executable(main ./src/main.cc) -target_link_libraries(main PRIVATE proj_warnings expected hiredis::hiredis) +target_link_libraries(main PRIVATE proj_warnings redis) diff --git a/src/main.cc b/src/main.cc index 8a84956..7937723 100644 --- a/src/main.cc +++ b/src/main.cc @@ -1,191 +1,40 @@ -#include - +#include #include #include -#include -#include -#include #include -#include #include -#include -#include -#include "expected.h" - -struct FieldVal { - std::string field, val; -}; - -struct KeyID { - std::string key; - std::string id; -}; - -struct XaddReq { - std::string key; - std::vector field_val_list; -}; - -struct XreadReq { - int count; - int timeout_ms = 5000; - std::vector key_id_list; -}; - -struct StreamEntry { - std::string id; - std::vector field_val_list; -}; - -using Key = std::string; - -using XreadRes = std::unordered_map>; - -class RedisClient { - public: - RedisClient(const std::string& host = "127.0.0.1", int port = 6379) { - ctx_ = redisConnect(host.c_str(), port); - - if (!ctx_) { - throw std::runtime_error("failed to allocate redis context"); - } else if (ctx_->err) { - throw std::runtime_error(ctx_->errstr); - } - } - - RedisClient(const RedisClient&) = delete; - RedisClient& operator=(const RedisClient&) = delete; - - RedisClient(RedisClient&& other) noexcept : ctx_(other.ctx_) { - other.ctx_ = nullptr; - } - - RedisClient& operator=(RedisClient&& other) noexcept { - if (this != &other) { - if (ctx_) redisFree(ctx_); - ctx_ = other.ctx_; - other.ctx_ = nullptr; - } - return *this; - } - - ~RedisClient() { - if (ctx_) redisFree(ctx_); - } - - Expected xadd(const XaddReq& req) noexcept { - // todo: safe guard against invalid requests - - std::stringstream ss; - ss << "XADD " << req.key << " * "; - for (const auto& [field, val] : req.field_val_list) - ss << field << ' ' << val << ' '; - - auto reply = submit_cmd(ss.str()); - - if (!reply) { - return Unexpected(ctx_->errstr); - } else if (reply->type == REDIS_REPLY_ERROR) { - return Unexpected(reply->str ? reply->str : "unknown error"); - } - - return {}; - } - - Expected xread(const XreadReq& req) noexcept { - // todo: safe guard against invalid requests - - std::stringstream ss; - ss << "XREAD COUNT " << req.count << ' '; - ss << "BLOCK " << req.timeout_ms << ' '; - ss << "STREAMS "; - for (const auto& [key, _] : req.key_id_list) ss << key << ' '; - for (const auto& [_, id] : req.key_id_list) ss << id << ' '; - - auto reply = submit_cmd(ss.str()); - - if (!reply) { - return Unexpected(ctx_->errstr); - } else if (reply->type == REDIS_REPLY_ERROR) { - return Unexpected(reply->str ? reply->str : "unknown reply error"); - } else if (reply->type == REDIS_REPLY_NIL) { - return {}; - } else if (reply->type != REDIS_REPLY_ARRAY) { - return Unexpected(std::format("unexpected reply type {}", reply->type)); - } - - XreadRes res; - - // todo: verify res has the right format - for (int i = 0, n = reply->elements; i < n; ++i) { - auto key = reply->element[i]->element[0]->str; - - auto entry_list = reply->element[i]->element[1]->element; - int entry_list_len = reply->element[i]->element[1]->elements; - - for (int entry_i = 0; entry_i < entry_list_len; ++entry_i) { - auto id = entry_list[entry_i]->element[0]->str; - - auto raw_field_val = entry_list[entry_i]->element[1]->element; - int raw_field_val_len = entry_list[entry_i]->element[1]->elements; - - StreamEntry streamEntry{.id = id, .field_val_list{}}; - streamEntry.field_val_list.reserve(raw_field_val_len >> 1); - - for (int fv_i = 0; fv_i < raw_field_val_len; fv_i += 2) { - streamEntry.field_val_list.emplace_back( - FieldVal{.field = raw_field_val[fv_i]->str, - .val = raw_field_val[fv_i + 1]->str}); - } - - res[key].emplace_back(streamEntry); - } - } - - return res; - } - - private: - using Reply = std::unique_ptr; - - redisContext* ctx_{nullptr}; - - Reply submit_cmd(const std::string& cmd) noexcept { - redisReply* reply = - static_cast(redisCommand(ctx_, cmd.c_str())); - return Reply(reply, freeReplyObject); - } -}; - -std::ostream& operator<<(std::ostream& os, const XreadRes& res) { - for (const auto& [key, stream_entry_list] : res) { - os << key << '\n'; - for (const auto& [id, field_val_list] : stream_entry_list) { - os << " " << id << '\n'; - for (const auto& [field, val] : field_val_list) { - os << " " << field << ' ' << val << '\n'; - } - } - } - return os; -} +#include "redis.h" int main() { - std::jthread redis_thread([](std::stop_token stop_token) { + std::jthread xadd_thread([](std::stop_token stop_token) { RedisClient redis("127.0.0.1", 6379); + std::string key{"key_01"}; + for (int i = 0; !stop_token.stop_requested(); ++i) { + auto res = + redis.xadd(XaddReq{.key = key, + .field_val_list = {{std::format("field_{}", i), + std::format("val_{}", i)}}}); + if (!res.has_value()) std::cerr << "err: " << res.error() << std::endl; + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + }); + + std::jthread xread_thread([](std::stop_token stop_token) { + RedisClient redis("127.0.0.1", 6379); std::string key{"key_01"}; std::string id{"0"}; while (!stop_token.stop_requested()) { - auto res = redis.xread(XreadReq{.count = 2, .key_id_list = {{key, id}}}); + auto res = redis.xread( + XreadReq{.count = 2, .timeout_ms = 2000, .key_id_list = {{key, id}}}); if (!res.has_value()) { - std::cout << "err: " << res.error() << std::endl; + std::cerr << "err: " << res.error() << std::endl; break; } else if (res.value().empty()) { - break; + continue; } std::cout << res.value() << std::endl << "---" << std::endl; @@ -193,10 +42,13 @@ int main() { if (!res.value().contains(key)) break; id = res.value()[key].back().id; + + std::this_thread::sleep_for(std::chrono::milliseconds(1)); } }); - redis_thread.join(); + xadd_thread.join(); + xread_thread.join(); return 0; } diff --git a/src/redis.h b/src/redis.h new file mode 100644 index 0000000..92016fc --- /dev/null +++ b/src/redis.h @@ -0,0 +1,265 @@ +#ifndef REDIS_H +#define REDIS_H + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "expected.h" + +struct FieldVal { + std::string field; // field = key + std::string val; +}; + +struct KeyID { + std::string key; // key = topic + std::string id; // id = timestamp-seq +}; + +struct XaddReq { + std::string key; // key = topic + std::vector field_val_list; +}; + +struct XreadReq { + int count; + int timeout_ms; // blocks indefinitely when timeout_ms = 0 + std::vector key_id_list; +}; + +struct StreamEntry { + std::string id; // id = timestamp-seq + std::vector field_val_list; +}; + +// key = topic +using Key = std::string; + +using XreadRes = std::unordered_map>; + +std::ostream& operator<<(std::ostream& os, const XreadRes& res) { + for (const auto& [key, stream_entry_list] : res) { + os << key << '\n'; + for (const auto& [id, field_val_list] : stream_entry_list) { + os << " " << id << '\n'; + for (const auto& [field, val] : field_val_list) { + os << " " << field << ' ' << val << '\n'; + } + } + } + return os; +} + +class RedisClient { + public: + RedisClient(const std::string& host = "127.0.0.1", int port = 6379) { + ctx_ = redisConnect(host.c_str(), port); + + if (!ctx_) { + throw std::runtime_error("failed to allocate redis context"); + } else if (ctx_->err) { + throw std::runtime_error(ctx_->errstr); + } + } + + RedisClient(const RedisClient&) = delete; + RedisClient& operator=(const RedisClient&) = delete; + + RedisClient(RedisClient&& other) noexcept : ctx_(other.ctx_) { + other.ctx_ = nullptr; + } + + RedisClient& operator=(RedisClient&& other) noexcept { + if (this != &other) { + if (ctx_) redisFree(ctx_); + ctx_ = other.ctx_; + other.ctx_ = nullptr; + } + return *this; + } + + ~RedisClient() { + if (ctx_) redisFree(ctx_); + } + + Expected xadd(const XaddReq& req) noexcept { + std::stringstream ss; + bool is_cmd_valid = !req.key.empty() && !req.field_val_list.empty(); + + ss << "XADD " << req.key << " * "; + + for (const auto& [field, val] : req.field_val_list) { + is_cmd_valid &= !field.empty() && !val.empty(); + ss << field << ' ' << val << ' '; + } + + if (!is_cmd_valid) + return Unexpected(std::format("cmd `{}` is invalid", ss.str())); + + auto reply = submit_cmd(ss.str()); + + if (!reply) { + return Unexpected(ctx_->errstr); + } else if (reply->type == REDIS_REPLY_ERROR) { + return Unexpected(reply->str ? reply->str : "unknown error"); + } + + return {}; + } + + Expected xread(const XreadReq& req) noexcept { + std::stringstream ss; + bool is_cmd_valid = + req.count > 0 && req.timeout_ms >= 0 && !req.key_id_list.empty(); + + ss << "XREAD COUNT " << req.count << ' '; + ss << "BLOCK " << req.timeout_ms << ' '; + ss << "STREAMS "; + + for (const auto& [key, _] : req.key_id_list) { + is_cmd_valid &= !key.empty(); + ss << key << ' '; + } + + for (const auto& [_, id] : req.key_id_list) { + is_cmd_valid &= !id.empty(); + ss << id << ' '; + } + + if (!is_cmd_valid) + return Unexpected(std::format("cmd `{}` is invalid", ss.str())); + + auto reply = submit_cmd(ss.str()); + + if (!reply) { + return Unexpected(ctx_->errstr); + } else if (reply->type == REDIS_REPLY_ERROR) { + return Unexpected(reply->str ? reply->str : "unknown reply error"); + } else if (reply->type == REDIS_REPLY_NIL) { + return {}; + } else if (reply->type != REDIS_REPLY_ARRAY) { + return Unexpected(std::format("unexpected reply type {}", reply->type)); + } + + XreadRes res; + + // XREAD response format: + // { + // key_01: { + // [ + // id_01 (timestamp-seq), + // [ + // field_01, + // val_01, + // field_02, + // val_02, + // ... + // ] + // ], + // [ + // id_02, + // [ + // field_01, + // val_01, + // ... + // ] + // ], + // ... + // }, + // key_02: { ... }, + // ... + // } + + const auto invalid_xread_res_err = Unexpected("invalid XREAD response"); + + auto is_arr_res_valid = [](redisReply* arr_res) { + return arr_res != nullptr && arr_res->type == REDIS_REPLY_ARRAY && + arr_res->element != nullptr; + }; + + auto is_str_res_valid = [](redisReply* str_res) { + return str_res != nullptr && str_res->type == REDIS_REPLY_STRING && + str_res->str != nullptr; + }; + + auto is_key_entry_list_res_valid = [&](redisReply* key_entry_list_res) { + return is_arr_res_valid(key_entry_list_res) && + key_entry_list_res->elements == 2 && + is_str_res_valid(key_entry_list_res->element[0]) && + is_arr_res_valid(key_entry_list_res->element[1]); + }; + + auto is_stream_entry_res_valid = [&](redisReply* stream_entry_res) { + return is_arr_res_valid(stream_entry_res) && + stream_entry_res->elements == 2 && + is_str_res_valid(stream_entry_res->element[0]) && + is_arr_res_valid(stream_entry_res->element[1]) && + (stream_entry_res->element[1]->elements & 1) == 0; + }; + + for (int i = 0, n = reply->elements; i < n; ++i) { + auto key_entry_list_res = reply->element[i]; + + if (!is_key_entry_list_res_valid(key_entry_list_res)) + return invalid_xread_res_err; + + auto key = key_entry_list_res->element[0]->str; + + auto entry_list = key_entry_list_res->element[1]->element; + int entry_list_len = key_entry_list_res->element[1]->elements; + + res[key].resize(entry_list_len); + + for (int entry_i = 0; entry_i < entry_list_len; ++entry_i) { + auto stream_entry_res = entry_list[entry_i]; + + if (!is_stream_entry_res_valid(stream_entry_res)) + return invalid_xread_res_err; + + auto id = stream_entry_res->element[0]->str; + + auto raw_field_val = stream_entry_res->element[1]->element; + int raw_field_val_len = stream_entry_res->element[1]->elements; + + auto& stream_entry = res[key][entry_i]; + stream_entry.id = id; + stream_entry.field_val_list.reserve(raw_field_val_len >> 1); + + for (int fv_i = 0; fv_i < raw_field_val_len; fv_i += 2) { + auto field_res = raw_field_val[fv_i]; + auto val_res = raw_field_val[fv_i + 1]; + + if (!is_str_res_valid(field_res) || !is_str_res_valid(val_res)) + return invalid_xread_res_err; + + stream_entry.field_val_list.emplace_back( + FieldVal{.field = field_res->str, .val = val_res->str}); + } + } + } + + return res; + } + + private: + using Reply = std::unique_ptr; + + // ctx_ is NOT thread safe + redisContext* ctx_{nullptr}; + + Reply submit_cmd(const std::string& cmd) noexcept { + redisReply* reply = + static_cast(redisCommand(ctx_, cmd.c_str())); + return Reply(reply, freeReplyObject); + } +}; + +#endif // REDIS_H