Skip to content

Commit

Permalink
xstreams parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
appkins committed Nov 24, 2018
1 parent f12f155 commit 7db1df0
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 14 deletions.
2 changes: 1 addition & 1 deletion examples/cpp_redis_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ main(void) {
}, [](cpp_redis::reply &reply) {
std::cout << "set hello 42: " << reply << std::endl;
cpp_redis::xstream_reply msg(reply);
std::cout << "Mes: " << msg[0].Messages[0].Values["message"] << std::endl;
std::cout << msg << std::endl;
// if (reply.is_string())
// do_something_with_string(reply.as_string())
});
Expand Down
19 changes: 15 additions & 4 deletions includes/cpp_redis/core/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1590,18 +1590,29 @@ namespace cpp_redis {

std::future<reply> xrange(const std::string &key,
const range_type_t &range_args);
//XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

client &xrevrange(const std::string &key, const range_type_t &range_args, const reply_callback_t &reply_callback);
client &
xread(const xread_args_t &a, const reply_callback_t &reply_callback);

std::future<reply> xrevrange(const std::string &key,
const range_type_t &range_args);
//XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
std::future<reply> xread(const xread_args_t &a);

client &
xreadgroup(const xreadgroup_args_t &a, const reply_callback_t &reply_callback);

std::future<reply> xreadgroup(const xreadgroup_args_t &a);

client &xrevrange(const std::string &key, const range_type_t &range_args, const reply_callback_t &reply_callback);

std::future<reply> xrevrange(const std::string &key,
const range_type_t &range_args);

client &xtrim(const std::string &key, int max_len, const reply_callback_t &reply_callback);
std::future<reply> xtrim(const std::string &key, int max_len);

client &xtrim_approx(const std::string &key, int max_len, const reply_callback_t &reply_callback);
std::future<reply> xtrim_approx(const std::string &key, int max_len);

client &zadd(const std::string &key, const std::vector<std::string> &options,
const std::multimap<std::string, std::string> &score_members,
const reply_callback_t &reply_callback);
Expand Down
16 changes: 14 additions & 2 deletions includes/cpp_redis/core/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,22 @@


namespace cpp_redis {
typedef std::int32_t milliseconds;
//! \brief first array is the session name, second is ids
typedef std::pair<std::vector<std::string>, std::vector<std::string>> streams_t;

typedef struct xread_args {
streams_t Streams;
std::int32_t Count;
std::chrono::milliseconds Block;
std::int32_t Block;
} xread_args_t;

typedef struct xreadgroup_args {
std::string Group;
std::string Consumer;
streams_t Streams;
std::int32_t Count;
int Block;
std::int32_t Block;
bool NoAck;
} xreadgroup_args_t;

Expand All @@ -55,9 +56,18 @@ namespace cpp_redis {
std::int32_t Count;
} range_type_t;

typedef struct xclaim_args {
std::string Stream;
std::string Group;
std::string Consumer;
milliseconds MinIdle;
std::vector<std::string> Messages;
} xclaim_args_t;

class xmessage {
public:
explicit xmessage(reply data);
friend std::ostream& operator<<(std::ostream& os, const xmessage& xm);
std::string Id;
std::map<std::string, std::string> Values;
};
Expand All @@ -67,6 +77,7 @@ namespace cpp_redis {
class xstream {
public:
explicit xstream(reply data);
friend std::ostream& operator<<(std::ostream& os, const xstream& xs);
std::string Stream;
std::vector<xmessage_t> Messages;
};
Expand All @@ -76,6 +87,7 @@ namespace cpp_redis {
class xstream_reply : public std::vector<xstream_t> {
public:
explicit xstream_reply(reply data);
friend std::ostream& operator<<(std::ostream& os, const xstream_reply& xs);
};

class range {
Expand Down
67 changes: 60 additions & 7 deletions sources/core/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2352,12 +2352,24 @@ namespace cpp_redis {
return *this;
}

client &
client::xrevrange(const std::string &key, const range_type_t &range_args, const reply_callback_t &reply_callback) {
std::vector<std::string> cmd = {"XREVRANGE", key, range_args.Start, range_args.Stop};
if (range_args.Count > 0) {
cmd.emplace_back(std::to_string(range_args.Count));
client &client::xread(const xread_args_t &a, const client::reply_callback_t &reply_callback) {
std::vector<std::string> cmd = {"XREAD"};
if (a.Count > 0) {
cmd.emplace_back("COUNT");
cmd.push_back(std::to_string(a.Count));
}

if (a.Block >= 0) {
cmd.emplace_back("BLOCK");
cmd.push_back(std::to_string(a.Block));
}

// Add streams
cmd.emplace_back("STREAMS");
cmd.insert(cmd.end(), a.Streams.first.begin(), a.Streams.first.end());
// Add ids
cmd.insert(cmd.end(), a.Streams.second.begin(), a.Streams.second.end());

send(cmd, reply_callback);
return *this;
}
Expand Down Expand Up @@ -2390,6 +2402,29 @@ namespace cpp_redis {
return *this;
}

client &
client::xrevrange(const std::string &key, const range_type_t &range_args, const reply_callback_t &reply_callback) {
std::vector<std::string> cmd = {"XREVRANGE", key, range_args.Start, range_args.Stop};
if (range_args.Count > 0) {
cmd.emplace_back(std::to_string(range_args.Count));
}
send(cmd, reply_callback);
return *this;
}

client &client::xtrim(const std::string &key, int max_len, const client::reply_callback_t &reply_callback) {
std::vector<std::string> cmd = {"XTRIM", key, "MAXLEN", std::to_string(max_len)};
send(cmd, reply_callback);
return *this;
}

client &client::xtrim_approx(const std::string &key, int max_len,
const cpp_redis::client::reply_callback_t &reply_callback) {
std::vector<std::string> cmd = {"XTRIM", key, "MAXLEN", "~", std::to_string(max_len)};
send(cmd, reply_callback);
return *this;
}

client &
client::zadd(const std::string &key, const std::vector<std::string> &options,
const std::multimap<std::string, std::string> &score_members, const reply_callback_t &reply_callback) {
Expand Down Expand Up @@ -4278,15 +4313,33 @@ namespace cpp_redis {
});
}

std::future<reply> client::xreadgroup(const xreadgroup_args_t &a) {
return exec_cmd([=](const reply_callback_t &cb) -> client & {
return xreadgroup(a, cb);
});
}

std::future<reply> client::xread(const xread_args_t &a) {
return exec_cmd([=](const reply_callback_t &cb) -> client & {
return xread(a, cb);
});
}

std::future<reply> client::xrevrange(const std::string &key, const range_type_t &range_args) {
return exec_cmd([=](const reply_callback_t &cb) -> client & {
return xrevrange(key, range_args, cb);
});
}

std::future<reply> client::xreadgroup(const xreadgroup_args_t &a) {
std::future<reply> client::xtrim(const std::string &key, int max_len) {
return exec_cmd([=](const reply_callback_t &cb) -> client & {
return xreadgroup(a, cb);
return xtrim(key, max_len, cb);
});
}

std::future<reply> client::xtrim_approx(const std::string &key, int max_len) {
return exec_cmd([=](const reply_callback_t &cb) -> client & {
return xtrim_approx(key, max_len, cb);
});
}

Expand Down
25 changes: 25 additions & 0 deletions sources/core/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,34 @@ namespace cpp_redis {
}
}

std::ostream &operator<<(std::ostream &os, const xmessage &xm) {
os << "\n\t\t\"id\": " << xm.Id << "\n\t\t\"values\": {";
for (auto &v : xm.Values) {
os << "\n\t\t\t\"" << v.first << "\": " << v.second << ",";
}
os << "\n\t\t}";
return os;
}

std::ostream &operator<<(std::ostream &os, const xstream &xs) {
os << "{\n\t\"stream\": " << xs.Stream << "\n\t\"messages\": [";
for (auto &m : xs.Messages) {
os << m;
}
os << "\n\t]\n}";
return os;
}

xstream_reply::xstream_reply(reply data) {
for (auto &d : data.as_array()) {
emplace_back(xstream(d));
}
}

std::ostream &operator<<(std::ostream &os, const xstream_reply &xs) {
for (auto &x : xs) {
os << x;
}
return os;
}
} // namespace cpp_redis

0 comments on commit 7db1df0

Please sign in to comment.