switch from websockets to sse
This commit is contained in:
parent
67335c97b4
commit
534077bad3
13 changed files with 384 additions and 233 deletions
191
sse.go
Normal file
191
sse.go
Normal file
|
@ -0,0 +1,191 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/fhs/gompd/v2/mpd"
|
||||
"github.com/labstack/echo/v4"
|
||||
"io"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Event represents Server-Sent Event.
|
||||
// SSE explanation: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
|
||||
type Event struct {
|
||||
// ID is used to set the EventSource object's last event ID value.
|
||||
ID []byte
|
||||
// Data field is for the message. When the EventSource receives multiple consecutive lines
|
||||
// that begin with data:, it concatenates them, inserting a newline character between each one.
|
||||
// Trailing newlines are removed.
|
||||
Data []byte
|
||||
// Event is a string identifying the type of event described. If this is specified, an event
|
||||
// will be dispatched on the browser to the listener for the specified event name; the website
|
||||
// source code should use addEventListener() to listen for named events. The onmessage handler
|
||||
// is called if no event name is specified for a message.
|
||||
Event []byte
|
||||
// Retry is the reconnection time. If the connection to the server is lost, the browser will
|
||||
// wait for the specified time before attempting to reconnect. This must be an integer, specifying
|
||||
// the reconnection time in milliseconds. If a non-integer value is specified, the field is ignored.
|
||||
Retry []byte
|
||||
// Comment line can be used to prevent connections from timing out; a server can send a comment
|
||||
// periodically to keep the connection alive.
|
||||
Comment []byte
|
||||
}
|
||||
|
||||
// MarshalTo marshals Event to given Writer
|
||||
func (ev *Event) MarshalTo(w io.Writer) error {
|
||||
// Marshalling part is taken from: https://github.com/r3labs/sse/blob/c6d5381ee3ca63828b321c16baa008fd6c0b4564/http.go#L16
|
||||
if len(ev.Data) == 0 && len(ev.Comment) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(ev.Data) > 0 {
|
||||
if _, err := fmt.Fprintf(w, "id: %s\n", ev.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sd := bytes.Split(ev.Data, []byte("\n"))
|
||||
for i := range sd {
|
||||
if _, err := fmt.Fprintf(w, "data: %s\n", sd[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if len(ev.Event) > 0 {
|
||||
if _, err := fmt.Fprintf(w, "event: %s\n", ev.Event); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if len(ev.Retry) > 0 {
|
||||
if _, err := fmt.Fprintf(w, "retry: %s\n", ev.Retry); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(ev.Comment) > 0 {
|
||||
if _, err := fmt.Fprintf(w, ": %s\n", ev.Comment); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := fmt.Fprint(w, "\n"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// serveSSE handles sending Server-Sent-Events.
|
||||
func serveSSE(c echo.Context) error {
|
||||
// TODO: figure out how to retrieve IP from Forwarded header behind proxy: https://echo.labstack.com/docs/ip-address
|
||||
c.Logger().Printf("SSE client connected, ip: %v", c.RealIP())
|
||||
|
||||
w := c.Response()
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
|
||||
// Connect to MPD server
|
||||
mpdConn, err := mpd.Dial("tcp", "localhost:6600")
|
||||
if err != nil {
|
||||
c.Logger().Error(err)
|
||||
event := Event{
|
||||
Event: []byte("mpd"),
|
||||
Data: []byte(fmt.Sprintf("connection error: %s", err.Error())),
|
||||
}
|
||||
if err := event.MarshalTo(w); err != nil {
|
||||
return err
|
||||
}
|
||||
w.Flush()
|
||||
}
|
||||
defer mpdConn.Close()
|
||||
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
var lastJsonStatus []byte
|
||||
var lastJsonCurrentSong []byte
|
||||
var lastJsonQueue []byte
|
||||
for {
|
||||
select {
|
||||
case <-c.Request().Context().Done():
|
||||
c.Logger().Printf("SSE client disconnected, ip: %v", c.RealIP())
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
c.Logger().Printf("Getting MPD status for %v", c.RealIP())
|
||||
|
||||
status, err := mpdConn.Status()
|
||||
if err != nil {
|
||||
c.Logger().Error(err)
|
||||
}
|
||||
jsonStatus, err := json.Marshal(status)
|
||||
if err != nil {
|
||||
c.Logger().Error(err)
|
||||
}
|
||||
// Only send new event if different from last time
|
||||
if !bytes.Equal(jsonStatus, lastJsonStatus) {
|
||||
statusEvent := Event{
|
||||
Event: []byte("status"),
|
||||
Data: []byte(string(jsonStatus)),
|
||||
}
|
||||
if err := statusEvent.MarshalTo(w); err != nil {
|
||||
return err
|
||||
}
|
||||
lastJsonStatus = jsonStatus
|
||||
}
|
||||
|
||||
currentsong, err := mpdConn.CurrentSong()
|
||||
if err != nil {
|
||||
c.Logger().Error(err)
|
||||
}
|
||||
jsonCurrentSong, err := json.Marshal(currentsong)
|
||||
if err != nil {
|
||||
c.Logger().Error(err)
|
||||
}
|
||||
// Only send new event if different from last time
|
||||
if !bytes.Equal(jsonCurrentSong, lastJsonCurrentSong) {
|
||||
currentSongEvent := Event{
|
||||
Event: []byte("status"),
|
||||
Data: []byte(string(jsonCurrentSong)),
|
||||
}
|
||||
if err := currentSongEvent.MarshalTo(w); err != nil {
|
||||
return err
|
||||
}
|
||||
lastJsonCurrentSong = jsonCurrentSong
|
||||
}
|
||||
|
||||
queue, err := mpdConn.PlaylistInfo(-1, -1)
|
||||
if err != nil {
|
||||
c.Logger().Error(err)
|
||||
}
|
||||
jsonQueue, err := json.Marshal(queue)
|
||||
if err != nil {
|
||||
c.Logger().Error(err)
|
||||
}
|
||||
// Only send new event if different from last time
|
||||
if !bytes.Equal(jsonQueue, lastJsonQueue) {
|
||||
queueEvent := Event{
|
||||
Event: []byte("status"),
|
||||
Data: []byte(string(jsonQueue)),
|
||||
}
|
||||
if err := queueEvent.MarshalTo(w); err != nil {
|
||||
return err
|
||||
}
|
||||
lastJsonQueue = jsonQueue
|
||||
}
|
||||
|
||||
// Ping to prevent timeout
|
||||
pingEvent := Event{
|
||||
Comment: []byte("ping"),
|
||||
}
|
||||
if err := pingEvent.MarshalTo(w); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.Flush()
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue