开发者

Golang SSE 服务器端推送事件

目录
  • 写在前面(愚蠢的我犯的错误)
  • 原因
  • SSE介绍
  • golang实现方式

写在前面(愚蠢的我犯的错误)

Golang SSE 服务器端推送事件

本应该在EventStream的怎么都在响应这里出现

后面通过查找问题知道EventSream有特殊的回复格式为:

data: [返回的内容]\n\n

示例:

data: success\n\n

返回success字符串

原因

我做了一个在线点赞的实时更新的小玩意,我想着实时更新WS全双工用不着。

SSE介绍

SSE(Server-Sent Event)是一种用于客户端与服务器端实时通讯的技术。它允许服务器端发送事件到客户端,客户端可以通过 EventSource 接口来接收这些事件。通常情况下,SSE 是基于 HTTP 协议实现的,它不需要建立和维护长连接,但服务器可以长时间向客户端推送数据,而客户端只需要等待并处理收到的数据即可。

Golang实现方式

SSE核心代码

	//sse Server-Sent-Events 服务事件流
	http.HandleFunc("/sse", func(w http.ResponseWriter, r *http.Request) {

		// 设置响应头,表明这是一个 SSE 连接
		w.Header().Set("Content-Type", "text/event-stream")
		w.Header().Set("Cache-Control", "no-cache")
		w.Header().Set("Connection", "keep-alive")

		//设置为刷新模式
		flush, ok := w.(http.Flusher)
		flush.Flush()

		if !ok {
			//判断是否转换成功,不成功则返回错误信息
			responseInfo(http.StatusOK, "response cannot convert http.Flusher", w)
			return
		}
		
		//这里因为我创建了一个Map用来存储响应IO和Flush刷新,
		//我在其他地方可以使用遍历进行给各个通信端进行发送信息
		respFlushMap[&w] = &flush
		
		select {
		case <-r.Context().Done():
			delete(respFlushMap, &w)
			return
		}
	})

发送事件请求

func main(){
	//....
	//点赞评论
	http.HandleFunc("/favorite", favorite(client))
	//...
}

// 点赞
func favorite(client *Redis.Client) func(w http.ResponseWriter, r *http.Request) {
	var lock = sync.RWMutex{}
	return func(w http.ResponseWriter, r *http.Request) {
	
		/* 业务处理逻辑
			...python...
		*/
		
		//核心代码 将点赞信息发送到各个SSE
		for writer, flusher := range respFlushMap {
			//一定要是这个格式“data: [数据内容]\n\n”不然前端不会体现在ServeEvent中而出现在response中
			fmt.Fprintf(*writer, "data: %s@%s\n\n", member, method)
			(*flusher).pythonFlush()
		}
	}
}

全部代码,包含了一些处理逻辑,可能比较混乱建议还是看看之前的

javascript
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/go-redis/redis/v8"
	"html/template"
	"log"
	"math/rand"
	"net"
	"net/http"
	"strconv"
	"sync"
	"time"
)

var commentNodeHashRedisKey = "commentNodeHashRedisKey"
var commentNodeSorterSetRedisKey = "commentNodeSorterSetRedisKey"

var respFlushMap = make(map[*http.ResponseWriter]*http.Flusher)

type CommentNode struct {
	Content    string  `json:"content"`    //内容
	Score      float64 `json:"score"`      //点赞数
	IP         string  `json:"ip"`         //IP
	NickName   string  `json:"nickName"`   //昵称
	IsFavorite bool    `json:"isFavorite"` //是否点赞
	Member     string  `json:"member"`     //唯一值
}

func main() {

	//静态资源文件
	staticServer := http.FileServer(http.Dir("./template"))

	//处理静态资源文件
	http.Handle("/static/", http.StripPrefix("/static/", staticServer))

	//创建客户端
	client := redis.NewClient(&redis.Options{
		Addr:     "192.168.192.170:6379",
		Password: "",
		DB:       0,
	})

	//判断时候连接成功
	err := client.Ping(context.Background()).Err()
	if err != nil {
		log.Println("连接错误: ", err.Error())
	}
	log.Println("连接成功")

	//添加评论
	http.HandleFunc("/addComment", addComment(client))

	//点赞评论
	http.HandleFunc("/favorite", favorite(client))

	//sse Server-Sent-Events 服务事件流
	http.HandleFunc("/sse", func(w http.ResponseWriter, r *http.Request) {

		// 设置响应头,表明这是一个 SSE 连接
		w.Header().Set("Content-Type", "text/event-stream")
		w.Header().Set("Cache-Control", "no-cache")
		w.Header().Set("Connection", "keep-alive")

		//设置为刷新模式
		flush, ok := w.(http.Flusher)
		flush.Flush()

		if !ok {
			responseInfo(http.StatusOK, "response cannot convert http.Flusher", w)
			return
		}

		respFlushMap[&w] = &flush
		ticker := time.NewTicker(time.Second)
		defer ticker.Stop()

		select {
		case <-r.Context().Done():
			delete(respFlushMap, &w)
			return
		}
	})

	http.HandleFunc("/commentList", commentList(client))

	//主页
	http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
		//python获取模板
		indexFile, err := template.ParseFiles("./template/index.html")
		if err != nil {
			log.Println(err.Error())
			resp.Write([]byte("./template/index.html not found"))
			return
		}
		//将内容输出
		indexFile.Execute(resp, nil)
	})

	//启动服务
	if err := http.ListenAndServe(":80", nil); err != nil {
		log.Println("启动服务失败!" + err.Error())
	}

}

// 添加评论
func addComment(client *redis.Client) func(w http.ResponseWriter, r *http.Request) {
	return func(w http.ResponseWriter, r *http.Request) {

		query := r.URL.Query()
		nickname := query.Get("nickName")
		if nickname == "" {
			nickname = "逸士"
		}

		//判断内容是否为空
		content := query.Get("content")
		if content == "" {
			responseInfo(http.StatusBadRequest, "your comment content is empty", w)
			return
		}
		host, _, _ := net.SplitHostPort(r.RempythonoteAddr)

		//使用时间戳
		member := fmt.Sprint(time.Now().UnixMilli() ^ rand.Int63())
		//序列化
		comment, _ := json.Marshal(CommentNode{
			Member:   member,
			IP:       host,
			NickName: nickname,
			Content:  content,
		})

		//添加到队列中
		client.HSet(r.Context(), commentNodeHashRedisKey, member, string(comment))

		//更新排行
		client.ZAdd(r.Context(), commentNodeSorterSetRedisKey, &redis.Z{
			Score:  0,
			Member: member,
		})

		responseInfo(http.StatusOK, "add comment success", w)
	}
}

// 点赞
func favorite(client *redis.Client) func(w http.ResponseWriter, r *http.Request) {
	var lock = sync.RWMutex{}
	return func(w http.ResponseWriter, r *http.Request) {
		lock.Lock()
		defer lock.Unlock()

		//查询成员(member)是否存在
		query := r.URL.Query()
		member := query.Get("member")
		method := r.Method
		if member == "" {
			responseInfo(http.StatusBadRequest, "member cannot be empty", w)
			lock.Unlock()
			return
		}

		//获取分数
		score, err := client.ZScore(r.Context(), commentNodeSorterSetRedisKey, member).Result()
		//点赞减少
		if method == http.MethodDelete {
			score -= 2
		}
		score++

		//更新排行
		client.ZAdd(r.Context(), commentNodeSorterSetRedisKey, &redis.Z{
			Score:  score,
			Member: member,
		})

		if err != nil {
			//不存在返回错误
			responseInfo(http.StatusBadRequest, "member does not exists", w)
			return
		}

		//更新分数
		var commentNode CommentNode
		commentNodeStr, err := client.HGet(r.Context(), commentNodeHashRedisKey, member).Result()
		if err != nil {
			responseInfo(http.StatusBadRequest, "update hash scope error: "+err.Error(), w)
			return
		}
		err = json.Unmarshal([]byte(commentNodeStr), &commentNode)
		if err != nil {
			responseInfo(http.StatusBadRequest, "update hash scope error: "+err.Error(), w)
			return
		}

		commentNode.Score = score
		data, _ := json.Marshal(commentNode)
		if err = client.HSet(r.Context(), commentNodeHashRedisKey, member, data).Err(); err != nil {
			responseInfo(http.StatusBadRequest, "update hash scope error: "+err.Error(), w)
			return
		}

		//返回成功
		responseInfo(http.StatusOK, "favorite comment success", w)

		//将点赞信息发送到各个SSE
		for writer, flusher := range respFlushMap {
			fmt.Fprintf(*writer, "data: %s@%s\n\n", member, method)
			(*flusher).Flush()
		}
	}
}

// 评论列表
func commentList(client *redis.Client) func(resp http.ResponseWriter, req *http.Request) {
	return func(resp http.ResponseWriter, req *http.Request) {

		query := req.URL.Query()
		offset, err := strconv.Atoi(query.Get("offset"))
		if err != nil {
			offset = 100
		}

		//连接人地址
		connectionAddr := req.RemoteAddr
		log.Printf("连接人地址: %s\n", connectionAddr)

		//获取offset偏移量的排行
		result, err := client.ZRevRangeWithScores(req.Context(), commentNodeSorterSetRedisKey, 0, int64(offset-1)).Result()

		if err != nil || result == nil {
			responseInfo(http.StatusOK, fmt.Sprint("错误:", err), resp)
			return
		}

		//获取评论详细信息
		members := make([]string, 0)
		scopeMap := make(map[string]float64)
		for _, item := range result {
			members = append(members, item.Member.(string))
			scopeMap[item.Member.(string)] = item.Score
		}
		rlt, err := client.HMGet(req.Context(), commentNodeHashRedisKey, members...).Result()
		if err != nil {
			responseInfo(http.StatusInternalServerError, err.Error(), resp)
			return
		}
		data, _ := json.Marshal(rlt)

		responseInfo(http.StatusOK, string(data), resp)
	}
}

func responseInfo(code int, info string, w http.ResponseWriter) {
	w.WriteHeader(code)
	w.Write([]byte(info))
}

到此这篇关于Golang SSE 服务器端推送事件的文章就介绍到这了,更多相关Golang SSE推送内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新开发

开发排行榜