基于Go的websocket消息服务

  3个月没写PHP了,这是我的第一个中小型go的websocket微服务。那么问题来了,github上那么多轮子,我为什么要自己造轮子呢?

  Why 造轮子?

  因为这样不仅能锻炼自己的技术能力,而且能帮助深入了解其中的实现原理。

  直接上流程图:

  

  

  其实其中有些难点并没有反映出来,比如历史消息数据的存储结构、病发时遇到的一些坑等。

  历史消息的存储结构 :

  即广播、组播可拆解成单播,那么代码就可以变得简单。

  但是,但是,但是,有看到 "ref"? ref表示,用户的历史消息,是否是一个引用, 类似于c/cpp的指针、地址。想一想,如果广播给1w用户,那么是不是要把一个msg push到每一个用户呢?

  答案至少有2:

  其一:push msg给everyone,优点:读取数据时很方便, 缺点:数据大量冗余,且push一瞬间io量过大,效率低;

  其二:push msg时,分别存储:广播表、组播表、单播表, 优点:分别查询性能高,无冗余 , 缺点:综合查询用户的所有历史消息时,性能差,而且redis的网络io次数较多,还有时间等排序的问题。

  综合考虑,选用第1种方案。

  问题又来了, 这个项目开发顺利不,遇到坑没?

  废话,技术的活,哪有不带坑的!

  坑1:panic中断既出 ,真tmd不是我想要的, 解决方式是:recovery   ( : P

  坑2:环境变量向内包的传递,试了几种办法不行,最后用一个包作代理,封装工厂和单例, 最好的解决了。

  

var instance *env

func NewEnv()*env {
	env := env{}
	env.init()
	env.parameters = make(map[string]interface{})
	return &env
}

func SingleEnv()*env{
	if nil == instance {
		instance = NewEnv()
	}
	return instance
}

//...

   坑3:websocket跨域问题,解决方法至少有2:可以修改默认设定

	// 临时忽略websocket跨域
	ws := websocket.Upgrader{
	}
	if model.SingleConfig().Http.EnableCrossSite {
		ws.CheckOrigin = func(r *http.Request) bool { //mock and stub
			return true
		}
	}

  或者是在nginx上加这些,相当于在同一个域,推荐用这:

nginx conf:

upstream push {
	ip_hash;
	server 127.0.0.1:9999 ;
	keepalive 60;
}
map $http_upgrade $connection_upgrade {
	default upgrade;
	‘‘      close;
}

server {
   listen 80;
   server_name dev.push.pub.sina.com.cn;

    location /push {
        proxy_http_version 1.1;
        proxy_redirect off;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        client_max_body_size 10m;
        client_body_buffer_size 128k;
        proxy_connect_timeout 300;
        proxy_send_timeout 300;
        proxy_read_timeout 300;
        proxy_pass http://push;
        fastcgi_keep_conn on;
        include        fastcgi_params;
    }

}

  坑4:go map不内建支持并发安全,这是最大的问题。解决稍有点麻烦,需要用到RWMutex锁。 我参考beego写的:

package lib

import "sync"

type RWLocker struct {
	mtx sync.RWMutex
}
func NewRWLock()*RWLocker{
	return &RWLocker{}
}

func (l *RWLocker)RLockDo(callback func()){
	l.mtx.RLock()
	defer l.mtx.RUnlock()
	callback()
}
func (l *RWLocker)RWLockDo(callback func()){
	l.mtx.Lock()
	defer l.mtx.Unlock()
	callback()
}

type Locker struct {
	mtx sync.Mutex
}
func NewLock()*Locker{
	return &Locker{}
}
func (l *Locker)LockDo(callback func()){
	l.mtx.Lock()
	defer l.mtx.Unlock()
	callback()
}

type MutexMap struct{
	m    map[interface{}]interface{}
	lock *sync.RWMutex

}
func NewMutexMap() *MutexMap {
	return &MutexMap{
		lock: new(sync.RWMutex),
		m:    make(map[interface{}]interface{}),
	}
}
func (m *MutexMap) Size() int{
	return len(m.m)
}
func (m *MutexMap) Raw() map[interface{}]interface{} {
	return m.m
}
//Get from maps return the k‘s value
func (m *MutexMap) Get(k interface{}) interface{} {
	m.lock.RLock()
	defer m.lock.RUnlock()
	if val, ok := m.m[k]; ok {
		return val
	}
	return nil
}

// Maps the given key and value. Returns false
// if the key is already in the map and changes nothing.
func (m *MutexMap) Set(k interface{}, v interface{}) bool {
	m.lock.Lock()
	defer m.lock.Unlock()
	if val, ok := m.m[k]; !ok {
		m.m[k] = v
	} else if val != v {
		m.m[k] = v
	} else {
		return false
	}
	return true
}

// Returns true if k is exist in the map.
func (m *MutexMap) Check(k interface{}) bool {
	m.lock.RLock()
	defer m.lock.RUnlock()
	if _, ok := m.m[k]; !ok {
		return false
	}
	return true
}

func (m *MutexMap) Keys(ignoreNil  bool, keys ...interface{}) []interface{}{
	m.lock.RLock()
	defer m.lock.RUnlock()
	vals := []interface{}{}
	for _,k := range keys {
		if v,ok := m.m[k]; ok {
			vals = append(vals, v)
		}else{
			if !ignoreNil {
				vals = append(vals, nil)
			}
		}
	}
	return vals
}
func (m *MutexMap) Delete(k interface{}) {
	m.lock.Lock()
	defer m.lock.Unlock()
	delete(m.m, k)
}

  

  基本的坑就是这些了,上线部署当然是jenkins+salt+rsync:

  最后,谈下,维护性、调试性。

  首先维护性:目前只遇到几次go会异常崩溃的情况,一般都是不细心或并发安全没做好,这个根据日志、race tool、strace/gdb可以搞定。

  另外,调试性的话,介于php, cpp之间,和java类似,一般能检查出问题,并打出日志,包括数组下标越界等,另外 还有pprof/strace/gdb等神器能用上,还是不错的。

  哈哈,今天就写这么多了, 要哄妹子了-----------我闺女。

  :P

  

  

  

时间: 2024-12-28 14:31:54

基于Go的websocket消息服务的相关文章

.net平台 基于 XMPP协议的即时消息服务端简单实现

.net平台 基于 XMPP协议的即时消息服务端简单实现 昨天抽空学习了一下XMPP,在网上找了好久,中文的资料太少了所以做这个简单的例子,今天才完成.公司也正在准备开发基于XMPP协议的即时通讯工具所以也算是打一个基础吧!如果你还没有了解过XMPP请先阅读附录中链接的文章,本实例是基agsXMPP上开发的,agsXMPP是C#写的支持开源XMPP协议软件,我们可以在agsXMPP上快速构建自已的即时通讯平台,我的这个例子只是修改了服务器端,因为agsXMPP本身自带的服务器端没有实现聊天功能.

滴滴出行基于RocketMQ构建企业级消息队列服务的实践

小结: 1. https://mp.weixin.qq.com/s/v6NM3UgX-qTI7yO1QPCJrw 滴滴出行基于RocketMQ构建企业级消息队列服务的实践 原创: 江海挺 阿里巴巴中间件 2018-11-01 原文地址:https://www.cnblogs.com/yuanjiangw/p/10780829.html

搭建websocket消息推送服务,必须要考虑的几个问题

近年,不论是正在快速增长的直播,远程教育以及IM聊天场景,还是在常规企业级系统中用到的系统提醒,对websocket的需求越来越大,对websocket的要求也越来越高.从早期对websocket的应用仅限于少部分功能和IM等特殊场景,逐步发展为追求支持高并发,百万.千万级每秒通讯的高可用websocket服务. 面对各种新场景对websocket功能和性能越来越高的需求,不同的团队有不同的选择,有的直接使用由专业团队开发的成熟稳定的第三方websocket服务,有些则选择自建websocket

Java消息服务初步学习(基于Spring In Action的整理)

几个名词 Java消息服务(Java Message Service)是一个Java标准,定义了使用消息代理的通用API. 消息代理(message broker):类似于邮局的作用,确保消息被投递到指定的目的地. ActiveMQ Kafka 目的地(destination) 队列(queue,点对点模型):消息可以有多个接收者,但每一条消息只能被一个接收者取走. 主题(topic,点对线模型):订阅此主题的订阅者都会接收到此消息的副本. 异步消息相较于同步消息的优点 时间:异步消息不需要等待

基于Tomcat7的websocket的应用实现

这几天想做一个单对单的在线聊天模块,所以研究了一下websocket,以下就来分享一下基于tomcat7的websocket实现的小例子. 1.WebSocket是什么 WebSocket protocol 是HTML5一种新的协议.它实现了浏览器与服务器全双工通信(full-duplex). 它摒弃了以轮询为主要方式的传统的即时通讯技术,真正实现了一个浏览器与服务器之间的全双工通讯,通过一个握手的动作,使得浏览器与服务器之间形成一条通道,实现数据的互相传送. 2.WebSocket在Tomca

消息服务框架

"一切都是消息"--MSF(消息服务框架)入门简介 "一切都是消息"--这是MSF(消息服务框架)的设计哲学. MSF的名字是 Message Service Framework 的简称,中文名称:消息服务框架,它是PDF.NET框架的一部分. 1,MSF诞生的背景 MSF最初来源于2009年,我们为某银行开发的基金投资分析系统,由于银行安全的原因并且这些投资资料属于机密资料,规定必须使用邮件系统来发送这些资料,但是邮件的收发不是直接针对人,而是两端的计算机程序.为

WebSocket消息推送

WebSocket协议是基于TCP的一种新的网络协议,应用层,是TCP/IP协议的子集. 它实现了浏览器与服务器全双工(full-duplex)通信,客户端和服务器都可以向对方主动发送和接收数据.在JS中创建WebSocket后,会有一个HTTP请求发向浏览器以发起请求.在取得服务器响应后,建立的连接会使用HTTP升级将HTTP协议转换为WebSocket协议.也就是说,使用标准的HTTP协议无法实现WebSocket,只有支持那些协议的专门浏览器才能正常工作.由于WebScoket使用了自定义

spring boot项目之webSocket消息推送

WebSocket是客户端和服务端的通信(https://www.cnblogs.com/jingmoxukong/p/7755643.html) 了解计算机网络协议的人,应该都知道:HTTP 协议是一种无状态的.无连接的.单向的应用层协议.它采用了请求/响应模型.通信请求只能由客户端发起,服务端对请求做出应答处理. 这种通信模型有一个弊端:HTTP 协议无法实现服务器主动向客户端发起消息. 这种单向请求的特点,注定了如果服务器有连续的状态变化,客户端要获知就非常麻烦.大多数 Web 应用程序将

C(++) Websocket消息推送---GoEasy

Goeasy, 它是一款第三方推送服务平台,使用它的API可以轻松搞定实时推送!个人感觉goeasy推送更稳定,推送速度快,代码简单易懂上手快浏览器兼容性:GoEasy推送支持websocket 和polling两种连接方式,从而可以支持IE6及其以上的所有版本,同时还支持其它浏览器诸如Firefox, Chrome, Safari 等等.支 持不同的开发语言:   GoEasy推送提供了Restful API接口,无论你的后台程序用的是哪种语言都可以通过RestfulAPI来实现后台实时推送.