http sse/x-ndjson

Server Sent Events

SSE (Server-Sent Events) 数据格式是一种基于文本的格式,用于在服务器和客户端之间传输事件数据。SSE 数据流由一系列以换行符 \n 分隔的字段组成,每个字段都由一个字段名和字段值组成。

SSE 数据格式中常用的字段如下:
Event stream format)

一个完整的 SSE 数据流示例如下:

1
2
3
4
5
6
7
8
9
10
11
event: myEvent
id: 1
data: This is the first event
data: with multiple lines

id: 2
retry: 10000
data: This is the second event

event: message
data: This is a default message event

在上面的示例中,包含了三个事件:

  1. 第一个事件是一个名为 myEvent 的自定义事件,事件数据为 This is the first event\nwith multiple lines
  2. 第二个事件没有指定事件类型,因此为默认的 message 事件,事件数据为 This is the second event。同时指定了 retry 字段,表示客户端在连接断开后每隔 10 秒尝试重新连接一次。
  3. 第三个事件是一个默认的 message 事件,事件数据为 This is a default message event

SSE 数据格式的简单性和灵活性使得它成为实现服务器向客户端推送数据的一种流行方式。

x-ndjson

x-ndjson 是一种非官方的 HTTP 媒体类型,用于表示 Newline Delimited JSON (NDJSON) 格式的数据。NDJSON 是一种将多个 JSON 对象通过换行符 \n 分隔的格式,每个 JSON 对象占据一行。

x-ndjson spec

NDJSON 格式示例如下:

1
2
3
{"name":"Alice","age":25}
{"name":"Bob","age":30}
{"name":"Charlie","age":35}

在上面的示例中,有三个 JSON 对象,每个对象占据一行,并由换行符 \n 分隔。

golang示例 sse

下面的例子server.go实现了了sse, proxy.go实现了代理text/event-steam类型的接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// server.go
package main

import (
"fmt"
"github.com/emicklei/go-restful/v3"
"log"
"net/http"
"time"
)

func main() {
ws := new(restful.WebService)
ws.Route(ws.GET("/events").To(handleSSE))
restful.Add(ws)
fmt.Println("Server is running on http://localhost:8088")
log.Fatal(http.ListenAndServe(":8088", nil))
}

func handleSSE(req *restful.Request, w *restful.Response) {
// 设置 headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")

// 创建一个通道用于发送消息
messageChan := make(chan string)
s := "message"
// 在后台生成消息
go func() {
for i := 0; i < 5; i++ {

messageChan <- fmt.Sprintf("Message %s:%d", s, i)
time.Sleep(1 * time.Second)
}
messageChan <- "[DONE]"
}()
// 持续发送消息
for {
select {
case message := <-messageChan:
fmt.Printf("send message to client")
fmt.Fprintf(w, "data: %s\n", message)
w.Flush()
if message == "[DONE]" {
return
}
case <-req.Request.Context().Done():
fmt.Println("Client disconnected")
return
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// proxy.go
package main

import (
"bufio"
"fmt"
"github.com/emicklei/go-restful/v3"
"io"
"log"
"net/http"
"time"

"github.com/go-resty/resty/v2"
)

const (
proxyAddr = ":9090"
targetURL = "http://localhost:8088" // 目标服务器的地址
)

func main() {
ws := new(restful.WebService)

ws.Route(ws.GET("/events").To(handleProxy).ContentEncodingEnabled(true))
//ContentEncodingEnabled(true)

container := restful.NewContainer()
container.Add(ws)

fmt.Printf("Proxy server is running on http://localhost%s\n", proxyAddr)
log.Fatal(http.ListenAndServe(proxyAddr, container))
}

func handleProxy(req *restful.Request, resp *restful.Response) {
client := resty.New()
client.SetTimeout(100000 * time.Second)

// 创建一个新的请求
restyReq := client.R()

// 复制原始请求的 headers
for name, values := range req.Request.Header {
for _, value := range values {
restyReq.Header.Add(name, value)
}
}

// 设置目标 URL
url := targetURL + req.Request.URL.Path
if req.Request.URL.RawQuery != "" {
url += "?" + req.Request.URL.RawQuery
}

// 发送请求到目标服务器
fmt.Printf("url: %v\n", url)
restyResp, err := restyReq.
SetDoNotParseResponse(true).
Execute(req.Request.Method, url)
if err != nil {
resp.WriteErrorString(http.StatusInternalServerError, "Error forwarding request: "+err.Error())
return
}
defer restyResp.RawBody().Close()

// 复制响应 headers
for name, values := range restyResp.Header() {
for _, value := range values {
resp.Header().Add(name, value)
}
}

// 设置状态码
fmt.Printf("statusCode: %v\n", restyResp.StatusCode())
fmt.Printf("resp header: %#v\n", resp.Header())
//resp.Header().Set("Transfer-Encoding", "chunked")
resp.WriteHeader(restyResp.StatusCode())

// 检查是否是 SSE
if isSSE(restyResp.Header()) {
handleSSEPROXY(resp, restyResp.RawBody())
} else {
// 普通 HTTP 请求,直接复制响应体
io.Copy(resp, restyResp.RawBody())
}
}

func isSSE(header http.Header) bool {
return header.Get("Content-Type") == "text/event-stream"
}

func handleSSEPROXY(w *restful.Response, body io.Reader) {
reader := bufio.NewReader(body)
fmt.Printf("header: %v\n", w.Header())

for {
fmt.Printf("try to read...\n")
line, err := reader.ReadBytes('\n')
if err == io.EOF {
log.Printf("eof proxy")
return
}
if err != nil {
if err != io.EOF {
log.Printf("Error reading SSE: %v", err)
}
return
}
// 每次写一行数据
n, err := w.Write(line)
fmt.Printf("write n: %v\n", n)
fmt.Printf("line: %v\n", string(line))
if err != nil {
log.Printf("Error writing SSE: %v", err)
return
}

w.Flush()

}
}

运行上面的代码

1
2
go run server.go
go run proxy.go

使用客户端访问1

1
2
3
4
curl -i http://localhost:9090/events 

# 可以看到`Transfer-Encoding: chunked`
# 可以发现client可以一段一段的接收数据,而不必等数据全部传输完

使用客户端访问2

1
2
3
4
5
6
7
curl http://localhost:9090/events -H 'Accept-Encoding: gzip'

// 可以看到`Transfer-Encoding: chunked`,`Content-Type: gzip`
// 发现要等所有数据传输完成才会进行解压操作。
// 原因是如果'Accept-Encoding: gzip',proxy.go中的w.Write(line)会调用gzip对数据
// 进行压缩,而且第一次write的话是写入10个字节的gzip压缩头,后面的所有数据在下一次chunked发送,因为ReadBytes('\n')无法获取到'\n',所以会发生阻塞直到所有数据完成。
// n, err := w.Write(line)

Accept-Encoding, Content-Type

Accept-Encoding: chunked表示使用块传输,定义的是数据的传输的方式
传输格式:

1
2
3
4
5
6
7
8
9
10
HTTP/1.1 200 OK
Content-Type: text/plain
Transfer-Encoding: chunked

7\r\n
Mozilla\r\n
11\r\n
Developer Network\r\n
0\r\n
\r\n

Content-Type: gzip 表示传输的内容gzip压缩的

上面的proxy.go也可以改为每个块传输的都是可以解压的gzip数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// server_gzip.go
package main

import (
"bytes"
"compress/gzip"
"fmt"
"github.com/emicklei/go-restful/v3"
"log"
"net/http"
"time"
)

func main() {
ws := new(restful.WebService)
ws.Route(ws.GET("/events").To(handleSSEGZIP))
restful.Add(ws)
//http.HandleFunc("/events", handleSSE)
fmt.Println("Server is running on http://localhost:8088")
log.Fatal(http.ListenAndServe(":8088", nil))
}

func handleSSEGZIP(req *restful.Request, w *restful.Response) {
// 设置 headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
//w.Header().Set("Transfer-Encoding", "chunked")

// 创建一个通道用于发送消息
messageChan := make(chan string)
s := "data"
// 在后台生成消息
go func() {
for i := 0; i < 5; i++ {

messageChan <- fmt.Sprintf("Message %s:%d\n", s, i)
time.Sleep(1 * time.Second)
}
messageChan <- "[DONE]"
}()
// 持续发送消息
for {
select {
case message := <-messageChan:
//count++
if message == "[DONE]" {
return
}
fmt.Printf("send message to client\n")
fmt.Printf("message: %s\n", message)
compressedData, err := compressedWithGZIP([]byte(message))
if err != nil {
panic(err)
}
//fmt.Fprintf(w, fmt.Sprintf())
fmt.Println(compressedData)
w.Write(compressedData)
w.Flush()

case <-req.Request.Context().Done():
fmt.Println("Client disconnected")
return
}
}
}

func compressedWithGZIP(data []byte) ([]byte, error) {
var buf bytes.Buffer
writer := gzip.NewWriter(&buf)
_, err := writer.Write(data)
if err != nil {
return nil, err
}
writer.Close()
return buf.Bytes(), nil
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// client_gzip.go
package main

import (
"bufio"
"compress/gzip"
"fmt"
"github.com/go-resty/resty/v2"
"log"
"time"
)

func main() {
client := resty.New()

client.SetTimeout(20000000 * time.Second)

resp, err := client.R().
SetDoNotParseResponse(true).
Get("http://localhost:8088/events")

if err != nil {
log.Printf("Error sending request: %v", err)
return
}

var reader *bufio.Reader

gzipReader, err := gzip.NewReader(resp.RawBody())
if err != nil {
fmt.Printf("make newReader err=%v\n", err)
}

defer gzipReader.Close()
reader = bufio.NewReader(gzipReader)

// 读取响应
for {
fmt.Printf("try to read....xxx")
line, err := reader.ReadBytes('\n')

fmt.Printf("try to read2....xxx")

if err != nil {
log.Printf("Error reading: %v", err)
return
}
fmt.Printf("line: %v", string(line))
}
}

Ref:

  1. https://datatracker.ietf.org/doc/html/rfc8895#name-server-push-server-sent-eve
  2. https://blog.axway.com/learning-center/apis/api-streaming/server-sent-events
  3. https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events-intro
  4. https://developer.mozilla.org/en-US/docs/Web/API/EventSource
  5. https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
  6. https://docs.mulesoft.com/dataweave/latest/dataweave-formats-ndjson
  7. https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding#chunked_encoding