Skip to content

Commit

Permalink
[receiver/sqlquery] fail on missing value for logs, collect errors (#…
Browse files Browse the repository at this point in the history
…35189)

**Description:** 

* [breaking] Fail if for log column not found in result set (for
consistency with metrics behaviour)
* Instead of fail-fast, collect all errors that occurred when
transforming row to metric or log

**Link to tracking Issue:** #35068

**Testing:** Added/updated unit tests

**Documentation:** n/a

Closes #35068
  • Loading branch information
Grandys authored Sep 27, 2024
1 parent da9d94c commit 507ec47
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 15 deletions.
27 changes: 27 additions & 0 deletions .chloggen/sqlqueryreceiver-collect-errors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: sqlqueryreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fail if value for log column in result set is missing, collect errors

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35068]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
19 changes: 11 additions & 8 deletions internal/sqlquery/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package sqlquery // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery"

import (
"errors"
"fmt"
"strconv"

Expand All @@ -18,36 +19,38 @@ func rowToMetric(row StringMap, cfg MetricCfg, dest pmetric.Metric, startTime pc
dest.SetUnit(cfg.Unit)
dataPointSlice := setMetricFields(cfg, dest)
dataPoint := dataPointSlice.AppendEmpty()
var errs []error
if cfg.StartTsColumn != "" {
if val, found := row[cfg.StartTsColumn]; found {
timestamp, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse uint64 for %q, value was %q: %w", cfg.StartTsColumn, val, err)
errs = append(errs, fmt.Errorf("failed to parse uint64 for %q, value was %q: %w", cfg.StartTsColumn, val, err))
}
startTime = pcommon.Timestamp(timestamp)
} else {
return fmt.Errorf("rowToMetric: start_ts_column not found")
errs = append(errs, fmt.Errorf("rowToMetric: start_ts_column not found"))
}
}
if cfg.TsColumn != "" {
if val, found := row[cfg.TsColumn]; found {
timestamp, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return fmt.Errorf("failed to parse uint64 for %q, value was %q: %w", cfg.TsColumn, val, err)
errs = append(errs, fmt.Errorf("failed to parse uint64 for %q, value was %q: %w", cfg.TsColumn, val, err))
}
ts = pcommon.Timestamp(timestamp)
} else {
return fmt.Errorf("rowToMetric: ts_column not found")
errs = append(errs, fmt.Errorf("rowToMetric: ts_column not found"))
}
}
setTimestamp(cfg, dataPoint, startTime, ts, scrapeCfg)
value, found := row[cfg.ValueColumn]
if !found {
return fmt.Errorf("rowToMetric: value_column '%s' not found in result set", cfg.ValueColumn)
errs = append(errs, fmt.Errorf("rowToMetric: value_column '%s' not found in result set", cfg.ValueColumn))
}

err := setDataPointValue(cfg, value, dataPoint)
if err != nil {
return fmt.Errorf("rowToMetric: %w", err)
errs = append(errs, fmt.Errorf("rowToMetric: %w", err))
}
attrs := dataPoint.Attributes()
for k, v := range cfg.StaticAttributes {
Expand All @@ -57,10 +60,10 @@ func rowToMetric(row StringMap, cfg MetricCfg, dest pmetric.Metric, startTime pc
if attrVal, found := row[columnName]; found {
attrs.PutStr(columnName, attrVal)
} else {
return fmt.Errorf("rowToMetric: attribute_column not found: '%s'", columnName)
errs = append(errs, fmt.Errorf("rowToMetric: attribute_column '%s' not found in result set", columnName))
}
}
return nil
return errors.Join(errs...)
}

func setTimestamp(cfg MetricCfg, dp pmetric.NumberDataPoint, startTime pcommon.Timestamp, ts pcommon.Timestamp, scrapeCfg scraperhelper.ControllerConfig) {
Expand Down
29 changes: 29 additions & 0 deletions internal/sqlquery/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,35 @@ func TestScraper_StartAndTS_ErrorOnColumnNotFound(t *testing.T) {
assert.Error(t, err)
}

func TestScraper_CollectRowToMetricsErrors(t *testing.T) {
client := &FakeDBClient{
StringMaps: [][]StringMap{{
{
"mycol": "42",
},
}},
}
scrpr := Scraper{
Client: client,
Query: Query{
Metrics: []MetricCfg{{
MetricName: "my.name",
ValueColumn: "mycol_na",
TsColumn: "Ts",
StartTsColumn: "StartTs",
AttributeColumns: []string{"attr_na"},
DataType: MetricTypeSum,
Aggregation: MetricAggregationCumulative,
}},
},
}
_, err := scrpr.Scrape(context.Background())
assert.ErrorContains(t, err, "rowToMetric: start_ts_column not found")
assert.ErrorContains(t, err, "rowToMetric: ts_column not found")
assert.ErrorContains(t, err, "rowToMetric: value_column 'mycol_na' not found in result set")
assert.ErrorContains(t, err, "rowToMetric: attribute_column 'attr_na' not found in result set")
}

func TestScraper_StartAndTS_ErrorOnParse(t *testing.T) {
client := &FakeDBClient{
StringMaps: [][]StringMap{{
Expand Down
13 changes: 10 additions & 3 deletions receiver/sqlqueryreceiver/logs_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,16 +316,23 @@ func (queryReceiver *logsQueryReceiver) storeTrackingValue(ctx context.Context,
}

func rowToLog(row sqlquery.StringMap, config sqlquery.LogsCfg, logRecord plog.LogRecord) error {
logRecord.Body().SetStr(row[config.BodyColumn])
var errs []error
value, found := row[config.BodyColumn]
if !found {
errs = append(errs, fmt.Errorf("rowToLog: body_column '%s' not found in result set", config.BodyColumn))
} else {
logRecord.Body().SetStr(value)
}
attrs := logRecord.Attributes()

for _, columnName := range config.AttributeColumns {
if attrVal, found := row[columnName]; found {
attrs.PutStr(columnName, attrVal)
} else {
return fmt.Errorf("rowToLog: attribute_column not found: '%s'", columnName)
errs = append(errs, fmt.Errorf("rowToLog: attribute_column '%s' not found in result set", columnName))
}
}
return nil
return errors.Join(errs...)
}

func (queryReceiver *logsQueryReceiver) shutdown(_ context.Context) error {
Expand Down
10 changes: 6 additions & 4 deletions receiver/sqlqueryreceiver/logs_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestLogsQueryReceiver_Collect(t *testing.T) {
)
}

func TestLogsQueryReceiver_MissingColumnInResultSetForAttributeColumn(t *testing.T) {
func TestLogsQueryReceiver_MissingColumnInResultSet(t *testing.T) {
fakeClient := &sqlquery.FakeDBClient{
StringMaps: [][]sqlquery.StringMap{
{{"col1": "42"}},
Expand All @@ -63,12 +63,14 @@ func TestLogsQueryReceiver_MissingColumnInResultSetForAttributeColumn(t *testing
query: sqlquery.Query{
Logs: []sqlquery.LogsCfg{
{
BodyColumn: "col1",
AttributeColumns: []string{"expected_column"},
BodyColumn: "expected_body_column",
AttributeColumns: []string{"expected_column", "expected_column_2"},
},
},
},
}
_, err := queryReceiver.collect(context.Background())
assert.ErrorContains(t, err, "rowToLog: attribute_column not found: 'expected_column'")
assert.ErrorContains(t, err, "rowToLog: attribute_column 'expected_column' not found in result set")
assert.ErrorContains(t, err, "rowToLog: attribute_column 'expected_column_2' not found in result set")
assert.ErrorContains(t, err, "rowToLog: body_column 'expected_body_column' not found in result set")
}

0 comments on commit 507ec47

Please sign in to comment.