...
 
Commits (3)
......@@ -3,7 +3,7 @@ workspace(name = "org_iota_entangled")
git_repository(
name = "rules_iota",
commit = "cf7ffa1d71b3bc751b6cd13166fa34625673fdfd",
remote = "https://gitlab.com/iota-foundation/software/rules_iota.git",
remote = "https://github.com/iotaledger/rules_iota.git",
)
android_sdk_repository(
......
......@@ -69,7 +69,8 @@ class IotaAPI {
virtual std::vector<std::string> findTransactions(
nonstd::optional<std::vector<std::string>> addresses,
nonstd::optional<std::vector<std::string>> bundles) = 0;
nonstd::optional<std::vector<std::string>> bundles,
nonstd::optional<std::vector<std::string>> approvees) = 0;
virtual NodeInfo getNodeInfo() = 0;
......
This diff is collapsed.
......@@ -32,7 +32,8 @@ class IotaJsonAPI : virtual public IotaAPI {
std::vector<std::string> findTransactions(
nonstd::optional<std::vector<std::string>> addresses,
nonstd::optional<std::vector<std::string>> bundles) override;
nonstd::optional<std::vector<std::string>> bundles,
nonstd::optional<std::vector<std::string>> approvees) override;
std::unordered_set<std::string> filterConsistentTails(
const std::vector<std::string>& tails) override;
......
......@@ -9,6 +9,7 @@
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <nlohmann/json.hpp>
#include <glog/logging.h>
using json = nlohmann::json;
......@@ -39,6 +40,8 @@ nonstd::optional<json> BeastIotaAPI::post(const json& input) {
req.body() = input.dump();
req.content_length(req.body().size());
VLOG(7) << __FUNCTION__ << " - req:\n" << req;
http::write(socket, req);
boost::beast::flat_buffer buffer;
http::response<http::string_body> res;
......@@ -46,6 +49,8 @@ nonstd::optional<json> BeastIotaAPI::post(const json& input) {
http::read(socket, buffer, res);
result = json::parse(res.body());
VLOG(7) << __FUNCTION__ << " - res:\n" << res;
socket.shutdown(tcp::socket::shutdown_both, ec);
if (ec && ec != boost::system::errc::not_connected)
......
......@@ -10,6 +10,7 @@ namespace cppclient {
struct GetTransactionsToApproveResponse {
std::string trunkTransaction;
std::string branchTransaction;
uint32_t duration;
};
struct GetNodeInfoResponse {
......
......@@ -6,6 +6,12 @@
#include <iota/tanglescope/common/zmqpub.hpp>
using namespace iota::tanglescope;
std::map<std::string, std::string> BlowballCollector::nameToDescHistogram = {
{BlowballCollector::TX_NUM_APPROVERS,
"Number of transactions approving a single transaction as observed "
"across multiple nodes"}};
bool BlowballCollector::parseConfiguration(const YAML::Node& conf) {
if (!PrometheusCollector::parseConfiguration(conf)) {
return false;
......@@ -35,7 +41,7 @@ void BlowballCollector::collect() {
auto registry = std::make_shared<Registry>();
exposer.RegisterCollectable(registry);
_families = buildHistogramsMap(registry, {});
histograms = buildHistogramsMap(registry, "blowball", {}, nameToDescHistogram);
analyzeBlowballsPeriodically();
refCountPublishedTransactions();
......@@ -68,7 +74,7 @@ void BlowballCollector::analyzeBlowballs(const std::vector<double>& buckets) {
auto now = std::chrono::system_clock::now();
for (const auto& it : refCountIt) {
_families.at(TX_NUM_APPROVERS).get().Add({}, buckets).Observe(it.second);
histograms.at(TX_NUM_APPROVERS).get().Add({}, buckets).Observe(it.second);
std::chrono::system_clock::time_point lastUpdateTime;
_txToLastUpdateTime.find(it.first, lastUpdateTime);
auto timeSinceLastUpdate =
......@@ -118,28 +124,3 @@ void BlowballCollector::refCountPublishedTransactions() {
},
[]() {});
}
using namespace prometheus;
PrometheusCollector::HistogramsMap BlowballCollector::buildHistogramsMap(
std::shared_ptr<Registry> registry,
const std::map<std::string, std::string>& labels) {
static std::map<std::string, std::string> nameToDesc = {
{TX_NUM_APPROVERS,
"Number of transactions approving a single transaction as observed "
"across multiple nodes"}};
std::map<std::string, std::reference_wrapper<Family<Histogram>>> families;
for (const auto& kv : nameToDesc) {
auto& curr_family = BuildHistogram()
.Name("blowball_collector" + kv.first)
.Help(kv.second)
.Labels(labels)
.Register(*registry);
families.insert(
std::make_pair(std::string(kv.first), std::ref(curr_family)));
}
return std::move(families);
}
\ No newline at end of file
......@@ -32,10 +32,6 @@ class BlowballCollector : public PrometheusCollector {
private:
// methods
HistogramsMap buildHistogramsMap(
std::shared_ptr<prometheus::Registry> registry,
const std::map<std::string, std::string>& labels);
void analyzeBlowballs(const std::vector<double>& buckets);
// Configuration
......@@ -43,7 +39,7 @@ class BlowballCollector : public PrometheusCollector {
uint32_t _snapshotInterval;
// Others
ZmqObservable _zmqObservable;
PrometheusCollector::HistogramsMap _families;
PrometheusCollector::HistogramsMap histograms;
// state
cuckoohash_map<std::string, uint8_t> _txToRefCount;
cuckoohash_map<std::string, std::chrono::system_clock::time_point>
......@@ -52,6 +48,8 @@ class BlowballCollector : public PrometheusCollector {
uint32_t _histogramRange;
uint32_t _bucketSize;
static std::map<std::string, std::string> nameToDescHistogram;
};
} // namespace tanglescope
......
......@@ -22,6 +22,21 @@ constexpr static auto DEPTH = 3;
using namespace iota::tanglescope;
std::map<std::string, std::string> EchoCatcher::nameToDescHistogram = {
{"time_elapsed_received",
"#Milliseconds it took for tx to travel back to transaction's "
"original source(\"listen_node\") [each interval is " +
std::to_string(BUCKET_WIDTH) + " milliseconds]"},
{"time_elapsed_arrived",
"#Milliseconds it took for tx to arrive to destination "
"(\"publish_node\") [each interval is " +
std::to_string(BUCKET_WIDTH) + " milliseconds]"},
{"time_elapsed_unseen_tx_published",
"#Milliseconds it took for tx to be published since the moment "
"client "
"first learned about it [each interval is " +
std::to_string(BUCKET_WIDTH) + " milliseconds]"}};
const std::string TX_TRYTES =
"99999999999999999999999999999999999999999999999999999999999999999999999999"
"99999999999999999999999999999999999999999999999999999999999999999999999999"
......@@ -222,8 +237,9 @@ void EchoCatcher::subscribeToTransactions(
std::string zmqURL, const EchoCatcher::ZmqObservable& zmqObservable,
std::shared_ptr<Registry> registry) {
std::atomic<bool> haveAllTXReturned = false;
auto families = buildHistogramsMap(
registry, {{"listen_node", _iriHost}, {"zmq_url", zmqURL}});
auto histograms = buildHistogramsMap(
registry, "echocatcher", {{"listen_node", _iriHost}, {"zmq_url", zmqURL}},
nameToDescHistogram);
std::vector<boost::future<void>> tasks;
std::vector<double> buckets(400);
......@@ -251,7 +267,7 @@ void EchoCatcher::subscribeToTransactions(
auto tx = std::static_pointer_cast<iri::TXMessage>(std::move(msg));
auto received = std::chrono::system_clock::now();
auto task =
handleUnseenTransactions(tx, received, families, buckets);
handleUnseenTransactions(tx, received, histograms, buckets);
tasks.push_back(std::move(task));
......@@ -266,12 +282,12 @@ void EchoCatcher::subscribeToTransactions(
tx->arrivalTime() - broadcastTime)
.count();
families.at("time_elapsed_received")
histograms.at("time_elapsed_received")
.get()
.Add({{"bundle_size", std::to_string(tx->lastIndex() + 1)}},
buckets)
.Observe(elapsedUntilReceived);
families.at("time_elapsed_arrived")
histograms.at("time_elapsed_arrived")
.get()
.Add({{"bundle_size", std::to_string(tx->lastIndex() + 1)}},
buckets)
......@@ -290,7 +306,7 @@ void EchoCatcher::subscribeToTransactions(
boost::future<void> EchoCatcher::handleUnseenTransactions(
std::shared_ptr<iri::TXMessage> tx,
std::chrono::time_point<std::chrono::system_clock> received,
HistogramsMap& families, const std::vector<double>& buckets) {
HistogramsMap& histograms, const std::vector<double>& buckets) {
static std::atomic<std::chrono::time_point<std::chrono::system_clock>>
lastDiscoveryTime = received;
std::chrono::system_clock::time_point txTime;
......@@ -301,7 +317,7 @@ boost::future<void> EchoCatcher::handleUnseenTransactions(
.count();
_hashToDiscoveryTime.erase(tx->hash());
families.at("time_elapsed_unseen_tx_published")
histograms.at("time_elapsed_unseen_tx_published")
.get()
.Add({{"bundle_size", std::to_string(tx->lastIndex() + 1)}}, buckets)
.Observe(txArrivalLatency);
......@@ -320,37 +336,4 @@ boost::future<void> EchoCatcher::handleUnseenTransactions(
}
return {};
}
PrometheusCollector::HistogramsMap EchoCatcher::buildHistogramsMap(
std::shared_ptr<Registry> registry,
const std::map<std::string, std::string>& labels) {
static std::map<std::string, std::string> nameToDesc = {
{"time_elapsed_received",
"#Milliseconds it took for tx to travel back to transaction's "
"original source(\"listen_node\") [each interval is " +
std::to_string(BUCKET_WIDTH) + " milliseconds]"},
{"time_elapsed_arrived",
"#Milliseconds it took for tx to arrive to destination "
"(\"publish_node\") [each interval is " +
std::to_string(BUCKET_WIDTH) + " milliseconds]"},
{"time_elapsed_unseen_tx_published",
"#Milliseconds it took for tx to be published since the moment "
"client "
"first learned about it [each interval is " +
std::to_string(BUCKET_WIDTH) + " milliseconds]"}};
std::map<std::string, std::reference_wrapper<Family<Histogram>>> famillies;
for (const auto& kv : nameToDesc) {
auto& curr_family = BuildHistogram()
.Name("echocatcher_" + kv.first)
.Help(kv.second)
.Labels(labels)
.Register(*registry);
famillies.insert(
std::make_pair(std::string(kv.first), std::ref(curr_family)));
}
return std::move(famillies);
}
\ No newline at end of file
......@@ -47,14 +47,9 @@ class EchoCatcher : public PrometheusCollector {
boost::future<void> handleUnseenTransactions(
std::shared_ptr<iri::TXMessage> tx,
std::chrono::time_point<std::chrono::system_clock> received,
HistogramsMap& families, const std::vector<double>& buckets);
HistogramsMap& histograms, const std::vector<double>& buckets);
private:
// methods
HistogramsMap buildHistogramsMap(
std::shared_ptr<prometheus::Registry> registry,
const std::map<std::string, std::string>& labels);
const std::vector<double>& histogramBuckets() const;
virtual void subscribeToTransactions(
......@@ -78,6 +73,8 @@ class EchoCatcher : public PrometheusCollector {
mutable std::shared_mutex _milestoneMutex;
std::string _latestSolidMilestoneHash;
static std::map<std::string, std::string> nameToDescHistogram;
};
} // namespace tanglescope
......
......@@ -2,7 +2,7 @@
namespace iota {
namespace tanglescope {
bool PrometheusCollector::parseConfiguration(const YAML::Node &conf) {
bool PrometheusCollector::parseConfiguration(const YAML::Node& conf) {
if (!CollectorBase::parseConfiguration(conf)) {
return false;
}
......@@ -12,5 +12,68 @@ bool PrometheusCollector::parseConfiguration(const YAML::Node &conf) {
}
return false;
}
using namespace prometheus;
PrometheusCollector::HistogramsMap PrometheusCollector::buildHistogramsMap(
std::shared_ptr<Registry> registry, const std::string& metricName,
const std::map<std::string, std::string>& labels,
const std::map<std::string, std::string>& nameToDesc) {
std::map<std::string, std::reference_wrapper<Family<Histogram>>> families;
for (const auto& kv : nameToDesc) {
auto& curr_family = BuildHistogram()
.Name(metricName + std::string("_") + kv.first)
.Help(kv.second)
.Labels(labels)
.Register(*registry);
families.insert(
std::make_pair(std::string(kv.first), std::ref(curr_family)));
}
return families;
}
using namespace prometheus;
PrometheusCollector::CountersMap PrometheusCollector::buildCountersMap(
std::shared_ptr<Registry> registry, const std::string& metricName,
const std::map<std::string, std::string>& labels,
const std::map<std::string, std::string>& nameToDesc) {
std::map<std::string, std::reference_wrapper<Family<Counter>>> families;
for (const auto& kv : nameToDesc) {
auto& curr_family = BuildCounter()
.Name(metricName + std::string("_") + kv.first)
.Help(kv.second)
.Labels(labels)
.Register(*registry);
families.insert(
std::make_pair(std::string(kv.first), std::ref(curr_family)));
}
return families;
}
PrometheusCollector::GaugeMap PrometheusCollector::buildGaugeMap(
std::shared_ptr<Registry> registry, const std::string& metricName,
const std::map<std::string, std::string>& labels,
const std::map<std::string, std::string>& nameToDesc) {
std::map<std::string, std::reference_wrapper<Family<Gauge>>> families;
for (const auto& kv : nameToDesc) {
auto& curr_family = BuildGauge()
.Name(metricName + std::string("_") + kv.first)
.Help(kv.second)
.Labels(labels)
.Register(*registry);
families.insert(
std::make_pair(std::string(kv.first), std::ref(curr_family)));
}
return families;
}
} // namespace tanglescope
} // namespace iota
\ No newline at end of file
......@@ -24,6 +24,24 @@ class PrometheusCollector : public CollectorBase {
// methods
bool parseConfiguration(const YAML::Node& conf) override;
static HistogramsMap buildHistogramsMap(
std::shared_ptr<prometheus::Registry> registry,
const std::string& metricName,
const std::map<std::string, std::string>& labels,
const std::map<std::string, std::string>& nameToDesc);
static CountersMap buildCountersMap(
std::shared_ptr<prometheus::Registry> registry,
const std::string& metricName,
const std::map<std::string, std::string>& labels,
const std::map<std::string, std::string>& nameToDesc);
static GaugeMap buildGaugeMap(
std::shared_ptr<prometheus::Registry> registry,
const std::string& metricName,
const std::map<std::string, std::string>& labels,
const std::map<std::string, std::string>& nameToDesc);
protected:
std::string _prometheusExpURI;
};
......
cc_library(name='shared',
srcs=glob(['**/*.cpp'],
exclude=['tanglescope.cpp', 'tests/**/*.cpp']),
hdrs=glob(
['**/*.hpp'],
exclude=['tests/**/*.hpp']),
deps=[
'//tanglescope/echocatcher:echocatcher',
'//tanglescope/statscollector:statscollector',
'//tanglescope/blowballcollector:blowballcollector',
'@yaml_cpp//:yaml_cpp',
])
cc_library(
name = "shared",
srcs = glob(
["**/*.cpp"],
exclude = [
"tanglescope.cpp",
"tests/**/*.cpp",
],
),
hdrs = glob(
["**/*.hpp"],
exclude = ["tests/**/*.hpp"],
),
deps = [
"//tanglescope/blowballcollector",
"//tanglescope/echocatcher",
"//tanglescope/statscollector",
"//tanglescope/tipselectioncollector",
"@yaml_cpp",
],
)
cc_binary(name='tanglescope',
srcs=["runner.cpp"],
deps=[':shared'],
data = [":copy_configuration",],
cc_binary(
name = "tanglescope",
srcs = ["runner.cpp"],
data = [":copy_configuration"],
deps = [":shared"],
)
cc_test(name='tests',
srcs=glob(['tests/**/*.cpp', '**/*.hpp']),
deps=['@gtest//:gtest_main', '@gtest//:gtest',
':shared',],
timeout="short")
cc_test(
name = "tests",
timeout = "short",
srcs = glob([
"tests/**/*.cpp",
"**/*.hpp",
]),
deps = [
":shared",
"@gtest",
"@gtest//:gtest_main",
],
)
genrule(name = 'copy_configuration',
local = 1,
srcs = glob(['configuration.yaml']),
outs = ["default_configuration.yaml"],
cmd = "cp $(SRCS) $(@)",
output_to_bindir = 1)
genrule(
name = "copy_configuration",
srcs = glob(["configuration.yaml"]),
outs = ["default_configuration.yaml"],
cmd = "cp $(SRCS) $(@)",
local = 1,
output_to_bindir = 1,
)
......@@ -48,4 +48,23 @@ blowballcollector:
#histogram's bucket's size (refcount of tx approvers)
bucket_size: 1
#histogram's range (maximum value of refcount to report)
histogram_range: 40
\ No newline at end of file
histogram_range: 40
tipselectioncollector:
#url of iri node
iri_host : "iri01.testnet.iota.cafe"
iri_port: 14265
#IP / Port that the Prometheus Exposer binds to
prometheus_exposer_uri: "0.0.0.0:8083"
#depth to give to "getTransactionsToApprove"
depth: 10
#in - between samples interval[seconds]
sample_interval: 20
#of calls to getTransactionsToApprove in each sample
sample_size: 20
#histogram granularity configuration
#duration's histogram (time it took 'getTransactionsToApprove') bucket's size [ms]
duration_bucket_size: 20
#duration's histogram range (maximum time it took 'getTransactionsToApprove')
duration_histogram_range: 1000
......@@ -7,6 +7,7 @@
#include <iota/tanglescope/common/txauxiliary.hpp>
#include <iota/tanglescope/echocatcher/echocatcher.hpp>
#include <iota/tanglescope/statscollector.hpp>
#include <iota/tanglescope/tipselectioncollector/tipselectioncollector.hpp>
#include <list>
DEFINE_string(ConfigurationPath, "", "YAML's configuration file path");
......@@ -18,33 +19,47 @@ int main(int argc, char** argv) {
std::list<boost::future<void>> tasks;
// need to parse yaml file and get arguments
auto conf = YAML::LoadFile(FLAGS_ConfigurationPath.empty()
? "default_configuration.yaml"
: FLAGS_ConfigurationPath);
iota::tanglescope::EchoCatcher echoCatcher;
iota::tanglescope::statscollector::StatsCollector statsCollector;
iota::tanglescope::BlowballCollector blowballCollector;
if (echoCatcher.parseConfiguration(conf["echocatcher"])) {
auto task = boost::async(boost::launch::async,
[&echoCatcher]() { echoCatcher.collect(); });
tasks.push_back(std::move(task));
}
try {
auto conf = YAML::LoadFile(FLAGS_ConfigurationPath.empty()
? "default_configuration.yaml"
: FLAGS_ConfigurationPath);
iota::tanglescope::EchoCatcher echoCatcher;
iota::tanglescope::statscollector::StatsCollector statsCollector;
iota::tanglescope::BlowballCollector blowballCollector;
iota::tanglescope::TipSelectionCollector tipSelectionCollector;
if (statsCollector.parseConfiguration(conf["statscollector"])) {
auto task = boost::async(boost::launch::async,
[&statsCollector]() { statsCollector.collect(); });
tasks.push_back(std::move(task));
}
if (echoCatcher.parseConfiguration(conf["echocatcher"])) {
auto task = boost::async(boost::launch::async,
[&echoCatcher]() { echoCatcher.collect(); });
tasks.push_back(std::move(task));
}
if (blowballCollector.parseConfiguration(conf["blowballcollector"])) {
auto task = boost::async(boost::launch::async, [&blowballCollector]() {
blowballCollector.collect();
});
tasks.push_back(std::move(task));
}
if (statsCollector.parseConfiguration(conf["statscollector"])) {
auto task = boost::async(boost::launch::async, [&statsCollector]() {
statsCollector.collect();
});
tasks.push_back(std::move(task));
}
std::for_each(tasks.begin(), tasks.end(), [](auto& task) { task.wait(); });
if (blowballCollector.parseConfiguration(conf["blowballcollector"])) {
auto task = boost::async(boost::launch::async, [&blowballCollector]() {
blowballCollector.collect();
});
tasks.push_back(std::move(task));
}
if (tipSelectionCollector.parseConfiguration(
conf["tipselectioncollector"])) {
auto task = boost::async(
boost::launch::async,
[&tipSelectionCollector]() { tipSelectionCollector.collect(); });
tasks.push_back(std::move(task));
}
std::for_each(tasks.begin(), tasks.end(), [](auto& task) { task.wait(); });
} catch (const std::exception& e) {
LOG(ERROR) << __FUNCTION__ << " Exception: " << e.what();
}
return 0;
}
......@@ -22,6 +22,25 @@ namespace iota {
namespace tanglescope {
namespace statscollector {
std::map<std::string, std::string> ZMQCollectorImpl::nameToDescCounters = {
{"transactions_new", "New TXs count"},
{"transactions_reattached", "Reattached TXs count"},
{"transactions_confirmed", "confirmed TXs count"},
{"bundles_new", "new bundles count"},
{"bundles_confirmed", "confirmed bundles count"},
{"value_new", "new tx's accumulated value"},
{"value_confirmed", "confirmed tx's accumulated value"}};
std::map<std::string, std::string> ZMQCollectorImpl::nameToDescHistograms = {
{"bundle_confirmation_duration", "bundle's confirmation duration [ms]"}};
std::map<std::string, std::string> ZMQCollectorImpl::nameToDescGauges = {
{"to_process", "Number of transactions to process"},
{"to_broadcast", "Number of transactions to broadcast to other nodes"},
{"to_request", "Number of transactions to request from other nodes"},
{"to_reply", "Number of transactions to reply to nodes who requested"},
{"total_transactions", "Number of transactions stored in node"}};
bool StatsCollector::parseConfiguration(const YAML::Node& conf) {
if (!PrometheusCollector::parseConfiguration(conf)) {
return false;
......@@ -77,9 +96,9 @@ ZMQCollectorImpl::ZMQCollectorImpl(
if (useURLLable) {
lables.insert(std::make_pair("publish_node", zmqURL));
}
_counters = buildCountersMap(registry, lables);
_histograms = buildHistogramsMap(registry, lables);
_gauges = buildGaugeMap(registry, lables);
_counters = PrometheusCollector::buildCountersMap(registry, METRIC_PREFIX, lables, nameToDescCounters);
_histograms = PrometheusCollector::buildHistogramsMap(registry, METRIC_PREFIX, lables, nameToDescHistograms);
_gauges = PrometheusCollector::buildGaugeMap(registry, METRIC_PREFIX, lables, nameToDescGauges);
}
void ZMQCollectorImpl::collect(uint32_t bundleConfirmationHistogramRange,
......@@ -142,83 +161,6 @@ void ZMQCollectorImpl::collect(uint32_t bundleConfirmationHistogramRange,
[]() {});
}
using namespace prometheus;
PrometheusCollector::CountersMap ZMQCollectorImpl::buildCountersMap(
std::shared_ptr<Registry> registry,
const std::map<std::string, std::string>& labels) {
static std::map<std::string, std::string> nameToDesc = {
{"transactions_new", "New TXs count"},
{"transactions_reattached", "Reattached TXs count"},
{"transactions_confirmed", "confirmed TXs count"},
{"bundles_new", "new bundles count"},
{"bundles_confirmed", "confirmed bundles count"},
{"value_new", "new tx's accumulated value"},
{"value_confirmed", "confirmed tx's accumulated value"}};
std::map<std::string, std::reference_wrapper<Family<Counter>>> families;
for (const auto& kv : nameToDesc) {
auto& curr_family = BuildCounter()
.Name("statscollector_" + kv.first)
.Help(kv.second)
.Labels(labels)
.Register(*registry);
families.insert(
std::make_pair(std::string(kv.first), std::ref(curr_family)));
}
return std::move(families);
}
PrometheusCollector::HistogramsMap ZMQCollectorImpl::buildHistogramsMap(
std::shared_ptr<Registry> registry,
const std::map<std::string, std::string>& labels) {
static std::map<std::string, std::string> nameToDesc = {
{"bundle_confirmation_duration", "bundle's confirmation duration [ms]"}};
std::map<std::string, std::reference_wrapper<Family<Histogram>>> families;
for (const auto& kv : nameToDesc) {
auto& curr_family = BuildHistogram()
.Name("statscollector_" + kv.first)
.Help(kv.second)
.Labels(labels)
.Register(*registry);
families.insert(
std::make_pair(std::string(kv.first), std::ref(curr_family)));
}
return std::move(families);
}
PrometheusCollector::GaugeMap ZMQCollectorImpl::buildGaugeMap(
std::shared_ptr<Registry> registry,
const std::map<std::string, std::string>& labels) {
static std::map<std::string, std::string> nameToDesc = {
{"to_process", "Number of transactions to process"},
{"to_broadcast", "Number of transactions to broadcast to other nodes"},
{"to_request", "Number of transactions to request from other nodes"},
{"to_reply", "Number of transactions to reply to nodes who requested"},
{"total_transactions", "Number of transactions stored in node"}};
std::map<std::string, std::reference_wrapper<Family<Gauge>>> families;
for (const auto& kv : nameToDesc) {
auto& curr_family = BuildGauge()
.Name("statscollector_rstat_" + kv.first)
.Help(kv.second)
.Labels(labels)
.Register(*registry);
families.insert(
std::make_pair(std::string(kv.first), std::ref(curr_family)));
}
return std::move(families);
}
} // namespace statscollector
} // namespace tanglescope
} // namespace iota
......@@ -17,6 +17,8 @@ constexpr static auto BUNDLE_CONFIRMATION_HISTOGRAM_RANGE =
constexpr static auto BUNDLE_CONFIRMATION_BUCKET_SIZE =
"bundle_confirmation_bucket_size";
constexpr static auto METRIC_PREFIX = "statscollector";
class ZMQCollectorImpl {
public:
ZMQCollectorImpl(std::string zmqURL,
......@@ -26,22 +28,15 @@ class ZMQCollectorImpl {
uint32_t bundleConfirmationBucketSize);
private:
PrometheusCollector::CountersMap buildCountersMap(
std::shared_ptr<prometheus::Registry> registry,
const std::map<std::string, std::string>& labels);
PrometheusCollector::HistogramsMap buildHistogramsMap(
std::shared_ptr<prometheus::Registry> registry,
const std::map<std::string, std::string>& labels);
PrometheusCollector::GaugeMap buildGaugeMap(
std::shared_ptr<prometheus::Registry> registry,
const std::map<std::string, std::string>& labels);
std::string _zmqURL;
PrometheusCollector::CountersMap _counters;
PrometheusCollector::HistogramsMap _histograms;
PrometheusCollector::GaugeMap _gauges;
static std::map<std::string, std::string> nameToDescHistograms;
static std::map<std::string, std::string> nameToDescCounters;
static std::map<std::string, std::string> nameToDescGauges;
};
class StatsCollector : public PrometheusCollector {
public:
......
cc_library(
name = "shared",
srcs = glob(
["**/*.cpp"],
exclude = [
"tipselectioncollector.cpp",
"tests/**/*.cpp",
],
),
hdrs = glob(
["**/*.hpp"],
exclude = ["tests/**/*.hpp"],
),
include_prefix = "iota/tanglescope/tipselectioncollector",
deps = [
"//tanglescope/common",
"//tanglescope/prometheus_collector",
"@com_github_gflags_gflags//:gflags",
"@glog",
],
)
cc_library(
name = "tipselectioncollector",
srcs = ["tipselectioncollector.cpp"],
hdrs = glob(["tipselectioncollector.hpp"]),
include_prefix = "iota/tanglescope",
visibility = ["//visibility:public"],
deps = [
":shared",
"@prometheus_cpp",
],
)
#include <glog/logging.h>
#include <chrono>
#include <thread>
#include <unordered_map>
#include <vector>
#include <iota/tanglescope/tipselectioncollector/tipselectioncollector.hpp>
using namespace iota::tanglescope;
using namespace cppclient;
std::map<std::string, std::string> TipSelectionCollector::nameToDescHistogram =
{{TipSelectionCollector::NUM_TX_WAS_SELECTED,
"# of times tip/tx was selected"},
{TipSelectionCollector::NUM_TRUNK_EQ_BRANCH,
"# of times both selected tips were the same"},
{TipSelectionCollector::NUM_TX_WAS_NOT_A_TIP,
"# of times selected tx was not a tip"},
{TipSelectionCollector::TIP_SELECTION_DURATION,
"The duration_of_tip_selection"}};
bool TipSelectionCollector::parseConfiguration(const YAML::Node& conf) {
if (!PrometheusCollector::parseConfiguration(conf)) {
return false;
}
if (!conf[IRI_HOST] || !conf[IRI_PORT] || !conf[SAMPLE_INTERVAL] ||
!conf[SAMPLE_SIZE] || !conf[DEPTH] || !conf[DURATION_HISTOGRAM_RANGE] ||
!conf[DURATION_BUCKET_SIZE]) {
return false;
}
_iriHost = conf[IRI_HOST].as<std::string>();
_iriPort = conf[IRI_PORT].as<uint32_t>();
_sampleInterval = conf[SAMPLE_INTERVAL].as<uint32_t>();
_sampleSize = conf[SAMPLE_SIZE].as<uint16_t>();
_depth = conf[DEPTH].as<uint16_t>();
_durationBucketSize = conf[DURATION_BUCKET_SIZE].as<uint16_t>();
_durationBucketRange = conf[DURATION_HISTOGRAM_RANGE].as<uint16_t>();
return true;
}
void TipSelectionCollector::collect() {
using namespace prometheus;
VLOG(3) << __FUNCTION__;
std::vector<double> txSelectionCountBuckets(_sampleSize);
double currInterval = 0;
std::generate(txSelectionCountBuckets.begin(), txSelectionCountBuckets.end(),
[&currInterval]() { return currInterval++; });
_txSelectionCountBuckets = txSelectionCountBuckets;
std::vector<double> durationBuckets(_durationBucketRange /
_durationBucketSize);
currInterval = 0;
std::generate(durationBuckets.begin(), durationBuckets.end(),
[&currInterval, durationBucketSize = _durationBucketSize]() {
currInterval += durationBucketSize;
return currInterval;
});
_durationBuckets = durationBuckets;
Exposer exposer{_prometheusExpURI};
auto registry = std::make_shared<Registry>();
exposer.RegisterCollectable(registry);
_api = std::make_shared<cppclient::BeastIotaAPI>(_iriHost, _iriPort);
_histograms = std::move(
buildHistogramsMap(registry, "tipselection", {}, nameToDescHistogram));
queryTipSelectionPeriodically();
}
void TipSelectionCollector::queryTipSelectionPeriodically() {
auto pubThread = rxcpp::schedulers::make_current_thread();
auto pubWorker = pubThread.create_worker();
if (_sampleInterval > 0) {
pubWorker.schedule_periodically(pubThread.now(),
std::chrono::seconds(_sampleInterval),
[&](auto scbl) { queryTipSelection(); });
} else {
queryTipSelection();
}
}
void TipSelectionCollector::queryTipSelection() {
std::vector<boost::future<GetTransactionsToApproveResponse>> futures;
uint16_t branchEqTrunkCounter = 0;
uint16_t numSelectedTXWasNotATip = 0;
std::unordered_map<std::string, uint16_t> txToNumSelected;
std::unordered_map<std::string, uint64_t> txToTimeTipSelectionStrated;
try {
for (uint16_t i = 0; i < _sampleSize; ++i) {
auto fu = boost::async(boost::launch::async, [this] {
return _api->getTransactionsToApprove(_depth);
});
futures.push_back(std::move(fu));
}
for (uint16_t i = 0; i < _sampleSize; ++i) {
const auto& resp = futures[i].get();
++txToNumSelected[resp.trunkTransaction];
++txToNumSelected[resp.branchTransaction];
auto nowMilliseconds =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
auto timeTipSelectionRequested = nowMilliseconds - resp.duration;
txToTimeTipSelectionStrated[resp.trunkTransaction] =
timeTipSelectionRequested;
txToTimeTipSelectionStrated[resp.branchTransaction] =
timeTipSelectionRequested;
_histograms.at(TIP_SELECTION_DURATION)
.get()
.Add({}, _durationBuckets)
.Observe(resp.duration);
if (resp.trunkTransaction == resp.branchTransaction) {
++branchEqTrunkCounter;
}
}
auto& hist = _histograms.at(NUM_TX_WAS_SELECTED)
.get()
.Add({}, _txSelectionCountBuckets);
for (const auto& kv : txToNumSelected) {
hist.Observe(kv.second);
std::vector<std::string> txHashes = {kv.first};
auto approvers = _api->findTransactions({}, {}, {txHashes});
if (!approvers.empty()) {
auto transactions = _api->getTransactions(approvers);
for (const auto& approver : transactions) {
auto txTimestampMS =
std::chrono::duration_cast<std::chrono::milliseconds>(
approver.timestamp.time_since_epoch())
.count();
if (txToTimeTipSelectionStrated[kv.first] > txTimestampMS)
++numSelectedTXWasNotATip;
break;
}
}
}
_histograms.at(NUM_TRUNK_EQ_BRANCH)
.get()
.Add({}, _txSelectionCountBuckets)
.Observe(branchEqTrunkCounter);
_histograms.at(NUM_TX_WAS_NOT_A_TIP)
.get()
.Add({}, _txSelectionCountBuckets)
.Observe(numSelectedTXWasNotATip);
} catch (const std::exception& e) {
LOG(ERROR) << __FUNCTION__ << " Exception: " << e.what();
}
}
\ No newline at end of file
#pragma once
#include <prometheus/exposer.h>
#include <boost/thread/executor.hpp>
#include <boost/thread/future.hpp>
#include <chrono>
#include <iota/tanglescope/common/iri.hpp>
#include <iota/tanglescope/prometheus_collector/prometheus_collector.hpp>
#include <libcuckoo/cuckoohash_map.hh>
#include <list>
#include <memory>
#include <rx.hpp>
#include <shared_mutex>
#include <string>
#include "cppclient/beast.h"
namespace iota {
namespace tanglescope {
class TipSelectionCollector : public PrometheusCollector {
public:
constexpr static auto IRI_HOST = "iri_host";
constexpr static auto IRI_PORT = "iri_port";
constexpr static auto SAMPLE_INTERVAL = "sample_interval";
constexpr static auto SAMPLE_SIZE = "sample_size";
constexpr static auto DEPTH = "depth";
constexpr static auto DURATION_BUCKET_SIZE = "duration_bucket_size";
constexpr static auto DURATION_HISTOGRAM_RANGE = "duration_histogram_range";
// metric names
constexpr static auto NUM_TX_WAS_SELECTED = "num_tx_was_selected";
constexpr static auto NUM_TRUNK_EQ_BRANCH = "num_trunk_eq_branch";
constexpr static auto NUM_TX_WAS_NOT_A_TIP = "num_tx_was_not_a_tip";
constexpr static auto TIP_SELECTION_DURATION = "tip_selection_duration";
void collect() override;
bool parseConfiguration(const YAML::Node& conf) override;
using ZmqObservable = rxcpp::observable<std::shared_ptr<iri::IRIMessage>>;
protected: // gmock classes
virtual void queryTipSelectionPeriodically();
virtual void queryTipSelection();
private:
// Configuration
std::string _iriHost;
uint32_t _iriPort;
uint32_t _sampleInterval;
uint16_t _sampleSize;
uint16_t _depth;
uint16_t _durationBucketSize;
uint16_t _durationBucketRange;
// Others
ZmqObservable _zmqObservable;
PrometheusCollector::HistogramsMap _histograms;
std::vector<double> _txSelectionCountBuckets;
std::vector<double> _durationBuckets;
static std::map<std::string, std::string> nameToDescHistogram;
std::shared_ptr<cppclient::IotaAPI> _api;
};
} // namespace tanglescope
} // namespace iota