From 67d710ef0dd931476508ca73991182459f7e6f29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Weing=C3=A4rtner?= Date: Thu, 23 May 2024 10:42:05 -0300 Subject: [PATCH] Automatically detect deleted resources While executing some Gnocchi optimizations (https://github.com/gnocchixyz/gnocchi/pull/1307), we noticed that some deleted/removed resources do not have the "ended_at" field with a datetime. This can cause slowness with time, as more and more "zombie" resources are left there, and this has a direct impact in the MySQL queries executed with the aggregates API. This patch introduces a new parameter called `metric_inactive_after`, which defines for how long a metric can go without receiving new datapoints until we consider it as inactive. Then, when all metrics of a resource are in inactive state, we can mark/consider the resource as removed. --- gnocchi/chef.py | 68 +++++++++++++++++++ gnocchi/cli/api.py | 5 +- gnocchi/cli/metricd.py | 10 +++ gnocchi/indexer/__init__.py | 6 +- ...n_for_truncate_inactive_metrics_process.py | 4 +- ...ec_create_last_measure_timestamp_column.py | 40 +++++++++++ gnocchi/indexer/sqlalchemy.py | 7 ++ gnocchi/indexer/sqlalchemy_base.py | 9 +++ gnocchi/opts.py | 17 ++++- gnocchi/storage/__init__.py | 3 + 10 files changed, 161 insertions(+), 8 deletions(-) create mode 100644 gnocchi/indexer/alembic/versions/f89ed2e3c2ec_create_last_measure_timestamp_column.py diff --git a/gnocchi/chef.py b/gnocchi/chef.py index 0fea47d98..a0e1b4c22 100644 --- a/gnocchi/chef.py +++ b/gnocchi/chef.py @@ -17,6 +17,7 @@ import hashlib import daiquiri +import datetime from gnocchi import carbonara from gnocchi import indexer @@ -50,6 +51,73 @@ def __init__(self, coord, incoming, index, storage): self.index = index self.storage = storage + def resource_ended_at_normalization(self, metric_inactive_after): + """Marks resources as ended at if needed. + + This method will check all metrics that have not received new + datapoints after a given period. The period is defined by + 'metric_inactive_after' parameter. If all metrics of resource are in + inactive state, we mark the ended_at field with a timestmap. Therefore, + we consider that the resource has ceased existing. + + In this process we will handle only metrics that are considered as + inactive, according to `metric_inactive_after` parameter. Therefore, + we do not need to lock these metrics while processing, as they are + inactive, and chances are that they will not receive measures anymore. + We will not "undelete" the resource, if it starts receiving measures + again. If there will ever be a case like this, we need to implement + some workflow to handle it. + """ + + momment_now = datetime.datetime.utcnow() + momment = momment_now - datetime.timedelta( + seconds=metric_inactive_after) + + inactive_metrics = self.index.list_metrics( + attribute_filter={"<": { + "last_measure_timestamp": momment}}, + resource_policy_filter={"==": {"ended_at": None}} + ) + + LOG.debug("Inactive metrics found for processing: [%s].", + inactive_metrics) + + metrics_by_resource_id = {} + for metric in inactive_metrics: + resource_id = metric.resource_id + if metrics_by_resource_id.get(resource_id) is None: + metrics_by_resource_id[resource_id] = [] + + metrics_by_resource_id[resource_id].append(metric) + + for resource_id in metrics_by_resource_id.keys(): + resource = self.index.get_resource( + "generic", resource_id, with_metrics=True) + resource_metrics = resource.metrics + resource_inactive_metrics = metrics_by_resource_id.get(resource_id) + + all_metrics_are_inactive = True + for m in resource_metrics: + if m not in resource_inactive_metrics: + all_metrics_are_inactive = False + LOG.debug("Not all metrics of resource [%s] are inactive. " + "Metric [%s] is not inactive.", resource, m) + break + + if all_metrics_are_inactive: + LOG.info("All metrics [%s] of resource [%s] are inactive." + "Therefore, we will mark it as finished with an" + "ended at timestmap.") + if resource.ended_at is not None: + LOG.debug( + "Resource [%s] already has an ended at value.", resource) + else: + LOG.info("Marking ended at timestamp for resource " + "[%s] because all of its metrics are inactive.", + resource) + self.index.update_resource( + "generic", resource_id, ended_at=momment_now) + def clean_raw_data_inactive_metrics(self): """Cleans metrics raw data if they are inactive. diff --git a/gnocchi/cli/api.py b/gnocchi/cli/api.py index 623a34a98..36954bb7d 100644 --- a/gnocchi/cli/api.py +++ b/gnocchi/cli/api.py @@ -72,7 +72,10 @@ def api(): "No need to pass `--' in gnocchi-api command line anymore, " "please remove") - uwsgi = spawn.find_executable("uwsgi") + uwsgi = conf.api.uwsgi_path + if not uwsgi: + uwsgi = spawn.find_executable("uwsgi") + if not uwsgi: LOG.error("Unable to find `uwsgi'.\n" "Be sure it is installed and in $PATH.") diff --git a/gnocchi/cli/metricd.py b/gnocchi/cli/metricd.py index a22e1646b..29445e273 100644 --- a/gnocchi/cli/metricd.py +++ b/gnocchi/cli/metricd.py @@ -278,6 +278,16 @@ def _run_job(self): LOG.debug("Finished the cleaning of raw data points for metrics that " "are no longer receiving measures.") + if self.conf.metricd.metric_inactive_after: + LOG.debug("Starting resource ended at field normalization.") + self.chef.resource_ended_at_normalization( + self.conf.metricd.metric_inactive_after) + LOG.debug("Finished resource ended at field normalization.") + else: + LOG.debug("Resource ended at field normalization is not " + "activated. See 'metric_inactive_after' parameter if " + "you wish to activate it.") + class MetricdServiceManager(cotyledon.ServiceManager): def __init__(self, conf): diff --git a/gnocchi/indexer/__init__.py b/gnocchi/indexer/__init__.py index 1949cd63a..7416948e8 100644 --- a/gnocchi/indexer/__init__.py +++ b/gnocchi/indexer/__init__.py @@ -446,7 +446,11 @@ def update_backwindow_changed_for_metrics_archive_policy( raise exceptions.NotImplementedError @staticmethod - def update_needs_raw_data_truncation(metric_id): + def update_needs_raw_data_truncation(metric_id, value): + raise exceptions.NotImplementedError + + @staticmethod + def update_last_measure_timestmap(metric_id): raise exceptions.NotImplementedError @staticmethod diff --git a/gnocchi/indexer/alembic/versions/18fff4509e3e_create_column_for_truncate_inactive_metrics_process.py b/gnocchi/indexer/alembic/versions/18fff4509e3e_create_column_for_truncate_inactive_metrics_process.py index d67bb6064..18a2f1910 100644 --- a/gnocchi/indexer/alembic/versions/18fff4509e3e_create_column_for_truncate_inactive_metrics_process.py +++ b/gnocchi/indexer/alembic/versions/18fff4509e3e_create_column_for_truncate_inactive_metrics_process.py @@ -13,17 +13,15 @@ # under the License. # -"""create metric truncation status column +"""Create metric truncation status column Revision ID: 18fff4509e3e Revises: 04eba72e4f90 Create Date: 2024-04-24 09:16:00 """ -import datetime from alembic import op -from sqlalchemy.sql import func import sqlalchemy diff --git a/gnocchi/indexer/alembic/versions/f89ed2e3c2ec_create_last_measure_timestamp_column.py b/gnocchi/indexer/alembic/versions/f89ed2e3c2ec_create_last_measure_timestamp_column.py new file mode 100644 index 000000000..c01156e19 --- /dev/null +++ b/gnocchi/indexer/alembic/versions/f89ed2e3c2ec_create_last_measure_timestamp_column.py @@ -0,0 +1,40 @@ +# Copyright 2015 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +"""Create last measure push timestamp column + +Revision ID: f89ed2e3c2ec +Revises: 18fff4509e3e +Create Date: 2024-04-24 09:16:00 + +""" + +from alembic import op + +import datetime +import sqlalchemy + +# revision identifiers, used by Alembic. +revision = 'f89ed2e3c2ec' +down_revision = '18fff4509e3e' +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "metric", sqlalchemy.Column( + "last_measure_timestamp", sqlalchemy.DateTime, + nullable=False, default=datetime.datetime.utcnow())) diff --git a/gnocchi/indexer/sqlalchemy.py b/gnocchi/indexer/sqlalchemy.py index 4028bca90..57b811ffb 100644 --- a/gnocchi/indexer/sqlalchemy.py +++ b/gnocchi/indexer/sqlalchemy.py @@ -1403,6 +1403,13 @@ def update_needs_raw_data_truncation(self, metrid_id, value=False): if session.execute(stmt).rowcount == 0: raise indexer.NoSuchMetric(metrid_id) + def update_last_measure_timestmap(self, metrid_id): + with self.facade.writer() as session: + stmt = update(Metric).filter(Metric.id == metrid_id).values( + last_measure_timestamp=datetime.datetime.utcnow()) + if session.execute(stmt).rowcount == 0: + raise indexer.NoSuchMetric(metrid_id) + def update_backwindow_changed_for_metrics_archive_policy( self, archive_policy_name): with self.facade.writer() as session: diff --git a/gnocchi/indexer/sqlalchemy_base.py b/gnocchi/indexer/sqlalchemy_base.py index 7880b5555..23b1bacc3 100644 --- a/gnocchi/indexer/sqlalchemy_base.py +++ b/gnocchi/indexer/sqlalchemy_base.py @@ -14,6 +14,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import datetime from oslo_db.sqlalchemy import models import sqlalchemy @@ -113,6 +114,14 @@ class Metric(Base, GnocchiBase, indexer.Metric): nullable=False, default=True, server_default=sqlalchemy.sql.true()) + # Timestamp that represents when the last measure push was received for the + # given metric. This allows us to identify when a metric ceased receiving + # measurements; thus, if all metric for a resource are in this situation, + # chances are that the resource ceased existing in the backend. + last_measure_timestamp = sqlalchemy.Column( + "last_measure_timestamp", sqlalchemy.DateTime, + nullable=False, default=datetime.datetime.utcnow()) + def jsonify(self): d = { "id": self.id, diff --git a/gnocchi/opts.py b/gnocchi/opts.py index b293d9170..076061046 100644 --- a/gnocchi/opts.py +++ b/gnocchi/opts.py @@ -57,7 +57,6 @@ def __getitem__(self, key): for opt in _INCOMING_OPTS: opt.default = '${storage.%s}' % opt.name - API_OPTS = ( cfg.HostAddressOpt('host', default="0.0.0.0", @@ -73,7 +72,10 @@ def __getitem__(self, key): but not chunked encoding (InfluxDB) * http-socket/socket: support chunked encoding, but require a upstream HTTP Server for HTTP/1.1, keepalive and HTTP protocol correctness. -""") +"""), + cfg.StrOpt('uwsgi-path', + default=None, + help="Custom UWSGI path to avoid auto discovery of packages.") ) @@ -172,7 +174,16 @@ def list_opts(): default=10000, min=1, help="Number of metrics that should be deleted " - "simultaneously by one janitor.") + "simultaneously by one janitor."), + cfg.IntOpt('metric_inactive_after', + default=0, + help="Number of seconds to wait before we consider a " + "metric inactive. An inactive metric is a metric " + "that has not received new measurements for a " + "given period. If all metrics of a resource are " + "inactive, we mark the resource with the " + "'ended_at' timestamp. The default is 0 (zero), " + "which means that we never execute process.") )), ("api", ( cfg.StrOpt('paste_config', diff --git a/gnocchi/storage/__init__.py b/gnocchi/storage/__init__.py index e41d150a4..7a6760489 100644 --- a/gnocchi/storage/__init__.py +++ b/gnocchi/storage/__init__.py @@ -688,6 +688,9 @@ def _map_compute_splits_operations(bound_timeserie): if metric.needs_raw_data_truncation: indexer_driver.update_needs_raw_data_truncation(metric.id) + # Mark when the metric receives its latest measures + indexer_driver.update_last_measure_timestmap(metric.id) + with self.statistics.time("splits delete"): self._delete_metric_splits(splits_to_delete) self.statistics["splits delete"] += len(splits_to_delete)