Server Sent Events
SSE (Server-Sent Events) 数据格式是一种基于文本的格式,用于在服务器和客户端之间传输事件数据。SSE 数据流由一系列以换行符 \n
分隔的字段组成,每个字段都由一个字段名和字段值组成。
SSE 数据格式中常用的字段如下:
Event stream format)
一个完整的 SSE 数据流示例如下:
1 | event: myEvent |
在上面的示例中,包含了三个事件:
- 第一个事件是一个名为
myEvent
的自定义事件,事件数据为This is the first event\nwith multiple lines
。 - 第二个事件没有指定事件类型,因此为默认的
message
事件,事件数据为This is the second event
。同时指定了retry
字段,表示客户端在连接断开后每隔 10 秒尝试重新连接一次。 - 第三个事件是一个默认的
message
事件,事件数据为This is a default message event
。
SSE 数据格式的简单性和灵活性使得它成为实现服务器向客户端推送数据的一种流行方式。
x-ndjson
x-ndjson
是一种非官方的 HTTP 媒体类型,用于表示 Newline Delimited JSON (NDJSON) 格式的数据。NDJSON 是一种将多个 JSON 对象通过换行符 \n
分隔的格式,每个 JSON 对象占据一行。
NDJSON 格式示例如下:
1 | {"name":"Alice","age":25} |
在上面的示例中,有三个 JSON 对象,每个对象占据一行,并由换行符 \n
分隔。
golang示例 sse
下面的例子server.go
实现了了sse
, proxy.go
实现了代理text/event-steam
类型的接口。
1 | // server.go |
1 | // proxy.go |
运行上面的代码
1 | go run server.go |
使用客户端访问1
1 | curl -i http://localhost:9090/events |
使用客户端访问2
1 | curl http://localhost:9090/events -H 'Accept-Encoding: gzip' |
Accept-Encoding, Content-Type
Accept-Encoding: chunked
表示使用块传输,定义的是数据的传输的方式
传输格式:1
2
3
4
5
6
7
8
9
10HTTP/1.1 200 OK
Content-Type: text/plain
Transfer-Encoding: chunked
7\r\n
Mozilla\r\n
11\r\n
Developer Network\r\n
0\r\n
\r\n
Content-Type: gzip
表示传输的内容gzip压缩的
上面的proxy.go
也可以改为每个块传输的都是可以解压的gzip数据1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78// server_gzip.go
package main
import (
"bytes"
"compress/gzip"
"fmt"
"github.com/emicklei/go-restful/v3"
"log"
"net/http"
"time"
)
func main() {
ws := new(restful.WebService)
ws.Route(ws.GET("/events").To(handleSSEGZIP))
restful.Add(ws)
//http.HandleFunc("/events", handleSSE)
fmt.Println("Server is running on http://localhost:8088")
log.Fatal(http.ListenAndServe(":8088", nil))
}
func handleSSEGZIP(req *restful.Request, w *restful.Response) {
// 设置 headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
//w.Header().Set("Transfer-Encoding", "chunked")
// 创建一个通道用于发送消息
messageChan := make(chan string)
s := "data"
// 在后台生成消息
go func() {
for i := 0; i < 5; i++ {
messageChan <- fmt.Sprintf("Message %s:%d\n", s, i)
time.Sleep(1 * time.Second)
}
messageChan <- "[DONE]"
}()
// 持续发送消息
for {
select {
case message := <-messageChan:
//count++
if message == "[DONE]" {
return
}
fmt.Printf("send message to client\n")
fmt.Printf("message: %s\n", message)
compressedData, err := compressedWithGZIP([]byte(message))
if err != nil {
panic(err)
}
//fmt.Fprintf(w, fmt.Sprintf())
fmt.Println(compressedData)
w.Write(compressedData)
w.Flush()
case <-req.Request.Context().Done():
fmt.Println("Client disconnected")
return
}
}
}
func compressedWithGZIP(data []byte) ([]byte, error) {
var buf bytes.Buffer
writer := gzip.NewWriter(&buf)
_, err := writer.Write(data)
if err != nil {
return nil, err
}
writer.Close()
return buf.Bytes(), nil
}
1 | // client_gzip.go |
Ref:
- https://datatracker.ietf.org/doc/html/rfc8895#name-server-push-server-sent-eve
- https://blog.axway.com/learning-center/apis/api-streaming/server-sent-events
- https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events-intro
- https://developer.mozilla.org/en-US/docs/Web/API/EventSource
- https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
- https://docs.mulesoft.com/dataweave/latest/dataweave-formats-ndjson
- https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding#chunked_encoding