Skip to content

Commit

Permalink
added official StatsHouse client, redesigned StatsHouse metrics (#874)
Browse files Browse the repository at this point in the history
Co-authored-by: Denis Vaksman <[email protected]>
  • Loading branch information
troy4eg and DrDet authored Sep 19, 2023
1 parent afdf709 commit 99d4257
Show file tree
Hide file tree
Showing 21 changed files with 1,571 additions and 576 deletions.
8 changes: 0 additions & 8 deletions common/server/statsd-client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ class statsd_stats_t : public stats_t {
// ignore it
}

bool need_aggregated_stats() noexcept final {
return true;
}

protected:
void add_stat(char type, const char *key, double value) noexcept final {
sb_printf(&sb, "%s.%s:", stats_prefix, normalize_key(key, "%s", ""));
Expand All @@ -71,10 +67,6 @@ class statsd_stats_t : public stats_t {
sb_printf(&sb, "%lld", value);
sb_printf(&sb, "|%c\n", type);
};

void add_multiple_stats(const char *key [[maybe_unused]], std::vector<double> &&values [[maybe_unused]]) noexcept final {
assert(false && "unimplemented");
}
};
} // namespace

Expand Down
8 changes: 0 additions & 8 deletions common/server/tl-stats-t.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ class tl_stats_t : public stats_t {
va_end(ap);
}

bool need_aggregated_stats() noexcept final {
return true;
}

protected:
void add_stat(char type [[maybe_unused]], const char *key, double value) noexcept final {
sb_printf(&sb, "%s\t", key);
Expand All @@ -45,8 +41,4 @@ class tl_stats_t : public stats_t {
sb_printf(&sb, "%lld", value);
sb_printf(&sb, "\n");
}

void add_multiple_stats(const char *key [[maybe_unused]], std::vector<double> &&values [[maybe_unused]]) noexcept final {
assert(false && "unimplemented");
}
};
13 changes: 0 additions & 13 deletions common/stats/provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,23 +67,12 @@ class stats_t {
add_gauge_stat(stat_key, value);
}

void add_multiple_gauge_stats(std::vector<double> &&values, const char *key1, const char *key2 = "") noexcept {
const size_t key1_len = std::strlen(key1);
const size_t key2_len = std::strlen(key2);
char stat_key[key1_len + key2_len + 1];
std::memcpy(stat_key, key1, key1_len);
std::memcpy(stat_key + key1_len, key2, key2_len + 1);

add_multiple_stats(stat_key, std::move(values));
}

template<typename T>
void add_gauge_stat(const std::atomic<T> &value, const char *key1, const char *key2 = "", const char *key3 = "") noexcept {
add_gauge_stat(value.load(std::memory_order_relaxed), key1, key2, key3);
}

virtual void add_general_stat(const char *key, const char *value_format, ...) noexcept __attribute__((format(printf, 3, 4))) = 0;
virtual bool need_aggregated_stats() noexcept = 0;

virtual ~stats_t() = default;

Expand All @@ -94,8 +83,6 @@ class stats_t {
virtual void add_stat_with_tag_type(char type, const char *key, const char *type_tag, double value) noexcept = 0;
virtual void add_stat_with_tag_type(char type, const char *key, const char *type_tag, long long value) noexcept = 0;

virtual void add_multiple_stats(const char *key, std::vector<double> &&values) noexcept = 0;

char *normalize_key(const char *key, const char *format, const char *prefix) noexcept;
};

Expand Down
6 changes: 2 additions & 4 deletions runtime/memory_resource/memory_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ void MemoryStats::write_stats_to(stats_t *stats, const char *prefix) const noexc
stats->add_gauge_stat(memory_limit, prefix, ".memory.limit");
stats->add_gauge_stat(memory_used, prefix, ".memory.used");
stats->add_gauge_stat(real_memory_used, prefix, ".memory.real_used");
if (stats->need_aggregated_stats()) {
stats->add_gauge_stat(max_memory_used, prefix, ".memory.used_max");
stats->add_gauge_stat(max_real_memory_used, prefix, ".memory.real_used_max");
}
stats->add_gauge_stat(max_memory_used, prefix, ".memory.used_max");
stats->add_gauge_stat(max_real_memory_used, prefix, ".memory.real_used_max");
stats->add_gauge_stat(defragmentation_calls, prefix, ".memory.defragmentation_calls");
stats->add_gauge_stat(huge_memory_pieces, prefix, ".memory.huge_memory_pieces");
stats->add_gauge_stat(small_memory_pieces, prefix, ".memory.small_memory_pieces");
Expand Down
5 changes: 2 additions & 3 deletions server/confdata-stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,8 @@ void ConfdataStats::write_stats_to(stats_t *stats, const memory_resource::Memory
stats->add_gauge_stat("confdata.updates.ignored", ignored_updates);
stats->add_gauge_stat("confdata.updates.total", total_updates);

if (stats->need_aggregated_stats()) {
stats->add_gauge_stat("confdata.elements.total", total_elements);
}
stats->add_gauge_stat("confdata.elements.total", total_elements);

stats->add_gauge_stat_with_type_tag("confdata.elements", "simple_key", simple_key_elements);
stats->add_gauge_stat_with_type_tag("confdata.elements", "one_dot_wildcard", one_dot_wildcard_elements);
stats->add_gauge_stat_with_type_tag("confdata.elements", "two_dots_wildcard", two_dots_wildcard_elements);
Expand Down
13 changes: 6 additions & 7 deletions server/php-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
#include "server/shared-data-worker-cache.h"
#include "server/signal-handlers.h"
#include "server/statshouse/statshouse-client.h"
#include "server/statshouse/worker-stats-buffer.h"
#include "server/workers-control.h"

using job_workers::JobWorkersContext;
Expand Down Expand Up @@ -1423,7 +1422,6 @@ void cron() {
}
vk::singleton<SharedDataWorkerCache>::get().on_worker_cron();
vk::singleton<ServerStats>::get().update_this_worker_stats();
vk::singleton<statshouse::WorkerStatsBuffer>::get().flush_if_needed();
}

void reopen_json_log() {
Expand Down Expand Up @@ -1677,7 +1675,6 @@ void init_all() {

init_php_scripts();
vk::singleton<ServerStats>::get().set_idle_worker_status();
vk::singleton<StatsHouseClient>::get();

worker_id = (int)lrand48();

Expand Down Expand Up @@ -2086,11 +2083,13 @@ int main_args_handler(int i, const char *long_option) {
kprintf("--%s option: can't find ':'\n", long_option);
return -1;
}
auto host = std::string(optarg, colon - optarg);
auto port = atoi(colon + 1);
if (host.empty()) {
host = "127.0.0.1";
}

auto &statshouse_client = vk::singleton<StatsHouseClient>::get();
statshouse_client.set_host(std::string(optarg, colon - optarg));
statshouse_client.set_port(atoi(colon + 1));
vk::singleton<statshouse::WorkerStatsBuffer>::get().enable();
StatsHouseClient::init(host, port);
return 0;
}
case 2027: {
Expand Down
73 changes: 35 additions & 38 deletions server/php-master.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
#include "server/server-stats.h"
#include "server/shared-data-worker-cache.h"
#include "server/shared-data.h"
#include "server/statshouse/add-metrics-batch.h"
#include "server/statshouse/statshouse-client.h"
#include "server/workers-control.h"

Expand All @@ -78,6 +77,7 @@
#include "server/job-workers/shared-memory-manager.h"
#include "server/json-logger.h"
#include "server/server-config.h"
#include "server/workers-stats.h"

using job_workers::JobWorkersContext;

Expand All @@ -101,13 +101,8 @@ static sigset_t empty_mask;
static double my_now;

/*** Stats ***/
static long tot_workers_started{0};
static long tot_workers_dead{0};
static long tot_workers_strange_dead{0};
static long workers_killed{0};
static long workers_hung{0};
static long workers_terminated{0};
static long workers_failed{0};

static workers_stats_t workers_stats{};

struct CpuStatTimestamp {
double timestamp;
Expand Down Expand Up @@ -461,7 +456,7 @@ void terminate_worker(worker_info_t *w) {
if (w->type == WorkerType::general_worker) {
changed = 1;
}
workers_terminated++;
workers_stats.workers_terminated++;
}

int kill_worker(WorkerType worker_type) {
Expand Down Expand Up @@ -492,7 +487,7 @@ void kill_hanging_workers() {
if (!worker->is_dying && worker->last_activity_time + get_max_hanging_time_sec() <= my_now) {
tvkprintf(master_process, 1, "No stats received from %s [pid = %d]. Terminate it\n",
worker->type == WorkerType::general_worker ? "general worker" : "job worker", static_cast<int>(worker->pid));
workers_hung++;
workers_stats.workers_hung++;
terminate_worker(worker);
last_terminated = my_now;
break;
Expand All @@ -505,7 +500,7 @@ void kill_hanging_workers() {
kprintf("master kill hanging %s : send SIGKILL to [pid = %d]\n",
workers[i]->type == WorkerType::general_worker ? "general worker" : "job worker", static_cast<int>(workers[i]->pid));
kill(workers[i]->pid, SIGKILL);
workers_killed++;
workers_stats.workers_killed++;

workers[i]->kill_flag = 1;

Expand Down Expand Up @@ -606,7 +601,7 @@ int run_worker(WorkerType worker_type) {

assert (vk::singleton<WorkersControl>::get().get_all_alive() < WorkersControl::max_workers_count);

tot_workers_started++;
workers_stats.tot_workers_started++;
const uint16_t worker_unique_id = vk::singleton<WorkersControl>::get().on_worker_creating(worker_type);
pid_t new_pid = fork();
if (new_pid == -1) {
Expand Down Expand Up @@ -718,7 +713,7 @@ void remove_worker(pid_t pid) {
if (workers[i]->type == WorkerType::general_worker && !workers[i]->is_dying) {
failed++;
}
workers_failed++;
workers_stats.workers_failed++;

delete_worker(workers[i]);

Expand All @@ -739,9 +734,9 @@ void update_workers() {
pid_t pid = waitpid(-1, &status, WNOHANG);
if (pid > 0) {
if (!WIFEXITED (status)) {
tot_workers_strange_dead++;
workers_stats.tot_workers_strange_dead++;
}
tot_workers_dead++;
workers_stats.tot_workers_dead++;
remove_worker(pid);
changed = 1;
} else {
Expand Down Expand Up @@ -972,13 +967,13 @@ std::string php_master_prepare_stats(bool add_worker_pids) {
<< "total_workers\t" << general_workers_stat.total_workers + job_workers_stat.total_workers << "\n"
<< "running_workers\t" << general_workers_stat.running_workers + job_workers_stat.running_workers << "\n"
<< "paused_workers\t" << general_workers_stat.waiting_workers + job_workers_stat.waiting_workers << "\n"
<< "tot_workers_started\t" << tot_workers_started << "\n"
<< "tot_workers_dead\t" << tot_workers_dead << "\n"
<< "tot_workers_strange_dead\t" << tot_workers_strange_dead << "\n"
<< "workers_killed\t" << workers_killed << "\n"
<< "workers_hung\t" << workers_hung << "\n"
<< "workers_terminated\t" << workers_terminated << "\n"
<< "workers_failed\t" << workers_failed << "\n";
<< "tot_workers_started\t" << workers_stats.tot_workers_started << "\n"
<< "tot_workers_dead\t" << workers_stats.tot_workers_dead << "\n"
<< "tot_workers_strange_dead\t" << workers_stats.tot_workers_strange_dead << "\n"
<< "workers_killed\t" << workers_stats.workers_killed << "\n"
<< "workers_hung\t" << workers_stats.workers_hung << "\n"
<< "workers_terminated\t" << workers_stats.workers_terminated << "\n"
<< "workers_failed\t" << workers_stats.workers_failed << "\n";
stats.write_stats_to(oss, add_worker_pids);

std::for_each(workers, last_worker, [&oss](const worker_info_t *w) {
Expand Down Expand Up @@ -1121,23 +1116,21 @@ STATS_PROVIDER_TAGGED(kphp_stats, 100, stats_tag_kphp_server) {
stats->add_gauge_stat("workers.job.processes.working", job_worker_group.running_workers);
stats->add_gauge_stat("workers.job.processes.working_but_waiting", job_worker_group.waiting_workers);

if (stats->need_aggregated_stats()) {
auto running_stats = server_stats.misc_stat_for_general_workers[1].get_stat();
stats->add_gauge_stat("workers.general.processes.running.avg_1m", running_stats.running_workers_avg);
stats->add_gauge_stat("workers.general.processes.running.max_1m", running_stats.running_workers_max);
auto running_stats = server_stats.misc_stat_for_general_workers[1].get_stat();
stats->add_gauge_stat("workers.general.processes.running.avg_1m", running_stats.running_workers_avg);
stats->add_gauge_stat("workers.general.processes.running.max_1m", running_stats.running_workers_max);

running_stats = server_stats.misc_stat_for_job_workers[1].get_stat();
stats->add_gauge_stat("workers.job.processes.running.avg_1m", running_stats.running_workers_avg);
stats->add_gauge_stat("workers.job.processes.running.max_1m", running_stats.running_workers_max);
}
running_stats = server_stats.misc_stat_for_job_workers[1].get_stat();
stats->add_gauge_stat("workers.job.processes.running.avg_1m", running_stats.running_workers_avg);
stats->add_gauge_stat("workers.job.processes.running.max_1m", running_stats.running_workers_max);

stats->add_gauge_stat("server.workers.started", tot_workers_started);
stats->add_gauge_stat("server.workers.dead", tot_workers_dead);
stats->add_gauge_stat("server.workers.strange_dead", tot_workers_strange_dead);
stats->add_gauge_stat("server.workers.killed", workers_killed);
stats->add_gauge_stat("server.workers.hung", workers_hung);
stats->add_gauge_stat("server.workers.terminated", workers_terminated);
stats->add_gauge_stat("server.workers.failed", workers_failed);
stats->add_gauge_stat("server.workers.started", workers_stats.tot_workers_started);
stats->add_gauge_stat("server.workers.dead", workers_stats.tot_workers_dead);
stats->add_gauge_stat("server.workers.strange_dead", workers_stats.tot_workers_strange_dead);
stats->add_gauge_stat("server.workers.killed", workers_stats.workers_killed);
stats->add_gauge_stat("server.workers.hung", workers_stats.workers_hung);
stats->add_gauge_stat("server.workers.terminated", workers_stats.workers_terminated);
stats->add_gauge_stat("server.workers.failed", workers_stats.workers_failed);

const auto cpu_stats = server_stats.cpu[1].get_stat();
stats->add_gauge_stat("cpu.stime", cpu_stats.cpu_s_usage);
Expand Down Expand Up @@ -1405,7 +1398,11 @@ static void cron() {
if (!other->is_alive || in_old_master_on_restart()) {
// write stats at the beginning to avoid spikes in graphs
send_data_to_statsd_with_prefix(vk::singleton<ServerConfig>::get().get_statsd_prefix(), stats_tag_kphp_server);
vk::singleton<StatsHouseClient>::get().master_send_metrics();
if (StatsHouseClient::has()) {
const auto cpu_stats = server_stats.cpu[1].get_stat();
StatsHouseClient::get().send_common_master_stats(workers_stats, instance_cache_get_memory_stats(), cpu_stats.cpu_s_usage, cpu_stats.cpu_u_usage,
instance_cache_memory_swaps_ok, instance_cache_memory_swaps_fail);
}
}
create_all_outbound_connections();
vk::singleton<ServerStats>::get().aggregate_stats();
Expand Down
Loading

0 comments on commit 99d4257

Please sign in to comment.