-
This commit is contained in:
1
.clang-format
Normal file
1
.clang-format
Normal file
@@ -0,0 +1 @@
|
|||||||
|
BasedOnStyle: Google
|
||||||
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
build
|
||||||
|
CMakeUserPresets.json
|
||||||
|
vcpkg_installed
|
||||||
16
CMakeLists.txt
Normal file
16
CMakeLists.txt
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
cmake_minimum_required(VERSION 3.16)
|
||||||
|
|
||||||
|
set(CMAKE_CXX_EXTENSIONS OFF)
|
||||||
|
set(CMAKE_CXX_STANDARD 23)
|
||||||
|
set(CMAKE_CXX_STANDARD_REQUIRED ON)
|
||||||
|
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O2")
|
||||||
|
|
||||||
|
project(redis_playground LANGUAGES CXX)
|
||||||
|
|
||||||
|
find_package(hiredis CONFIG REQUIRED)
|
||||||
|
|
||||||
|
add_library(proj_warnings INTERFACE)
|
||||||
|
target_compile_options(proj_warnings INTERFACE -Wall -Werror -Wextra -Wpedantic)
|
||||||
|
|
||||||
|
add_executable(main ./src/main.cc)
|
||||||
|
target_link_libraries(main PRIVATE proj_warnings hiredis::hiredis)
|
||||||
13
CMakePresets.json
Normal file
13
CMakePresets.json
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
{
|
||||||
|
"version": 2,
|
||||||
|
"configurePresets": [
|
||||||
|
{
|
||||||
|
"name": "vcpkg",
|
||||||
|
"generator": "Ninja",
|
||||||
|
"binaryDir": "${sourceDir}/build",
|
||||||
|
"cacheVariables": {
|
||||||
|
"CMAKE_TOOLCHAIN_FILE": "$env{VCPKG_ROOT}/scripts/buildsystems/vcpkg.cmake"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
95
README.md
95
README.md
@@ -0,0 +1,95 @@
|
|||||||
|
## setup dev env
|
||||||
|
|
||||||
|
### install cmake & ninja
|
||||||
|
|
||||||
|
```
|
||||||
|
# linux
|
||||||
|
sudo apt install ninja-build
|
||||||
|
|
||||||
|
CMAKE_VER=v4.2.1
|
||||||
|
sudo apt install libssl-dev openssl
|
||||||
|
git clone --branch $CMAKE_VER --depth 1 https://github.com/Kitware/CMake.git
|
||||||
|
cd CMake
|
||||||
|
./bootstrap && make && sudo make install
|
||||||
|
|
||||||
|
# mac
|
||||||
|
brew install cmake ninja
|
||||||
|
```
|
||||||
|
|
||||||
|
### install dependencies
|
||||||
|
|
||||||
|
```bash
|
||||||
|
vcpkg --disable-metrics install --recurse
|
||||||
|
```
|
||||||
|
|
||||||
|
### update vcpkg baseline
|
||||||
|
|
||||||
|
```bash
|
||||||
|
vcpkg --disable-metrics x-update-baseline
|
||||||
|
```
|
||||||
|
|
||||||
|
### show package versions
|
||||||
|
|
||||||
|
```bash
|
||||||
|
vcpkg --disable-metrics list
|
||||||
|
```
|
||||||
|
|
||||||
|
### create `CMakeUserPresets.json`
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"version": 2,
|
||||||
|
"configurePresets": [
|
||||||
|
{
|
||||||
|
"name": "default",
|
||||||
|
"inherits": "vcpkg",
|
||||||
|
"environment": {
|
||||||
|
"VCPKG_ROOT": "path/to/vcpkg"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## format
|
||||||
|
|
||||||
|
```bash
|
||||||
|
find . \( -path ./build -o -path ./vcpkg_installed \) -prune -o \
|
||||||
|
-type f \( -name '*.cc' -o -name '*.h' \) -exec clang-format -i {} +
|
||||||
|
```
|
||||||
|
|
||||||
|
## build
|
||||||
|
|
||||||
|
```
|
||||||
|
cmake --preset=default
|
||||||
|
cmake --build build
|
||||||
|
```
|
||||||
|
|
||||||
|
## run
|
||||||
|
|
||||||
|
spin up a local redis server
|
||||||
|
|
||||||
|
```
|
||||||
|
docker run --name redis -p 6379:6379 -it redis:8.4.0
|
||||||
|
```
|
||||||
|
|
||||||
|
run the binary
|
||||||
|
|
||||||
|
```
|
||||||
|
./build/main
|
||||||
|
```
|
||||||
|
|
||||||
|
push messages with `redis-cli`
|
||||||
|
|
||||||
|
```
|
||||||
|
docker exec -it redis redis-cli -h localhost -p 6379
|
||||||
|
|
||||||
|
XADD key_01 * f1 v1 f2 v2
|
||||||
|
```
|
||||||
|
|
||||||
|
## clean
|
||||||
|
|
||||||
|
```
|
||||||
|
rm -rf ./build
|
||||||
|
rm -rf $HOME/.cache/vcpkg
|
||||||
|
```
|
||||||
|
|||||||
202
src/main.cc
Normal file
202
src/main.cc
Normal file
@@ -0,0 +1,202 @@
|
|||||||
|
#include <hiredis/hiredis.h>
|
||||||
|
|
||||||
|
#include <expected>
|
||||||
|
#include <format>
|
||||||
|
#include <iostream>
|
||||||
|
#include <memory>
|
||||||
|
#include <sstream>
|
||||||
|
#include <stdexcept>
|
||||||
|
#include <stop_token>
|
||||||
|
#include <string>
|
||||||
|
#include <thread>
|
||||||
|
#include <unordered_map>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
struct FieldVal {
|
||||||
|
std::string field, val;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct KeyID {
|
||||||
|
std::string key;
|
||||||
|
std::string id;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct XaddReq {
|
||||||
|
std::string key;
|
||||||
|
std::vector<FieldVal> field_val_list;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct XreadReq {
|
||||||
|
int count;
|
||||||
|
int timeout_ms = 5000;
|
||||||
|
std::vector<KeyID> key_id_list;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct StreamEntry {
|
||||||
|
std::string id;
|
||||||
|
std::vector<FieldVal> field_val_list;
|
||||||
|
};
|
||||||
|
|
||||||
|
using Key = std::string;
|
||||||
|
|
||||||
|
using XreadRes = std::unordered_map<Key, std::vector<StreamEntry>>;
|
||||||
|
|
||||||
|
class RedisClient {
|
||||||
|
public:
|
||||||
|
RedisClient(const std::string& host = "127.0.0.1", int port = 6379) {
|
||||||
|
ctx_ = redisConnect(host.c_str(), port);
|
||||||
|
|
||||||
|
if (!ctx_) {
|
||||||
|
throw std::runtime_error("failed to allocate redis context");
|
||||||
|
} else if (ctx_->err) {
|
||||||
|
throw std::runtime_error(ctx_->errstr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
RedisClient(const RedisClient&) = delete;
|
||||||
|
RedisClient& operator=(const RedisClient&) = delete;
|
||||||
|
|
||||||
|
RedisClient(RedisClient&& other) noexcept : ctx_(other.ctx_) {
|
||||||
|
other.ctx_ = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
RedisClient& operator=(RedisClient&& other) noexcept {
|
||||||
|
if (this != &other) {
|
||||||
|
if (ctx_) redisFree(ctx_);
|
||||||
|
ctx_ = other.ctx_;
|
||||||
|
other.ctx_ = nullptr;
|
||||||
|
}
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
~RedisClient() {
|
||||||
|
if (ctx_) redisFree(ctx_);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::expected<void, std::string> xadd(const XaddReq& req) noexcept {
|
||||||
|
// todo: safe guard against invalid requests
|
||||||
|
|
||||||
|
std::stringstream ss;
|
||||||
|
ss << "XADD " << req.key << " * ";
|
||||||
|
for (const auto& [field, val] : req.field_val_list)
|
||||||
|
ss << field << ' ' << val << ' ';
|
||||||
|
|
||||||
|
auto reply = submit_cmd(ss.str());
|
||||||
|
|
||||||
|
if (!reply) {
|
||||||
|
return std::unexpected(ctx_->errstr);
|
||||||
|
} else if (reply->type == REDIS_REPLY_ERROR) {
|
||||||
|
return std::unexpected(reply->str ? reply->str : "unknown error");
|
||||||
|
}
|
||||||
|
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
std::expected<XreadRes, std::string> xread(const XreadReq& req) noexcept {
|
||||||
|
// todo: safe guard against invalid requests
|
||||||
|
|
||||||
|
std::stringstream ss;
|
||||||
|
ss << "XREAD COUNT " << req.count << ' ';
|
||||||
|
ss << "BLOCK " << req.timeout_ms << ' ';
|
||||||
|
ss << "STREAMS ";
|
||||||
|
for (const auto& [key, _] : req.key_id_list) ss << key << ' ';
|
||||||
|
for (const auto& [_, id] : req.key_id_list) ss << id << ' ';
|
||||||
|
|
||||||
|
auto reply = submit_cmd(ss.str());
|
||||||
|
|
||||||
|
if (!reply) {
|
||||||
|
return std::unexpected(ctx_->errstr);
|
||||||
|
} else if (reply->type == REDIS_REPLY_ERROR) {
|
||||||
|
return std::unexpected(reply->str ? reply->str : "unknown reply error");
|
||||||
|
} else if (reply->type == REDIS_REPLY_NIL) {
|
||||||
|
return {};
|
||||||
|
} else if (reply->type != REDIS_REPLY_ARRAY) {
|
||||||
|
return std::unexpected(
|
||||||
|
std::format("unexpected reply type {}", reply->type));
|
||||||
|
}
|
||||||
|
|
||||||
|
XreadRes res;
|
||||||
|
|
||||||
|
// todo: verify res has the right format
|
||||||
|
for (int i = 0, n = reply->elements; i < n; ++i) {
|
||||||
|
auto key = reply->element[i]->element[0]->str;
|
||||||
|
|
||||||
|
auto entry_list = reply->element[i]->element[1]->element;
|
||||||
|
int entry_list_len = reply->element[i]->element[1]->elements;
|
||||||
|
|
||||||
|
for (int entry_i = 0; entry_i < entry_list_len; ++entry_i) {
|
||||||
|
auto id = entry_list[entry_i]->element[0]->str;
|
||||||
|
|
||||||
|
auto raw_field_val = entry_list[entry_i]->element[1]->element;
|
||||||
|
int raw_field_val_len = entry_list[entry_i]->element[1]->elements;
|
||||||
|
|
||||||
|
StreamEntry streamEntry{.id = id, .field_val_list{}};
|
||||||
|
streamEntry.field_val_list.reserve(raw_field_val_len >> 1);
|
||||||
|
|
||||||
|
for (int fv_i = 0; fv_i < raw_field_val_len; fv_i += 2) {
|
||||||
|
streamEntry.field_val_list.emplace_back(
|
||||||
|
FieldVal{.field = raw_field_val[fv_i]->str,
|
||||||
|
.val = raw_field_val[fv_i + 1]->str});
|
||||||
|
}
|
||||||
|
|
||||||
|
res[key].emplace_back(streamEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
using Reply = std::unique_ptr<redisReply, decltype(&freeReplyObject)>;
|
||||||
|
|
||||||
|
redisContext* ctx_{nullptr};
|
||||||
|
|
||||||
|
Reply submit_cmd(const std::string& cmd) noexcept {
|
||||||
|
redisReply* reply =
|
||||||
|
static_cast<redisReply*>(redisCommand(ctx_, cmd.c_str()));
|
||||||
|
return Reply(reply, freeReplyObject);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
std::ostream& operator<<(std::ostream& os, const XreadRes& res) {
|
||||||
|
for (const auto& [key, stream_entry_list] : res) {
|
||||||
|
os << key << '\n';
|
||||||
|
for (const auto& [id, field_val_list] : stream_entry_list) {
|
||||||
|
os << " " << id << '\n';
|
||||||
|
for (const auto& [field, val] : field_val_list) {
|
||||||
|
os << " " << field << ' ' << val << '\n';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return os;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main() {
|
||||||
|
std::jthread redis_thread([](std::stop_token stop_token) {
|
||||||
|
RedisClient redis("127.0.0.1", 6379);
|
||||||
|
|
||||||
|
std::string key{"key_01"};
|
||||||
|
std::string id{"0"};
|
||||||
|
|
||||||
|
while (!stop_token.stop_requested()) {
|
||||||
|
auto res = redis.xread(XreadReq{.count = 2, .key_id_list = {{key, id}}});
|
||||||
|
|
||||||
|
if (!res.has_value()) {
|
||||||
|
std::cout << "err: " << res.error() << std::endl;
|
||||||
|
break;
|
||||||
|
} else if (res.value().empty()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::cout << res.value() << std::endl << "---" << std::endl;
|
||||||
|
|
||||||
|
if (!res.value().contains(key)) break;
|
||||||
|
|
||||||
|
id = res.value()[key].back().id;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
redis_thread.join();
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
14
vcpkg-configuration.json
Normal file
14
vcpkg-configuration.json
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
{
|
||||||
|
"default-registry": {
|
||||||
|
"kind": "git",
|
||||||
|
"baseline": "d2452281016c6d77b6ef199d599d9929eafe4807",
|
||||||
|
"repository": "https://github.com/microsoft/vcpkg"
|
||||||
|
},
|
||||||
|
"registries": [
|
||||||
|
{
|
||||||
|
"kind": "artifact",
|
||||||
|
"location": "https://github.com/microsoft/vcpkg-ce-catalog/archive/refs/heads/main.zip",
|
||||||
|
"name": "microsoft"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
5
vcpkg.json
Normal file
5
vcpkg.json
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
{
|
||||||
|
"dependencies": [
|
||||||
|
"hiredis"
|
||||||
|
]
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user