Send heartbeat to bitcoinity to keep the connection alive

Thanks to blueagain for figuring this out.
This allows for long queries to bitcoinity to not fail
when the requested exchange/currency pair is not received
quickly enough
This commit is contained in:
King_DuckZ 2022-05-15 01:31:32 +02:00
parent 5fd772b302
commit 5aef83f4e8
3 changed files with 61 additions and 3 deletions

View File

@ -0,0 +1,32 @@
// Copyright 2022, blueagain
// https://github.com/terrablue
import WebSocket from "ws";
const ws = new WebSocket("ws://bitcoinity.org/webs_bridge/websocket");
let ref = 2;
const send = (ws, data) => {
ws.send(JSON.stringify({...data, payload: {}, ref}));
ref++;
}
const join = (ws, topic) =>
send(ws, {topic, event: "phx_join"});
const currencies = ["USD"];
ws.on("open", () => {
ws.on("message", message => {
const parsed_message = JSON.parse(message);
if (currencies.includes(parsed_message.payload.data?.currency)) {
const {currency, exchange_name, last} = parsed_message.payload.data;
console.log(`currency: ${currency}, exchange: ${exchange_name}, last: ${last}`);
}
});
join(ws, "webs:markets");
join(ws, "webs:bitfinex_USD");
setInterval(() => {
send(ws, {topic: "phoenix", event: "heartbeat"})
}, 30000)
})

View File

@ -28,6 +28,7 @@ namespace {
constexpr std::string_view JsonKeyRef = "ref";
constexpr std::string_view JsonKeyEvent = "event";
constexpr std::string_view JsonKeyTopic = "topic";
constexpr float HeartbeatTimeout = 29.0f;
std::string force_to_string(const std::optional<std::string_view>& data) {
if (data)
@ -82,7 +83,7 @@ namespace {
return BitcoinityMessage{};
}
void init_bitcoinity (WebsocketReader& ws, std::string_view exchange, std::string_view currency) {
int init_bitcoinity (WebsocketReader& ws, std::string_view exchange, std::string_view currency) {
//thanks to blueagain from #javascript libera.chat for helping figuring
//out the websocket address and the correct sequence of messages to
//initiate the communication properly.
@ -126,6 +127,8 @@ namespace {
std::string{resp.payload.get_string_or_empty("status")} +
"\" and ref \"" + force_to_string(resp.ref) + "\"");
}
return 3;
}
std::string join_keys (const std::vector<std::string>& keys) noexcept {
@ -175,13 +178,17 @@ BitcoinityReader::BitcoinityReader(const std::string& user_agent, std::string_vi
std::string{BitcoinityPort},
std::string{BitcoinityWSPath},
user_agent
)
),
m_next_ref(1)
{
init_bitcoinity(*this, exchange, currency);
m_next_ref = init_bitcoinity(*this, exchange, currency) + 1;
m_last_heartbeat = std::chrono::steady_clock::now();
}
BitcoinityMessage BitcoinityReader::read() {
do {
send_heartbeat_ifn();
auto response = to_bitcoinity_message(WebsocketReader::read());
if (response.event == "ping") {
//received something like
@ -203,4 +210,16 @@ BitcoinityMessage BitcoinityReader::read() {
}
} while(true);
}
void BitcoinityReader::send_heartbeat_ifn() {
using std::chrono::duration;
using std::chrono::steady_clock;
if (duration<float>(steady_clock::now() - m_last_heartbeat).count() >= HeartbeatTimeout) {
std::string message = R"({"topic":"phoenix","event":"heartbeat","payload":{},"ref":")";
message += std::to_string(m_next_ref++) + R"("})";
m_last_heartbeat = std::chrono::steady_clock::now();
WebsocketReader::write(message);
}
}
} //namespace duck

View File

@ -22,6 +22,7 @@
#include <stdexcept>
#include <vector>
#include <utility>
#include <chrono>
namespace duck {
class BitcoinityError : public std::runtime_error {
@ -63,6 +64,12 @@ public:
~BitcoinityReader() = default;
BitcoinityMessage read();
private:
void send_heartbeat_ifn();
std::chrono::steady_clock::time_point m_last_heartbeat;
int m_next_ref;
};
} //namespace duck