Better comunication with Bitcoinity's websocket

I think I have to send 3 messages to the websocket
to initialise it. In response, it seems to send the last
ticker price reasonably quickly, compared to earlier
when it was just waiting for the next available transaction
before notifying me.
This commit is contained in:
King_DuckZ 2022-05-14 06:58:49 +02:00
parent 2016f19c3d
commit d80908da4e
10 changed files with 436 additions and 62 deletions

View file

@ -0,0 +1,20 @@
SEND >> {"topic":"all","event":"phx_join","payload":{},"ref":"1"}
RECV << {"event":"phx_reply","payload":{"response":{},"status":"ok"},"ref":"1","topic":"all"}
SEND >> {"topic":"webs:markets","event":"phx_join","payload":{},"ref":"2"}
RECV << {"event":"phx_reply","payload":{"response":{},"status":"ok"},"ref":"2","topic":"webs:markets"}
SEND >> {"topic":"webs:markets_kraken_USD","event":"phx_join","payload":{},"ref":"3"}
RECV << {"event":"phx_reply","payload":{"response":{},"status":"ok"},"ref":"3","topic":"webs:markets_kraken_USD"}
RECV << {"event":"new_msg","payload":{"data":{"currency":"USD","exchange_name":"bitfinex","last":29420.94784483}},"ref":null,"topic":"webs:markets"}
---
SEND >> {"topic":"all","event":"phx_join","payload":{},"ref":"1"}
RECV << {"event":"phx_reply","payload":{"response":{},"status":"ok"},"ref":"1","topic":"all"}
SEND >> {"topic":"webs:markets","event":"phx_join","payload":{},"ref":"2"}
RECV << {"event":"phx_reply","payload":{"response":{},"status":"ok"},"ref":"2","topic":"webs:markets"}
SEND >> {"topic":"webs:markets_kraken_USD","event":"phx_join","payload":{},"ref":"3"}
RECV << {"event":"phx_reply","payload":{"response":{},"status":"ok"},"ref":"3","topic":"webs:markets_kraken_USD"}
RECV << {"event":"new_msg","payload":{"data":{"connected_count":1355}},"ref":null,"topic":"webs:markets"}
RECV << {"event":"new_msg","payload":{"data":{"currency":"EUR","exchange_name":"paymium","last":28363.44}},"ref":null,"topic":"webs:markets"}

View file

@ -0,0 +1,45 @@
/* Copyright 2022, Michele Santullo
* This file is part of duckticker.
*
* Wrenpp is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Wrenpp is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with duckticker. If not, see <http://www.gnu.org/licenses/>.
*/
#include "bitcoinity_payload.hpp"
namespace duck {
BitcoinityPayload::BitcoinityPayload(
simdjson::padded_string&& body,
simdjson::ondemand::parser&& parser
) :
m_body(std::move(body)),
m_parser(std::move(parser)), //moving parser invalidates the document
m_document(m_parser.iterate(m_body)),
m_payload_node(m_document[JsonKeyPayload])
{
}
BitcoinityPayload::BitcoinityPayload(BitcoinityPayload&& other) :
BitcoinityPayload(std::move(other.m_body), std::move(other.m_parser))
{
}
BitcoinityPayload& BitcoinityPayload::operator= (BitcoinityPayload&& other) {
m_body = std::move(other.m_body);
m_parser = std::move(other.m_parser);
m_document = m_parser.iterate(m_body);
m_payload_node = m_document[JsonKeyPayload];
return *this;
}
} //namespace duck

View file

@ -0,0 +1,91 @@
/* Copyright 2022, Michele Santullo
* This file is part of duckticker.
*
* Wrenpp is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Wrenpp is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with duckticker. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <simdjson.h>
#include <string_view>
namespace duck {
namespace detail {
template <typename As, typename Key, typename... Keys>
inline std::optional<As> get_from_node(simdjson::ondemand::object& node, Key&& key, Keys&&... keys) {
auto item_result = node[key];
if (item_result.error())
return {};
else {
if constexpr (sizeof...(Keys) > 0) {
simdjson::ondemand::object item = item_result;
return get_from_node<As>(item, std::forward<Keys>(keys)...);
}
else {
return static_cast<As>(item_result);
}
}
}
} //namespace detail
class BitcoinityPayload {
public:
static constexpr std::string_view JsonKeyPayload = "payload";
BitcoinityPayload(
simdjson::padded_string&& body_p,
simdjson::ondemand::parser&& parser_p
);
BitcoinityPayload(BitcoinityPayload&& other);
BitcoinityPayload& operator= (BitcoinityPayload&& other);
template <typename As, typename... Keys>
std::optional<As> get (Keys&&... keys) const {
return detail::get_from_node<As>(m_payload_node, std::forward<Keys>(keys)...);
}
template <typename... Keys>
std::optional<std::string_view> get_string (Keys&&... keys) const {
return this->get<std::string_view>(std::forward<Keys>(keys)...);
}
template <typename... Keys>
std::optional<double> get_double (Keys&&... keys) const {
return this->get<double>(std::forward<Keys>(keys)...);
}
template <typename... Keys>
std::string_view get_string_or_empty (Keys&&... keys) const {
auto retval = this->get_string(std::forward<Keys>(keys)...);
if (retval)
return *retval;
else
return {};
}
template <typename... Keys>
bool has_key (Keys&&... keys) const {
return !!this->get<simdjson::ondemand::object>(std::forward<Keys>(keys)...);
}
private:
simdjson::padded_string m_body;
mutable simdjson::ondemand::parser m_parser;
mutable simdjson::ondemand::document m_document;
mutable simdjson::ondemand::object m_payload_node;
};
} //namespace duck

164
src/bitcoinity_reader.cpp Normal file
View file

@ -0,0 +1,164 @@
/* Copyright 2022, Michele Santullo
* This file is part of duckticker.
*
* Wrenpp is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Wrenpp is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with duckticker. If not, see <http://www.gnu.org/licenses/>.
*/
#include "bitcoinity_reader.hpp"
#include <string_view>
#include <cassert>
namespace duck {
namespace {
constexpr std::string_view BitcoinityHostname = "bitcoinity.org";
constexpr std::string_view BitcoinityPort = "80";
constexpr std::string_view BitcoinityWSPath = "/webs_bridge/websocket";
constexpr std::string_view JsonKeyPayload = BitcoinityPayload::JsonKeyPayload;
constexpr std::string_view JsonKeyRef = "ref";
constexpr std::string_view JsonKeyEvent = "event";
constexpr std::string_view JsonKeyTopic = "topic";
std::string force_to_string(const std::optional<std::string_view>& data) {
if (data)
return std::string{*data};
else
return {};
}
BitcoinityMessage to_bitcoinity_message (const std::string& message) {
if (message.empty())
throw BitcoinityError("Unable to make a BitcoinityMessage from an empty message");
simdjson::ondemand::parser parser;
simdjson::padded_string body = message;
simdjson::ondemand::document doc = parser.iterate(body);
auto event_result = doc[JsonKeyEvent];
if (event_result.error())
throw BitcoinityMissingKeyError({std::string(JsonKeyEvent)}, message);
std::string_view event = event_result;
auto payload_result = doc[JsonKeyPayload];
if (payload_result.error())
throw BitcoinityMissingKeyError({std::string(JsonKeyPayload)}, message);
auto ref_result = doc[JsonKeyRef];
if (ref_result.error())
throw BitcoinityMissingKeyError({std::string(JsonKeyRef)}, message);
std::string_view ref = (ref_result.is_null() ? std::string_view{} : ref_result.value());
auto topic_result = doc[JsonKeyTopic];
if (topic_result.error())
throw BitcoinityMissingKeyError({std::string(JsonKeyTopic)}, message);
std::string_view topic = topic_result;
return BitcoinityMessage{
BitcoinityPayload{std::move(body), std::move(parser)},
std::move(ref),
std::move(event),
std::move(topic)
};
}
void init_bitcoinity (WebsocketReader& ws, std::string_view exchange, std::string_view currency) {
{
ws.write(R"({"topic":"all","event":"phx_join","payload":{},"ref":"1"})");
auto resp = to_bitcoinity_message(ws.read());
if (resp.event != "phx_reply" or resp.payload.get_string_or_empty("status") != "ok" or not resp.ref or *resp.ref != "1")
throw BitcoinityError("Server responded to message 1 \"" +
std::string{resp.event} + "\" with status \"" +
std::string{resp.payload.get_string_or_empty("status")} +
"\" and ref \"" + force_to_string(resp.ref) + "\"");
}
{
ws.write(R"({"topic":"webs:markets","event":"phx_join","payload":{},"ref":"2"})");
auto resp = to_bitcoinity_message(ws.read());
if (resp.event != "phx_reply" or resp.payload.get_string_or_empty("status") != "ok" or not resp.ref or *resp.ref != "2")
throw BitcoinityError("Server responded to message 2 \"" +
std::string{resp.event} + "\" with status \"" +
std::string{resp.payload.get_string_or_empty("status")} +
"\" and ref \"" + force_to_string(resp.ref) + "\"");
}
{
ws.write(
R"({"topic":"webs:markets_)" + std::string{exchange} + "_" +
std::string{currency} + R"(","event":"phx_join","payload":{},"ref":"3"})"
);
auto resp = to_bitcoinity_message(ws.read());
if (resp.event != "phx_reply" or resp.payload.get_string_or_empty("status") != "ok" or not resp.ref or *resp.ref != "3")
throw BitcoinityError("Server responded to message 3 \"" +
std::string{resp.event} + "\" with status \"" +
std::string{resp.payload.get_string_or_empty("status")} +
"\" and ref \"" + force_to_string(resp.ref) + "\"");
}
}
std::string join_keys (const std::vector<std::string>& keys) noexcept {
try {
std::string retval;
for (const auto& key : keys) {
retval += "/" + key;
}
return retval;
}
catch (...) {
return {};
}
}
} //unnamed namespace
BitcoinityMissingKeyError::BitcoinityMissingKeyError(const std::vector<std::string>& keys, const std::string& json) :
BitcoinityError("Missing key \"" + join_keys(keys) + "\" from input json:\n" + json),
m_keys(keys),
m_json(json)
{ }
const std::vector<std::string>& BitcoinityMissingKeyError::keys() const noexcept {
return m_keys;
}
const std::string& BitcoinityMissingKeyError::json() const noexcept {
return m_json;
}
BitcoinityMessage::BitcoinityMessage (
BitcoinityPayload&& payload_p,
std::optional<std::string_view> ref_p,
std::string_view event_p,
std::string_view topic_p
) :
payload(std::move(payload_p)),
ref(std::move(ref_p)),
event(std::move(event_p)),
topic(std::move(topic_p))
{
}
BitcoinityReader::BitcoinityReader(const std::string& user_agent, std::string_view exchange, std::string_view currency) :
WebsocketReader(std::string{BitcoinityHostname},
std::string{BitcoinityPort},
std::string{BitcoinityWSPath},
user_agent
)
{
init_bitcoinity(*this, exchange, currency);
}
BitcoinityMessage BitcoinityReader::read() {
return to_bitcoinity_message(WebsocketReader::read());
}
} //namespace duck

67
src/bitcoinity_reader.hpp Normal file
View file

@ -0,0 +1,67 @@
/* Copyright 2022, Michele Santullo
* This file is part of duckticker.
*
* Wrenpp is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Wrenpp is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with duckticker. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "websocket_reader.hpp"
#include "bitcoinity_payload.hpp"
#include <stdexcept>
#include <vector>
#include <utility>
namespace duck {
class BitcoinityError : public std::runtime_error {
public:
BitcoinityError(const std::string& message) : std::runtime_error(message) {}
private:
};
class BitcoinityMissingKeyError : public BitcoinityError {
public:
BitcoinityMissingKeyError (const std::vector<std::string>& keys, const std::string& json);
const std::vector<std::string>& keys() const noexcept;
const std::string& json() const noexcept;
private:
std::vector<std::string> m_keys;
std::string m_json;
};
struct BitcoinityMessage {
BitcoinityMessage (
BitcoinityPayload&& payload_p,
std::optional<std::string_view> ref_p,
std::string_view event_p,
std::string_view topic_p
);
BitcoinityPayload payload;
std::optional<std::string_view> ref;
std::string_view event;
std::string_view topic;
};
class BitcoinityReader : private WebsocketReader {
public:
BitcoinityReader(const std::string& user_agent, std::string_view exchange, std::string_view currency);
~BitcoinityReader() = default;
BitcoinityMessage read();
};
} //namespace duck

View file

@ -15,8 +15,7 @@
* along with duckticker. If not, see <http://www.gnu.org/licenses/>. * along with duckticker. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "websocket_reader.hpp" #include "bitcoinity_reader.hpp"
#include <simdjson.h>
#include <wrenpp/vm_fun.hpp> #include <wrenpp/vm_fun.hpp>
#include <wrenpp/def_configuration.hpp> #include <wrenpp/def_configuration.hpp>
#include <wrenpp/callback_manager.hpp> #include <wrenpp/callback_manager.hpp>
@ -53,7 +52,6 @@ class App {
var the_app = App.new("USD", "kraken") var the_app = App.new("USD", "kraken")
)script"; )script";
class TickerPrice { class TickerPrice {
public: public:
TickerPrice() = default; TickerPrice() = default;
@ -80,67 +78,39 @@ private:
TickerPrice ticker_price_bitoinity (std::string_view currency, std::string_view exchange) { TickerPrice ticker_price_bitoinity (std::string_view currency, std::string_view exchange) {
static const TickerPrice error_price {0.0, "", ""}; static const TickerPrice error_price {0.0, "", ""};
duck::WebsocketReader websocket{"bitcoinity.org", duck::BitcoinityReader bitcoinity{
"80",
"/webs_bridge/websocket",
std::string{duck::WebsocketReader::BeastVersionString} + " websocket-client-coro", std::string{duck::WebsocketReader::BeastVersionString} + " websocket-client-coro",
R"({"topic":"webs:markets_)" + std::string{exchange} + "_" + exchange,
std::string{currency} + R"(","event":"phx_join","payload":{},"ref":"3"})" currency
}; };
simdjson::ondemand::parser parser; do {
{ //possible values I observed:
simdjson::padded_string body = websocket.read(); // {"event":"new_msg","payload":{"data":{"currency":"EUR","exchange_name":"paymium","last":28363.44}},"ref":null,"topic":"webs:markets"}
std::cout << body << '\n'; // {"event":"new_msg","payload":{"data":{"currency":"USD","depth":{"price":30071,"type":1,"volume":-1.4123},"exchange_name":"bitfinex"}},"ref":null,"topic":"webs:markets_bitfinex_USD"}
if (body.size() == 0) // {"event":"new_msg","payload":{"data":{"currency":"USD","exchange_name":"bitfinex","trade":{"amount":0.0486,"date":1652471682,"exchange_name":"bitfinex","price":30072}}},"ref":null,"topic":"webs:markets_bitfinex_USD"}
return error_price;
simdjson::ondemand::document doc = parser.iterate(body);
if (doc["event"].get_string().value() != "phx_reply" or doc["payload"]["status"].get_string().value() != "ok")
return error_price;
}
{
double price_out; double price_out;
std::string currency_out; std::optional<double> price;
std::string exchange_out; auto msg = bitcoinity.read();
do {
simdjson::padded_string body = websocket.read();
std::cout << body << '\n';
simdjson::ondemand::document doc = parser.iterate(body);
auto payload_result = doc["payload"];
if (payload_result.error())
return error_price;
simdjson::ondemand::object payload = payload_result;
auto data_result = payload["data"];
if (data_result.error())
return error_price;
simdjson::ondemand::object data = data_result;
simdjson::error_code price_error; if (msg.payload.has_key("data", "depth_shot"))
if (not data["depth"].error()) { continue;
price_error = data["depth"]["price"].get(price_out);
}
else if (not data["trade"].error()) {
price_error = data["trade"]["price"].get(price_out);
}
else if (not data["depth_shot"].error()) {
continue;
}
std::string_view currency_out_view, exchange_out_view; if ((price = msg.payload.get_double("data", "last")))
const auto currency_error = data["currency"].get(currency_out_view); price_out = *price;
const auto exchange_error = data["exchange_name"].get(exchange_out_view); else if ((price = msg.payload.get_double("data", "depth", "price")))
price_out = *price;
else if ((price = msg.payload.get_double("data", "trade", "price")))
price_out = *price;
else
continue; //keep trying, sometimes I get a message like:
//{"event":"new_msg","payload":{"data":{"connected_count":1355}},"ref":null,"topic":"webs:markets"}
if (price_error or currency_error or exchange_error) std::string_view currency_out = msg.payload.get_string_or_empty("data", "currency");
return error_price; std::string_view exchange_out = msg.payload.get_string_or_empty("data", "exchange_name");
return {price_out, std::string(currency_out), std::string(exchange_out)};
currency_out = currency_out_view; } while (true);
exchange_out = exchange_out_view;
} while (false);
return {price_out, std::move(currency_out), std::move(exchange_out)};
}
} }
void ticker_price_bitcoinity_wren (wren::VM& vm) { void ticker_price_bitcoinity_wren (wren::VM& vm) {

View file

@ -24,6 +24,8 @@ executable(meson.project_name(),
'nap/page_fetch.cpp', 'nap/page_fetch.cpp',
'nap/quick_rest.cpp', 'nap/quick_rest.cpp',
'websocket_reader.cpp', 'websocket_reader.cpp',
'bitcoinity_reader.cpp',
'bitcoinity_payload.cpp',
install: true, install: true,
dependencies: [ dependencies: [
curlcpp_dep, curlcpp_dep,

View file

@ -15,9 +15,14 @@
* along with duckticker. If not, see <http://www.gnu.org/licenses/>. * along with duckticker. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define QUIET_SEND_RECV
#include "websocket_reader.hpp" #include "websocket_reader.hpp"
#include <boost/beast.hpp> #include <boost/beast.hpp>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#if !defined(QUIET_SEND_RECV)
# include <iostream>
#endif
namespace duck { namespace duck {
const std::string_view WebsocketReader::BeastVersionString = BOOST_BEAST_VERSION_STRING; const std::string_view WebsocketReader::BeastVersionString = BOOST_BEAST_VERSION_STRING;
@ -38,8 +43,7 @@ struct WebsocketReader::LocalData {
WebsocketReader::WebsocketReader (std::string host, WebsocketReader::WebsocketReader (std::string host,
const std::string& port, const std::string& port,
const std::string& path, const std::string& path,
const std::string& user_agent, const std::string& user_agent
const std::string& message
) : ) :
m_local(std::make_unique<LocalData>()) m_local(std::make_unique<LocalData>())
{ {
@ -54,13 +58,23 @@ WebsocketReader::WebsocketReader (std::string host,
} }
)); ));
m_local->ws.handshake(host, path); m_local->ws.handshake(host, path);
m_local->ws.write(boost::asio::buffer(message));
} }
std::string WebsocketReader::read() { std::string WebsocketReader::read() {
boost::beast::flat_buffer buffer; boost::beast::flat_buffer buffer;
m_local->ws.read(buffer); m_local->ws.read(buffer);
return boost::beast::buffers_to_string(buffer.data()); std::string retval = boost::beast::buffers_to_string(buffer.data());
#if !defined(QUIET_SEND_RECV)
std::cout << "RECV << " << retval << '\n';
#endif
return retval;
}
void WebsocketReader::write(const std::string_view& message) {
#if !defined(QUIET_SEND_RECV)
std::cout << "SEND >> " << message << '\n';
#endif
m_local->ws.write(boost::asio::buffer(message));
} }
WebsocketReader::~WebsocketReader() { WebsocketReader::~WebsocketReader() {

View file

@ -19,6 +19,7 @@
#include <string> #include <string>
#include <string_view> #include <string_view>
#include <optional>
#include <memory> #include <memory>
namespace duck { namespace duck {
@ -29,13 +30,13 @@ public:
WebsocketReader (std::string host, WebsocketReader (std::string host,
const std::string& port, const std::string& port,
const std::string& path, const std::string& path,
const std::string& user_agent, const std::string& user_agent
const std::string& message
); );
~WebsocketReader(); ~WebsocketReader();
std::string read(); std::string read();
void write(const std::string_view& message);
private: private:
struct LocalData; struct LocalData;