initial test suite

This commit is contained in:
Marcelo Pires 2018-09-06 17:59:56 +02:00
parent b6e6a9cee8
commit db6b6c3848
5 changed files with 178 additions and 10 deletions

2
.gitignore vendored
View File

@ -7,6 +7,8 @@
# Test binary, build with `go test -c` # Test binary, build with `go test -c`
*.test *.test
*/*node_modules
*/*package-lock.json
# Output of the go coverage tool, specifically when used with LiteIDE # Output of the go coverage tool, specifically when used with LiteIDE
*.out *.out

15
test/package.json Normal file
View File

@ -0,0 +1,15 @@
{
"name": "server",
"version": "1.0.0",
"description": "",
"main": "server.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"start": "node server.js"
},
"author": "",
"license": "ISC",
"dependencies": {
"faye": "^1.2.4"
}
}

24
test/server.js Normal file
View File

@ -0,0 +1,24 @@
var http = require('http'),
faye = require('faye');
var server = http.createServer(),
bayeux = new faye.NodeAdapter({mount: '/faye', timeout: 45});
var unauthorized = [
'/unauthorized',
];
bayeux.addExtension({
incoming: function (message, callback) {
if (message.channel === '/meta/subscribe') {
console.log(message)
if (unauthorized.indexOf(message.subscription) >= 0) {
message.error = '500::unauthorized channel';
}
}
callback(message);
}
});
bayeux.attach(server);
server.listen(8000);

120
test/setup_test.go Normal file
View File

@ -0,0 +1,120 @@
package faye_test
import (
"fmt"
"github.com/thesyncim/faye"
"github.com/thesyncim/faye/extensions"
"github.com/thesyncim/faye/message"
"log"
"os"
"os/exec"
"sync"
"testing"
"time"
)
type cancelFn func()
func setup(t *testing.T) (cancelFn, error) {
cmd := exec.Command("npm", "start")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Start()
if err != nil {
return nil, err
}
var cancel = func() {
exec.Command("taskkill", "/F", "/T", "/PID", fmt.Sprint(cmd.Process.Pid)).Run()
log.Println("canceled")
}
go func() {
select {
case <-time.After(time.Second * 30):
cancel()
t.Fatal("failed")
os.Exit(1)
}
}()
return cancel, nil
}
func TestServerSubscribeAndPublish(t *testing.T) {
shutdown, err := setup(t)
if err != nil {
t.Fatal(err)
}
defer shutdown()
debug := extensions.NewDebugExtension(os.Stdout)
client, err := fayec.NewClient("ws://localhost:8000/faye", fayec.WithExtension(debug.InExtension, debug.OutExtension))
if err != nil {
t.Fatal(err)
}
client.OnPublishResponse("/test", func(message *message.Message) {
if !message.Successful {
t.Fatalf("failed to send message with id %s", message.Id)
}
})
var done sync.WaitGroup
done.Add(10)
var delivered int
go func() {
client.Subscribe("/test", func(data message.Data) {
if data != "hello world" {
t.Fatalf("expecting: `hello world` got : %s", data)
}
delivered++
done.Done()
})
}()
//give some time for setup
time.Sleep(time.Second)
for i := 0; i < 10; i++ {
id, err := client.Publish("/test", "hello world")
if err != nil {
t.Fatal(err)
}
log.Println(id, i)
}
done.Wait()
err = client.Unsubscribe("/test")
if err != nil {
t.Fatal(err)
}
if delivered != 10 {
t.Fatal("message received after client unsubscribe")
}
log.Println("complete")
}
func TestSubscribeUnauthorizedChannel(t *testing.T) {
shutdown, err := setup(t)
if err != nil {
t.Fatal(err)
}
defer shutdown()
debug := extensions.NewDebugExtension(os.Stdout)
client, err := fayec.NewClient("ws://localhost:8000/faye", fayec.WithExtension(debug.InExtension, debug.OutExtension))
if err != nil {
t.Fatal(err)
}
err = client.Subscribe("/unauthorized", func(data message.Data) {
t.Fatal("received message on unauthorized channel")
})
if err == nil {
t.Fatal("subscribed to an unauthorized channel")
}
}

View File

@ -20,11 +20,13 @@ func init() {
//Websocket represents an websocket transport for the faye protocol //Websocket represents an websocket transport for the faye protocol
type Websocket struct { type Websocket struct {
TransportOpts *transport.Options TransportOpts *transport.Options
conn *websocket.Conn
clientID string connMu sync.Mutex
msgID *uint64 conn *websocket.Conn
once sync.Once clientID string
advice atomic.Value //type message.Advise msgID *uint64
once sync.Once
advice atomic.Value //type message.Advise
stopCh chan error stopCh chan error
@ -46,6 +48,7 @@ func (w *Websocket) Init(options *transport.Options) error {
w.TransportOpts = options w.TransportOpts = options
w.msgID = &msgID w.msgID = &msgID
w.subs = map[string]chan *message.Message{} w.subs = map[string]chan *message.Message{}
w.onPublishResponse = map[string]func(message *message.Message){}
w.stopCh = make(chan error) w.stopCh = make(chan error)
w.conn, _, err = websocket.DefaultDialer.Dial(options.Url, nil) w.conn, _, err = websocket.DefaultDialer.Dial(options.Url, nil)
if err != nil { if err != nil {
@ -68,6 +71,7 @@ func (w *Websocket) readWorker() error {
} }
//dispatch //dispatch
msg := &payload[0] msg := &payload[0]
w.applyInExtensions(msg)
if msg.Advice != nil { if msg.Advice != nil {
w.handleAdvise(msg.Advice) w.handleAdvise(msg.Advice)
@ -116,14 +120,15 @@ func (w *Websocket) readWorker() error {
if transport.IsEventDelivery(msg) { if transport.IsEventDelivery(msg) {
w.subsMu.Lock() w.subsMu.Lock()
subscription := w.subs[msg.Channel] subscription, ok := w.subs[msg.Channel]
w.subsMu.Unlock() w.subsMu.Unlock()
w.applyInExtensions(msg) if ok {
if subscription != nil {
if subscription != nil { subscription <- msg
subscription <- msg }
} }
continue continue
} }
@ -145,6 +150,8 @@ func (w *Websocket) Name() string {
} }
func (w *Websocket) sendMessage(m *message.Message) error { func (w *Websocket) sendMessage(m *message.Message) error {
w.connMu.Lock()
defer w.connMu.Unlock()
w.applyOutExtensions(m) w.applyOutExtensions(m)
var payload []message.Message var payload []message.Message