Skip to content

Commit

Permalink
transport/websocket: add heartbeat parameter for connection keep-alive
Browse files Browse the repository at this point in the history
Signed-off-by: hr567 <[email protected]>
  • Loading branch information
hr567 committed Nov 27, 2024
1 parent d8934cf commit 6201daa
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 22 deletions.
2 changes: 2 additions & 0 deletions infra/conf/transport_internet.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ type WebSocketConfig struct {
Path string `json:"path"`
Headers map[string]string `json:"headers"`
AcceptProxyProtocol bool `json:"acceptProxyProtocol"`
Heartbeat uint32 `json:"heartbeat"`
}

// Build implements Buildable.
Expand Down Expand Up @@ -178,6 +179,7 @@ func (c *WebSocketConfig) Build() (proto.Message, error) {
Header: c.Headers,
AcceptProxyProtocol: c.AcceptProxyProtocol,
Ed: ed,
Heartbeat: c.Heartbeat,
}
return config, nil
}
Expand Down
2 changes: 1 addition & 1 deletion transport/internet/splithttp/browser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (c *BrowserDialerClient) OpenDownload(ctx context.Context, baseURL string)
return nil, dummyAddr, dummyAddr, err
}

return websocket.NewConnection(conn, dummyAddr, nil), conn.RemoteAddr(), conn.LocalAddr(), nil
return websocket.NewConnection(conn, dummyAddr, nil, 0), conn.RemoteAddr(), conn.LocalAddr(), nil
}

func (c *BrowserDialerClient) SendUploadRequest(ctx context.Context, url string, payload io.ReadWriteCloser, contentLength int64) error {
Expand Down
44 changes: 27 additions & 17 deletions transport/internet/websocket/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions transport/internet/websocket/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ message Config {
map<string, string> header = 3;
bool accept_proxy_protocol = 4;
uint32 ed = 5;
uint32 heartbeat = 6;
}
13 changes: 12 additions & 1 deletion transport/internet/websocket/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,18 @@ type connection struct {
remoteAddr net.Addr
}

func NewConnection(conn *websocket.Conn, remoteAddr net.Addr, extraReader io.Reader) *connection {
func NewConnection(conn *websocket.Conn, remoteAddr net.Addr, extraReader io.Reader, heartbeat uint32) *connection {
if heartbeat != 0 {
go func() {
for {
time.Sleep(time.Duration(heartbeat) * time.Second)
if err := conn.WriteControl(websocket.PingMessage, []byte(""), time.Time{}); err != nil {
break
}
}
}()
}

return &connection{
conn: conn,
remoteAddr: remoteAddr,
Expand Down
4 changes: 2 additions & 2 deletions transport/internet/websocket/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func dialWebSocket(ctx context.Context, dest net.Destination, streamSettings *in
return nil, err
}

return NewConnection(conn, conn.RemoteAddr(), nil), nil
return NewConnection(conn, conn.RemoteAddr(), nil, wsSettings.Heartbeat), nil
}

header := wsSettings.GetRequestHeader()
Expand All @@ -117,7 +117,7 @@ func dialWebSocket(ctx context.Context, dest net.Destination, streamSettings *in
return nil, errors.New("failed to dial to (", uri, "): ", reason).Base(err)
}

return NewConnection(conn, conn.RemoteAddr(), nil), nil
return NewConnection(conn, conn.RemoteAddr(), nil, wsSettings.Heartbeat), nil
}

type delayDialConn struct {
Expand Down
2 changes: 1 addition & 1 deletion transport/internet/websocket/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
}
}

h.ln.addConn(NewConnection(conn, remoteAddr, extraReader))
h.ln.addConn(NewConnection(conn, remoteAddr, extraReader, h.ln.config.Heartbeat))
}

type Listener struct {
Expand Down

0 comments on commit 6201daa

Please sign in to comment.