Skip to content

Commit

Permalink
Merge pull request #52 from jehiah/exported_config_52
Browse files Browse the repository at this point in the history
config: export & list defaults
  • Loading branch information
mreiferson committed Jun 16, 2014
2 parents c40f474 + 5d77a1d commit 23d7999
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 273 deletions.
327 changes: 150 additions & 177 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,116 +9,91 @@ import (
"reflect"
"strconv"
"strings"
"sync"
"time"
"unsafe"
)

// Config is a struct of NSQ options
//
// (see Config.Set() for available parameters)
// The only valid way to create a Config is via NewConfig, using a struct literal will panic.
// After Config is passed into a high-level type (like Consumer, Producer, etc.) the values are no
// longer mutable (they are copied).
//
// Use Set(key string, value interface{}) as an alternate way to set parameters
type Config struct {
sync.RWMutex
initOnce sync.Once

readTimeout time.Duration `opt:"read_timeout" min:"100ms" max:"5m"`
writeTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m"`

lookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"5s" max:"5m"`
lookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1"`

maxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m"`
defaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m"`
backoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m"`

maxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535"`
lowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m"`

clientID string `opt:"client_id"`
hostname string `opt:"hostname"`
userAgent string `opt:"user_agent"`

heartbeatInterval time.Duration `opt:"heartbeat_interval"`
sampleRate int32 `opt:"sample_rate" min:"0" max:"99"`

tlsV1 bool `opt:"tls_v1"`
tlsConfig *tls.Config `opt:"tls_config"`

deflate bool `opt:"deflate"`
deflateLevel int `opt:"deflate_level" min:"1" max:"9"`
snappy bool `opt:"snappy"`

outputBufferSize int64 `opt:"output_buffer_size"`
outputBufferTimeout time.Duration `opt:"output_buffer_timeout"`

maxInFlight int `opt:"max_in_flight" min:"0"`

maxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m"`

authSecret string `opt:"auth_secret"`
initialized bool

// Deadlines for network reads and writes
ReadTimeout time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s"`
WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s"`

// Duration between polling lookupd for new producers, and fractional jitter to add to
// the lookupd pool loop. this helps evenly distribute requests even if multiple consumers
// restart at the same time
LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"5s" max:"5m" default:"60s"`
LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`

// Maximum duration when REQueueing (for doubling of deferred requeue)
MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"`
// Unit of time for calculating consumer backoff
BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`

// Maximum number of times this consumer will attempt to process a message before giving up
MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`
// Amount of time in seconds to wait for a message from a producer when in a state where RDY
// counts are re-distributed (ie. max_in_flight < num_producers)
LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`

// Identifiers sent to nsqd representing this client
// UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>")
ClientID string `opt:"client_id"` // deprecated (defaults: short hostname)
Hostname string `opt:"hostname"`
UserAgent string `opt:"user_agent"`

// Duration of time between heartbeats. This must be less than ReadTimeout
HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"`
// Integer percentage to sample the channel (requires nsqd 0.2.25+)
SampleRate int32 `opt:"sample_rate" min:"0" max:"99"`

// TLS Settings
TlsV1 bool `opt:"tls_v1"`
TlsConfig *tls.Config `opt:"tls_config"`

// Compression Settings
Deflate bool `opt:"deflate"`
DeflateLevel int `opt:"deflate_level" min:"1" max:"9" default:"6"`
Snappy bool `opt:"snappy"`

// Size of the buffer (in bytes) used by nsqd for buffering writes to this connection
OutputBufferSize int64 `opt:"output_buffer_size" default:"16384"`
// Timeout used by nsqd before flushing buffered writes (set to 0 to disable).
//
// WARNING: configuring clients with an extremely low
// (< 25ms) output_buffer_timeout has a significant effect
// on nsqd CPU usage (particularly with > 50 clients connected).
OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms"`

// Maximum number of messages to allow in flight (concurrency knob)
MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"`

// Maximum amount of time to backoff when processing fails 0 == no backoff
MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`

// secret for nsqd authentication (requires nsqd 0.2.29+)
AuthSecret string `opt:"auth_secret"`
}

// NewConfig returns a new default configuration.
//
// "read_timeout": 60s (min: 100ms, max: 5m) (time.Duration)
// "write_timeout": 1s (min: 100ms, max: 5m) (time.Duration)
// "lookupd_poll_interval": 60s (min: 5s, max: 5m) (time.Duration)
// "lookupd_poll_jitter": 0.3 (min: 0.0, max: 1.0) (float)
// "max_requeue_delay": 15m (min: 0, max: 60m) (time.Duration)
// "default_requeue_delay": 90s (min: 0, max: 60m) (time.Duration)
// "backoff_multiplier": 1s (min: 0, max: 60m) (time.TIme)
// "max_attempts": 5 (min: 0, max: 65535) (int)
// "low_rdy_idle_timeout": 10s (min: 1s, max: 5m) (time.Duration)
// "client_id": "<short host name>" (string)
// "hostname": os.Hostname() (string)
// "user_agent": "go-nsq/<version>" (string)
// "heartbeat_interval": 30s (time.Duration)
// "sample_rate": 0 (min: 0, max: 99) (int)
// "tls_v1": false (bool)
// "tls_config": nil (*tls.Config)
// "deflate": false (bool)
// "deflate_level": 6 (min: 1, max: 9) (int)
// "snappy": false (bool)
// "output_buffer_size": 16384 (int)
// "output_buffer_timeout": 250ms (time.Duration)
// "max_in_flight": 1 (int)
// "max_backoff_duration": 120s (time.Duration)
// "auth_secret": "" (string)
// NewConfig returns a new default nsq configuration.
//
// See Config.Set() for a description of these parameters.
// This must be used to initialize Config structs. Values can be set directly, or through Config.Set()
func NewConfig() *Config {
conf := &Config{}
conf.initialize()
return conf
}

// initialize is used to ensure that a Config has a baseline set of defaults
// despite how it might have been insantiated
func (c *Config) initialize() {
c.initOnce.Do(func() {
hostname, err := os.Hostname()
if err != nil {
log.Fatalf("ERROR: unable to get hostname %s", err.Error())
}
c.maxInFlight = 1
c.maxAttempts = 5
c.lookupdPollInterval = 60 * time.Second
c.lookupdPollJitter = 0.3
c.lowRdyIdleTimeout = 10 * time.Second
c.defaultRequeueDelay = 90 * time.Second
c.maxRequeueDelay = 15 * time.Minute
c.backoffMultiplier = time.Second
c.maxBackoffDuration = 120 * time.Second
c.readTimeout = DefaultClientTimeout
c.writeTimeout = time.Second
c.deflateLevel = 6
c.outputBufferSize = 16 * 1024
c.outputBufferTimeout = 250 * time.Millisecond
c.heartbeatInterval = DefaultClientTimeout / 2
c.clientID = strings.Split(hostname, ".")[0]
c.hostname = hostname
c.userAgent = fmt.Sprintf("go-nsq/%s", VERSION)
})
c := &Config{}
c.initialized = true
if err := c.setDefaults(); err != nil {
panic(err.Error())
}
return c
}

// Set takes an option as a string and a value as an interface and
Expand All @@ -140,86 +115,9 @@ func (c *Config) initialize() {
// 1 (an int where 1 == true and 0 == false)
//
// It returns an error for an invalid option or value.
//
// read_timeout (time.Duration): the deadline set for network reads
// (min: 100ms, max: 5m)
//
// write_timeout (time.Duration): the deadline set for network writes
// (min: 100ms, max: 5m)
//
// lookupd_poll_interval (time.Duration): duration between polling lookupd for new
// (min: 5s, max: 5m)
//
// lookupd_poll_jitter (float): fractional amount of jitter to add to the lookupd pool loop,
// this helps evenly distribute requests even if multiple
// consumers restart at the same time.
// (min: 0.0, max: 1.0)
//
// max_requeue_delay (time.Duration): the maximum duration when REQueueing
// (for doubling of deferred requeue)
// (min: 0, max: 60m)
//
// default_requeue_delay (time.Duration): the default duration when REQueueing
// (min: 0, max: 60m)
//
// backoff_multiplier (time.Duration): the unit of time for calculating consumer backoff
// (min: 0, max: 60m)
//
// max_attempts (int): maximum number of times this consumer will attempt to process a message
// (min: 0, max: 65535)
//
// low_rdy_idle_timeout (time.Duration): the amount of time in seconds to wait for a message
// from a producer when in a state where RDY counts
// are re-distributed (ie. max_in_flight < num_producers)
// (min: 1s, max: 5m)
//
// client_id (string): an identifier sent to nsqd representing the client
// (defaults: short hostname)
//
// hostname (string): an identifier sent to nsqd representing the host
// (defaults: long hostname)
//
// user_agent (string): an identifier of the agent for this client (in the spirit of HTTP)
// (default: "<client_library_name>/<version>")
//
// heartbeat_interval (time.Duration): duration of time between heartbeats
//
// sample_rate (int): integer percentage to sample the channel (requires nsqd 0.2.25+)
// (min: 0, max: 99)
//
// tls_v1 (bool): negotiate TLS
//
// tls_config (*tls.Config): client TLS configuration
//
// deflate (bool): negotiate Deflate compression
//
// deflate_level (int): the compression level to negotiate for Deflate
// (min: 1, max: 9)
//
// snappy (bool): negotiate Snappy compression
//
// output_buffer_size (int): size of the buffer (in bytes) used by nsqd for
// buffering writes to this connection
//
// output_buffer_timeout (time.Duration): timeout (in ms) used by nsqd before flushing buffered
// writes (set to 0 to disable).
//
// WARNING: configuring clients with an extremely low
// (< 25ms) output_buffer_timeout has a significant effect
// on nsqd CPU usage (particularly with > 50 clients connected).
//
// max_in_flight (int): the maximum number of messages to allow in flight (concurrency knob)
//
// max_backoff_duration (time.Duration): the maximum amount of time to backoff when processing fails
// 0 == no backoff
//
// auth_secret (string): secret for nsqd authentication (requires nsqd 0.2.29+)
//
func (c *Config) Set(option string, value interface{}) error {
c.Lock()
defer c.Unlock()

c.initialize()
c.assertInitialized()

val := reflect.ValueOf(c).Elem()
typ := val.Type()
Expand Down Expand Up @@ -261,6 +159,81 @@ func (c *Config) Set(option string, value interface{}) error {
return fmt.Errorf("invalid option %s", option)
}

func (c *Config) assertInitialized() {
if !c.initialized {
panic("Config{} must be created with NewConfig()")
}
}

// Validate checks that all values are within specified min/max ranges
func (c *Config) Validate() error {

c.assertInitialized()

val := reflect.ValueOf(c).Elem()
typ := val.Type()
for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)

min := field.Tag.Get("min")
max := field.Tag.Get("max")

if min == "" && max == "" {
continue
}

value := val.FieldByName(field.Name)

if min != "" {
coercedMinVal, _ := coerce(min, field.Type)
if valueCompare(value, coercedMinVal) == -1 {
return fmt.Errorf("invalid %s ! %v < %v",
field.Name, value.Interface(), coercedMinVal.Interface())
}
}
if max != "" {
coercedMaxVal, _ := coerce(max, field.Type)
if valueCompare(value, coercedMaxVal) == 1 {
return fmt.Errorf("invalid %s ! %v > %v",
field.Name, value.Interface(), coercedMaxVal.Interface())
}
}
}

if c.HeartbeatInterval > c.ReadTimeout {
return fmt.Errorf("HeartbeatInterval %v must be less than ReadTimeout %v", c.HeartbeatInterval, c.ReadTimeout)
}
return nil
}

func (c *Config) setDefaults() error {
val := reflect.ValueOf(c).Elem()
typ := val.Type()
for i := 0; i < typ.NumField(); i++ {
field := typ.Field(i)
opt := field.Tag.Get("opt")
defaultVal := field.Tag.Get("default")
if defaultVal == "" || opt == "" {
continue
}

if err := c.Set(opt, defaultVal); err != nil {
return err
}
}

hostname, err := os.Hostname()
if err != nil {
log.Fatalf("ERROR: unable to get hostname %s", err.Error())
}

c.ClientID = strings.Split(hostname, ".")[0]
c.Hostname = hostname
c.UserAgent = fmt.Sprintf("go-nsq/%s", VERSION)

return nil
}

// because Config contains private structs we can't use reflect.Value
// directly, instead we need to "unsafely" address the variable
func unsafeValueOf(val reflect.Value) reflect.Value {
Expand Down
12 changes: 12 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,15 @@ func TestConfigSet(t *testing.T) {
t.Errorf("Error setting `tls_v1` config: %v", err)
}
}

func TestConfigValidate(t *testing.T) {
c := NewConfig()
if err := c.Validate(); err != nil {
t.Error("initialized config is invalid")
}
c.DeflateLevel = 100
if err := c.Validate(); err == nil {
t.Error("no error set for invalid value")
}

}
Loading

0 comments on commit 23d7999

Please sign in to comment.