diff --git a/client.go b/client.go index 3ebcbaa..4a20f53 100644 --- a/client.go +++ b/client.go @@ -7,8 +7,8 @@ import ( ) type options struct { - inExt message.Extension - outExt message.Extension + inExt []message.Extension + outExt []message.Extension transport transport.Transport } @@ -60,13 +60,19 @@ func NewClient(url string, opts ...Option) (*Client, error) { func WithOutExtension(extension message.Extension) Option { return func(o *options) { - o.outExt = extension + o.outExt = append(o.outExt, extension) + } +} +func WithExtension(inExt message.Extension, outExt message.Extension) Option { + return func(o *options) { + o.inExt = append(o.inExt, inExt) + o.outExt = append(o.outExt, outExt) } } func WithInExtension(extension message.Extension) Option { return func(o *options) { - o.inExt = extension + o.inExt = append(o.inExt, extension) } } diff --git a/extensions/debug.go b/extensions/debug.go new file mode 100644 index 0000000..20d3ffd --- /dev/null +++ b/extensions/debug.go @@ -0,0 +1,31 @@ +package extensions + +import ( + "encoding/json" + "github.com/thesyncim/faye/message" + "io" + "log" +) + +func debugJson(v interface{}) string { + b, _ := json.MarshalIndent(v, "", " ") + return string(b) +} + +type DebugExtension struct { + in *log.Logger + out *log.Logger +} + +func NewDebugExtension(out io.Writer) *DebugExtension { + li := log.New(out, "InMsg", 0) + lo := log.New(out, "outMsg", 0) + return &DebugExtension{in: li, out: lo} +} + +func (d *DebugExtension) InExtension(m *message.Message) { + d.in.Println(debugJson(m)) +} +func (d *DebugExtension) OutExtension(m *message.Message) { + d.out.Println(debugJson(m)) +} diff --git a/transport/transport.go b/transport/transport.go index 480894a..8ce3bab 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -6,8 +6,8 @@ import "github.com/thesyncim/faye/message" type Options struct { Url string - InExt message.Extension - OutExt message.Extension + InExt []message.Extension + OutExt []message.Extension //todo dial timeout //todo read/write deadline } diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 708d168..23b27c1 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -1,11 +1,11 @@ package websocket import ( - "encoding/json" + "fmt" "github.com/gorilla/websocket" "github.com/thesyncim/faye/message" "github.com/thesyncim/faye/transport" - "log" + "strconv" "sync" "sync/atomic" @@ -17,13 +17,6 @@ func init() { transport.RegisterTransport(&Websocket{}) } -var Debug = true - -func debugJson(v interface{}) string { - b, _ := json.MarshalIndent(v, "", " ") - return string(b) -} - type Websocket struct { TransportOpts *transport.Options conn *websocket.Conn @@ -60,7 +53,7 @@ func (w *Websocket) readWorker() error { return err } //dispatch - msg := payload[0] + msg := &payload[0] if transport.IsControlMsg(msg.Channel) { //handle it @@ -71,11 +64,14 @@ func (w *Websocket) readWorker() error { w.subsMu.Lock() subscription, ok := w.subs[msg.Subscription] w.subsMu.Unlock() - log.Println(debugJson(msg)) if !ok { panic("BUG: subscription not registered `" + msg.Subscription + "`") } - subscription <- &msg + if msg.GetError() != nil { + //inject the error + msg.Error = fmt.Sprintf("susbscription `%s` failed", msg.Subscription) + } + subscription <- msg close(subscription) w.subsMu.Lock() delete(w.subs, msg.Channel) @@ -100,8 +96,10 @@ func (w *Websocket) readWorker() error { subscription := w.subs[msg.Channel] w.subsMu.Unlock() + w.applyInExtensions(msg) + if subscription != nil { - subscription <- &msg + subscription <- msg } } } @@ -111,11 +109,10 @@ func (w *Websocket) Name() string { } func (w *Websocket) sendMessage(m *message.Message) error { + w.applyOutExtensions(m) + var payload []message.Message payload = append(payload, *m) - if Debug { - log.Println("sending request", debugJson(payload)) - } return w.conn.WriteJSON(payload) } func (w *Websocket) nextMsgID() string { @@ -141,11 +138,9 @@ func (w *Websocket) Handshake() (err error) { if err = w.conn.ReadJSON(&hsResps); err != nil { return err } - if Debug { - log.Println("handshake response", debugJson(hsResps)) - } - resp := hsResps[0] + resp := &hsResps[0] + w.applyInExtensions(resp) if resp.GetError() != nil { return err } @@ -160,7 +155,7 @@ func (w *Websocket) Connect() error { ConnectionType: transportName, Id: w.nextMsgID(), } - //todo verify if extensions are applied on connect,verify if hs is complete + //todo expect connect resp from server go w.readWorker() return w.sendMessage(&m) } @@ -172,9 +167,6 @@ func (w *Websocket) Subscribe(subscription string, onMessage func(message *messa Subscription: subscription, Id: w.nextMsgID(), } - if w.TransportOpts.OutExt != nil { - w.TransportOpts.OutExt(m) - } if err := w.sendMessage(m); err != nil { return err @@ -198,9 +190,27 @@ func (w *Websocket) Subscribe(subscription string, onMessage func(message *messa } func (w *Websocket) Unsubscribe(subscription string) error { - panic("not implemented") + //https://docs.cometd.org/current/reference/#_bayeux_meta_unsubscribe + m := &message.Message{ + Channel: transport.Unsubscribe, + Subscription: subscription, + ClientId: w.clientID, + Id: w.nextMsgID(), + } + return w.sendMessage(m) } func (w *Websocket) Publish(subscription string, message *message.Message) error { panic("not implemented") } + +func (w *Websocket) applyOutExtensions(m *message.Message) { + for i := range w.TransportOpts.OutExt { + w.TransportOpts.OutExt[i](m) + } +} +func (w *Websocket) applyInExtensions(m *message.Message) { + for i := range w.TransportOpts.InExt { + w.TransportOpts.InExt[i](m) + } +}