safeguard against redis req/res (#4)
This commit is contained in:
@@ -15,5 +15,8 @@ target_compile_options(proj_warnings INTERFACE -Wall -Werror -Wextra -Wpedantic)
|
||||
add_library(expected INTERFACE src/expected.h)
|
||||
target_link_libraries(expected INTERFACE proj_warnings)
|
||||
|
||||
add_library(redis INTERFACE src/redis.h)
|
||||
target_link_libraries(redis INTERFACE proj_warnings expected hiredis::hiredis)
|
||||
|
||||
add_executable(main ./src/main.cc)
|
||||
target_link_libraries(main PRIVATE proj_warnings expected hiredis::hiredis)
|
||||
target_link_libraries(main PRIVATE proj_warnings redis)
|
||||
|
||||
196
src/main.cc
196
src/main.cc
@@ -1,191 +1,40 @@
|
||||
#include <hiredis/hiredis.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <format>
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
#include <stdexcept>
|
||||
#include <stop_token>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "expected.h"
|
||||
|
||||
struct FieldVal {
|
||||
std::string field, val;
|
||||
};
|
||||
|
||||
struct KeyID {
|
||||
std::string key;
|
||||
std::string id;
|
||||
};
|
||||
|
||||
struct XaddReq {
|
||||
std::string key;
|
||||
std::vector<FieldVal> field_val_list;
|
||||
};
|
||||
|
||||
struct XreadReq {
|
||||
int count;
|
||||
int timeout_ms = 5000;
|
||||
std::vector<KeyID> key_id_list;
|
||||
};
|
||||
|
||||
struct StreamEntry {
|
||||
std::string id;
|
||||
std::vector<FieldVal> field_val_list;
|
||||
};
|
||||
|
||||
using Key = std::string;
|
||||
|
||||
using XreadRes = std::unordered_map<Key, std::vector<StreamEntry>>;
|
||||
|
||||
class RedisClient {
|
||||
public:
|
||||
RedisClient(const std::string& host = "127.0.0.1", int port = 6379) {
|
||||
ctx_ = redisConnect(host.c_str(), port);
|
||||
|
||||
if (!ctx_) {
|
||||
throw std::runtime_error("failed to allocate redis context");
|
||||
} else if (ctx_->err) {
|
||||
throw std::runtime_error(ctx_->errstr);
|
||||
}
|
||||
}
|
||||
|
||||
RedisClient(const RedisClient&) = delete;
|
||||
RedisClient& operator=(const RedisClient&) = delete;
|
||||
|
||||
RedisClient(RedisClient&& other) noexcept : ctx_(other.ctx_) {
|
||||
other.ctx_ = nullptr;
|
||||
}
|
||||
|
||||
RedisClient& operator=(RedisClient&& other) noexcept {
|
||||
if (this != &other) {
|
||||
if (ctx_) redisFree(ctx_);
|
||||
ctx_ = other.ctx_;
|
||||
other.ctx_ = nullptr;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
~RedisClient() {
|
||||
if (ctx_) redisFree(ctx_);
|
||||
}
|
||||
|
||||
Expected<void> xadd(const XaddReq& req) noexcept {
|
||||
// todo: safe guard against invalid requests
|
||||
|
||||
std::stringstream ss;
|
||||
ss << "XADD " << req.key << " * ";
|
||||
for (const auto& [field, val] : req.field_val_list)
|
||||
ss << field << ' ' << val << ' ';
|
||||
|
||||
auto reply = submit_cmd(ss.str());
|
||||
|
||||
if (!reply) {
|
||||
return Unexpected(ctx_->errstr);
|
||||
} else if (reply->type == REDIS_REPLY_ERROR) {
|
||||
return Unexpected(reply->str ? reply->str : "unknown error");
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
Expected<XreadRes> xread(const XreadReq& req) noexcept {
|
||||
// todo: safe guard against invalid requests
|
||||
|
||||
std::stringstream ss;
|
||||
ss << "XREAD COUNT " << req.count << ' ';
|
||||
ss << "BLOCK " << req.timeout_ms << ' ';
|
||||
ss << "STREAMS ";
|
||||
for (const auto& [key, _] : req.key_id_list) ss << key << ' ';
|
||||
for (const auto& [_, id] : req.key_id_list) ss << id << ' ';
|
||||
|
||||
auto reply = submit_cmd(ss.str());
|
||||
|
||||
if (!reply) {
|
||||
return Unexpected(ctx_->errstr);
|
||||
} else if (reply->type == REDIS_REPLY_ERROR) {
|
||||
return Unexpected(reply->str ? reply->str : "unknown reply error");
|
||||
} else if (reply->type == REDIS_REPLY_NIL) {
|
||||
return {};
|
||||
} else if (reply->type != REDIS_REPLY_ARRAY) {
|
||||
return Unexpected(std::format("unexpected reply type {}", reply->type));
|
||||
}
|
||||
|
||||
XreadRes res;
|
||||
|
||||
// todo: verify res has the right format
|
||||
for (int i = 0, n = reply->elements; i < n; ++i) {
|
||||
auto key = reply->element[i]->element[0]->str;
|
||||
|
||||
auto entry_list = reply->element[i]->element[1]->element;
|
||||
int entry_list_len = reply->element[i]->element[1]->elements;
|
||||
|
||||
for (int entry_i = 0; entry_i < entry_list_len; ++entry_i) {
|
||||
auto id = entry_list[entry_i]->element[0]->str;
|
||||
|
||||
auto raw_field_val = entry_list[entry_i]->element[1]->element;
|
||||
int raw_field_val_len = entry_list[entry_i]->element[1]->elements;
|
||||
|
||||
StreamEntry streamEntry{.id = id, .field_val_list{}};
|
||||
streamEntry.field_val_list.reserve(raw_field_val_len >> 1);
|
||||
|
||||
for (int fv_i = 0; fv_i < raw_field_val_len; fv_i += 2) {
|
||||
streamEntry.field_val_list.emplace_back(
|
||||
FieldVal{.field = raw_field_val[fv_i]->str,
|
||||
.val = raw_field_val[fv_i + 1]->str});
|
||||
}
|
||||
|
||||
res[key].emplace_back(streamEntry);
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
private:
|
||||
using Reply = std::unique_ptr<redisReply, decltype(&freeReplyObject)>;
|
||||
|
||||
redisContext* ctx_{nullptr};
|
||||
|
||||
Reply submit_cmd(const std::string& cmd) noexcept {
|
||||
redisReply* reply =
|
||||
static_cast<redisReply*>(redisCommand(ctx_, cmd.c_str()));
|
||||
return Reply(reply, freeReplyObject);
|
||||
}
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const XreadRes& res) {
|
||||
for (const auto& [key, stream_entry_list] : res) {
|
||||
os << key << '\n';
|
||||
for (const auto& [id, field_val_list] : stream_entry_list) {
|
||||
os << " " << id << '\n';
|
||||
for (const auto& [field, val] : field_val_list) {
|
||||
os << " " << field << ' ' << val << '\n';
|
||||
}
|
||||
}
|
||||
}
|
||||
return os;
|
||||
}
|
||||
#include "redis.h"
|
||||
|
||||
int main() {
|
||||
std::jthread redis_thread([](std::stop_token stop_token) {
|
||||
std::jthread xadd_thread([](std::stop_token stop_token) {
|
||||
RedisClient redis("127.0.0.1", 6379);
|
||||
std::string key{"key_01"};
|
||||
|
||||
for (int i = 0; !stop_token.stop_requested(); ++i) {
|
||||
auto res =
|
||||
redis.xadd(XaddReq{.key = key,
|
||||
.field_val_list = {{std::format("field_{}", i),
|
||||
std::format("val_{}", i)}}});
|
||||
if (!res.has_value()) std::cerr << "err: " << res.error() << std::endl;
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
}
|
||||
});
|
||||
|
||||
std::jthread xread_thread([](std::stop_token stop_token) {
|
||||
RedisClient redis("127.0.0.1", 6379);
|
||||
std::string key{"key_01"};
|
||||
std::string id{"0"};
|
||||
|
||||
while (!stop_token.stop_requested()) {
|
||||
auto res = redis.xread(XreadReq{.count = 2, .key_id_list = {{key, id}}});
|
||||
auto res = redis.xread(
|
||||
XreadReq{.count = 2, .timeout_ms = 2000, .key_id_list = {{key, id}}});
|
||||
|
||||
if (!res.has_value()) {
|
||||
std::cout << "err: " << res.error() << std::endl;
|
||||
std::cerr << "err: " << res.error() << std::endl;
|
||||
break;
|
||||
} else if (res.value().empty()) {
|
||||
break;
|
||||
continue;
|
||||
}
|
||||
|
||||
std::cout << res.value() << std::endl << "---" << std::endl;
|
||||
@@ -193,10 +42,13 @@ int main() {
|
||||
if (!res.value().contains(key)) break;
|
||||
|
||||
id = res.value()[key].back().id;
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||
}
|
||||
});
|
||||
|
||||
redis_thread.join();
|
||||
xadd_thread.join();
|
||||
xread_thread.join();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
265
src/redis.h
Normal file
265
src/redis.h
Normal file
@@ -0,0 +1,265 @@
|
||||
#ifndef REDIS_H
|
||||
#define REDIS_H
|
||||
|
||||
#include <hiredis/hiredis.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <format>
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "expected.h"
|
||||
|
||||
struct FieldVal {
|
||||
std::string field; // field = key
|
||||
std::string val;
|
||||
};
|
||||
|
||||
struct KeyID {
|
||||
std::string key; // key = topic
|
||||
std::string id; // id = timestamp-seq
|
||||
};
|
||||
|
||||
struct XaddReq {
|
||||
std::string key; // key = topic
|
||||
std::vector<FieldVal> field_val_list;
|
||||
};
|
||||
|
||||
struct XreadReq {
|
||||
int count;
|
||||
int timeout_ms; // blocks indefinitely when timeout_ms = 0
|
||||
std::vector<KeyID> key_id_list;
|
||||
};
|
||||
|
||||
struct StreamEntry {
|
||||
std::string id; // id = timestamp-seq
|
||||
std::vector<FieldVal> field_val_list;
|
||||
};
|
||||
|
||||
// key = topic
|
||||
using Key = std::string;
|
||||
|
||||
using XreadRes = std::unordered_map<Key, std::vector<StreamEntry>>;
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const XreadRes& res) {
|
||||
for (const auto& [key, stream_entry_list] : res) {
|
||||
os << key << '\n';
|
||||
for (const auto& [id, field_val_list] : stream_entry_list) {
|
||||
os << " " << id << '\n';
|
||||
for (const auto& [field, val] : field_val_list) {
|
||||
os << " " << field << ' ' << val << '\n';
|
||||
}
|
||||
}
|
||||
}
|
||||
return os;
|
||||
}
|
||||
|
||||
class RedisClient {
|
||||
public:
|
||||
RedisClient(const std::string& host = "127.0.0.1", int port = 6379) {
|
||||
ctx_ = redisConnect(host.c_str(), port);
|
||||
|
||||
if (!ctx_) {
|
||||
throw std::runtime_error("failed to allocate redis context");
|
||||
} else if (ctx_->err) {
|
||||
throw std::runtime_error(ctx_->errstr);
|
||||
}
|
||||
}
|
||||
|
||||
RedisClient(const RedisClient&) = delete;
|
||||
RedisClient& operator=(const RedisClient&) = delete;
|
||||
|
||||
RedisClient(RedisClient&& other) noexcept : ctx_(other.ctx_) {
|
||||
other.ctx_ = nullptr;
|
||||
}
|
||||
|
||||
RedisClient& operator=(RedisClient&& other) noexcept {
|
||||
if (this != &other) {
|
||||
if (ctx_) redisFree(ctx_);
|
||||
ctx_ = other.ctx_;
|
||||
other.ctx_ = nullptr;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
~RedisClient() {
|
||||
if (ctx_) redisFree(ctx_);
|
||||
}
|
||||
|
||||
Expected<void> xadd(const XaddReq& req) noexcept {
|
||||
std::stringstream ss;
|
||||
bool is_cmd_valid = !req.key.empty() && !req.field_val_list.empty();
|
||||
|
||||
ss << "XADD " << req.key << " * ";
|
||||
|
||||
for (const auto& [field, val] : req.field_val_list) {
|
||||
is_cmd_valid &= !field.empty() && !val.empty();
|
||||
ss << field << ' ' << val << ' ';
|
||||
}
|
||||
|
||||
if (!is_cmd_valid)
|
||||
return Unexpected(std::format("cmd `{}` is invalid", ss.str()));
|
||||
|
||||
auto reply = submit_cmd(ss.str());
|
||||
|
||||
if (!reply) {
|
||||
return Unexpected(ctx_->errstr);
|
||||
} else if (reply->type == REDIS_REPLY_ERROR) {
|
||||
return Unexpected(reply->str ? reply->str : "unknown error");
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
Expected<XreadRes> xread(const XreadReq& req) noexcept {
|
||||
std::stringstream ss;
|
||||
bool is_cmd_valid =
|
||||
req.count > 0 && req.timeout_ms >= 0 && !req.key_id_list.empty();
|
||||
|
||||
ss << "XREAD COUNT " << req.count << ' ';
|
||||
ss << "BLOCK " << req.timeout_ms << ' ';
|
||||
ss << "STREAMS ";
|
||||
|
||||
for (const auto& [key, _] : req.key_id_list) {
|
||||
is_cmd_valid &= !key.empty();
|
||||
ss << key << ' ';
|
||||
}
|
||||
|
||||
for (const auto& [_, id] : req.key_id_list) {
|
||||
is_cmd_valid &= !id.empty();
|
||||
ss << id << ' ';
|
||||
}
|
||||
|
||||
if (!is_cmd_valid)
|
||||
return Unexpected(std::format("cmd `{}` is invalid", ss.str()));
|
||||
|
||||
auto reply = submit_cmd(ss.str());
|
||||
|
||||
if (!reply) {
|
||||
return Unexpected(ctx_->errstr);
|
||||
} else if (reply->type == REDIS_REPLY_ERROR) {
|
||||
return Unexpected(reply->str ? reply->str : "unknown reply error");
|
||||
} else if (reply->type == REDIS_REPLY_NIL) {
|
||||
return {};
|
||||
} else if (reply->type != REDIS_REPLY_ARRAY) {
|
||||
return Unexpected(std::format("unexpected reply type {}", reply->type));
|
||||
}
|
||||
|
||||
XreadRes res;
|
||||
|
||||
// XREAD response format:
|
||||
// {
|
||||
// key_01: {
|
||||
// [
|
||||
// id_01 (timestamp-seq),
|
||||
// [
|
||||
// field_01,
|
||||
// val_01,
|
||||
// field_02,
|
||||
// val_02,
|
||||
// ...
|
||||
// ]
|
||||
// ],
|
||||
// [
|
||||
// id_02,
|
||||
// [
|
||||
// field_01,
|
||||
// val_01,
|
||||
// ...
|
||||
// ]
|
||||
// ],
|
||||
// ...
|
||||
// },
|
||||
// key_02: { ... },
|
||||
// ...
|
||||
// }
|
||||
|
||||
const auto invalid_xread_res_err = Unexpected("invalid XREAD response");
|
||||
|
||||
auto is_arr_res_valid = [](redisReply* arr_res) {
|
||||
return arr_res != nullptr && arr_res->type == REDIS_REPLY_ARRAY &&
|
||||
arr_res->element != nullptr;
|
||||
};
|
||||
|
||||
auto is_str_res_valid = [](redisReply* str_res) {
|
||||
return str_res != nullptr && str_res->type == REDIS_REPLY_STRING &&
|
||||
str_res->str != nullptr;
|
||||
};
|
||||
|
||||
auto is_key_entry_list_res_valid = [&](redisReply* key_entry_list_res) {
|
||||
return is_arr_res_valid(key_entry_list_res) &&
|
||||
key_entry_list_res->elements == 2 &&
|
||||
is_str_res_valid(key_entry_list_res->element[0]) &&
|
||||
is_arr_res_valid(key_entry_list_res->element[1]);
|
||||
};
|
||||
|
||||
auto is_stream_entry_res_valid = [&](redisReply* stream_entry_res) {
|
||||
return is_arr_res_valid(stream_entry_res) &&
|
||||
stream_entry_res->elements == 2 &&
|
||||
is_str_res_valid(stream_entry_res->element[0]) &&
|
||||
is_arr_res_valid(stream_entry_res->element[1]) &&
|
||||
(stream_entry_res->element[1]->elements & 1) == 0;
|
||||
};
|
||||
|
||||
for (int i = 0, n = reply->elements; i < n; ++i) {
|
||||
auto key_entry_list_res = reply->element[i];
|
||||
|
||||
if (!is_key_entry_list_res_valid(key_entry_list_res))
|
||||
return invalid_xread_res_err;
|
||||
|
||||
auto key = key_entry_list_res->element[0]->str;
|
||||
|
||||
auto entry_list = key_entry_list_res->element[1]->element;
|
||||
int entry_list_len = key_entry_list_res->element[1]->elements;
|
||||
|
||||
res[key].resize(entry_list_len);
|
||||
|
||||
for (int entry_i = 0; entry_i < entry_list_len; ++entry_i) {
|
||||
auto stream_entry_res = entry_list[entry_i];
|
||||
|
||||
if (!is_stream_entry_res_valid(stream_entry_res))
|
||||
return invalid_xread_res_err;
|
||||
|
||||
auto id = stream_entry_res->element[0]->str;
|
||||
|
||||
auto raw_field_val = stream_entry_res->element[1]->element;
|
||||
int raw_field_val_len = stream_entry_res->element[1]->elements;
|
||||
|
||||
auto& stream_entry = res[key][entry_i];
|
||||
stream_entry.id = id;
|
||||
stream_entry.field_val_list.reserve(raw_field_val_len >> 1);
|
||||
|
||||
for (int fv_i = 0; fv_i < raw_field_val_len; fv_i += 2) {
|
||||
auto field_res = raw_field_val[fv_i];
|
||||
auto val_res = raw_field_val[fv_i + 1];
|
||||
|
||||
if (!is_str_res_valid(field_res) || !is_str_res_valid(val_res))
|
||||
return invalid_xread_res_err;
|
||||
|
||||
stream_entry.field_val_list.emplace_back(
|
||||
FieldVal{.field = field_res->str, .val = val_res->str});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
private:
|
||||
using Reply = std::unique_ptr<redisReply, decltype(&freeReplyObject)>;
|
||||
|
||||
// ctx_ is NOT thread safe
|
||||
redisContext* ctx_{nullptr};
|
||||
|
||||
Reply submit_cmd(const std::string& cmd) noexcept {
|
||||
redisReply* reply =
|
||||
static_cast<redisReply*>(redisCommand(ctx_, cmd.c_str()));
|
||||
return Reply(reply, freeReplyObject);
|
||||
}
|
||||
};
|
||||
|
||||
#endif // REDIS_H
|
||||
Reference in New Issue
Block a user