Skip to content

Commit

Permalink
Move nanomsg components to new APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Nov 27, 2023
1 parent 6a2162f commit fd2c1ed
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 206 deletions.
2 changes: 0 additions & 2 deletions internal/component/input/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ type Config struct {
Dynamic DynamicConfig `json:"dynamic" yaml:"dynamic"`
Generate GenerateConfig `json:"generate" yaml:"generate"`
Inproc InprocConfig `json:"inproc" yaml:"inproc"`
Nanomsg NanomsgConfig `json:"nanomsg" yaml:"nanomsg"`
NSQ NSQConfig `json:"nsq" yaml:"nsq"`
Plugin any `json:"plugin,omitempty" yaml:"plugin,omitempty"`
ReadUntil ReadUntilConfig `json:"read_until" yaml:"read_until"`
Expand All @@ -42,7 +41,6 @@ func NewConfig() Config {
Dynamic: NewDynamicConfig(),
Generate: NewGenerateConfig(),
Inproc: NewInprocConfig(),
Nanomsg: NewNanomsgConfig(),
NSQ: NewNSQConfig(),
Plugin: nil,
ReadUntil: NewReadUntilConfig(),
Expand Down
21 changes: 0 additions & 21 deletions internal/component/input/config_nanomsg.go

This file was deleted.

2 changes: 0 additions & 2 deletions internal/component/output/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ type Config struct {
Dynamic DynamicConfig `json:"dynamic" yaml:"dynamic"`
Fallback TryConfig `json:"fallback" yaml:"fallback"`
Inproc string `json:"inproc" yaml:"inproc"`
Nanomsg NanomsgConfig `json:"nanomsg" yaml:"nanomsg"`
NSQ NSQConfig `json:"nsq" yaml:"nsq"`
Plugin any `json:"plugin,omitempty" yaml:"plugin,omitempty"`
Reject string `json:"reject" yaml:"reject"`
Expand Down Expand Up @@ -51,7 +50,6 @@ func NewConfig() Config {
Dynamic: NewDynamicConfig(),
Fallback: NewTryConfig(),
Inproc: "",
Nanomsg: NewNanomsgConfig(),
NSQ: NewNSQConfig(),
Plugin: nil,
Reject: "",
Expand Down
21 changes: 0 additions & 21 deletions internal/component/output/config_nanomsg.go

This file was deleted.

153 changes: 81 additions & 72 deletions internal/impl/nanomsg/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,109 +3,124 @@ package nanomsg
import (
"context"
"errors"
"fmt"
"net/url"
"strings"
"sync"
"time"

"go.nanomsg.org/mangos/v3"
"go.nanomsg.org/mangos/v3/protocol/pub"
"go.nanomsg.org/mangos/v3/protocol/pull"
"go.nanomsg.org/mangos/v3/protocol/push"
"go.nanomsg.org/mangos/v3/protocol/sub"

"github.com/benthosdev/benthos/v4/internal/bundle"
"github.com/benthosdev/benthos/v4/internal/component"
"github.com/benthosdev/benthos/v4/internal/component/input"
"github.com/benthosdev/benthos/v4/internal/component/input/processors"
"github.com/benthosdev/benthos/v4/internal/docs"
"github.com/benthosdev/benthos/v4/internal/log"
"github.com/benthosdev/benthos/v4/internal/message"
"github.com/benthosdev/benthos/v4/public/service"

// Import all transport types.
_ "go.nanomsg.org/mangos/v3/transport/all"
)

const (
niFieldURLs = "urls"
niFieldBind = "bind"
niFieldSocketType = "socket_type"
niFieldSubFilters = "sub_filters"
niFieldPollTimeout = "poll_timeout"
)

func inputConfigSpec() *service.ConfigSpec {
return service.NewConfigSpec().
Stable().
Categories("Network").
Summary(`Consumes messages via Nanomsg sockets (scalability protocols).`).
Description(`Currently only PULL and SUB sockets are supported.`).
Fields(
service.NewURLListField(niFieldURLs).
Description("A list of URLs to connect to (or as). If an item of the list contains commas it will be expanded into multiple URLs."),
service.NewBoolField(niFieldBind).
Description("Whether the URLs provided should be connected to, or bound as.").
Default(true),
service.NewStringEnumField(niFieldSocketType, "PULL", "SUB").
Description("The socket type to use.").
Default("PULL"),
service.NewStringListField(niFieldSubFilters).
Description("A list of subscription topic filters to use when consuming from a SUB socket. Specifying a single sub_filter of `''` will subscribe to everything.").
Default([]any{}),
service.NewDurationField(niFieldPollTimeout).
Description("The period to wait until a poll is abandoned and reattempted.").
Advanced().
Default("5s"),
)
}

func init() {
err := bundle.AllInputs.Add(processors.WrapConstructor(newNanomsgInput), docs.ComponentSpec{
Name: "nanomsg",
Summary: `Consumes messages via Nanomsg sockets (scalability protocols).`,
Description: `Currently only PULL and SUB sockets are supported.`,
Config: docs.FieldComponent().WithChildren(
docs.FieldURL("urls", "A list of URLs to connect to (or as). If an item of the list contains commas it will be expanded into multiple URLs.").Array(),
docs.FieldBool("bind", "Whether the URLs provided should be connected to, or bound as."),
docs.FieldString("socket_type", "The socket type to use.").HasOptions("PULL", "SUB"),
docs.FieldString("sub_filters", "A list of subscription topic filters to use when consuming from a SUB socket. Specifying a single sub_filter of `''` will subscribe to everything.").Array(),
docs.FieldString("poll_timeout", "The period to wait until a poll is abandoned and reattempted.").Advanced(),
).ChildDefaultAndTypesFromStruct(input.NewNanomsgConfig()),
Categories: []string{
"Network",
},
err := service.RegisterInput("nanomsg", inputConfigSpec(), func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) {
rdr, err := newNanomsgReaderFromParsed(conf, mgr)
if err != nil {
return nil, err
}
return service.AutoRetryNacks(rdr), nil
})
if err != nil {
panic(err)
}
}

func newNanomsgInput(conf input.Config, mgr bundle.NewManagement) (input.Streamed, error) {
s, err := newNanomsgReader(conf.Nanomsg, mgr.Logger())
if err != nil {
return nil, err
}
return input.NewAsyncReader("nanomsg", input.NewAsyncPreserver(s), mgr)
}

type nanomsgReader struct {
socket mangos.Socket
cMut sync.Mutex

urls []string
bind bool
socketType string
subFilters []string
pollTimeout time.Duration
repTimeout time.Duration

urls []string
conf input.NanomsgConfig
log log.Modular
log *service.Logger
}

func newNanomsgReader(conf input.NanomsgConfig, log log.Modular) (*nanomsgReader, error) {
s := nanomsgReader{
conf: conf,
log: log,
func newNanomsgReaderFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (rdr *nanomsgReader, err error) {
rdr = &nanomsgReader{
log: mgr.Logger(),
repTimeout: time.Second * 5,
}

for _, u := range conf.URLs {
for _, splitU := range strings.Split(u, ",") {
if len(splitU) > 0 {
s.urls = append(s.urls, strings.Replace(splitU, "//*:", "//0.0.0.0:", 1))
}
}
var cURLs []*url.URL
if cURLs, err = conf.FieldURLList(niFieldURLs); err != nil {
return
}
for _, u := range cURLs {
rdr.urls = append(rdr.urls, strings.Replace(u.String(), "//*:", "//0.0.0.0:", 1))
}

if conf.SocketType == "SUB" && len(conf.SubFilters) == 0 {
return nil, errors.New("must provide at least one sub filter when connecting with a SUB socket, in order to subscribe to all messages add an empty string")
if rdr.socketType, err = conf.FieldString(niFieldSocketType); err != nil {
return
}

if tout := conf.PollTimeout; len(tout) > 0 {
var err error
if s.pollTimeout, err = time.ParseDuration(tout); err != nil {
return nil, fmt.Errorf("failed to parse poll timeout string: %v", err)
}
if rdr.subFilters, err = conf.FieldStringList(niFieldSubFilters); err != nil {
return
}

if rdr.bind, err = conf.FieldBool(niFieldBind); err != nil {
return
}

if rdr.socketType == "SUB" && len(rdr.subFilters) == 0 {
return nil, errors.New("must provide at least one sub filter when connecting with a SUB socket, in order to subscribe to all messages add an empty string")
}

return &s, nil
if rdr.pollTimeout, err = conf.FieldDuration(niFieldPollTimeout); err != nil {
return
}
return
}

func getSocketFromType(t string) (mangos.Socket, error) {
func getInputSocketFromType(t string) (mangos.Socket, error) {
switch t {
case "PULL":
return pull.NewSocket()
case "SUB":
return sub.NewSocket()
case "PUSH":
return push.NewSocket()
case "PUB":
return pub.NewSocket()
}
return nil, errors.New("invalid Scalability Protocols socket type")
}
Expand All @@ -127,12 +142,12 @@ func (s *nanomsgReader) Connect(ctx context.Context) error {
}
}()

socket, err = getSocketFromType(s.conf.SocketType)
socket, err = getInputSocketFromType(s.socketType)
if err != nil {
return err
}

if s.conf.Bind {
if s.bind {
for _, addr := range s.urls {
if err = socket.Listen(addr); err != nil {
break
Expand Down Expand Up @@ -162,35 +177,29 @@ func (s *nanomsgReader) Connect(ctx context.Context) error {
return err
}

for _, filter := range s.conf.SubFilters {
for _, filter := range s.subFilters {
if err := socket.SetOption(mangos.OptionSubscribe, []byte(filter)); err != nil {
return err
}
}

if s.conf.Bind {
s.log.Infof(
"Receiving Scalability Protocols messages at bound URLs: %s\n",
s.urls,
)
if s.bind {
s.log.Infof("Receiving Scalability Protocols messages at bound URLs: %s", s.urls)
} else {
s.log.Infof(
"Receiving Scalability Protocols messages at connected URLs: %s\n",
s.urls,
)
s.log.Infof("Receiving Scalability Protocols messages at connected URLs: %s", s.urls)
}

s.socket = socket
return nil
}

func (s *nanomsgReader) ReadBatch(ctx context.Context) (message.Batch, input.AsyncAckFn, error) {
func (s *nanomsgReader) Read(ctx context.Context) (*service.Message, service.AckFunc, error) {
s.cMut.Lock()
socket := s.socket
s.cMut.Unlock()

if socket == nil {
return nil, nil, component.ErrNotConnected
return nil, nil, service.ErrNotConnected
}
data, err := socket.Recv()
if err != nil {
Expand All @@ -199,7 +208,7 @@ func (s *nanomsgReader) ReadBatch(ctx context.Context) (message.Batch, input.Asy
}
return nil, nil, err
}
return message.QuickBatch([][]byte{data}), func(ctx context.Context, err error) error {
return service.NewMessage(data), func(ctx context.Context, err error) error {
return nil
}, nil
}
Expand Down
Loading

0 comments on commit fd2c1ed

Please sign in to comment.