Skip to content

Commit

Permalink
Test code to exercise dropped events
Browse files Browse the repository at this point in the history
  • Loading branch information
Neil Edgar committed Sep 30, 2019
1 parent 2b9db9a commit 5ddd0ec
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 60 deletions.
42 changes: 27 additions & 15 deletions examples/newclient/newclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,20 @@ const (
defaultTcpsPort = 8081
)

var (
skipVerify, compress bool
port int
addr, caFile, certFile, keyFile, realm, scheme, serType string
)

func NewClient(logger *log.Logger) (*client.Client, error) {
var (
skipVerify, compress bool

port int
ParseArgs()

return MyNewClient(logger)
}

addr, caFile, certFile, keyFile, realm, scheme, serType string
)
func ParseArgs() {
flag.StringVar(&addr, "addr", "",
fmt.Sprintf("router address. (default %q or %q for network or unix socket)", defaultAddr, defaultUnix))
flag.IntVar(&port, "port", 0,
Expand All @@ -55,6 +61,9 @@ func NewClient(logger *log.Logger) (*client.Client, error) {
"private key file with PEM encoded data")
flag.BoolVar(&compress, "compress", false, "enable websocket compression")
flag.Parse()
}

func MyNewClient(logger *log.Logger) (*client.Client, error) {

// Get requested serialization.
serialization := client.JSON
Expand All @@ -70,11 +79,12 @@ func NewClient(logger *log.Logger) (*client.Client, error) {
"invalid serialization, muse be one of: json, msgpack, cbor")
}

if addr == "" {
cli_addr := addr
if cli_addr == "" {
if scheme == "unix" {
addr = defaultUnix
cli_addr = defaultUnix
} else {
addr = defaultAddr
cli_addr = defaultAddr
}
}

Expand Down Expand Up @@ -144,32 +154,34 @@ func NewClient(logger *log.Logger) (*client.Client, error) {
if port == 0 {
port = defaultWsPort
}
addr = fmt.Sprintf("ws://%s:%d/ws", addr, port)
cli_addr = fmt.Sprintf("ws://%s:%d/ws", cli_addr, port)
case "https", "wss":
if port == 0 {
port = defaultWssPort
}
addr = fmt.Sprintf("wss://%s:%d/ws", addr, port)
cli_addr = fmt.Sprintf("wss://%s:%d/ws", cli_addr, port)
case "tcp":
if port == 0 {
port = defaultTcpPort
}
addr = fmt.Sprintf("tcp://%s:%d/", addr, port)
cli_addr = fmt.Sprintf("tcp://%s:%d/", cli_addr, port)
case "tcps":
if port == 0 {
port = defaultTcpsPort
}
addr = fmt.Sprintf("tcps://%s:%d/", addr, port)
cli_addr = fmt.Sprintf("tcps://%s:%d/", cli_addr, port)
case "unix":
addr = fmt.Sprintf("unix://%s", addr)
cli_addr = fmt.Sprintf("unix://%s", cli_addr)
default:
return nil, errors.New("scheme must be one of: http, https, ws, wss, tcp, tcps, unix")
}
cli, err = client.ConnectNet(addr, cfg)
logger.Println("Connecting to", cli_addr, "using", serType, "serialization")

cli, err = client.ConnectNet(cli_addr, cfg)
if err != nil {
return nil, err
}

logger.Println("Connected to", addr, "using", serType, "serialization")
logger.Println("Connected to", cli_addr, "using", serType, "serialization")
return cli, nil
}
85 changes: 43 additions & 42 deletions examples/pubsub/publisher/publisher.go
Original file line number Diff line number Diff line change
@@ -1,64 +1,65 @@
package main

import (
"fmt"
"log"
"os"
"sync"
"sync/atomic"

"github.com/gammazero/nexus/examples/newclient"
"github.com/gammazero/nexus/wamp"
)

const exampleTopic = "example.hello"
const exampleCountTopic = "example.eventCount"

func main() {
logger := log.New(os.Stdout, "PUBLISHER> ", 0)
// Connect publisher client with requested socket type and serialization.
publisher, err := newclient.NewClient(logger)
if err != nil {
logger.Fatal(err)
}
defer publisher.Close()

// Publish to topic.
args := wamp.List{"hello world"}
err = publisher.Publish(exampleTopic, nil, args, nil)
if err != nil {
logger.Fatalf("publish error: %s", err)
}
newclient.ParseArgs()

// Publish more events to topic.
args = wamp.List{"how are you today"}
err = publisher.Publish(exampleTopic, nil, args, nil)
if err != nil {
logger.Fatalf("publish error: %s", err)
}
max := 20
msgCount := 20
fmt.Printf("PUBLISHER %d clients\n", max)

// Publish events only to sessions 42, 1138, 1701.
args = wamp.List{"for your eyes only"}
opts := wamp.Dict{wamp.WhitelistKey: wamp.List{42, 1138, 1701}}
err = publisher.Publish(exampleTopic, opts, args, nil)
if err != nil {
logger.Fatalf("publish error: %s", err)
}
wg := sync.WaitGroup{}
wg.Add(max)

args = wamp.List{"testing 1"}
err = publisher.Publish(exampleTopic, nil, args, nil)
if err != nil {
logger.Fatalf("publish error: %s", err)
}
var success uint64
var failure uint64

args = wamp.List{"testing 2"}
err = publisher.Publish(exampleTopic, nil, args, nil)
if err != nil {
logger.Fatalf("publish error: %s", err)
}
for ii := 1; ii <= max; ii++ {
go func(loop int) {

logger := log.New(os.Stdout, fmt.Sprintf("PUBLISHER %d > ", loop), 0)
// Connect publisher client with requested socket type and serialization.
publisher, err := newclient.MyNewClient(logger)
if err != nil {
logger.Fatal(err)
}
defer publisher.Close()

// Publish to topic.
for jj := 1; jj <= msgCount; jj++ {
args := wamp.List{ fmt.Sprintf("%d:%d", loop, jj, ) }
err = publisher.Publish(exampleCountTopic, nil, args, nil)
if err != nil {
logger.Printf("message %d error: %s\n", jj, err)
atomic.AddUint64(&failure, 1)
} else {
//logger.Printf("message %d: success\n", jj)
atomic.AddUint64(&success, 1)
}
}

args = wamp.List{"testing 3"}
err = publisher.Publish(exampleTopic, nil, args, nil)
if err != nil {
logger.Fatalf("publish error: %s", err)
logger.Printf("Sent %d messages to %s\n", msgCount, exampleCountTopic)
wg.Done()

}(ii)
}

logger.Println("Published messages to", exampleTopic)
publisher.Close()
fmt.Printf("Waiting PUBLISHER %d clients\n", max)
wg.Wait()
fmt.Printf("Done PUBLISHER %d clients %d success %d failure\n", max, success, failure)

}
46 changes: 43 additions & 3 deletions examples/pubsub/subscriber/subscriber.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package main

import (
"fmt"
"github.com/gammazero/nexus/examples/newclient"
"github.com/gammazero/nexus/wamp"
"log"
"os"
"os/signal"

"github.com/gammazero/nexus/examples/newclient"
"github.com/gammazero/nexus/wamp"
"sync"
)

const exampleTopic = "example.hello"
const exampleCountTopic = "example.eventCount"

func main() {
logger := log.New(os.Stdout, "SUBSCRIBER> ", 0)
Expand All @@ -35,6 +37,44 @@ func main() {
}
logger.Println("Subscribed to", exampleTopic)


// Define function to handle events count received.
mapCounts := sync.Map{}
evtCountHandler := func(args wamp.List, kwargs wamp.Dict, details wamp.Dict) {
var client, msgCount int
received, ok := (args[0]).(string)
if !ok {
logger.Printf("Event format unexpected: %s", args[0])
return
}
count, err := fmt.Sscanf(received, "%d:%d",&client, &msgCount)
if err != nil || count != 2 {
logger.Printf("Event format error %s count %d\n", err, count)
return
}
value, found := mapCounts.Load(client)
expected := 1
if found {
expected = value.(int) + 1
}
if msgCount != expected {
logger.Printf("Client %d Error %d expected %d\n", client, msgCount, expected)
return
}
mapCounts.Store(client, msgCount)
logger.Printf("Client %d Success %d \n", client, msgCount)

}



err = subscriber.Subscribe(exampleCountTopic, evtCountHandler, nil)
if err != nil {
logger.Fatal("subscribe error:", err)
}
logger.Println("Subscribed to", exampleCountTopic)


// Wait for CTRL-c or client close while handling events.
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
Expand Down

0 comments on commit 5ddd0ec

Please sign in to comment.