-
Notifications
You must be signed in to change notification settings - Fork 2
/
handler.go
185 lines (165 loc) · 4.53 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package sse
import (
"net/http"
"sync"
)
// HandlerConfig provides a means of passing configuration to NewHandler.
type HandlerConfig struct {
// NumEventsToKeep indicates the number of events that should be kept for
// clients reconnecting.
NumEventsToKeep int
// ChannelBufferSize indicates how many events should be buffered before
// the connection is assumed to be dead.
ChannelBufferSize int
// ConnectedFn, if provided, is invoked when a client connects. The return
// value of this function is associated with the client and is passed to
// InitFn and FilterFn.
ConnectedFn func(*http.Request) any
// InitFn, if provided, is invoked right before a client enters the event
// loop and sends any events that it returns to the client. This is useful,
// for example, if you are synchronizing application state. The single
// parameter is equal to the value returned by ConnectedFn.
InitFn func(any) []*Event
// FilterFn, if provided, is invoked when an event is being sent to a
// client to determine if it should actually be sent. The first parameter
// is equal to the value returned by ConnectedFn and the return value
// should be set to true to send the event.
FilterFn func(any, *Event) bool
}
// DefaultHandlerConfig provides a set of defaults.
var DefaultHandlerConfig = &HandlerConfig{
NumEventsToKeep: 10,
ChannelBufferSize: 4,
}
// Handler provides an http.Handler that can be used for sending events to any
// number of connected clients.
type Handler struct {
mutex sync.Mutex
waitGroup sync.WaitGroup
cfg *HandlerConfig
eventQueue []*Event
eventChans map[chan *Event]any
isClosed bool
}
// NewHandler creates a new Handler instance.
func NewHandler(cfg *HandlerConfig) *Handler {
if cfg == nil {
cfg = DefaultHandlerConfig
}
return &Handler{
cfg: cfg,
eventChans: make(map[chan *Event]any),
}
}
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Determine if there is a value to associate with this client
var v any = nil
if h.cfg.ConnectedFn != nil {
v = h.cfg.ConnectedFn(r)
}
// We need to be able to flush the writer after each chunk
f, ok := w.(http.Flusher)
if !ok {
panic("http.ResponseWriter does not implement http.Flusher")
}
// Register the channel
h.mutex.Lock()
if h.isClosed {
h.mutex.Unlock()
http.Error(
w,
http.StatusText(http.StatusServiceUnavailable),
http.StatusServiceUnavailable,
)
return
}
h.waitGroup.Add(1)
defer h.waitGroup.Done()
eventChan := make(chan *Event, h.cfg.ChannelBufferSize)
h.eventChans[eventChan] = nil
h.mutex.Unlock()
// Write the response headers
w.Header().Set("Cache-Control", "no-store")
w.Header().Set("Content-Type", "text/event-stream")
w.WriteHeader(http.StatusOK)
// Make a list of events to send on intialization if requested
lastEventID := r.Header.Get("Last-Event-ID")
if lastEventID != "" {
events := []*Event{}
func() {
defer h.mutex.Unlock()
h.mutex.Lock()
lastEventIdx := -1
for i, e := range h.eventQueue {
if lastEventID == e.ID {
lastEventIdx = i
}
}
events = append(events, h.eventQueue[lastEventIdx+1:]...)
}()
for _, e := range events {
if h.cfg.FilterFn == nil || h.cfg.FilterFn(v, e) {
w.Write(e.Bytes())
}
}
f.Flush()
}
// Send messages received from InitFn (if provided)
if h.cfg.InitFn != nil {
for _, e := range h.cfg.InitFn(v) {
w.Write(e.Bytes())
}
f.Flush()
}
// Write events as they come in
for {
select {
case e, ok := <-eventChan:
if !ok {
// The server is shutting down the connection; no need to
// remove ourselves from the map
return
}
if h.cfg.FilterFn == nil || h.cfg.FilterFn(v, e) {
w.Write(e.Bytes())
f.Flush()
}
case <-r.Context().Done():
// Client disconnected, remove this channel from the map
func() {
defer h.mutex.Unlock()
h.mutex.Lock()
delete(h.eventChans, eventChan)
}()
return
}
}
}
// Send sends the provided event to all connected clients. Any clients that
// block are forcibly disconnected.
func (h *Handler) Send(e *Event) {
defer h.mutex.Unlock()
h.mutex.Lock()
for c := range h.eventChans {
select {
case c <- e:
default:
close(c)
delete(h.eventChans, c)
}
}
h.eventQueue = append(h.eventQueue, e)
if len(h.eventQueue) > h.cfg.NumEventsToKeep {
h.eventQueue = h.eventQueue[1:]
}
}
// Close shuts down all of the event channels and waits for them to complete.
func (h *Handler) Close() {
h.mutex.Lock()
for c := range h.eventChans {
close(c)
}
h.isClosed = true
h.mutex.Unlock()
h.waitGroup.Wait()
}