From d80908da4e3e7d43484aa61d44fe9190ac9b3973 Mon Sep 17 00:00:00 2001 From: King_DuckZ Date: Sat, 14 May 2022 06:58:49 +0200 Subject: [PATCH] Better comunication with Bitcoinity's websocket I think I have to send 3 messages to the websocket to initialise it. In response, it seems to send the last ticker price reasonably quickly, compared to earlier when it was just waiting for the next available transaction before notifying me. --- docs/sample_communication.txt | 20 +++ .../sample_responses.json | 0 src/bitcoinity_payload.cpp | 45 +++++ src/bitcoinity_payload.hpp | 91 ++++++++++ src/bitcoinity_reader.cpp | 164 ++++++++++++++++++ src/bitcoinity_reader.hpp | 67 +++++++ src/main.cpp | 82 +++------ src/meson.build | 2 + src/websocket_reader.cpp | 22 ++- src/websocket_reader.hpp | 5 +- 10 files changed, 436 insertions(+), 62 deletions(-) create mode 100644 docs/sample_communication.txt rename sample_responses.json => docs/sample_responses.json (100%) create mode 100644 src/bitcoinity_payload.cpp create mode 100644 src/bitcoinity_payload.hpp create mode 100644 src/bitcoinity_reader.cpp create mode 100644 src/bitcoinity_reader.hpp diff --git a/docs/sample_communication.txt b/docs/sample_communication.txt new file mode 100644 index 0000000..edf5bfe --- /dev/null +++ b/docs/sample_communication.txt @@ -0,0 +1,20 @@ +SEND >> {"topic":"all","event":"phx_join","payload":{},"ref":"1"} +RECV << {"event":"phx_reply","payload":{"response":{},"status":"ok"},"ref":"1","topic":"all"} +SEND >> {"topic":"webs:markets","event":"phx_join","payload":{},"ref":"2"} +RECV << {"event":"phx_reply","payload":{"response":{},"status":"ok"},"ref":"2","topic":"webs:markets"} +SEND >> {"topic":"webs:markets_kraken_USD","event":"phx_join","payload":{},"ref":"3"} +RECV << {"event":"phx_reply","payload":{"response":{},"status":"ok"},"ref":"3","topic":"webs:markets_kraken_USD"} +RECV << {"event":"new_msg","payload":{"data":{"currency":"USD","exchange_name":"bitfinex","last":29420.94784483}},"ref":null,"topic":"webs:markets"} + + +--- + + +SEND >> {"topic":"all","event":"phx_join","payload":{},"ref":"1"} +RECV << {"event":"phx_reply","payload":{"response":{},"status":"ok"},"ref":"1","topic":"all"} +SEND >> {"topic":"webs:markets","event":"phx_join","payload":{},"ref":"2"} +RECV << {"event":"phx_reply","payload":{"response":{},"status":"ok"},"ref":"2","topic":"webs:markets"} +SEND >> {"topic":"webs:markets_kraken_USD","event":"phx_join","payload":{},"ref":"3"} +RECV << {"event":"phx_reply","payload":{"response":{},"status":"ok"},"ref":"3","topic":"webs:markets_kraken_USD"} +RECV << {"event":"new_msg","payload":{"data":{"connected_count":1355}},"ref":null,"topic":"webs:markets"} +RECV << {"event":"new_msg","payload":{"data":{"currency":"EUR","exchange_name":"paymium","last":28363.44}},"ref":null,"topic":"webs:markets"} diff --git a/sample_responses.json b/docs/sample_responses.json similarity index 100% rename from sample_responses.json rename to docs/sample_responses.json diff --git a/src/bitcoinity_payload.cpp b/src/bitcoinity_payload.cpp new file mode 100644 index 0000000..3999b0d --- /dev/null +++ b/src/bitcoinity_payload.cpp @@ -0,0 +1,45 @@ +/* Copyright 2022, Michele Santullo + * This file is part of duckticker. + * + * Wrenpp is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Wrenpp is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with duckticker. If not, see . + */ + +#include "bitcoinity_payload.hpp" + +namespace duck { +BitcoinityPayload::BitcoinityPayload( + simdjson::padded_string&& body, + simdjson::ondemand::parser&& parser +) : + m_body(std::move(body)), + m_parser(std::move(parser)), //moving parser invalidates the document + m_document(m_parser.iterate(m_body)), + m_payload_node(m_document[JsonKeyPayload]) +{ +} + +BitcoinityPayload::BitcoinityPayload(BitcoinityPayload&& other) : + BitcoinityPayload(std::move(other.m_body), std::move(other.m_parser)) +{ +} + +BitcoinityPayload& BitcoinityPayload::operator= (BitcoinityPayload&& other) { + m_body = std::move(other.m_body); + m_parser = std::move(other.m_parser); + m_document = m_parser.iterate(m_body); + m_payload_node = m_document[JsonKeyPayload]; + return *this; +} + +} //namespace duck diff --git a/src/bitcoinity_payload.hpp b/src/bitcoinity_payload.hpp new file mode 100644 index 0000000..bb44ae2 --- /dev/null +++ b/src/bitcoinity_payload.hpp @@ -0,0 +1,91 @@ +/* Copyright 2022, Michele Santullo + * This file is part of duckticker. + * + * Wrenpp is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Wrenpp is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with duckticker. If not, see . + */ + +#pragma once + +#include +#include + +namespace duck { +namespace detail { + template + inline std::optional get_from_node(simdjson::ondemand::object& node, Key&& key, Keys&&... keys) { + auto item_result = node[key]; + if (item_result.error()) + return {}; + else { + if constexpr (sizeof...(Keys) > 0) { + simdjson::ondemand::object item = item_result; + return get_from_node(item, std::forward(keys)...); + } + else { + return static_cast(item_result); + } + } + } +} //namespace detail + +class BitcoinityPayload { +public: + static constexpr std::string_view JsonKeyPayload = "payload"; + + BitcoinityPayload( + simdjson::padded_string&& body_p, + simdjson::ondemand::parser&& parser_p + ); + + BitcoinityPayload(BitcoinityPayload&& other); + + BitcoinityPayload& operator= (BitcoinityPayload&& other); + + template + std::optional get (Keys&&... keys) const { + return detail::get_from_node(m_payload_node, std::forward(keys)...); + } + + template + std::optional get_string (Keys&&... keys) const { + return this->get(std::forward(keys)...); + } + + template + std::optional get_double (Keys&&... keys) const { + return this->get(std::forward(keys)...); + } + + template + std::string_view get_string_or_empty (Keys&&... keys) const { + auto retval = this->get_string(std::forward(keys)...); + if (retval) + return *retval; + else + return {}; + } + + template + bool has_key (Keys&&... keys) const { + return !!this->get(std::forward(keys)...); + } + +private: + simdjson::padded_string m_body; + mutable simdjson::ondemand::parser m_parser; + mutable simdjson::ondemand::document m_document; + mutable simdjson::ondemand::object m_payload_node; +}; + +} //namespace duck diff --git a/src/bitcoinity_reader.cpp b/src/bitcoinity_reader.cpp new file mode 100644 index 0000000..46b9d7d --- /dev/null +++ b/src/bitcoinity_reader.cpp @@ -0,0 +1,164 @@ +/* Copyright 2022, Michele Santullo + * This file is part of duckticker. + * + * Wrenpp is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Wrenpp is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with duckticker. If not, see . + */ + +#include "bitcoinity_reader.hpp" +#include +#include + +namespace duck { +namespace { + constexpr std::string_view BitcoinityHostname = "bitcoinity.org"; + constexpr std::string_view BitcoinityPort = "80"; + constexpr std::string_view BitcoinityWSPath = "/webs_bridge/websocket"; + constexpr std::string_view JsonKeyPayload = BitcoinityPayload::JsonKeyPayload; + constexpr std::string_view JsonKeyRef = "ref"; + constexpr std::string_view JsonKeyEvent = "event"; + constexpr std::string_view JsonKeyTopic = "topic"; + + std::string force_to_string(const std::optional& data) { + if (data) + return std::string{*data}; + else + return {}; + } + + BitcoinityMessage to_bitcoinity_message (const std::string& message) { + if (message.empty()) + throw BitcoinityError("Unable to make a BitcoinityMessage from an empty message"); + + simdjson::ondemand::parser parser; + simdjson::padded_string body = message; + simdjson::ondemand::document doc = parser.iterate(body); + + auto event_result = doc[JsonKeyEvent]; + if (event_result.error()) + throw BitcoinityMissingKeyError({std::string(JsonKeyEvent)}, message); + std::string_view event = event_result; + + auto payload_result = doc[JsonKeyPayload]; + if (payload_result.error()) + throw BitcoinityMissingKeyError({std::string(JsonKeyPayload)}, message); + + auto ref_result = doc[JsonKeyRef]; + if (ref_result.error()) + throw BitcoinityMissingKeyError({std::string(JsonKeyRef)}, message); + std::string_view ref = (ref_result.is_null() ? std::string_view{} : ref_result.value()); + + auto topic_result = doc[JsonKeyTopic]; + if (topic_result.error()) + throw BitcoinityMissingKeyError({std::string(JsonKeyTopic)}, message); + std::string_view topic = topic_result; + + return BitcoinityMessage{ + BitcoinityPayload{std::move(body), std::move(parser)}, + std::move(ref), + std::move(event), + std::move(topic) + }; + } + + void init_bitcoinity (WebsocketReader& ws, std::string_view exchange, std::string_view currency) { + { + ws.write(R"({"topic":"all","event":"phx_join","payload":{},"ref":"1"})"); + auto resp = to_bitcoinity_message(ws.read()); + if (resp.event != "phx_reply" or resp.payload.get_string_or_empty("status") != "ok" or not resp.ref or *resp.ref != "1") + throw BitcoinityError("Server responded to message 1 \"" + + std::string{resp.event} + "\" with status \"" + + std::string{resp.payload.get_string_or_empty("status")} + + "\" and ref \"" + force_to_string(resp.ref) + "\""); + } + + { + ws.write(R"({"topic":"webs:markets","event":"phx_join","payload":{},"ref":"2"})"); + auto resp = to_bitcoinity_message(ws.read()); + if (resp.event != "phx_reply" or resp.payload.get_string_or_empty("status") != "ok" or not resp.ref or *resp.ref != "2") + throw BitcoinityError("Server responded to message 2 \"" + + std::string{resp.event} + "\" with status \"" + + std::string{resp.payload.get_string_or_empty("status")} + + "\" and ref \"" + force_to_string(resp.ref) + "\""); + } + + { + ws.write( + R"({"topic":"webs:markets_)" + std::string{exchange} + "_" + + std::string{currency} + R"(","event":"phx_join","payload":{},"ref":"3"})" + ); + auto resp = to_bitcoinity_message(ws.read()); + if (resp.event != "phx_reply" or resp.payload.get_string_or_empty("status") != "ok" or not resp.ref or *resp.ref != "3") + throw BitcoinityError("Server responded to message 3 \"" + + std::string{resp.event} + "\" with status \"" + + std::string{resp.payload.get_string_or_empty("status")} + + "\" and ref \"" + force_to_string(resp.ref) + "\""); + } + } + + std::string join_keys (const std::vector& keys) noexcept { + try { + std::string retval; + for (const auto& key : keys) { + retval += "/" + key; + } + + return retval; + } + catch (...) { + return {}; + } + } +} //unnamed namespace + +BitcoinityMissingKeyError::BitcoinityMissingKeyError(const std::vector& keys, const std::string& json) : + BitcoinityError("Missing key \"" + join_keys(keys) + "\" from input json:\n" + json), + m_keys(keys), + m_json(json) +{ } + +const std::vector& BitcoinityMissingKeyError::keys() const noexcept { + return m_keys; +} + +const std::string& BitcoinityMissingKeyError::json() const noexcept { + return m_json; +} + +BitcoinityMessage::BitcoinityMessage ( + BitcoinityPayload&& payload_p, + std::optional ref_p, + std::string_view event_p, + std::string_view topic_p +) : + payload(std::move(payload_p)), + ref(std::move(ref_p)), + event(std::move(event_p)), + topic(std::move(topic_p)) +{ +} + +BitcoinityReader::BitcoinityReader(const std::string& user_agent, std::string_view exchange, std::string_view currency) : + WebsocketReader(std::string{BitcoinityHostname}, + std::string{BitcoinityPort}, + std::string{BitcoinityWSPath}, + user_agent + ) +{ + init_bitcoinity(*this, exchange, currency); +} + +BitcoinityMessage BitcoinityReader::read() { + return to_bitcoinity_message(WebsocketReader::read()); +} +} //namespace duck diff --git a/src/bitcoinity_reader.hpp b/src/bitcoinity_reader.hpp new file mode 100644 index 0000000..170cdee --- /dev/null +++ b/src/bitcoinity_reader.hpp @@ -0,0 +1,67 @@ +/* Copyright 2022, Michele Santullo + * This file is part of duckticker. + * + * Wrenpp is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Wrenpp is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with duckticker. If not, see . + */ + +#pragma once + +#include "websocket_reader.hpp" +#include "bitcoinity_payload.hpp" +#include +#include +#include + +namespace duck { +class BitcoinityError : public std::runtime_error { +public: + BitcoinityError(const std::string& message) : std::runtime_error(message) {} +private: +}; + +class BitcoinityMissingKeyError : public BitcoinityError { +public: + BitcoinityMissingKeyError (const std::vector& keys, const std::string& json); + + const std::vector& keys() const noexcept; + const std::string& json() const noexcept; + +private: + std::vector m_keys; + std::string m_json; +}; + +struct BitcoinityMessage { + BitcoinityMessage ( + BitcoinityPayload&& payload_p, + std::optional ref_p, + std::string_view event_p, + std::string_view topic_p + ); + + BitcoinityPayload payload; + std::optional ref; + std::string_view event; + std::string_view topic; +}; + +class BitcoinityReader : private WebsocketReader { +public: + BitcoinityReader(const std::string& user_agent, std::string_view exchange, std::string_view currency); + ~BitcoinityReader() = default; + + BitcoinityMessage read(); +}; + +} //namespace duck diff --git a/src/main.cpp b/src/main.cpp index 97dbd39..038741a 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -15,8 +15,7 @@ * along with duckticker. If not, see . */ -#include "websocket_reader.hpp" -#include +#include "bitcoinity_reader.hpp" #include #include #include @@ -53,7 +52,6 @@ class App { var the_app = App.new("USD", "kraken") )script"; - class TickerPrice { public: TickerPrice() = default; @@ -80,67 +78,39 @@ private: TickerPrice ticker_price_bitoinity (std::string_view currency, std::string_view exchange) { static const TickerPrice error_price {0.0, "", ""}; - duck::WebsocketReader websocket{"bitcoinity.org", - "80", - "/webs_bridge/websocket", + duck::BitcoinityReader bitcoinity{ std::string{duck::WebsocketReader::BeastVersionString} + " websocket-client-coro", - R"({"topic":"webs:markets_)" + std::string{exchange} + "_" + - std::string{currency} + R"(","event":"phx_join","payload":{},"ref":"3"})" + exchange, + currency }; - simdjson::ondemand::parser parser; - { - simdjson::padded_string body = websocket.read(); - std::cout << body << '\n'; - if (body.size() == 0) - return error_price; - simdjson::ondemand::document doc = parser.iterate(body); + do { + //possible values I observed: + // {"event":"new_msg","payload":{"data":{"currency":"EUR","exchange_name":"paymium","last":28363.44}},"ref":null,"topic":"webs:markets"} + // {"event":"new_msg","payload":{"data":{"currency":"USD","depth":{"price":30071,"type":1,"volume":-1.4123},"exchange_name":"bitfinex"}},"ref":null,"topic":"webs:markets_bitfinex_USD"} + // {"event":"new_msg","payload":{"data":{"currency":"USD","exchange_name":"bitfinex","trade":{"amount":0.0486,"date":1652471682,"exchange_name":"bitfinex","price":30072}}},"ref":null,"topic":"webs:markets_bitfinex_USD"} - if (doc["event"].get_string().value() != "phx_reply" or doc["payload"]["status"].get_string().value() != "ok") - return error_price; - } - - { double price_out; - std::string currency_out; - std::string exchange_out; - do { - simdjson::padded_string body = websocket.read(); - std::cout << body << '\n'; - simdjson::ondemand::document doc = parser.iterate(body); - auto payload_result = doc["payload"]; - if (payload_result.error()) - return error_price; - simdjson::ondemand::object payload = payload_result; - auto data_result = payload["data"]; - if (data_result.error()) - return error_price; - simdjson::ondemand::object data = data_result; + std::optional price; + auto msg = bitcoinity.read(); - simdjson::error_code price_error; - if (not data["depth"].error()) { - price_error = data["depth"]["price"].get(price_out); - } - else if (not data["trade"].error()) { - price_error = data["trade"]["price"].get(price_out); - } - else if (not data["depth_shot"].error()) { - continue; - } + if (msg.payload.has_key("data", "depth_shot")) + continue; - std::string_view currency_out_view, exchange_out_view; - const auto currency_error = data["currency"].get(currency_out_view); - const auto exchange_error = data["exchange_name"].get(exchange_out_view); + if ((price = msg.payload.get_double("data", "last"))) + price_out = *price; + else if ((price = msg.payload.get_double("data", "depth", "price"))) + price_out = *price; + else if ((price = msg.payload.get_double("data", "trade", "price"))) + price_out = *price; + else + continue; //keep trying, sometimes I get a message like: + //{"event":"new_msg","payload":{"data":{"connected_count":1355}},"ref":null,"topic":"webs:markets"} - if (price_error or currency_error or exchange_error) - return error_price; - - currency_out = currency_out_view; - exchange_out = exchange_out_view; - } while (false); - - return {price_out, std::move(currency_out), std::move(exchange_out)}; - } + std::string_view currency_out = msg.payload.get_string_or_empty("data", "currency"); + std::string_view exchange_out = msg.payload.get_string_or_empty("data", "exchange_name"); + return {price_out, std::string(currency_out), std::string(exchange_out)}; + } while (true); } void ticker_price_bitcoinity_wren (wren::VM& vm) { diff --git a/src/meson.build b/src/meson.build index 67a8cd4..22c9996 100644 --- a/src/meson.build +++ b/src/meson.build @@ -24,6 +24,8 @@ executable(meson.project_name(), 'nap/page_fetch.cpp', 'nap/quick_rest.cpp', 'websocket_reader.cpp', + 'bitcoinity_reader.cpp', + 'bitcoinity_payload.cpp', install: true, dependencies: [ curlcpp_dep, diff --git a/src/websocket_reader.cpp b/src/websocket_reader.cpp index 1c7e078..5307f32 100644 --- a/src/websocket_reader.cpp +++ b/src/websocket_reader.cpp @@ -15,9 +15,14 @@ * along with duckticker. If not, see . */ +#define QUIET_SEND_RECV + #include "websocket_reader.hpp" #include #include +#if !defined(QUIET_SEND_RECV) +# include +#endif namespace duck { const std::string_view WebsocketReader::BeastVersionString = BOOST_BEAST_VERSION_STRING; @@ -38,8 +43,7 @@ struct WebsocketReader::LocalData { WebsocketReader::WebsocketReader (std::string host, const std::string& port, const std::string& path, - const std::string& user_agent, - const std::string& message + const std::string& user_agent ) : m_local(std::make_unique()) { @@ -54,13 +58,23 @@ WebsocketReader::WebsocketReader (std::string host, } )); m_local->ws.handshake(host, path); - m_local->ws.write(boost::asio::buffer(message)); } std::string WebsocketReader::read() { boost::beast::flat_buffer buffer; m_local->ws.read(buffer); - return boost::beast::buffers_to_string(buffer.data()); + std::string retval = boost::beast::buffers_to_string(buffer.data()); +#if !defined(QUIET_SEND_RECV) + std::cout << "RECV << " << retval << '\n'; +#endif + return retval; +} + +void WebsocketReader::write(const std::string_view& message) { +#if !defined(QUIET_SEND_RECV) + std::cout << "SEND >> " << message << '\n'; +#endif + m_local->ws.write(boost::asio::buffer(message)); } WebsocketReader::~WebsocketReader() { diff --git a/src/websocket_reader.hpp b/src/websocket_reader.hpp index d18e984..656551a 100644 --- a/src/websocket_reader.hpp +++ b/src/websocket_reader.hpp @@ -19,6 +19,7 @@ #include #include +#include #include namespace duck { @@ -29,13 +30,13 @@ public: WebsocketReader (std::string host, const std::string& port, const std::string& path, - const std::string& user_agent, - const std::string& message + const std::string& user_agent ); ~WebsocketReader(); std::string read(); + void write(const std::string_view& message); private: struct LocalData;