From e98ccfdec8a9bd0a34251913d0ba31ab212c9b97 Mon Sep 17 00:00:00 2001 From: Marcelo Pires Date: Thu, 6 Sep 2018 18:40:00 +0200 Subject: [PATCH] fix process kill fix unsubscribe error --- test/setup_test.go | 33 +++++++++++++++++++------------- transport/websocket/websocket.go | 17 ++++++---------- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/test/setup_test.go b/test/setup_test.go index 4b826f4..70df94b 100644 --- a/test/setup_test.go +++ b/test/setup_test.go @@ -1,7 +1,7 @@ package faye_test import ( - "fmt" + "github.com/pkg/errors" "github.com/thesyncim/faye" "github.com/thesyncim/faye/extensions" "github.com/thesyncim/faye/message" @@ -15,6 +15,8 @@ import ( type cancelFn func() +var unauthorizedErr = errors.New("500::unauthorized channel") + func setup(t *testing.T) (cancelFn, error) { cmd := exec.Command("npm", "start") cmd.Stdout = os.Stdout @@ -26,14 +28,14 @@ func setup(t *testing.T) (cancelFn, error) { } var cancel = func() { - exec.Command("taskkill", "/F", "/T", "/PID", fmt.Sprint(cmd.Process.Pid)).Run() + cmd.Process.Signal(os.Kill) log.Println("canceled") } go func() { select { case <-time.After(time.Second * 30): cancel() - t.Fatal("failed") + t.Fatal("test failed") os.Exit(1) } @@ -42,7 +44,7 @@ func setup(t *testing.T) (cancelFn, error) { return cancel, nil } -func TestServerSubscribeAndPublish(t *testing.T) { +func TestServerSubscribeAndPublish10Messages(t *testing.T) { shutdown, err := setup(t) if err != nil { t.Fatal(err) @@ -56,21 +58,22 @@ func TestServerSubscribeAndPublish(t *testing.T) { t.Fatal(err) } + var delivered int + var done sync.WaitGroup + done.Add(10) client.OnPublishResponse("/test", func(message *message.Message) { if !message.Successful { t.Fatalf("failed to send message with id %s", message.Id) } + delivered++ + done.Done() }) - var done sync.WaitGroup - done.Add(10) - var delivered int + go func() { client.Subscribe("/test", func(data message.Data) { if data != "hello world" { t.Fatalf("expecting: `hello world` got : %s", data) } - delivered++ - done.Done() }) }() @@ -89,11 +92,15 @@ func TestServerSubscribeAndPublish(t *testing.T) { if err != nil { t.Fatal(err) } + //try to publish one more message + id, err := client.Publish("/test", "hello world") + if err != nil { + t.Fatal(err) + } + log.Println(id) if delivered != 10 { t.Fatal("message received after client unsubscribe") } - log.Println("complete") - } func TestSubscribeUnauthorizedChannel(t *testing.T) { @@ -113,8 +120,8 @@ func TestSubscribeUnauthorizedChannel(t *testing.T) { err = client.Subscribe("/unauthorized", func(data message.Data) { t.Fatal("received message on unauthorized channel") }) - if err == nil { - t.Fatal("subscribed to an unauthorized channel") + if err.Error() != unauthorizedErr.Error() { + t.Fatalf("expecting `500::unauthorized channel` got : `%s`", err.Error()) } } diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 1018f42..5767a36 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -89,8 +89,8 @@ func (w *Websocket) readWorker() error { if !ok { panic("BUG: subscription not registered `" + msg.Subscription + "`") } - if msg.GetError() != nil { - //inject the error + if msg.GetError() == nil { + //inject the error if the server returns unsuccessful without error msg.Error = fmt.Sprintf("susbscription `%s` failed", msg.Subscription) } subscription <- msg @@ -99,16 +99,7 @@ func (w *Websocket) readWorker() error { delete(w.subs, msg.Channel) w.subsMu.Unlock() } - case transport.MetaUnsubscribe: - //handle MetaUnsubscribe resp - case transport.MetaConnect: - //handle MetaConnect resp - case transport.MetaDisconnect: - //handle MetaDisconnect resp - - case transport.MetaHandshake: - //handle MetaHandshake resp } continue @@ -277,6 +268,10 @@ func (w *Websocket) Unsubscribe(subscription string) error { delete(w.subs, subscription) } w.subsMu.Unlock() + //remove onPublishResponse handler + w.onPubResponseMu.Lock() + delete(w.onPublishResponse, subscription) + w.onPubResponseMu.Unlock() return w.sendMessage(m) }