Improve BitcoinityReader.

It sends a pong message back, but I'm not sure it's
having an effect
This commit is contained in:
King_DuckZ 2022-05-15 00:50:39 +02:00
parent b3b77d5cad
commit 5fd772b302
3 changed files with 52 additions and 16 deletions

View file

@ -44,6 +44,8 @@ class BitcoinityPayload {
public: public:
static constexpr std::string_view JsonKeyPayload = "payload"; static constexpr std::string_view JsonKeyPayload = "payload";
BitcoinityPayload() = default;
BitcoinityPayload( BitcoinityPayload(
simdjson::padded_string&& body_p, simdjson::padded_string&& body_p,
simdjson::ondemand::parser&& parser_p simdjson::ondemand::parser&& parser_p

View file

@ -37,6 +37,7 @@ namespace {
} }
BitcoinityMessage to_bitcoinity_message (const std::string& message) { BitcoinityMessage to_bitcoinity_message (const std::string& message) {
typedef std::optional<std::string> OptionalString;
if (message.empty()) if (message.empty())
throw BitcoinityError("Unable to make a BitcoinityMessage from an empty message"); throw BitcoinityError("Unable to make a BitcoinityMessage from an empty message");
@ -56,7 +57,8 @@ namespace {
auto ref_result = doc[JsonKeyRef]; auto ref_result = doc[JsonKeyRef];
if (ref_result.error()) if (ref_result.error())
throw BitcoinityMissingKeyError({std::string(JsonKeyRef)}, message); throw BitcoinityMissingKeyError({std::string(JsonKeyRef)}, message);
std::string_view ref = (ref_result.is_null() ? std::string_view{} : ref_result.value()); OptionalString ref =
(ref_result.is_null() ? OptionalString{} : OptionalString{static_cast<std::string_view>(ref_result.value())});
auto topic_result = doc[JsonKeyTopic]; auto topic_result = doc[JsonKeyTopic];
if (topic_result.error()) if (topic_result.error())
@ -66,11 +68,20 @@ namespace {
return BitcoinityMessage{ return BitcoinityMessage{
BitcoinityPayload{std::move(body), std::move(parser)}, BitcoinityPayload{std::move(body), std::move(parser)},
std::move(ref), std::move(ref),
std::move(event), std::string(event),
std::move(topic) std::string(topic)
}; };
} }
BitcoinityMessage wait_for_event (WebsocketReader& ws, std::string_view event, int max_messages) {
do {
auto resp = to_bitcoinity_message(ws.read());
if (resp.event == event)
return resp;
} while (--max_messages > 0);
return BitcoinityMessage{};
}
void init_bitcoinity (WebsocketReader& ws, std::string_view exchange, std::string_view currency) { void init_bitcoinity (WebsocketReader& ws, std::string_view exchange, std::string_view currency) {
//thanks to blueagain from #javascript libera.chat for helping figuring //thanks to blueagain from #javascript libera.chat for helping figuring
//out the websocket address and the correct sequence of messages to //out the websocket address and the correct sequence of messages to
@ -81,9 +92,11 @@ namespace {
//3) {"topic":"webs:markets_bitfinex_USD","event":"phx_join","payload":{},"ref":"3"} //3) {"topic":"webs:markets_bitfinex_USD","event":"phx_join","payload":{},"ref":"3"}
//See docs/bitcoinity/sample_communication.md for a real world example //See docs/bitcoinity/sample_communication.md for a real world example
constexpr int MaxDiscardedReplies = 5;
{ {
ws.write(R"({"topic":"all","event":"phx_join","payload":{},"ref":"1"})"); ws.write(R"({"topic":"all","event":"phx_join","payload":{},"ref":"1"})");
auto resp = to_bitcoinity_message(ws.read()); auto resp = wait_for_event(ws, "phx_reply", MaxDiscardedReplies);
if (resp.event != "phx_reply" or resp.payload.get_string_or_empty("status") != "ok" or not resp.ref or *resp.ref != "1") if (resp.event != "phx_reply" or resp.payload.get_string_or_empty("status") != "ok" or not resp.ref or *resp.ref != "1")
throw BitcoinityError("Server responded to message 1 \"" + throw BitcoinityError("Server responded to message 1 \"" +
std::string{resp.event} + "\" with status \"" + std::string{resp.event} + "\" with status \"" +
@ -93,7 +106,7 @@ namespace {
{ {
ws.write(R"({"topic":"webs:markets","event":"phx_join","payload":{},"ref":"2"})"); ws.write(R"({"topic":"webs:markets","event":"phx_join","payload":{},"ref":"2"})");
auto resp = to_bitcoinity_message(ws.read()); auto resp = wait_for_event(ws, "phx_reply", MaxDiscardedReplies);
if (resp.event != "phx_reply" or resp.payload.get_string_or_empty("status") != "ok" or not resp.ref or *resp.ref != "2") if (resp.event != "phx_reply" or resp.payload.get_string_or_empty("status") != "ok" or not resp.ref or *resp.ref != "2")
throw BitcoinityError("Server responded to message 2 \"" + throw BitcoinityError("Server responded to message 2 \"" +
std::string{resp.event} + "\" with status \"" + std::string{resp.event} + "\" with status \"" +
@ -106,7 +119,7 @@ namespace {
R"({"topic":"webs:markets_)" + std::string{exchange} + "_" + R"({"topic":"webs:markets_)" + std::string{exchange} + "_" +
std::string{currency} + R"(","event":"phx_join","payload":{},"ref":"3"})" std::string{currency} + R"(","event":"phx_join","payload":{},"ref":"3"})"
); );
auto resp = to_bitcoinity_message(ws.read()); auto resp = wait_for_event(ws, "phx_reply", MaxDiscardedReplies);
if (resp.event != "phx_reply" or resp.payload.get_string_or_empty("status") != "ok" or not resp.ref or *resp.ref != "3") if (resp.event != "phx_reply" or resp.payload.get_string_or_empty("status") != "ok" or not resp.ref or *resp.ref != "3")
throw BitcoinityError("Server responded to message 3 \"" + throw BitcoinityError("Server responded to message 3 \"" +
std::string{resp.event} + "\" with status \"" + std::string{resp.event} + "\" with status \"" +
@ -146,9 +159,9 @@ const std::string& BitcoinityMissingKeyError::json() const noexcept {
BitcoinityMessage::BitcoinityMessage ( BitcoinityMessage::BitcoinityMessage (
BitcoinityPayload&& payload_p, BitcoinityPayload&& payload_p,
std::optional<std::string_view> ref_p, std::optional<std::string> ref_p,
std::string_view event_p, std::string event_p,
std::string_view topic_p std::string topic_p
) : ) :
payload(std::move(payload_p)), payload(std::move(payload_p)),
ref(std::move(ref_p)), ref(std::move(ref_p)),
@ -168,6 +181,26 @@ BitcoinityReader::BitcoinityReader(const std::string& user_agent, std::string_vi
} }
BitcoinityMessage BitcoinityReader::read() { BitcoinityMessage BitcoinityReader::read() {
return to_bitcoinity_message(WebsocketReader::read()); do {
auto response = to_bitcoinity_message(WebsocketReader::read());
if (response.event == "ping") {
//received something like
// {"event":"ping","payload":{"token":"wassup"},"ref":null,"topic":"all"}
std::string reply = R"({"event":"pong","payload":{"token":")";
reply += response.payload.get_string_or_empty("token");
reply += R"("},"ref":)";
if (response.ref)
reply += "\"" + std::string{*response.ref} + "\"";
else
reply += "null";
reply += R"(,"topic":")";
reply += response.topic;
reply += R"("})";
WebsocketReader::write(reply);
}
else {
return response;
}
} while(true);
} }
} //namespace duck } //namespace duck

View file

@ -43,17 +43,18 @@ private:
}; };
struct BitcoinityMessage { struct BitcoinityMessage {
BitcoinityMessage() = default;
BitcoinityMessage ( BitcoinityMessage (
BitcoinityPayload&& payload_p, BitcoinityPayload&& payload_p,
std::optional<std::string_view> ref_p, std::optional<std::string> ref_p,
std::string_view event_p, std::string event_p,
std::string_view topic_p std::string topic_p
); );
BitcoinityPayload payload; BitcoinityPayload payload;
std::optional<std::string_view> ref; std::optional<std::string> ref;
std::string_view event; std::string event;
std::string_view topic; std::string topic;
}; };
class BitcoinityReader : private WebsocketReader { class BitcoinityReader : private WebsocketReader {