From f893114402bf98dafc7a4f172d3af8e70b899e2d Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Tue, 28 Nov 2023 10:19:29 +0000 Subject: [PATCH] Rework nsq components to new APIs --- internal/component/input/config.go | 2 - internal/component/input/config_nsq.go | 31 ----- internal/component/output/config.go | 2 - internal/component/output/config_nsq.go | 25 ---- internal/impl/nsq/docker-compose.yaml | 28 +++++ internal/impl/nsq/input.go | 155 ++++++++++++++---------- internal/impl/nsq/integration_test.go | 52 ++++++++ internal/impl/nsq/output.go | 129 +++++++++++--------- website/docs/components/inputs/nsq.md | 25 ++-- website/docs/components/outputs/nsq.md | 17 ++- 10 files changed, 261 insertions(+), 205 deletions(-) delete mode 100644 internal/component/input/config_nsq.go delete mode 100644 internal/component/output/config_nsq.go create mode 100644 internal/impl/nsq/docker-compose.yaml create mode 100644 internal/impl/nsq/integration_test.go diff --git a/internal/component/input/config.go b/internal/component/input/config.go index 7126128ef2..08d2e01b02 100644 --- a/internal/component/input/config.go +++ b/internal/component/input/config.go @@ -17,7 +17,6 @@ type Config struct { Dynamic DynamicConfig `json:"dynamic" yaml:"dynamic"` Generate GenerateConfig `json:"generate" yaml:"generate"` Inproc InprocConfig `json:"inproc" yaml:"inproc"` - NSQ NSQConfig `json:"nsq" yaml:"nsq"` Plugin any `json:"plugin,omitempty" yaml:"plugin,omitempty"` ReadUntil ReadUntilConfig `json:"read_until" yaml:"read_until"` Resource string `json:"resource" yaml:"resource"` @@ -41,7 +40,6 @@ func NewConfig() Config { Dynamic: NewDynamicConfig(), Generate: NewGenerateConfig(), Inproc: NewInprocConfig(), - NSQ: NewNSQConfig(), Plugin: nil, ReadUntil: NewReadUntilConfig(), Resource: "", diff --git a/internal/component/input/config_nsq.go b/internal/component/input/config_nsq.go deleted file mode 100644 index 5d208b8911..0000000000 --- a/internal/component/input/config_nsq.go +++ /dev/null @@ -1,31 +0,0 @@ -package input - -import ( - btls "github.com/benthosdev/benthos/v4/internal/tls" -) - -// NSQConfig contains configuration fields for the NSQ input type. -type NSQConfig struct { - Addresses []string `json:"nsqd_tcp_addresses" yaml:"nsqd_tcp_addresses"` - LookupAddresses []string `json:"lookupd_http_addresses" yaml:"lookupd_http_addresses"` - Topic string `json:"topic" yaml:"topic"` - Channel string `json:"channel" yaml:"channel"` - UserAgent string `json:"user_agent" yaml:"user_agent"` - TLS btls.Config `json:"tls" yaml:"tls"` - MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` - MaxAttempts uint16 `json:"max_attempts" yaml:"max_attempts"` -} - -// NewNSQConfig creates a new NSQConfig with default values. -func NewNSQConfig() NSQConfig { - return NSQConfig{ - Addresses: []string{}, - LookupAddresses: []string{}, - Topic: "", - Channel: "", - UserAgent: "", - TLS: btls.NewConfig(), - MaxInFlight: 100, - MaxAttempts: 5, - } -} diff --git a/internal/component/output/config.go b/internal/component/output/config.go index 214b925762..f5677b0df0 100644 --- a/internal/component/output/config.go +++ b/internal/component/output/config.go @@ -21,7 +21,6 @@ type Config struct { Dynamic DynamicConfig `json:"dynamic" yaml:"dynamic"` Fallback TryConfig `json:"fallback" yaml:"fallback"` Inproc string `json:"inproc" yaml:"inproc"` - NSQ NSQConfig `json:"nsq" yaml:"nsq"` Plugin any `json:"plugin,omitempty" yaml:"plugin,omitempty"` Reject string `json:"reject" yaml:"reject"` Resource string `json:"resource" yaml:"resource"` @@ -50,7 +49,6 @@ func NewConfig() Config { Dynamic: NewDynamicConfig(), Fallback: NewTryConfig(), Inproc: "", - NSQ: NewNSQConfig(), Plugin: nil, Reject: "", Resource: "", diff --git a/internal/component/output/config_nsq.go b/internal/component/output/config_nsq.go deleted file mode 100644 index c20175b03a..0000000000 --- a/internal/component/output/config_nsq.go +++ /dev/null @@ -1,25 +0,0 @@ -package output - -import ( - btls "github.com/benthosdev/benthos/v4/internal/tls" -) - -// NSQConfig contains configuration fields for the NSQ output type. -type NSQConfig struct { - Address string `json:"nsqd_tcp_address" yaml:"nsqd_tcp_address"` - Topic string `json:"topic" yaml:"topic"` - UserAgent string `json:"user_agent" yaml:"user_agent"` - TLS btls.Config `json:"tls" yaml:"tls"` - MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"` -} - -// NewNSQConfig creates a new NSQConfig with default values. -func NewNSQConfig() NSQConfig { - return NSQConfig{ - Address: "", - Topic: "", - UserAgent: "", - TLS: btls.NewConfig(), - MaxInFlight: 64, - } -} diff --git a/internal/impl/nsq/docker-compose.yaml b/internal/impl/nsq/docker-compose.yaml new file mode 100644 index 0000000000..bdbbe7a538 --- /dev/null +++ b/internal/impl/nsq/docker-compose.yaml @@ -0,0 +1,28 @@ +# Surprisingly, there still seems to be absolutely no options available for +# running a single node set up of NSQ for testing purposes, which means it's +# extremely awkward to write real integration tests. Instead, we have this +# docker-compose set up where if you run it and then execute unit tests for this +# package it'll run them. +version: '3' +services: + nsqlookupd: + image: nsqio/nsq + command: /nsqlookupd + ports: + - "4160:4160" + - "4161" + nsqd: + image: nsqio/nsq + command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 + depends_on: + - nsqlookupd + ports: + - "4150:4150" + - "4151" + nsqadmin: + image: nsqio/nsq + command: /nsqadmin --lookupd-http-address=nsqlookupd:4161 + depends_on: + - nsqlookupd + ports: + - "4171" diff --git a/internal/impl/nsq/input.go b/internal/impl/nsq/input.go index 356cf651cf..d58b2a9844 100644 --- a/internal/impl/nsq/input.go +++ b/internal/impl/nsq/input.go @@ -11,62 +11,70 @@ import ( "github.com/nsqio/go-nsq" - "github.com/benthosdev/benthos/v4/internal/bundle" - "github.com/benthosdev/benthos/v4/internal/component" - "github.com/benthosdev/benthos/v4/internal/component/input" - "github.com/benthosdev/benthos/v4/internal/component/input/processors" - "github.com/benthosdev/benthos/v4/internal/docs" - "github.com/benthosdev/benthos/v4/internal/log" - "github.com/benthosdev/benthos/v4/internal/message" - btls "github.com/benthosdev/benthos/v4/internal/tls" + "github.com/benthosdev/benthos/v4/public/service" ) -func init() { - err := bundle.AllInputs.Add(processors.WrapConstructor(newNSQInput), docs.ComponentSpec{ - Name: "nsq", - Summary: `Subscribe to an NSQ instance topic and channel.`, - Description: ` +const ( + niFieldNSQDAddrs = "nsqd_tcp_addresses" + niFieldLookupDAddrs = "lookupd_http_addresses" + niFieldTLS = "tls" + niFieldMaxInFlight = "max_in_flight" + niFieldTopic = "topic" + niFieldChannel = "channel" + niFieldUserAgent = "user_agent" + niFieldMaxAttempts = "max_attempts" +) + +func inputConfigSpec() *service.ConfigSpec { + return service.NewConfigSpec(). + Stable(). + Categories("Services"). + Summary(`Subscribe to an NSQ instance topic and channel.`). + Description(` ### Metadata This input adds the following metadata fields to each message: -` + "``` text" + ` +`+"``` text"+` - nsq_attempts - nsq_id - nsq_nsqd_address - nsq_timestamp -` + "```" + ` +`+"```"+` You can access these metadata fields using [function interpolation](/docs/configuration/interpolation#bloblang-queries). -`, - Config: docs.FieldComponent().WithChildren( - docs.FieldString("nsqd_tcp_addresses", "A list of nsqd addresses to connect to.").Array(), - docs.FieldString("lookupd_http_addresses", "A list of nsqlookupd addresses to connect to.").Array(), - btls.FieldSpec(), - docs.FieldString("topic", "The topic to consume from."), - docs.FieldString("channel", "The channel to consume from."), - docs.FieldString("user_agent", "A user agent to assume when connecting."), - docs.FieldInt("max_in_flight", "The maximum number of pending messages to consume at any given time."), - docs.FieldInt("max_attempts", "The maximum number of attempts to successfully consume a messages."), - ).ChildDefaultAndTypesFromStruct(input.NewNSQConfig()), - Categories: []string{ - "Services", - }, +`). + Fields( + service.NewStringListField(niFieldNSQDAddrs). + Description("A list of nsqd addresses to connect to."), + service.NewStringListField(niFieldLookupDAddrs). + Description("A list of nsqlookupd addresses to connect to."), + service.NewTLSToggledField(niFieldTLS), + service.NewStringField(niFieldTopic). + Description("The topic to consume from."), + service.NewStringField(niFieldChannel). + Description("The channel to consume from."), + service.NewStringField(niFieldUserAgent). + Description("A user agent to assume when connecting."). + Optional(), + service.NewIntField(niFieldMaxInFlight). + Description("The maximum number of pending messages to consume at any given time."). + Default(100), + service.NewIntField(niFieldMaxAttempts). + Description("The maximum number of attempts to successfully consume a messages."). + Default(5), + ) +} + +func init() { + err := service.RegisterInput("nsq", inputConfigSpec(), func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { + return newNSQReaderFromParsed(conf, mgr) }) if err != nil { panic(err) } } -func newNSQInput(conf input.Config, mgr bundle.NewManagement) (input.Streamed, error) { - var n input.Async - var err error - if n, err = newNSQReader(conf.NSQ, mgr); err != nil { - return nil, err - } - return input.NewAsyncReader("nsq", n, mgr) -} - type nsqReader struct { consumer *nsq.Consumer cMut sync.Mutex @@ -76,42 +84,68 @@ type nsqReader struct { tlsConf *tls.Config addresses []string lookupAddresses []string - conf input.NSQConfig - log log.Modular + topic string + channel string + userAgent string + maxInFlight int + maxAttempts uint16 + log *service.Logger internalMessages chan *nsq.Message interruptChan chan struct{} interruptOnce sync.Once } -func newNSQReader(conf input.NSQConfig, mgr bundle.NewManagement) (*nsqReader, error) { - n := nsqReader{ - conf: conf, +func newNSQReaderFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (n *nsqReader, err error) { + n = &nsqReader{ log: mgr.Logger(), internalMessages: make(chan *nsq.Message), interruptChan: make(chan struct{}), } - for _, addr := range conf.Addresses { + + var addresses []string + if addresses, err = conf.FieldStringList(niFieldNSQDAddrs); err != nil { + return + } + for _, addr := range addresses { for _, splitAddr := range strings.Split(addr, ",") { if len(splitAddr) > 0 { n.addresses = append(n.addresses, splitAddr) } } } - for _, addr := range conf.LookupAddresses { + + if addresses, err = conf.FieldStringList(niFieldLookupDAddrs); err != nil { + return + } + for _, addr := range addresses { for _, splitAddr := range strings.Split(addr, ",") { if len(splitAddr) > 0 { n.lookupAddresses = append(n.lookupAddresses, splitAddr) } } } - if conf.TLS.Enabled { - var err error - if n.tlsConf, err = conf.TLS.Get(mgr.FS()); err != nil { - return nil, err - } + + if n.tlsConf, _, err = conf.FieldTLSToggled(niFieldTLS); err != nil { + return + } + + if n.topic, err = conf.FieldString(niFieldTopic); err != nil { + return } - return &n, nil + if n.channel, err = conf.FieldString(niFieldChannel); err != nil { + return + } + n.userAgent, _ = conf.FieldString(niFieldUserAgent) + if n.maxInFlight, err = conf.FieldMaxInFlight(); err != nil { + return + } + var tmpMA int + if tmpMA, err = conf.FieldInt(niFieldMaxAttempts); err != nil { + return + } + n.maxAttempts = uint16(tmpMA) + return } func (n *nsqReader) HandleMessage(message *nsq.Message) error { @@ -134,16 +168,16 @@ func (n *nsqReader) Connect(ctx context.Context) (err error) { } cfg := nsq.NewConfig() - cfg.UserAgent = n.conf.UserAgent - cfg.MaxInFlight = n.conf.MaxInFlight - cfg.MaxAttempts = n.conf.MaxAttempts + cfg.UserAgent = n.userAgent + cfg.MaxInFlight = n.maxInFlight + cfg.MaxAttempts = n.maxAttempts if n.tlsConf != nil { cfg.TlsV1 = true cfg.TlsConfig = n.tlsConf } var consumer *nsq.Consumer - if consumer, err = nsq.NewConsumer(n.conf.Topic, n.conf.Channel, cfg); err != nil { + if consumer, err = nsq.NewConsumer(n.topic, n.channel, cfg); err != nil { return } @@ -181,6 +215,7 @@ func (n *nsqReader) read(ctx context.Context) (*nsq.Message, error) { case msg = <-n.internalMessages: return msg, nil case <-ctx.Done(): + return nil, ctx.Err() case <-n.interruptChan: for _, m := range n.unAckMsgs { m.Requeue(-1) @@ -188,26 +223,24 @@ func (n *nsqReader) read(ctx context.Context) (*nsq.Message, error) { } n.unAckMsgs = nil _ = n.disconnect() - return nil, component.ErrTypeClosed + return nil, service.ErrEndOfInput } - return nil, component.ErrTimeout } -func (n *nsqReader) ReadBatch(ctx context.Context) (message.Batch, input.AsyncAckFn, error) { +func (n *nsqReader) Read(ctx context.Context) (*service.Message, service.AckFunc, error) { msg, err := n.read(ctx) if err != nil { return nil, nil, err } n.unAckMsgs = append(n.unAckMsgs, msg) - bmsg := message.QuickBatch([][]byte{msg.Body}) - part := bmsg.Get(0) + part := service.NewMessage(msg.Body) part.MetaSetMut("nsq_attempts", strconv.Itoa(int(msg.Attempts))) part.MetaSetMut("nsq_id", string(msg.ID[:])) part.MetaSetMut("nsq_timestamp", strconv.FormatInt(msg.Timestamp, 10)) part.MetaSetMut("nsq_nsqd_address", msg.NSQDAddress) - return bmsg, func(rctx context.Context, res error) error { + return part, func(rctx context.Context, res error) error { if res != nil { msg.Requeue(-1) } diff --git a/internal/impl/nsq/integration_test.go b/internal/impl/nsq/integration_test.go new file mode 100644 index 0000000000..505e1daaef --- /dev/null +++ b/internal/impl/nsq/integration_test.go @@ -0,0 +1,52 @@ +package nsq + +import ( + "net" + "testing" + "time" + + "github.com/benthosdev/benthos/v4/internal/integration" +) + +func TestIntegration(t *testing.T) { + t.Parallel() + + { + timeout := time.Second + conn, err := net.DialTimeout("tcp", "localhost:4150", timeout) + if err != nil { + t.Skip("Skipping NSQ tests as services are not running") + } + conn.Close() + } + + template := ` +output: + nsq: + nsqd_tcp_address: localhost:4150 + topic: topic-$ID + # user_agent: "" + max_in_flight: $MAX_IN_FLIGHT + +input: + nsq: + nsqd_tcp_addresses: [ localhost:4150 ] + lookupd_http_addresses: [ localhost:4160 ] + topic: topic-$ID + channel: channel-$ID + # user_agent: "" + max_in_flight: 100 + max_attempts: 5 +` + suite := integration.StreamTests( + integration.StreamTestOpenClose(), + integration.StreamTestSendBatch(10), + integration.StreamTestStreamParallel(1000), + ) + suite.Run(t, template) + + t.Run("with max in flight", func(t *testing.T) { + t.Parallel() + suite.Run(t, template, integration.StreamTestOptMaxInFlight(10)) + }) +} diff --git a/internal/impl/nsq/output.go b/internal/impl/nsq/output.go index bb2b38aa90..2df2ccc6cd 100644 --- a/internal/impl/nsq/output.go +++ b/internal/impl/nsq/output.go @@ -10,73 +10,81 @@ import ( nsq "github.com/nsqio/go-nsq" - "github.com/benthosdev/benthos/v4/internal/bloblang/field" - "github.com/benthosdev/benthos/v4/internal/bundle" - "github.com/benthosdev/benthos/v4/internal/component" "github.com/benthosdev/benthos/v4/internal/component/output" - "github.com/benthosdev/benthos/v4/internal/component/output/processors" - "github.com/benthosdev/benthos/v4/internal/docs" - "github.com/benthosdev/benthos/v4/internal/log" - "github.com/benthosdev/benthos/v4/internal/message" - btls "github.com/benthosdev/benthos/v4/internal/tls" + "github.com/benthosdev/benthos/v4/public/service" ) +const ( + noFieldNSQDAddr = "nsqd_tcp_address" + noFieldTLS = "tls" + noFieldTopic = "topic" + noFieldUserAgent = "user_agent" +) + +func outputConfigSpec() *service.ConfigSpec { + return service.NewConfigSpec(). + Stable(). + Categories("Services"). + Summary(`Publish to an NSQ topic.`). + Description(output.Description(true, false, `The `+"`topic`"+` field can be dynamically set using function interpolations described [here](/docs/configuration/interpolation#bloblang-queries). When sending batched messages these interpolations are performed per message part.`)). + Fields( + service.NewStringField(noFieldNSQDAddr). + Description("The address of the target NSQD server."), + service.NewInterpolatedStringField(noFieldTopic). + Description("The topic to publish to."), + service.NewStringField(noFieldUserAgent). + Description("A user agent to assume when connecting."). + Optional(), + service.NewTLSToggledField(noFieldTLS), + service.NewOutputMaxInFlightField(), + ) +} + func init() { - err := bundle.AllOutputs.Add(processors.WrapConstructor(newNSQOutput), docs.ComponentSpec{ - Name: "nsq", - Summary: `Publish to an NSQ topic.`, - Description: output.Description(true, false, `The `+"`topic`"+` field can be dynamically set using function interpolations described [here](/docs/configuration/interpolation#bloblang-queries). When sending batched messages these interpolations are performed per message part.`), - Config: docs.FieldComponent().WithChildren( - docs.FieldString("nsqd_tcp_address", "The address of the target NSQD server."), - docs.FieldString("topic", "The topic to publish to.").IsInterpolated(), - docs.FieldString("user_agent", "A user agent string to connect with."), - btls.FieldSpec(), - docs.FieldInt("max_in_flight", "The maximum number of messages to have in flight at a given time. Increase this to improve throughput."), - ).ChildDefaultAndTypesFromStruct(output.NewNSQConfig()), - Categories: []string{ - "Services", - }, + err := service.RegisterOutput("nsq", outputConfigSpec(), func(conf *service.ParsedConfig, mgr *service.Resources) (service.Output, int, error) { + wtr, err := newNSQWriterFromParsed(conf, mgr) + if err != nil { + return nil, 0, err + } + mIF, err := conf.FieldMaxInFlight() + if err != nil { + return nil, 0, err + } + return wtr, mIF, nil }) if err != nil { panic(err) } } -func newNSQOutput(conf output.Config, mgr bundle.NewManagement) (output.Streamed, error) { - w, err := newNSQWriter(conf.NSQ, mgr) - if err != nil { - return nil, err - } - return output.NewAsyncWriter("nsq", conf.NSQ.MaxInFlight, w, mgr) -} - type nsqWriter struct { - log log.Modular + log *service.Logger - topicStr *field.Expression + address string + topicStr *service.InterpolatedString + tlsConf *tls.Config + userAgent string - tlsConf *tls.Config connMut sync.RWMutex producer *nsq.Producer - - conf output.NSQConfig } -func newNSQWriter(conf output.NSQConfig, mgr bundle.NewManagement) (*nsqWriter, error) { - n := nsqWriter{ - log: mgr.Logger(), - conf: conf, +func newNSQWriterFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (n *nsqWriter, err error) { + n = &nsqWriter{ + log: mgr.Logger(), } - var err error - if n.topicStr, err = mgr.BloblEnvironment().NewField(conf.Topic); err != nil { - return nil, fmt.Errorf("failed to parse topic expression: %v", err) + + if n.address, err = conf.FieldString(noFieldNSQDAddr); err != nil { + return } - if conf.TLS.Enabled { - if n.tlsConf, err = conf.TLS.Get(mgr.FS()); err != nil { - return nil, err - } + if n.topicStr, err = conf.FieldInterpolatedString(noFieldTopic); err != nil { + return nil, err + } + if n.tlsConf, _, err = conf.FieldTLSToggled(noFieldTLS); err != nil { + return } - return &n, nil + n.userAgent, _ = conf.FieldString(noFieldUserAgent) + return } func (n *nsqWriter) Connect(ctx context.Context) error { @@ -84,13 +92,13 @@ func (n *nsqWriter) Connect(ctx context.Context) error { defer n.connMut.Unlock() cfg := nsq.NewConfig() - cfg.UserAgent = n.conf.UserAgent + cfg.UserAgent = n.userAgent if n.tlsConf != nil { cfg.TlsV1 = true cfg.TlsConfig = n.tlsConf } - producer, err := nsq.NewProducer(n.conf.Address, cfg) + producer, err := nsq.NewProducer(n.address, cfg) if err != nil { return err } @@ -101,26 +109,29 @@ func (n *nsqWriter) Connect(ctx context.Context) error { return err } n.producer = producer - n.log.Infof("Sending NSQ messages to address: %s\n", n.conf.Address) + n.log.Infof("Sending NSQ messages to address: %s", n.address) return nil } -func (n *nsqWriter) WriteBatch(ctx context.Context, msg message.Batch) error { +func (n *nsqWriter) Write(ctx context.Context, msg *service.Message) error { n.connMut.RLock() prod := n.producer n.connMut.RUnlock() if prod == nil { - return component.ErrNotConnected + return service.ErrNotConnected } - return output.IterateBatchedSend(msg, func(i int, p *message.Part) error { - topicStr, err := n.topicStr.String(i, msg) - if err != nil { - return fmt.Errorf("topic interpolation error: %w", err) - } - return prod.Publish(topicStr, p.AsBytes()) - }) + topicStr, err := n.topicStr.TryString(msg) + if err != nil { + return fmt.Errorf("topic interpolation error: %w", err) + } + + mBytes, err := msg.AsBytes() + if err != nil { + return err + } + return prod.Publish(topicStr, mBytes) } func (n *nsqWriter) Close(context.Context) error { diff --git a/website/docs/components/inputs/nsq.md b/website/docs/components/inputs/nsq.md index 7fac266bf6..1f411a0ec6 100644 --- a/website/docs/components/inputs/nsq.md +++ b/website/docs/components/inputs/nsq.md @@ -29,11 +29,11 @@ Subscribe to an NSQ instance topic and channel. input: label: "" nsq: - nsqd_tcp_addresses: [] - lookupd_http_addresses: [] - topic: "" - channel: "" - user_agent: "" + nsqd_tcp_addresses: [] # No default (required) + lookupd_http_addresses: [] # No default (required) + topic: "" # No default (required) + channel: "" # No default (required) + user_agent: "" # No default (optional) max_in_flight: 100 max_attempts: 5 ``` @@ -46,8 +46,8 @@ input: input: label: "" nsq: - nsqd_tcp_addresses: [] - lookupd_http_addresses: [] + nsqd_tcp_addresses: [] # No default (required) + lookupd_http_addresses: [] # No default (required) tls: enabled: false skip_cert_verify: false @@ -55,9 +55,9 @@ input: root_cas: "" root_cas_file: "" client_certs: [] - topic: "" - channel: "" - user_agent: "" + topic: "" # No default (required) + channel: "" # No default (required) + user_agent: "" # No default (optional) max_in_flight: 100 max_attempts: 5 ``` @@ -87,7 +87,6 @@ A list of nsqd addresses to connect to. Type: `array` -Default: `[]` ### `lookupd_http_addresses` @@ -95,7 +94,6 @@ A list of nsqlookupd addresses to connect to. Type: `array` -Default: `[]` ### `tls` @@ -243,7 +241,6 @@ The topic to consume from. Type: `string` -Default: `""` ### `channel` @@ -251,7 +248,6 @@ The channel to consume from. Type: `string` -Default: `""` ### `user_agent` @@ -259,7 +255,6 @@ A user agent to assume when connecting. Type: `string` -Default: `""` ### `max_in_flight` diff --git a/website/docs/components/outputs/nsq.md b/website/docs/components/outputs/nsq.md index 531ffeafc2..517c370473 100644 --- a/website/docs/components/outputs/nsq.md +++ b/website/docs/components/outputs/nsq.md @@ -29,9 +29,9 @@ Publish to an NSQ topic. output: label: "" nsq: - nsqd_tcp_address: "" - topic: "" - user_agent: "" + nsqd_tcp_address: "" # No default (required) + topic: "" # No default (required) + user_agent: "" # No default (optional) max_in_flight: 64 ``` @@ -43,9 +43,9 @@ output: output: label: "" nsq: - nsqd_tcp_address: "" - topic: "" - user_agent: "" + nsqd_tcp_address: "" # No default (required) + topic: "" # No default (required) + user_agent: "" # No default (optional) tls: enabled: false skip_cert_verify: false @@ -75,7 +75,6 @@ The address of the target NSQD server. Type: `string` -Default: `""` ### `topic` @@ -84,15 +83,13 @@ This field supports [interpolation functions](/docs/configuration/interpolation# Type: `string` -Default: `""` ### `user_agent` -A user agent string to connect with. +A user agent to assume when connecting. Type: `string` -Default: `""` ### `tls`