Commit 4cf9e751 authored by Mikhail Makurov's avatar Mikhail Makurov
Browse files

Added Clickhouse data preloading to avoid read congestion

parent 425de40f
Pipeline #64966476 passed with stage
in 1 minute and 9 seconds
......@@ -25,9 +25,13 @@
#include "zbxhistory.h"
#include "zbxself.h"
#include "history.h"
//#include "../zbxdbcache/valuecache.h"
/* curl_multi_wait() is supported starting with version 7.28.0 (0x071c00) */
#if defined(HAVE_LIBCURL) && LIBCURL_VERSION_NUM >= 0x071c00
size_t DCconfig_get_itemids_by_valuetype( int value_type, zbx_vector_uint64_t *vector_itemids);
int zbx_vc_simple_add(zbx_uint64_t itemids, zbx_history_record_t *record);
extern char *CONFIG_HISTORY_STORAGE_URL;
extern int CONFIG_HISTORY_STORAGE_PIPELINES;
......@@ -38,6 +42,7 @@ extern char *CONFIG_CLICKHOUSE_USERNAME;
extern char *CONFIG_CLICKHOUSE_PASSWORD;
extern int CONFIG_SERVER_STARTUP_TIME;
extern int CONFIG_CLICKHOUSE_VALUECACHE_FILL_TIME;
extern int CONFIG_CLICKHOUSE_PRELOAD_VALUES;
typedef struct
{
......@@ -504,6 +509,7 @@ static int clickhouse_flush(zbx_history_iface_t *hist)
int zbx_history_clickhouse_init(zbx_history_iface_t *hist, unsigned char value_type, char **error)
{
zbx_clickhouse_data_t *data;
size_t alloc = 0, offset = 0;
if (0 != curl_global_init(CURL_GLOBAL_ALL))
{
......@@ -534,6 +540,161 @@ int zbx_history_clickhouse_init(zbx_history_iface_t *hist, unsigned char value_t
hist->get_values = clickhouse_get_values;
hist->requires_trends = 0;
//preloading support
//we only load data of the type value_type
//of string, text and double types
if (CONFIG_CLICKHOUSE_PRELOAD_VALUES > 0 ) {
unsigned long rows_num;
zbx_vector_uint64_t vector_itemids;
zbx_vector_history_record_t values;
size_t i;
char *query=NULL;
size_t q_len = 0, q_offset = 0, valuecount = 0;
zbx_history_record_t hr;
zbx_vector_uint64_create(&vector_itemids);
zabbix_log(LOG_LEVEL_DEBUG,"Fetching list of items of type %d",value_type);
size_t items=DCconfig_get_itemids_by_valuetype( value_type, &vector_itemids);
zabbix_log(LOG_LEVEL_DEBUG,"Got %ld items for the type",items);
if ( items > 0 ) {
zbx_snprintf_alloc(&query,&q_len,&q_offset,"SELECT itemid, clock, value, value_dbl, value_str FROM %s WHERE (itemid IN (",
CONFIG_HISTORY_STORAGE_TABLE_NAME);
zbx_snprintf_alloc(&query,&q_len,&q_offset,"%ld",vector_itemids.values[0]);
//for ( i = 1; i < vector_itemids.values_num && i< 20; i++ ) {
for ( i = 1; i < vector_itemids.values_num ; i++ ) {
zbx_snprintf_alloc(&query,&q_len,&q_offset,",%ld",vector_itemids.values[i]);
//if (i > 40) break;
}
zbx_snprintf_alloc(&query,&q_len,&q_offset,")) AND (day = today() OR day = today()-1 ) ORDER BY itemid ASC, clock DESC LIMIT 10 BY itemid");
zbx_snprintf_alloc(&query, &q_len, &q_offset, " format JSON ");
zabbix_log(LOG_LEVEL_DEBUG,"Length of the query: '%ld'",strlen(query));
}
zbx_vector_uint64_destroy(&vector_itemids);
CURL *handle = NULL;
zbx_httppage_t page_r;
bzero(&page_r,sizeof(zbx_httppage_t));
struct curl_slist *curl_headers = NULL;
char errbuf[CURL_ERROR_SIZE];
CURLcode err;
if (NULL == (handle = curl_easy_init()))
{
zabbix_log(LOG_LEVEL_ERR, "cannot initialize cURL session");
} ;
page_r.offset = 0;
*errbuf = '\0';
curl_easy_setopt(handle, CURLOPT_URL, data->url);
curl_easy_setopt(handle, CURLOPT_POSTFIELDS, query);
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curl_write_cb);
curl_easy_setopt(handle, CURLOPT_WRITEDATA, &page_r);
curl_easy_setopt(handle, CURLOPT_HTTPHEADER, curl_headers);
curl_easy_setopt(handle, CURLOPT_FAILONERROR, 1L);
curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, errbuf);
if (CURLE_OK != (err = curl_easy_perform(handle)))
{
clickhouse_log_error(handle, err, errbuf,&page_r);
zabbix_log(LOG_LEVEL_WARNING, "Failed query %s",query);
//goto out;
}
if (NULL != page_r.data) {
zabbix_log(LOG_LEVEL_INFORMATION, "Query copleted, filling value cache");
struct zbx_json_parse jp, jp_row, jp_data;
const char *p = NULL;
zbx_json_open(page_r.data, &jp);
zbx_json_brackets_by_name(&jp, "data", &jp_data);
while (NULL != (p = zbx_json_next(&jp_data, p)))
{
char *clck = NULL, *value = NULL, *value_dbl = NULL, *value_str = NULL , *itemid_str = NULL;
size_t clck_alloc=0, value_alloc = 0, value_dbl_alloc = 0, value_str_alloc = 0, itemid_alloc = 0;
struct zbx_json_parse jp_row;
zbx_uint64_t itemid=0;
if (SUCCEED == zbx_json_brackets_open(p, &jp_row)) {
if (SUCCEED == zbx_json_value_by_name_dyn(&jp_row, "itemid", &itemid_str, &itemid_alloc) &&
SUCCEED == zbx_json_value_by_name_dyn(&jp_row, "clock", &clck, &clck_alloc) &&
SUCCEED == zbx_json_value_by_name_dyn(&jp_row, "value", &value, &value_alloc) &&
SUCCEED == zbx_json_value_by_name_dyn(&jp_row, "value_dbl", &value_dbl, &value_dbl_alloc) &&
SUCCEED == zbx_json_value_by_name_dyn(&jp_row, "value_str", &value_str, &value_str_alloc))
{
hr.timestamp.sec = atoi(clck);
itemid=atoi(itemid_str);
switch (hist->value_type)
{
case ITEM_VALUE_TYPE_UINT64:
zabbix_log(LOG_LEVEL_DEBUG, "Parsed as UINT64 %s",value);
hr.value = history_str2value(value, hist->value_type);
break;
case ITEM_VALUE_TYPE_FLOAT:
zabbix_log(LOG_LEVEL_DEBUG, "Parsed as DBL field %s",value_dbl);
hr.value = history_str2value(value_dbl, hist->value_type);
break;
case ITEM_VALUE_TYPE_STR:
case ITEM_VALUE_TYPE_TEXT:
zabbix_log(LOG_LEVEL_DEBUG, "Parsed as STR/TEXT type %s",value_str);
hr.value = history_str2value(value_str, hist->value_type);
break;
default:
//todo: does server really need's to read logs????
goto out;
}
if (FAIL == zbx_vc_simple_add(itemid,&hr)) {
zabbix_log(LOG_LEVEL_INFORMATION,"Couldn't add value to vc");
} else {
valuecount++;
}
}
out:
zbx_free(itemid_str);
zbx_free(clck);
zbx_free(value);
zbx_free(value_dbl);
zbx_free(value_str);
} else {
zabbix_log(LOG_LEVEL_DEBUG,"CLICCKHOUSE: Couldn't parse JSON row: %s",p);
};
}
zabbix_log(LOG_LEVEL_INFORMATION, "Added %ld values to valuecache", valuecount);
} else {
zabbix_log(LOG_LEVEL_WARNING, "WARN: Got empty responce from clickhouse");
}
zbx_free(page_r.data);
curl_slist_free_all(curl_headers);
curl_easy_cleanup(handle);
zbx_free(query);
}
return SUCCEED;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment