paho.mqtt.golang--option.go 源码解析

broker: 指 mqtt 服务器

package mqtt

import (
	"crypto/tls"
	"net/http"
	"net/url"
	"regexp"
	"strings"
	"time"
)

// 允许在重新连接之前更新用户名和密码。函数应该返回当前的用户名和密码。
type CredentialsProvider func() (username string, password string)

// MessageHandler 是一种回调类型,
// 可以设置为在发布到订阅了客户端的主题的消息到达时执行。
type MessageHandler func(Client, Message)

// ConnectionLostHandler是一种回调类型,可以设置为在与MQTT代理意外断开连接时执行。
// 调用Disconnect或ForceDisconnect导致的断开连接不会导致执行OnConnectionLost回调。
type ConnectionLostHandler func(Client, error)

// OnConnectHandler是一个回调,当客户端状态从未连接/已断开变为已连接时,将调用此回调。
// 即 初始连接时和重新连接时执行此函数
type OnConnectHandler func(Client)

// 回调函数
// 初始连接丢失后,在重新连接之前调用
type ReconnectHandler func(Client, *ClientOptions)

// ClientOptions contains configurable options for an Client.
type ClientOptions struct {
	Servers                 []*url.URL    // mqtt服务器(broker)的地址
	ClientID                string  // 连接 broker 的客户端的唯一标识符
	Username                string  // 连接 broker 的用户名, 如果 broker 开启了用户名和密码验证时使用
	Password                string // 连接 broker 的密码
	CredentialsProvider     CredentialsProvider //  自定义function,  返回值为: (username, password)
	CleanSession            bool // 是否开启下线后清楚 session
	Order                   bool // 是否顺序发送, true: 同步顺序发送消息, false: 异步发送消息. 发送的消息有可能乱序
	WillEnabled             bool
	WillTopic               string // 订阅的主题
	WillPayload             []byte
	WillQos                 byte  //消息类型 0, 1, 2
	WillRetained            bool  //是否保留消息
	ProtocolVersion         uint  // Mqtt 版本号 值=3 使用MQTT 3.1, 值=4使用 MQTT 3.1.1
	protocolVersionExplicit bool //是否设定了 mqtt 版本
	TLSConfig               *tls.Config  // tls 加密配置
	KeepAlive               int64  //向代理发送PING请求之前客户端应等待的时间(以秒为单位)
	PingTimeout             time.Duration //向代理发送PING请求之后客户端确定连接丢失之前等待的时间(以秒为单位)。默认值为10秒。
	ConnectTimeout          time.Duration //在尝试打开与MQTT服务器的连接之前客户端超时和错误尝试之前等待的时间。持续时间为0永远不会超时。默认30秒。当前仅可用于TCP / TLS连接。
	MaxReconnectInterval    time.Duration //连接断开后两次尝试重新连接之间等待的最长时间
	AutoReconnect           bool  //SetAutoReconnect设置在丢失连接时是否应使用自动重新连接逻辑,即使禁用该连接仍会调用ConnectionLostHandler
	ConnectRetryInterval    time.Duration //如果ConnectRetry为TRUE,则在最初连接时两次连接尝试之间要等待的时间
        //配置在失败的情况下connect函数是否将自动重试连接(当为true时,Connect函数返回的令牌在连接建立或被取消之前不会完成)
        // 如果ConnectRetry为true,则应在OnConnect处理程序中请求订阅
        //将其设置为TRUE允许在建立连接之前发布消息
	ConnectRetry            bool
	Store                   Store // 提供消息持久性的实现
	DefaultPublishHandler   MessageHandler
	OnConnect               OnConnectHandler //可以设置为在发布到订阅了客户端的主题的消息到达时执行。
	OnConnectionLost        ConnectionLostHandler //与MQTT代理意外断开连接时执行
	OnReconnecting          ReconnectHandler //初始连接丢失后,在重新连接之前调用
	WriteTimeout            time.Duration //发布消息阻塞时间, 0: 永远不超时
	MessageChannelDepth     uint // 作废.
	ResumeSubs              bool  // 将在连接时启用存储的(取消)订阅消息的恢复,但如果CleanSession为false则不重新连接。否则,这些消息将被丢弃。
	HTTPHeaders             http.Header //websocket 使用
}

// NewClientOptions will create a new ClientClientOptions type with some
// default values.
//   Port: 1883
//   CleanSession: True
//   Order: True
//   KeepAlive: 30 (seconds)
//   ConnectTimeout: 30 (seconds)
//   MaxReconnectInterval 10 (minutes)
//   AutoReconnect: True
func NewClientOptions() *ClientOptions {
	o := &ClientOptions{
		Servers:                 nil,
		ClientID:                "",
		Username:                "",
		Password:                "",
		CleanSession:            true,
		Order:                   true,
		WillEnabled:             false,
		WillTopic:               "",
		WillPayload:             nil,
		WillQos:                 0,
		WillRetained:            false,
		ProtocolVersion:         0,
		protocolVersionExplicit: false,
		KeepAlive:               30,
		PingTimeout:             10 * time.Second,
		ConnectTimeout:          30 * time.Second,
		MaxReconnectInterval:    10 * time.Minute,
		AutoReconnect:           true,
		ConnectRetryInterval:    30 * time.Second,
		ConnectRetry:            false,
		Store:                   nil,
		OnConnect:               nil,
		OnConnectionLost:        DefaultConnectionLostHandler,
		WriteTimeout:            0, // 0 represents timeout disabled
		ResumeSubs:              false,
		HTTPHeaders:             make(map[string][]string),
	}
	return o
}

// AddBroker adds a broker URI to the list of brokers to be used. The format should be
// scheme://host:port
// Where "scheme" is one of "tcp", "ssl", or "ws", "host" is the ip-address (or hostname)
// and "port" is the port on which the broker is accepting connections.
//
// Default values for hostname is "127.0.0.1", for schema is "tcp://".
//
// An example broker URI would look like: tcp://foobar.com:1883
func (o *ClientOptions) AddBroker(server string) *ClientOptions {
	re := regexp.MustCompile(`%(25)?`)
	if len(server) > 0 && server[0] == ‘:‘ {
		server = "127.0.0.1" + server
	}
	if !strings.Contains(server, "://") {
		server = "tcp://" + server
	}
	server = re.ReplaceAllLiteralString(server, "%25")
	brokerURI, err := url.Parse(server)
	if err != nil {
		ERROR.Println(CLI, "Failed to parse %q broker address: %s", server, err)
		return o
	}
	o.Servers = append(o.Servers, brokerURI)
	return o
}

// SetResumeSubs will enable resuming of stored (un)subscribe messages when connecting
// but not reconnecting if CleanSession is false. Otherwise these messages are discarded.
func (o *ClientOptions) SetResumeSubs(resume bool) *ClientOptions {
	o.ResumeSubs = resume
	return o
}

// SetClientID will set the client id to be used by this client when
// connecting to the MQTT broker. According to the MQTT v3.1 specification,
// a client id must be no longer than 23 characters.
func (o *ClientOptions) SetClientID(id string) *ClientOptions {
	o.ClientID = id
	return o
}

// SetUsername will set the username to be used by this client when connecting
// to the MQTT broker. Note: without the use of SSL/TLS, this information will
// be sent in plaintext across the wire.
func (o *ClientOptions) SetUsername(u string) *ClientOptions {
	o.Username = u
	return o
}

// SetPassword will set the password to be used by this client when connecting
// to the MQTT broker. Note: without the use of SSL/TLS, this information will
// be sent in plaintext across the wire.
func (o *ClientOptions) SetPassword(p string) *ClientOptions {
	o.Password = p
	return o
}

// SetCredentialsProvider will set a method to be called by this client when
// connecting to the MQTT broker that provide the current username and password.
// Note: without the use of SSL/TLS, this information will be sent
// in plaintext across the wire.
func (o *ClientOptions) SetCredentialsProvider(p CredentialsProvider) *ClientOptions {
	o.CredentialsProvider = p
	return o
}

// SetCleanSession will set the "clean session" flag in the connect message
// when this client connects to an MQTT broker. By setting this flag, you are
// indicating that no messages saved by the broker for this client should be
// delivered. Any messages that were going to be sent by this client before
// diconnecting previously but didn‘t will not be sent upon connecting to the
// broker.
func (o *ClientOptions) SetCleanSession(clean bool) *ClientOptions {
	o.CleanSession = clean
	return o
}

// SetOrderMatters will set the message routing to guarantee order within
// each QoS level. By default, this value is true. If set to false,
// this flag indicates that messages can be delivered asynchronously
// from the client to the application and possibly arrive out of order.
func (o *ClientOptions) SetOrderMatters(order bool) *ClientOptions {
	o.Order = order
	return o
}

// SetTLSConfig will set an SSL/TLS configuration to be used when connecting
// to an MQTT broker. Please read the official Go documentation for more
// information.
func (o *ClientOptions) SetTLSConfig(t *tls.Config) *ClientOptions {
	o.TLSConfig = t
	return o
}

// SetStore will set the implementation of the Store interface
// used to provide message persistence in cases where QoS levels
// QoS_ONE or QoS_TWO are used. If no store is provided, then the
// client will use MemoryStore by default.
func (o *ClientOptions) SetStore(s Store) *ClientOptions {
	o.Store = s
	return o
}

// SetKeepAlive will set the amount of time (in seconds) that the client
// should wait before sending a PING request to the broker. This will
// allow the client to know that a connection has not been lost with the
// server.
func (o *ClientOptions) SetKeepAlive(k time.Duration) *ClientOptions {
	o.KeepAlive = int64(k / time.Second)
	return o
}

// SetPingTimeout will set the amount of time (in seconds) that the client
// will wait after sending a PING request to the broker, before deciding
// that the connection has been lost. Default is 10 seconds.
func (o *ClientOptions) SetPingTimeout(k time.Duration) *ClientOptions {
	o.PingTimeout = k
	return o
}

// SetProtocolVersion sets the MQTT version to be used to connect to the
// broker. Legitimate values are currently 3 - MQTT 3.1 or 4 - MQTT 3.1.1
func (o *ClientOptions) SetProtocolVersion(pv uint) *ClientOptions {
	if (pv >= 3 && pv <= 4) || (pv > 0x80) {
		o.ProtocolVersion = pv
		o.protocolVersionExplicit = true
	}
	return o
}

// UnsetWill will cause any set will message to be disregarded.
func (o *ClientOptions) UnsetWill() *ClientOptions {
	o.WillEnabled = false
	return o
}

// SetWill accepts a string will message to be set. When the client connects,
// it will give this will message to the broker, which will then publish the
// provided payload (the will) to any clients that are subscribed to the provided
// topic.
func (o *ClientOptions) SetWill(topic string, payload string, qos byte, retained bool) *ClientOptions {
	o.SetBinaryWill(topic, []byte(payload), qos, retained)
	return o
}

// SetBinaryWill accepts a []byte will message to be set. When the client connects,
// it will give this will message to the broker, which will then publish the
// provided payload (the will) to any clients that are subscribed to the provided
// topic.
func (o *ClientOptions) SetBinaryWill(topic string, payload []byte, qos byte, retained bool) *ClientOptions {
	o.WillEnabled = true
	o.WillTopic = topic
	o.WillPayload = payload
	o.WillQos = qos
	o.WillRetained = retained
	return o
}

// SetDefaultPublishHandler sets the MessageHandler that will be called when a message
// is received that does not match any known subscriptions.
func (o *ClientOptions) SetDefaultPublishHandler(defaultHandler MessageHandler) *ClientOptions {
	o.DefaultPublishHandler = defaultHandler
	return o
}

// SetOnConnectHandler sets the function to be called when the client is connected. Both
// at initial connection time and upon automatic reconnect.
func (o *ClientOptions) SetOnConnectHandler(onConn OnConnectHandler) *ClientOptions {
	o.OnConnect = onConn
	return o
}

// SetConnectionLostHandler will set the OnConnectionLost callback to be executed
// in the case where the client unexpectedly loses connection with the MQTT broker.
func (o *ClientOptions) SetConnectionLostHandler(onLost ConnectionLostHandler) *ClientOptions {
	o.OnConnectionLost = onLost
	return o
}

// SetReconnectingHandler sets the OnReconnecting callback to be executed prior
// to the client attempting a reconnect to the MQTT broker.
func (o *ClientOptions) SetReconnectingHandler(cb ReconnectHandler) *ClientOptions {
	o.OnReconnecting = cb
	return o
}

// SetWriteTimeout puts a limit on how long a mqtt publish should block until it unblocks with a
// timeout error. A duration of 0 never times out. Default 30 seconds
func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions {
	o.WriteTimeout = t
	return o
}

// SetConnectTimeout limits how long the client will wait when trying to open a connection
// to an MQTT server before timing out and erroring the attempt. A duration of 0 never times out.
// Default 30 seconds. Currently only operational on TCP/TLS connections.
func (o *ClientOptions) SetConnectTimeout(t time.Duration) *ClientOptions {
	o.ConnectTimeout = t
	return o
}

// SetMaxReconnectInterval sets the maximum time that will be waited between reconnection attempts
// when connection is lost
func (o *ClientOptions) SetMaxReconnectInterval(t time.Duration) *ClientOptions {
	o.MaxReconnectInterval = t
	return o
}

// SetAutoReconnect sets whether the automatic reconnection logic should be used
// when the connection is lost, even if disabled the ConnectionLostHandler is still
// called
func (o *ClientOptions) SetAutoReconnect(a bool) *ClientOptions {
	o.AutoReconnect = a
	return o
}

// SetConnectRetryInterval sets the time that will be waited between connection attempts
// when initially connecting if ConnectRetry is TRUE
func (o *ClientOptions) SetConnectRetryInterval(t time.Duration) *ClientOptions {
	o.ConnectRetryInterval = t
	return o
}

// SetConnectRetry sets whether the connect function will automatically retry the connection
// in the event of a failure (when true the token returned by the Connect function will
// not complete until the connection is up or it is cancelled)
// If ConnectRetry is true then subscriptions should be requested in OnConnect handler
// Setting this to TRUE permits mesages to be published before the connection is established
func (o *ClientOptions) SetConnectRetry(a bool) *ClientOptions {
	o.ConnectRetry = a
	return o
}

// SetMessageChannelDepth DEPRECATED The value set here no longer has any effect, this function
// remains so the API is not altered.
func (o *ClientOptions) SetMessageChannelDepth(s uint) *ClientOptions {
	o.MessageChannelDepth = s
	return o
}

// SetHTTPHeaders sets the additional HTTP headers that will be sent in the WebSocket
// opening handshake.
func (o *ClientOptions) SetHTTPHeaders(h http.Header) *ClientOptions {
	o.HTTPHeaders = h
	return o
}

原文地址:https://www.cnblogs.com/xuange306/p/12630601.html

时间: 2024-08-30 15:32:25

paho.mqtt.golang--option.go 源码解析的相关文章

Flume-ng源码解析之Channel组件

如果还没看过Flume-ng源码解析之启动流程,可以点击Flume-ng源码解析之启动流程 查看 1 接口介绍 组件的分析顺序是按照上一篇中启动顺序来分析的,首先是Channel,然后是Sink,最后是Source,在开始看组件源码之前我们先来看一下两个重要的接口,一个是LifecycleAware ,另一个是NamedComponent 1.1 LifecycleAware @[email protected] interface LifecycleAware {  public void s

GlusterFS源码解析 —— GlusterFS 配置

GlusterFS 源码安装方式:http://blog.csdn.net/wangyuling1234567890/article/details/25519261 1.GlusterFS的典型架构图 2.GlusterFS常用translators(中继) 2.1.1.    storage/posix type storage/posix storage/posix的作用是指定一个本地目录给GlusterFS内的一个卷使用. 配置例子: volume posix-example type

Bootstrap 源码解析

Bootstrap 源码解析 1.Bootstrap的作用域 2.Bootstrap的类定义 3.Bootstrap的插件定义 4.Bootstrap的事件代理 5.Bootstrap的对象数据缓存 6.Bootstrap的防冲突 7.作用域外如何使用Button类 8.Bootstrap的单元测试 Bootstrap的作用域 Bootstrap每个插件都定义在下面这段作用域代码中: 请看<IIFE>和<严格模式>编译环境. 在插件的作用域之外,全局范围执行代码的第一行,检测了jQ

【Android】IntentService &amp; HandlerThread源码解析

一.前言 在学习Service的时候,我们一定会知道IntentService:官方文档不止一次强调,Service本身是运行在主线程中的(详见:[Android]Service),而主线程中是不适合进行耗时任务的,因而官方文档叮嘱我们一定要在Service中另开线程进行耗时任务处理.IntentService正是为这个目的而诞生的一个优雅设计,让程序员不用再管理线程的开启和允许. 至于介绍HandlerThread,一方面是因为IntentService的实现中使用到了HandlerThrea

Netty5源码解析

Netty5源码解析 今天让我来总结下netty5的服务端代码. 服务端(ServerBootstrap) 示例代码如下: import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel

Django-Filter源码解析一

Django Filter源码解析 最近在看Django-FIlter项目的源码,学习一下别人的开发思想: 整体介绍 首先,我从其中一个测试用例作为入口,开始了debug之路,一点一点的断点,分析它的执行顺序,如图: ok,下面从代码的层面进行分析: url url(r'^books/$', FilterView.as_view(model=Book)), view函数,这里的实现方式应该是借鉴了Django中自带的ListView,其同样的继承了MultipleObjectTemplateRe

Log4Qt快速入门——Log4Qt日志格式化源码解析

Log4Qt快速入门--Log4Qt日志格式化源码解析 一.Layout 1.Layout简介 Log4Qt提供了多种Layout对象,用于格式化日志输出,指定日志级别.线程名称.Logger名称.日期时间等信息.Layout类是Log4Qt API中的抽象类.PatternLayout:根据一个模式字符串输出日志事件:SimpleLayout:输出日志事件的级别和消息:TTCCLayout:输出日志事件的时间.线程名称.Logger名称和嵌套的诊断上下文信息.PatternLayout和TTC

.NET Core实战项目之CMS 第三章 入门篇-源码解析配置文件及依赖注入

作者:依乐祝 原文链接:https://www.cnblogs.com/yilezhu/p/9998021.html 写在前面 上篇文章我给大家讲解了ASP.NET Core的概念及为什么使用它,接着带着你一步一步的配置了.NET Core的开发环境并创建了一个ASP.NET Core的mvc项目,同时又通过一个实战教你如何在页面显示一个Content的列表.不知道你有没有跟着敲下代码,千万不要做眼高手低的人哦.这篇文章我们就会设计一些复杂的概念了,因为要对ASP.NET Core的启动及运行原

netty服务端启动--ServerBootstrap源码解析

netty服务端启动--ServerBootstrap源码解析 前面的第一篇文章中,我以spark中的netty客户端的创建为切入点,分析了netty的客户端引导类Bootstrap的参数设置以及启动过程.显然,我们还有另一个重要的部分--服务端的初始化和启动过程没有探究,所以这一节,我们就来从源码层面详细分析一下netty的服务端引导类ServerBootstrap的启动过程. spark中netty服务端的创建 我们仍然以spark中对netty的使用为例,以此为源码分析的切入点,首先我们看

Spring Security 解析(七) —— Spring Security Oauth2 源码解析

Spring Security 解析(七) -- Spring Security Oauth2 源码解析 ??在学习Spring Cloud 时,遇到了授权服务oauth 相关内容时,总是一知半解,因此决定先把Spring Security .Spring Security Oauth2 等权限.认证相关的内容.原理及设计学习并整理一遍.本系列文章就是在学习的过程中加强印象和理解所撰写的,如有侵权请告知. 项目环境: JDK1.8 Spring boot 2.x Spring Security