server.go 源码阅读


package pingo

import (
    "bufio"
    "bytes"
    "flag"
    "fmt"
    "io"
    "math/rand"
    "net"
    "net/rpc"
    "os"
    "path"
    "path/filepath"
    "reflect"
    "strings"
    "time"
)

// Register a new object this plugin exports. The object must be
// an exported symbol and obey all rules an object in the standard
// "rpc" module has to obey.
//注册的一个对象作为可以导出对象。这个对象必须符合RPC规则
//               - exported method of exported type
//    - two arguments, both of exported type
//    - the second argument is a pointer
//    - one return value, of type error
// Register will panic if called after Run.
//如果在运行中  注册对象 就会报错 
func Register(obj interface{}) {
    if defaultServer.running {
        panic("Do not call Register after Run")
    }
    defaultServer.register(obj)//注册可导出的对象
}

// Run will start all the necessary steps to make the plugin available.
//调用Run函数是必须的来保证插件的可用性
func Run() error {
    if !flag.Parsed() {//判断参数是否解析
        flag.Parse()//解析当前参数
    }
    return defaultServer.run()
}

// Internal object for plugin control
type PingoRpc struct{}

// Default constructor for interal object. Do not call manually.
func NewPingoRpc() *PingoRpc {
    return &PingoRpc{}
}

// Internal RPC call to shut down a plugin. Do not call manually.
func (s *PingoRpc) Exit(status int, unused *int) error {
    os.Exit(status)
    return nil
}

type config struct {
    proto   string
    addr    string
    prefix  string
    unixdir string
}

func makeConfig() *config {
    c := &config{}
    flag.StringVar(&c.proto, "pingo:proto", "unix", "Protocol to use: unix or tcp")
    flag.StringVar(&c.unixdir, "pingo:unixdir", "", "Alternative directory for unix socket")
    flag.StringVar(&c.prefix, "pingo:prefix", "pingo", "Prefix to output lines")
    return c
}

type rpcServer struct {
    *rpc.Server
    secret  string
    objs    []string
    conf    *config
    running bool
}

func newRpcServer() *rpcServer {
    rand.Seed(time.Now().UTC().UnixNano())
    r := &rpcServer{
        Server: rpc.NewServer(),
        secret: randstr(64),
        objs:   make([]string, 0),
        conf:   makeConfig(), // conf remains fixed after this point
    }
    r.register(&PingoRpc{})
    return r
}

var defaultServer = newRpcServer()

type bufReadWriteCloser struct {
    *bufio.Reader
    r io.ReadWriteCloser
}

func newBufReadWriteCloser(r io.ReadWriteCloser) *bufReadWriteCloser {
    return &bufReadWriteCloser{Reader: bufio.NewReader(r), r: r}
}

func (b *bufReadWriteCloser) Write(data []byte) (int, error) {
    return b.r.Write(data)
}

func (b *bufReadWriteCloser) Close() error {
    return b.r.Close()
}

func readHeaders(brwc *bufReadWriteCloser) ([]byte, error) {
    var buf bytes.Buffer
    var headerEnd bool

    for {
        b, err := brwc.ReadByte()
        if err != nil {
            return []byte(""), err
        }

        buf.WriteByte(b)

        if b == ‘\n‘ {
            if headerEnd {
                break
            }
            headerEnd = true
        } else {
            headerEnd = false
        }
    }

    return buf.Bytes(), nil
}

func parseHeaders(brwc *bufReadWriteCloser, m map[string]string) error {
    headers, err := readHeaders(brwc)
    if err != nil {
        return err
    }

    r := bytes.NewReader(headers)
    scanner := bufio.NewScanner(r)

    for scanner.Scan() {
        parts := strings.SplitN(scanner.Text(), ": ", 2)
        if parts[0] == "" {
            continue
        }
        m[parts[0]] = parts[1]
    }

    return nil
}

func (r *rpcServer) authConn(token string) bool {
    if token != "" && token == r.secret {
        return true
    }
    return false
}

func (r *rpcServer) serveConn(conn io.ReadWriteCloser, h meta) {
    bconn := newBufReadWriteCloser(conn)
    defer bconn.Close()

    headers := make(map[string]string)
    if err := parseHeaders(bconn, headers); err != nil {
        h.output("error", err.Error())
        return
    }

    if r.authConn(headers["Auth-Token"]) {
        r.Server.ServeConn(bconn)
    }

    return
}

func (r *rpcServer) register(obj interface{}) {
    element := reflect.TypeOf(obj).Elem() //获取对象元素类型
    r.objs = append(r.objs, element.Name()) //添加到objs中  获取元素类型名称
    r.Server.Register(obj)//调用rpc 的注册方法
}

type connection interface {
    addr() string
    retries() int
}

type tcp int

func (t *tcp) addr() string {
    if *t < 1024 {
        // Only use unprivileged ports
        *t = 1023
    }

    *t = *t + 1
    return fmt.Sprintf("127.0.0.1:%d", *t)
}

func (t *tcp) retries() int {
    return 500
}

type unix string

func (u *unix) addr() string {
    name := randstr(8)
    if *u != "" {
        name = filepath.FromSlash(path.Join(string(*u), name))
    }
    return name
}

func (u *unix) retries() int {
    return 4
}
//
func (r *rpcServer) run() error {
    var conn connection
    var err error
    var listener net.Listener

    r.running = true  //设置默认运行状态为true

    h := meta(r.conf.prefix)//设置自定数据类型  参数值为config 的前缀
    h.output("objects", strings.Join(r.objs, ", "))//方法参数 key 为常量 objects  参数 val 值为注册服务对象元素名称  。使用,链接的字符串
       //协议判断
    switch r.conf.proto {
    case "tcp":
        conn = new(tcp)
    default:
        r.conf.proto = "unix"
        conn = new(unix)
    }
        //获取尝试连接500次 但是只要有一次执行成功 立刻返回
    for i := 0; i < conn.retries(); i++ {
        r.conf.addr = conn.addr()
        listener, err = net.Listen(r.conf.proto, r.conf.addr)
        if err == nil {
            break
        }
    }

    if err != nil {
        h.output("fatal", fmt.Sprintf("%s: Could not connect in %d attemps, using %s protocol", errorCodeConnFailed, conn.retries(), r.conf.proto))
        return err
    }

    h.output("auth-token", defaultServer.secret)
    h.output("ready", fmt.Sprintf("proto=%s addr=%s", r.conf.proto, r.conf.addr))
    for {
        var conn net.Conn
        conn, err = listener.Accept()
        if err != nil {
            h.output("fatal", fmt.Sprintf("err-http-serve: %s", err.Error()))
            continue
        }
        go r.serveConn(conn, h)
    }
}
				
时间: 2024-10-02 04:26:35

server.go 源码阅读的相关文章

CI框架源码阅读笔记3 全局函数Common.php

从本篇开始,将深入CI框架的内部,一步步去探索这个框架的实现.结构和设计. Common.php文件定义了一系列的全局函数(一般来说,全局函数具有最高的加载优先权,因此大多数的框架中BootStrap引导文件都会最先引入全局函数,以便于之后的处理工作). 打开Common.php中,第一行代码就非常诡异: if ( ! defined('BASEPATH')) exit('No direct script access allowed'); 上一篇(CI框架源码阅读笔记2 一切的入口 index

Netty源码阅读(一) ServerBootstrap启动

Netty源码阅读(一) ServerBootstrap启动 转自我的Github Netty是由JBOSS提供的一个java开源框架.Netty提供异步的.事件驱动的网络应用程序框架和工具,用以快速开发高性能.高可靠性的网络服务器和客户端程序.本文讲会对Netty服务启动的过程进行分析,主要关注启动的调用过程,从这里面进一步理解Netty的线程模型,以及Reactor模式. 这是我画的一个Netty启动过程中使用到的主要的类的概要类图,当然是用到的类比这个多得多,而且我也忽略了各个类的继承关系

CI框架源码阅读笔记1 - 环境准备、基本术语和框架流程

最开始使用CI框架的时候,就打算写一个CI源码阅读的笔记系列,可惜虎头蛇尾,一直没有行动.最近项目少,总算是有了一些时间去写一些东西.于是准备将之前的一些笔记和经验记录下来,一方面权作备忘,另一方面时刻提醒自己:借鉴和学习才有出路,忘记过去意味着背叛! 基本术语说明 在本文开始之前,有必要对文中反复出现的术语做一个简单的说明,如果你对这一部分已经熟谙,完全可以略过.本文中反复出现和提及的术语包括: 前端控制器(Front Controller): 用于集中控制用户的所有请求的组件,将用户的请求发

The Open Web Interface for .NET (OWIN) 源码阅读

katana开源许久,网上仍未搜索到对其源码的阅读总结,本人在工作中正好遇到数据处理流程框架设计,想来跟服务器处理request和response差不多,遂起了阅读katana源码,并借鉴其设计的想法,磕磕碰碰,困难重重,所幸有一些收获,与大家交流交流. katana源码 https://katanaproject.codeplex.com/ owin官网 http://owin.org/ 两个最重要的数据结构 1 Environment IDictionary<string, object>

commons-io源码阅读心得

FileCleanTracker: 开启一个守护线程在后台默默的删除文件. 1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ow

Nutch源码阅读进程4---parseSegment

前面依次看了nutch的准备工作inject和generate部分,抓取的fetch部分的代码,趁热打铁,我们下面来一睹parse即页面解析部分的代码,这块代码主要是集中在ParseSegment类里面,Let‘s go~~~ 上期回顾:上回主要讲的是nutch的fetch部分的功能代码实现,主要是先将segments目录下的指定文件夹作为输入,读取里面将要爬取的url信息存入爬取队列,再根据用户输入的爬取的线程个数thread决定消费者的个数,线程安全地取出爬取队列里的url,然后在执行爬取页

Android系统源码阅读(12):InputChannel的注册过程

Android系统源码阅读(12):InputChannel的注册过程 请对照AOSP版本:6.0.1_r50. InputManager可以获得输入事件并分发,Activity需要处理这些输入事件.那么,这两者之间如何建立的连接呢?这就需要InputChannel作为桥梁建立两者之间的通道. 1. ViewRootImpl创建InputChannel 这里ViewRoot类已经消失了,由ViewRootImpl替代.Activity在创建时会将自己的DecorView设置给对应的ViewRoo

Nutch源码阅读进程3---fetch

走了一遍Inject和Generate,基本了解了nutch在执行爬取前的一些前期预热工作,包括url的过滤.规则化.分值计算以及其与mapreduce的联系紧密性等,自我感觉nutch的整个流程是很缜密的,起码从前面两个过程看是这样的. 前期回顾:上一期主要是讲解了nutch的第二个环节Generate,该环节主要完成获取将要抓取的url列表,并写入到segments目录下,其中一些细节的处理包括每个job提交前的输入输出以及执行的map和reducer类具体做了那些工作都可以参考上一篇.接下

openjdk源码阅读导航

转自:http://rednaxelafx.iteye.com/blog/1549577 这是链接帖.主体内容都在各链接中. 怕放草稿箱里过会儿又坑掉了,总之先发出来再说…回头再慢慢补充内容. 先把ItEye网站上的信息聚合起来. 近期提问帖: 阅读openjdk源代码 如何来看OpenJDK源码 如何分析OpenJDK中JVM的实现 一个个回复太麻烦了,合在一块儿写这么一篇. ================ 前言 我的VM帖的索引 高级语言虚拟机(HLLVM)群组 新浪微群“JVM源码阅读活