lookup.go

package nsqd

import (
    "bytes"
    "encoding/json"
    "net"
    "os"
    "strconv"
    "time"

    "github.com/nsqio/go-nsq"
    "github.com/nsqio/nsq/internal/version"
)

func connectCallback(n *NSQD, hostname string, syncTopicChan chan *lookupPeer) func(*lookupPeer) {
    return func(lp *lookupPeer) {
        ci := make(map[string]interface{})
        ci["version"] = version.Binary
        ci["tcp_port"] = n.RealTCPAddr().Port
        ci["http_port"] = n.RealHTTPAddr().Port
        ci["hostname"] = hostname
        ci["broadcast_address"] = n.getOpts().BroadcastAddress

        cmd, err := nsq.Identify(ci)
        if err != nil {
            lp.Close()
            return
        }
        resp, err := lp.Command(cmd)
        if err != nil {
            n.logf("LOOKUPD(%s): ERROR %s - %s", lp, cmd, err)
        } else if bytes.Equal(resp, []byte("E_INVALID")) {
            n.logf("LOOKUPD(%s): lookupd returned %s", lp, resp)
        } else {
            err = json.Unmarshal(resp, &lp.Info)
            if err != nil {
                n.logf("LOOKUPD(%s): ERROR parsing response - %s", lp, resp)
            } else {
                n.logf("LOOKUPD(%s): peer info %+v", lp, lp.Info)
            }
        }

        go func() {
            syncTopicChan <- lp
        }()
    }
}

func (n *NSQD) lookupLoop() {
    var lookupPeers []*lookupPeer
    var lookupAddrs []string
    syncTopicChan := make(chan *lookupPeer)
    connect := true

    hostname, err := os.Hostname()
    if err != nil {
        n.logf("FATAL: failed to get hostname - %s", err)
        os.Exit(1)
    }

    // for announcements, lookupd determines the host automatically
    ticker := time.Tick(15 * time.Second)
    for {
        if connect {
            for _, host := range n.getOpts().NSQLookupdTCPAddresses {
                if in(host, lookupAddrs) {
                    continue
                }
                n.logf("LOOKUP(%s): adding peer", host)
                lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.getOpts().Logger,
                    connectCallback(n, hostname, syncTopicChan))
                lookupPeer.Command(nil) // start the connection
                lookupPeers = append(lookupPeers, lookupPeer)
                lookupAddrs = append(lookupAddrs, host)
            }
            n.lookupPeers.Store(lookupPeers)
            connect = false
        }

        select {
        case <-ticker:
            // send a heartbeat and read a response (read detects closed conns)
            for _, lookupPeer := range lookupPeers {
                n.logf("LOOKUPD(%s): sending heartbeat", lookupPeer)
                cmd := nsq.Ping()
                _, err := lookupPeer.Command(cmd)
                if err != nil {
                    n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err)
                }
            }
        case val := <-n.notifyChan:
            var cmd *nsq.Command
            var branch string

            switch val.(type) {
            case *Channel:
                // notify all nsqlookupds that a new channel exists, or that it‘s removed
                branch = "channel"
                channel := val.(*Channel)
                if channel.Exiting() == true {
                    cmd = nsq.UnRegister(channel.topicName, channel.name)
                } else {
                    cmd = nsq.Register(channel.topicName, channel.name)
                }
            case *Topic:
                // notify all nsqlookupds that a new topic exists, or that it‘s removed
                branch = "topic"
                topic := val.(*Topic)
                if topic.Exiting() == true {
                    cmd = nsq.UnRegister(topic.name, "")
                } else {
                    cmd = nsq.Register(topic.name, "")
                }
            }

            for _, lookupPeer := range lookupPeers {
                n.logf("LOOKUPD(%s): %s %s", lookupPeer, branch, cmd)
                _, err := lookupPeer.Command(cmd)
                if err != nil {
                    n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err)
                }
            }
        case lookupPeer := <-syncTopicChan:
            var commands []*nsq.Command
            // build all the commands first so we exit the lock(s) as fast as possible
            n.RLock()
            for _, topic := range n.topicMap {
                topic.RLock()
                if len(topic.channelMap) == 0 {
                    commands = append(commands, nsq.Register(topic.name, ""))
                } else {
                    for _, channel := range topic.channelMap {
                        commands = append(commands, nsq.Register(channel.topicName, channel.name))
                    }
                }
                topic.RUnlock()
            }
            n.RUnlock()

            for _, cmd := range commands {
                n.logf("LOOKUPD(%s): %s", lookupPeer, cmd)
                _, err := lookupPeer.Command(cmd)
                if err != nil {
                    n.logf("LOOKUPD(%s): ERROR %s - %s", lookupPeer, cmd, err)
                    break
                }
            }
        case <-n.optsNotificationChan:
            var tmpPeers []*lookupPeer
            var tmpAddrs []string
            for _, lp := range lookupPeers {
                if in(lp.addr, n.getOpts().NSQLookupdTCPAddresses) {
                    tmpPeers = append(tmpPeers, lp)
                    tmpAddrs = append(tmpAddrs, lp.addr)
                    continue
                }
                n.logf("LOOKUP(%s): removing peer", lp)
                lp.Close()
            }
            lookupPeers = tmpPeers
            lookupAddrs = tmpAddrs
            connect = true
        case <-n.exitChan:
            goto exit
        }
    }

exit:
    n.logf("LOOKUP: closing")
}

func in(s string, lst []string) bool {
    for _, v := range lst {
        if s == v {
            return true
        }
    }
    return false
}

func (n *NSQD) lookupdHTTPAddrs() []string {
    var lookupHTTPAddrs []string
    lookupPeers := n.lookupPeers.Load()
    if lookupPeers == nil {
        return nil
    }
    for _, lp := range lookupPeers.([]*lookupPeer) {
        if len(lp.Info.BroadcastAddress) <= 0 {
            continue
        }
        addr := net.JoinHostPort(lp.Info.BroadcastAddress, strconv.Itoa(lp.Info.HTTPPort))
        lookupHTTPAddrs = append(lookupHTTPAddrs, addr)
    }
    return lookupHTTPAddrs
}
				
时间: 2024-11-02 20:13:55

lookup.go的相关文章

ORACLE Index Lookup索引访问路径总结

在ORACLE中,索引访问/查找(Index Lookup)路径有五种方式,分别为INDEX UNIQUE SCAN.INDEX RANGE SCAN.INDEX FULL SCAN.INDEX FAST FULL SCAN .INDEX SKIP SCAN.下面通过一些案例介绍.总结一下这五种索引访问路径.本文是总结这方面的知识点,所以文中一些地方参考.引用了参考资料中的部分内容.详细.具体资料可以参考官方资料Index Scans 索引唯一扫描(INDEX UNIQUE SCAN)   索引

CRM JS 设置lookup字段 setSimpleLookupValue

function setSimpleLookupValue(LookupId, Type, Id, Name) { /// <summary> /// Sets the value for lookup attributes that accept only a single entity reference. /// Use of this function to set lookups that allow for multiple references, /// a.k.a 'party

关于heroku的lookup api.heroku.com on 127.0.1.1:53问题解决

在使用heroku login的时候,出现的Post https://api.heroku.com/login: dial tcp: lookup api.heroku.com on 127.0.1.1:53: cannot unmarshal DNS message 此问题应该如何解决呢? 1.十有八九是DNS的锅,如果8.8.8.8和114.114.114.114都不行的话, 请打开你的手机热点,电脑使用手机的移动网络来链接并且login,登录成功之后再换回原本网络,即可解决

C#高级编程五十四天----Lookup类和有序字典

Lookup类 Dictionary<Tkey,TValue>只为每个键支持一个值.新类Lookup<Tkey,TValue>是.NET3.5中新增的,它类似与Dictionary<Tkey,TElement>,但把键映射带一个值集上.这个类在程序及System.Core中实现,用System,Linq命名空间定义. Lookup<Tkey,TElement>的方法和属性如下表: 属性名或者方法名 说明 Count 属性Count返回集合中的元素个数 Ite

SSIS: 使用Lookup 和 Cache transformation 进行数据匹配简单介绍

本文将讲解Cache transformation的使用方式,并且用Lookup transformation进行匹配. 背景 如下图,我们的产品目标表中有些有尺寸信息有些没有.我们需要用Cache组件获取缓存信息.并且用Lookup组件进行匹配. 实际操作 1.建立以下工作流 Flat File 内容如下 打开 Cache组件,点击Edit->Columns . 我们可以看到从管道传递过来的数据. 如果 Index Position 是0 .那么表示不用lookup. 如果要把管道数据保存到文

crm2011创建Lookup类型的字段

在crm2011里面,创建lookup类型的字段不能直接创建,需要通过创建关系来创建,下面给出一个事例: using System; using Microsoft.Xrm.Sdk; using Microsoft.Xrm.Sdk.Messages; using Microsoft.Xrm.Sdk.Metadata; using Microsoft.Crm.Sdk.Messages; /// <summary> /// 创建Lookup字段 /// </summary> publi

eclipse+tomcat+maven debug的时候总是出现source not found /Edit lookup path...的问题解决方案

eclipse+tomcat+maven debug的时候总是出现source not found /Edit  lookup path...的问题解决方案 这个问题纠结好久好久.... 问题出现的环境 IED:eclipse 包管理:maven 容器:tomcat 项目debug struts中的源码 问题描述 编写好代码之后,想着跟踪一下源码看看,可是总是出现 按照以往经验,直接点击[edit.....]关联上本工程之后就可以debug.在完成关联之后,会出现下面的情况 突然出现断点指向的源

sed &amp;&amp; awk lookup 解答

[email protected]:/home/koyaku# vim lookup 2 3 #0 4 BEGIN { FS = " "; OFS = " " 5 #prompt user 6 printf("Enter a glossary term:") 7 } 8 9 #1 read local file named glossary 2 3 #0 4 BEGIN { FS = " "; OFS = " &qu

python属性查找(attribute lookup)

在Python中,属性查找(attribute lookup)是比较复杂的,特别是涉及到描述符descriptor的时候. 在上一文章末尾,给出了一段代码,就涉及到descriptor与attribute lookup的问题.而get系列函数(__get__, __getattr__, __getattribute__) 也很容易搞晕,本文就这些问题简单总结一下. 首先,我们知道: python中一切都是对象,"everything is object",包括类,类的实例,数字,模块

SSIS: Lookup组件高级用法,生成推断成员(inferred member)

将数据导入事实表如果无法匹配维度表的记录一般有两种处理方式. 一是将不匹配记录输出到一个表中待后续处理,然后重新导入.二是先生成维度Key,后续再完善维度key,本文指导各位使用第二种方式. 背景 比如下图StoreID为1的经销商不存在于我们经销商维度表中,我们现在要使用lookup组件进行匹配,并生成维度key. 操作步骤 1. 先添加一个派生列组件,将StoreID转为字符,等会儿生成键值会用到. 2. 先进行匹配一次,然后把无法匹配到的记录传到下一个Lookup 组件  Insert