Skip to content

Commit

Permalink
x-pack/filebeat/input/cel: fix eval state return on error (#33996)
Browse files Browse the repository at this point in the history
Previously, when an error in evaluation or result deserialisation
occurred, the entire input activation container was returned instead of
a valid state. The result of this was that the state would nest on each
iteration of the eval event loop. In cases where errors can happen
frequently, this can result in unbounded memory use.

Also improve the robustness of the want_more flag; the old code would
continue with the loop if state["want_more"] was unset since any(nil)
does not equal false.

Relates: #33992
  • Loading branch information
efd6 authored Dec 8, 2022
1 parent 0c77112 commit f08eed8
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Rename identity as identity_name when the value is a string in Azure Platform Logs. {pull}33654[33654]
- Fix input cancellation handling when HTTP client does not support contexts. {issue}33962[33962] {pull}33968[33968]
- Update mito CEL extension library to v0.0.0-20221207004749-2f0f2875e464 {pull}33974[33974]
- Fix CEL result deserialisation when evaluation fails. {issue}33992[33992] {pull}33996[33996]

*Heartbeat*
- Fix broken zip URL monitors. NOTE: Zip URL Monitors will be removed in version 8.7 and replaced with project monitors. {pull}33723[33723]
Expand Down
28 changes: 12 additions & 16 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,7 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub
log.Debugw("request state", logp.Namespace("cel"), "state", state)
metrics.executions.Add(1)
start := time.Now()
state, err = evalWith(ctx, prg, map[string]interface{}{
root: state,
})
state, err = evalWith(ctx, prg, state)
log.Debugw("response state", logp.Namespace("cel"), "state", state)
if err != nil {
switch {
Expand Down Expand Up @@ -427,9 +425,7 @@ func (input) run(env v2.Context, src *source, cursor map[string]interface{}, pub
// Replace the last known good cursor.
state["cursor"] = goodCursor

// Avoid explicit type assertion. This is safe as long as the value is
// Go-comparable.
if state["want_more"] == false {
if more, _ := state["want_more"].(bool); !more {
return nil
}
}
Expand Down Expand Up @@ -847,31 +843,31 @@ func newProgram(ctx context.Context, src, root string, client *http.Client, limi
return prg, nil
}

func evalWith(ctx context.Context, prg cel.Program, input map[string]interface{}) (map[string]interface{}, error) {
out, _, err := prg.ContextEval(ctx, input)
func evalWith(ctx context.Context, prg cel.Program, state map[string]interface{}) (map[string]interface{}, error) {
out, _, err := prg.ContextEval(ctx, map[string]interface{}{root: state})
if e := ctx.Err(); e != nil {
err = e
}
if err != nil {
input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed eval: %v", err)}
return input, fmt.Errorf("failed eval: %w", err)
state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed eval: %v", err)}
return state, fmt.Errorf("failed eval: %w", err)
}

v, err := out.ConvertToNative(reflect.TypeOf(&structpb.Value{}))
if err != nil {
input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed proto conversion: %v", err)}
return input, fmt.Errorf("failed proto conversion: %w", err)
state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed proto conversion: %v", err)}
return state, fmt.Errorf("failed proto conversion: %w", err)
}
b, err := protojson.MarshalOptions{Indent: ""}.Marshal(v.(proto.Message))
if err != nil {
input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed native conversion: %v", err)}
return input, fmt.Errorf("failed native conversion: %w", err)
state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed native conversion: %v", err)}
return state, fmt.Errorf("failed native conversion: %w", err)
}
var res map[string]interface{}
err = json.Unmarshal(b, &res)
if err != nil {
input["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed json conversion: %v", err)}
return input, fmt.Errorf("failed json conversion: %w", err)
state["events"] = map[string]interface{}{"error.message": fmt.Sprintf("failed json conversion: %v", err)}
return state, fmt.Errorf("failed json conversion: %w", err)
}
return res, nil
}
Expand Down

0 comments on commit f08eed8

Please sign in to comment.