Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StringSet metrics to Python SDK #31969

Merged
merged 4 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,12 @@ public static Gauge gauge(Class<?> namespace, String name) {
return new DelegatingGauge(MetricName.named(namespace, name));
}

/**
* Create a metric that can have its new value set, and is aggregated by taking the last reported
* value.
*/
/** Create a metric that accumulates and reports set of unique string values. */
public static StringSet stringSet(String namespace, String name) {
return new DelegatingStringSet(MetricName.named(namespace, name));
}

/**
* Create a metric that can have its new value set, and is aggregated by taking the last reported
* value.
*/
/** Create a metric that accumulates and reports set of unique string values. */
public static StringSet stringSet(Class<?> namespace, String name) {
return new DelegatingStringSet(MetricName.named(namespace, name));
}
Expand Down
6 changes: 6 additions & 0 deletions sdks/python/apache_beam/metrics/cells.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ cdef class GaugeCell(MetricCell):
cdef readonly object data


cdef class StringSetCell(MetricCell):
cdef readonly set data

cdef inline bint _update(self, value) except -1


cdef class DistributionData(object):
cdef readonly libc.stdint.int64_t sum
cdef readonly libc.stdint.int64_t count
Expand Down
75 changes: 75 additions & 0 deletions sdks/python/apache_beam/metrics/cells.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,62 @@ def to_runner_api_monitoring_info_impl(self, name, transform_id):
ptransform=transform_id)


class StringSetCell(MetricCell):
"""For internal use only; no backwards-compatibility guarantees.

Tracks the current value for a StringSet metric.

Each cell tracks the state of a metric independently per context per bundle.
Therefore, each metric has a different cell in each bundle, that is later
aggregated.

This class is thread safe.
"""
def __init__(self, *args):
super().__init__(*args)
self.data = StringSetAggregator.identity_element()

def add(self, value):
self.update(value)

def update(self, value):
# type: (str) -> None
if cython.compiled:
# We will hold the GIL throughout the entire _update.
self._update(value)
else:
with self._lock:
self._update(value)

def _update(self, value):
self.data.add(value)

def get_cumulative(self):
# type: () -> set
with self._lock:
return set(self.data)

def combine(self, other):
# type: (StringSetCell) -> StringSetCell
combined = StringSetAggregator().combine(self.data, other.data)
result = StringSetCell()
result.data = combined
return result

def to_runner_api_monitoring_info_impl(self, name, transform_id):
from apache_beam.metrics import monitoring_infos

return monitoring_infos.user_set_string(
name.namespace,
name.name,
self.get_cumulative(),
ptransform=transform_id)

def reset(self):
# type: () -> None
self.data = StringSetAggregator.identity_element()


class DistributionResult(object):
"""The result of a Distribution metric."""
def __init__(self, data):
Expand Down Expand Up @@ -553,3 +609,22 @@ def combine(self, x, y):
def result(self, x):
# type: (GaugeData) -> GaugeResult
return GaugeResult(x.get_cumulative())


class StringSetAggregator(MetricAggregator):
@staticmethod
def identity_element():
# type: () -> set
return set()

def combine(self, x, y):
# type: (set, set) -> set
if len(x) == 0:
return y
elif len(y) == 0:
return x
else:
return set.union(x, y)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if it makes sense to use return set.union(x, y) unconditionally. Could there be any concerns about side effects due to Aggregation returning a mutable collection that can be modified elsewhere after aggregation was computed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this mirrors #31803 . I wonder in most cases the aggregation encounters one empty and one non-empty cells, so a short cut optimization is reasonable here.

Could there be any concerns about side effects due to Aggregation returning a mutable collection

This is a good point. I was hesitating if I should return set / frozenset . However in Python frozenset isn't instance of set (isinstance(frozenset(), set) is False) while Java ImmutableSet is Set, I ended up with set. Do you think we should make the result of StringSet query a frozenset instance everywhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it depends on the lifecycle of these sets; given Java implementation, my guess is that it is probably not a concern.


def result(self, x):
return x
24 changes: 24 additions & 0 deletions sdks/python/apache_beam/metrics/cells_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from apache_beam.metrics.cells import DistributionData
from apache_beam.metrics.cells import GaugeCell
from apache_beam.metrics.cells import GaugeData
from apache_beam.metrics.cells import StringSetCell
from apache_beam.metrics.metricbase import MetricName


Expand Down Expand Up @@ -169,5 +170,28 @@ def test_start_time_set(self):
self.assertGreater(mi.start_time.seconds, 0)


class TestStringSetCell(unittest.TestCase):
def test_not_leak_mutable_set(self):
c = StringSetCell()
c.add('test')
c.add('another')
s = c.get_cumulative()
self.assertEqual(s, set(('test', 'another')))
s.add('yet another')
self.assertEqual(c.get_cumulative(), set(('test', 'another')))

def test_combine_appropriately(self):
s1 = StringSetCell()
s1.add('1')
s1.add('2')

s2 = StringSetCell()
s2.add('1')
s2.add('3')

result = s2.combine(s1)
self.assertEqual(result.data, set(('1', '2', '3')))


if __name__ == '__main__':
unittest.main()
20 changes: 18 additions & 2 deletions sdks/python/apache_beam/metrics/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from apache_beam.metrics.cells import CounterCell
from apache_beam.metrics.cells import DistributionCell
from apache_beam.metrics.cells import GaugeCell
from apache_beam.metrics.cells import StringSetCell
from apache_beam.runners.worker import statesampler
from apache_beam.runners.worker.statesampler import get_current_tracker

Expand Down Expand Up @@ -259,6 +260,12 @@ def get_gauge(self, metric_name):
GaugeCell,
self.get_metric_cell(_TypedMetricName(GaugeCell, metric_name)))

def get_string_set(self, metric_name):
# type: (MetricName) -> StringSetCell
return cast(
StringSetCell,
self.get_metric_cell(_TypedMetricName(StringSetCell, metric_name)))

def get_metric_cell(self, typed_metric_name):
# type: (_TypedMetricName) -> MetricCell
cell = self.metrics.get(typed_metric_name, None)
Expand Down Expand Up @@ -292,7 +299,13 @@ def get_cumulative(self):
v in self.metrics.items() if k.cell_type == GaugeCell
}

return MetricUpdates(counters, distributions, gauges)
string_sets = {
MetricKey(self.step_name, k.metric_name): v.get_cumulative()
for k,
v in self.metrics.items() if k.cell_type == StringSetCell
}

return MetricUpdates(counters, distributions, gauges, string_sets)

def to_runner_api(self):
return [
Expand Down Expand Up @@ -344,7 +357,8 @@ def __init__(
self,
counters=None, # type: Optional[Dict[MetricKey, int]]
distributions=None, # type: Optional[Dict[MetricKey, DistributionData]]
gauges=None # type: Optional[Dict[MetricKey, GaugeData]]
gauges=None, # type: Optional[Dict[MetricKey, GaugeData]]
string_sets=None, # type: Optional[Dict[MetricKey, set]]
):
# type: (...) -> None

Expand All @@ -354,7 +368,9 @@ def __init__(
counters: Dictionary of MetricKey:MetricUpdate updates.
distributions: Dictionary of MetricKey:MetricUpdate objects.
gauges: Dictionary of MetricKey:MetricUpdate objects.
string_sets: Dictionary of MetricKey:MetricUpdate objects.
"""
self.counters = counters or {}
self.distributions = distributions or {}
self.gauges = gauges or {}
self.string_sets = string_sets or {}
9 changes: 9 additions & 0 deletions sdks/python/apache_beam/metrics/execution_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

# pytype: skip-file

import functools
import unittest

from apache_beam.metrics.execution import MetricKey
Expand Down Expand Up @@ -88,24 +89,32 @@ def test_get_cumulative_or_updates(self):
distribution = mc.get_distribution(
MetricName('namespace', 'name{}'.format(i)))
gauge = mc.get_gauge(MetricName('namespace', 'name{}'.format(i)))
str_set = mc.get_string_set(MetricName('namespace', 'name{}'.format(i)))

counter.inc(i)
distribution.update(i)
gauge.set(i)
str_set.add(str(i % 7))
all_values.append(i)

# Retrieve ALL updates.
cumulative = mc.get_cumulative()
self.assertEqual(len(cumulative.counters), 10)
self.assertEqual(len(cumulative.distributions), 10)
self.assertEqual(len(cumulative.gauges), 10)
self.assertEqual(len(cumulative.string_sets), 10)

self.assertEqual(
set(all_values), {v
for _, v in cumulative.counters.items()})
self.assertEqual(
set(all_values), {v.value
for _, v in cumulative.gauges.items()})
self.assertEqual({str(i % 7)
for i in all_values},
functools.reduce(
set.union,
(v for _, v in cumulative.string_sets.items())))


if __name__ == '__main__':
Expand Down
31 changes: 29 additions & 2 deletions sdks/python/apache_beam/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from apache_beam.metrics.metricbase import Distribution
from apache_beam.metrics.metricbase import Gauge
from apache_beam.metrics.metricbase import MetricName
from apache_beam.metrics.metricbase import StringSet

if TYPE_CHECKING:
from apache_beam.metrics.execution import MetricKey
Expand Down Expand Up @@ -115,6 +116,23 @@ def gauge(
namespace = Metrics.get_namespace(namespace)
return Metrics.DelegatingGauge(MetricName(namespace, name))

@staticmethod
def string_set(
namespace: Union[Type, str], name: str) -> 'Metrics.DelegatingStringSet':
"""Obtains or creates a String set metric.

String set metrics are restricted to string values.

Args:
namespace: A class or string that gives the namespace to a metric
name: A string that gives a unique name to a metric

Returns:
A StringSet object.
"""
namespace = Metrics.get_namespace(namespace)
return Metrics.DelegatingStringSet(MetricName(namespace, name))

class DelegatingCounter(Counter):
"""Metrics Counter that Delegates functionality to MetricsEnvironment."""
def __init__(
Expand All @@ -138,11 +156,18 @@ def __init__(self, metric_name: MetricName) -> None:
super().__init__(metric_name)
self.set = MetricUpdater(cells.GaugeCell, metric_name) # type: ignore[assignment]

class DelegatingStringSet(StringSet):
"""Metrics StringSet that Delegates functionality to MetricsEnvironment."""
def __init__(self, metric_name: MetricName) -> None:
super().__init__(metric_name)
self.add = MetricUpdater(cells.StringSetCell, metric_name) # type: ignore[assignment]


class MetricResults(object):
COUNTERS = "counters"
DISTRIBUTIONS = "distributions"
GAUGES = "gauges"
STRINGSETS = "string_sets"

@staticmethod
def _matches_name(filter: 'MetricsFilter', metric_key: 'MetricKey') -> bool:
Expand Down Expand Up @@ -207,11 +232,13 @@ def query(
{
"counters": [MetricResult(counter_key, committed, attempted), ...],
"distributions": [MetricResult(dist_key, committed, attempted), ...],
"gauges": [] // Empty list if nothing matched the filter.
"gauges": [], // Empty list if nothing matched the filter.
"string_sets": [] [MetricResult(string_set_key, committed, attempted),
...]
}

The committed / attempted values are DistributionResult / GaugeResult / int
objects.
/ set objects.
"""
raise NotImplementedError

Expand Down
16 changes: 15 additions & 1 deletion sdks/python/apache_beam/metrics/metricbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@
from typing import Optional

__all__ = [
'Metric', 'Counter', 'Distribution', 'Gauge', 'Histogram', 'MetricName'
'Metric',
'Counter',
'Distribution',
'Gauge',
'StringSet',
'Histogram',
'MetricName'
]


Expand Down Expand Up @@ -138,6 +144,14 @@ def set(self, value):
raise NotImplementedError


class StringSet(Metric):
"""StringSet Metric interface.

Reports set of unique string values during pipeline execution.."""
def add(self, value):
raise NotImplementedError


class Histogram(Metric):
"""Histogram Metric interface.

Expand Down
Loading
Loading