golang+sse+angular的心跳机制、angullar的轮询机制、time.Duration和time.NewTicker的学习

长连接断开的原因

  • 连接超时,浏览器自动断开连接
  • 进程被杀死
  • 不可抗拒因素

根据不同情况,高效保活的方式

  • 连接超时:心跳机制
  • 进程保活
  • 断线重连

重点心跳机制

  • 产物

    • 心跳包
    • 心跳应答

轮询与心跳区别

  • 轮询一次相当于:建立一次TCP连接+断开连接
  • 心跳:在已有的连接上进行保活

心跳设计要点

  • 心跳包的规格(内容&大小)
  • 心跳发送间隔时间(按照项目的特性进行判断)
  • 断线重连机制(核心= 如何判断长连接的有效性)

心跳具体实现(基于sse的长连接)

  • 客户端做心跳机制:客户端长时间没有反应,使用心跳机制,证明客户端的存在
  • 服务端做心跳机制:服务端长时间没有反应,使用心跳机制,证明服务端还存在
  • 服务端做心跳机制

思考点:

  • 如何判断连接中断信号(单独的思考,在本次的代码中,没有用于跟心跳机制有关,以后有想法,会补上)
notify := w.(http.CloseNotifier).CloseNotify()
    // log.Println("notify:",<- notify) 会直接堵住的,因为notify它接收连接中断信号
go func(){
    // 太迷了,正确想法就是:只能接收异常的信号,就是网络中断的信号
    fmt.Println("接收连接中断信号")
    <-notify
    userData[r.RemoteAddr] = r.RemoteAddr
    offUser <- r.RemoteAddr
    log.Println(r.RemoteAddr,"just close")
}()
  • 如何将一一对应的客户端和服务端保存
// 接收发送给客户端数据
type RW struct{
    Rw http.ResponseWriter
    T time.Time
}
var rw = make(map[int64]*RW)

// 考虑使用map。记得当正确的数据发送给客户端之后要将对应的map键值删除
delete(rw,a)   // 当发送完之后,就要将这个客户端删除了。a时键值
  • 利用golang中的time.Ticker机制,监听是否有服务端等待,然后进行轮询保活。心跳机制重点(利用协程进行监听)
// 保活,心跳
    go func(){
        defer func(){
            if err := recover();err!=nil{
                fmt.Println(err)
            }
        }()
        fmt.Println("开启保活")
        keepAliveInterval := time.Duration(6000)
        fmt.Println(keepAliveInterval)
        ticker := time.NewTicker(3*time.Second)
        for {
            select{
            case <-ticker.C:
                fmt.Println("保活,心跳机制")
                t1 := time.Now()
                for _,value:= range rw{
                    fmt.Println(value)
                    if t1.Sub(value.T)>keepAliveInterval{
                        fmt.Println("进入保活")
                        f,ok:=value.Rw.(http.Flusher)
                        if !ok{
                            fmt.Fprintf(value.Rw,"不能用来做sse")
                            return
                        }
                        fmt.Fprintf(value.Rw,"data:请耐心等待,我正在努力的加载数据\n\n")
                        f.Flush()
                    }
                }
            }
        }
    }()

样例代码

server.go

package main

import(
    "fmt"
    "log"
    "time"
    "sync"
    "net/http"
)

// 接收发送给客户端数据
type RW struct{
    Rw http.ResponseWriter
    T time.Time
}

var offUser = make(chan string,0)
var userData = make(map[string]string)
var rw = make(map[int64]*RW)
var i int64 = 0
var lock sync.Mutex

func init(){
    log.SetFlags(log.Ltime|log.Lshortfile)
}

func sseService(w http.ResponseWriter,r *http.Request){
    var a int64  // 用来接收key值
    defer func(){
        if err := recover();err!=nil{
            fmt.Println(err)
        }
    }()

    lock.Lock()
    i++
    a=i
    lock.Unlock()
    // 提取get请求参数
    fmt.Println("a =",a)
    f,ok := w.(http.Flusher)
    if !ok{
        http.Error(w,"cannot support sse",http.StatusInternalServerError)
        return
    }

    // 用于监听客户端时候已经断开了连接
    notify := w.(http.CloseNotifier).CloseNotify()
    // log.Println("notify:",<- notify) 会直接堵住的,因为notify它接收网络中断信号
    go func(){
        fmt.Println("接收关闭信号")
        <-notify
        offUser <- r.RemoteAddr
        log.Println(r.RemoteAddr,"just close")
    }()

    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("Transfer-Encoding", "chunked")
    w.Header().Set("Access-Control-Allow-Origin","*")

    fmt.Fprintf(w,"data:welcome\n\n")
    f.Flush()

    // 将当前的w保存
    fmt.Println("心跳")
    t := time.Now()
    rr := &RW{Rw:w,T:t}
    fmt.Println("rr =",rr)
    rw[a] = rr 

    // 模拟服务端接收发送数据阻塞
    fmt.Println("模拟服务端发送数据阻塞")
    time.Sleep(time.Second*30)
    fmt.Fprintf(w,"data:12345加油\n\n")
    f.Flush()
    delete(rw,a)   // 当发送完之后,就要将这个客户端删除了
}

func testClose(w http.ResponseWriter,r *http.Request){
    fmt.Println("remoteAddr:",r.RemoteAddr)
    fmt.Println("userData:",userData)
    // 用于监听客户端时候已经断开了连接
    notify := w.(http.CloseNotifier).CloseNotify()
    go func(){
        fmt.Println("接收连接中断信号")
        <-notify
        userData[r.RemoteAddr] = r.RemoteAddr
        offUser <- r.RemoteAddr
        log.Println(r.RemoteAddr,"just close")
    }()
    time.Sleep(time.Second*1)
    fmt.Fprintln(w,"这里任意数字")
}

func main(){
    fmt.Println("sse1")

    // 获取中断的客户端
    go func(){
        fmt.Println("监听关闭的客户端")
        for{
            select{
            case user:=<-offUser:
                log.Println("userOff:",user)
            }
        }
    }()

    // 保活,心跳
    go func(){
        defer func(){
            if err := recover();err!=nil{
                fmt.Println(err)
            }
        }()
        fmt.Println("开启保活")
        keepAliveInterval := time.Duration(6000)
        fmt.Println(keepAliveInterval)
        ticker := time.NewTicker(3*time.Second)
        for {
            select{
            case <-ticker.C:
                fmt.Println("保活,心跳机制")
                t1 := time.Now()
                for _,value:= range rw{
                    fmt.Println(value)
                    if t1.Sub(value.T)>keepAliveInterval{
                        fmt.Println("进入保活")
                        f,ok:=value.Rw.(http.Flusher)
                        if !ok{
                            fmt.Fprintf(value.Rw,"不能用来做sse")
                            return
                        }
                        fmt.Fprintf(value.Rw,"data:请耐心等待,我正在努力的加载数据\n\n")
                        f.Flush()
                    }
                }
            }
        }
    }()

    http.HandleFunc("/sse",sseService)
    http.HandleFunc("/testClose",testClose)
    http.ListenAndServe(":8080",nil)
}

client(angular)

  sse(){
    let that = this
    if ("EventSource" in window){
      console.log("可以使用EventSource")
    }else{
      return
    }
    var url = "http://localhost:8080/sse?pid="+12345
    var es = new EventSource(url)
    // 监听事件
    // 连接事件
    es.onopen = function(e:any){
      console.log("我进来啦")
      console.log(e)
    }

    // message事件
    es.onmessage = function(e){
      that.Data = e.data
      if (e.data=="12345加油"){  // 后端通知前端结束发送信息
        console.log("12345加油,这是服务端正确想发送的数据")
        es.close()
      }else{
        console.log(e.data)
      }
    }
    es.addEventListener("error",(e:any)=>{
      // 这里的e要声明变量,否则回报没有readyState属性
      console.log("e.target",e.target)
      console.log("SSEERROR:",e.target.readyState)
      if(e.target.readyState == 0){
        // 重连
        console.log("Reconnecting...")
        es.close()  // 不开启服务端,直接关闭
      }
      if(e.target.readyState==2){
        // 放弃
        console.log("give up.")
      }
    },false);
  }

学习心跳机制附带的知识点

angular设置轮询

  • setInterval()方法重复调用一个函数或执行一个代码段,在每次调用之间具有固定的时间延迟
  • clearInterval()删除重复调用
 myTest = setInterval(()=>{
      var i:number = 1
      console.log("轮询还是心跳")
      if(i===4){
        return
      }
      i++
    },1500)  // 一旦实例化,就会直接运行

  test(){
   clearInterval(this.myTest)   // 清除重复运行函数
  }

time.Duration

  • Duration的基本单位是纳秒
  • 作用:打印时间时,根据最合适的时间单位打印;用于时间比较
keepAliveInterval := time.Duration(3) 

// 打印数据值
3ns

time.NewTicker

  • 创建一个轮询机制,规定隔一段时间处理一次函数
ticker := time.NewTicker(500 * time.Millisecond)
done := make(chan bool)

go func(){
    for{
        select{
        case <-done:
            return
        case t := <-ticker.C: // 500微秒轮询一次
            fmt.Println("Tick at",t)
        }
    }
}()

time.Sleep(10*time.Second)
ticker.Stop()
done<-true
fmt.Println("ticker stopper")

总结

  • 学到一招:对于有是接口的方法:直接去看相对应实现的源代码

原文地址:https://www.cnblogs.com/MyUniverse/p/11746159.html

时间: 2024-11-08 18:54:14

golang+sse+angular的心跳机制、angullar的轮询机制、time.Duration和time.NewTicker的学习的相关文章

golang轮询机制select的理解

func main() { c := make(chan int) quit := make(chan int) go func() { for i := 0; i < 10; i++ { fmt.Println(<-c) } quit <- 0 }() fibonacci(c, quit) } //select的轮询机制 func fibonacci(c chan int, quit chan int) { x, y := 0, 1 for { select { // select轮询

OSChina客户端源码学习(3)--轮询机制的实现

主要以OSChina Android客户端源码中Notice的轮询机制进行解读. 一.基础知识 一般IM(即使通讯)的实现有两种方式:推送和轮询,推送就是服务器主动向客户端发送消息,用特定的协议比如XMPP.MQTT.另一种是轮询,实时性并不高,而且比较耗电.这种有分为两种情况:一段时间发起一次查询和死循环进行查询. 参考: http://jcodecraeer.com/a/anzhuokaifa/androidkaifa/2014/0401/1609.html 远端Service调用: a.服

OSChinaclient源代码学习(3)--轮询机制的实现

主要以OSChina Androidclient源代码中Notice的轮询机制进行解读. 一.基础知识 一般IM(即使通讯)的实现有两种方式:推送和轮询,推送就是server主动向client发送消息,用特定的协议比方XMPP.MQTT. 还有一种是轮询,实时性并不高.并且比較耗电.这样的有分为两种情况:一段时间发起一次查询和死循环进行查询. 參考: http://jcodecraeer.com/a/anzhuokaifa/androidkaifa/2014/0401/1609.html 远端S

Keepalived 之 双主模式+DNS轮询机制 实现高负载

一.Keepalived双主模式+DNS轮询机制作用 作用:在单主模式下,备机通常会以等待状态放着,不接受任何数据,导致所有数据请求只往主机-负载均衡发送,做成资源浪费:而双主模式,即创造两个VIP,两个VIP分别放在两台负载均衡的机器上,同时两台主机均为对方的备机,以作VIP的漂移,服务接管作用,加入DNS轮询机制,使客户端的域名分别依次解释到两个VIP上,形成两台负载均衡主机同时对外提供服务.同时也解决了单主模式下的单机性能屏颈. 二.网络拓扑图 三.两台负载均衡主机的Keepalived

3. 闭包_对象组合继承模式_事件轮询机制

1.谈谈闭包? (产生条件.是什么.在哪里.作用.生命周期.缺点) 产生闭包的三个条件: 函数嵌套 内部函数引用外部函数的局部变量 执行外部函数 包含被引用的局部变量的一个"对象",  通过 chrome 开发者工具可以调试查看到,就是 closure,它存在嵌套的内部函数中 作用: 延长了局部变量的存活时间, 让函数外部可以操作(读写)到函数内部的数据(变量/函数) 闭包的生命周期: 产生 :  在嵌套内部函数定义执行完时就产生了(不是在调用) 死亡 :  在嵌套的内部函数成为垃圾对

Linux中的DNS 正向解析与反向解析,轮询机制

1.正向解析 (域名转换为IP地址) 1.vim /etc/named.conf 修改如下 2.编辑dns的副配置文件vim /etc/named.rfc1912.zones 3.cd var/named/cp -p named.localhost bruce.com.zone 复制其文件包括权限属性原文件修改vim bruce.com.zone 4.修改完成后正向解析的记录加好重启服务systemctl restart named 重启服务systemctl stop firewalld 关闭

node.js的事件轮询机制

借助libuv库实现的 概括事件轮询机制:分为六个阶段1.timers 定时器阶段计时和执行到点的定时器回调函数 2.pending callbacks某些系统操作(例如TCP错误类型) 3.idle,prepare 4.poll轮询阶段(轮询队列)如果轮询队列不为空,依次同步取出轮询队列中第一个回调函数,直到轮询队列为空或者达到系统最大限制如果轮询队列为空 如果之前设置过setImmediate函数,直接进入下一个check阶段,如果之前没有设置过setImmediate函数,在当前 poll

JS中的异步以及事件轮询机制

一.JS为何是单线程的? JavaScript语言的一大特点就是单线程,也就是说,同一个时间只能做一件事.那么,为什么JavaScript不能有多个线程呢?这样能提高效率啊.(在JAVA和c#中的异步均是通过多线程实现的,没有循环队列一说,直接在子线程中完成相关的操作) JavaScript的单线程,与它的用途有关.作为浏览器脚本语言,JavaScript的主要用途是与用户互动,以及操作DOM.这决定了它只能是单线程,否则会带来很复杂的同步问题.比如,假定JavaScript同时有两个线程,一个

轮询与心跳机制

上一篇内容参见() 在上一篇文章中提到的缓存内网服务端会话信息中,外网服务端保存内网服务端会话的有效性以及平台上监控所有内网服务端的网络状况,模仿心跳机制实现,这里在做一点叙诉,关于思路和具体实现. 在很多的平台应用中,都有这样的需求,平台内包括多个子系统或者属于其管控范围内的其他平台,需要对这些系统进行统一的监控,来查看当前的运行状态或者其他运行信息,我们的应用也有这样的一个情况,需要再外网服务端(平台)上监控,其下运行的多个内网服务端的网络状况,查阅了写资料后确立了2种可实现的方式. 1:轮