一些tcp通讯代码

1,nginx-lua

需要设置nginx配置文件

    resolver 223.5.5.5 223.6.6.6;
    lua_package_path "/usr/local/nginx/conf/lua-resty-http/lib/?.lua;/usr/local/nginx/conf/lua-resty-string/lib/?.lua;;";
    init_worker_by_lua_file /usr/local/nginx/conf/init.lua;

代码

local sleepTime = 5
local remoteIp = "127.0.0.1"
local remotePort = 9527

local function getLocalClient()
    return "I am Client One" , nil
end

local function getSessionKey(client)
    local http = require("resty.http")
    local httpc = http.new()

    local res, err = httpc:request_uri("http://www.ciaos.com/service/token", {
        method = "POST",
        body = "param=" .. ngx.encode_base64(client),
        headers = {
                ["Content-Type"] = "application/x-www-form-urlencoded",
        }
    })
    if not res or res.status ~= 200 then
        return nil, false
    else
    local body = res.body
    local skey = ngx.decode_base64(body)
        return skey, nil
    end
end

local function encryptMessage(skey, msg)
    local aes = require "resty.aes"
    local str = require "resty.string"
    local aes_128_cbc_with_iv = assert(aes:new(skey, nil, aes.cipher(128,"cbc"), {iv="1234567890123456"}))
    local encrypted = ngx.encode_base64(aes_128_cbc_with_iv:encrypt(msg))
    return encrypted
end

local function decryptMessage(skey, msg)
    local aes = require "resty.aes"
    local str = require "resty.string"
    local aes_128_cbc_with_iv = assert(aes:new(skey, nil, aes.cipher(128,"cbc"), {iv="1234567890123456"}))
    local decrypted = aes_128_cbc_with_iv:decrypt(ngx.decode_base64(msg))
    return decrypted
end

local function modifySleeptime()
    sleepTime = sleepTime * 2
    if sleepTime > 600 then
        sleepTime = 5
    end
end

local function sendMessage(sock, skey, msg, isconn)
    enmsg = encryptMessage(skey, msg)
    if isconn then
        msg = msg .. " " .. enmsg
    else
        msg = enmsg
    end
    local len = string.len(msg)
    local res={0,0,0,0}
    local num = len
    local n = math.ceil(select(2,math.frexp(num))/8)
    for k=n,1,-1 do
        local mul=2^(8*(k-1))
        res[k]=math.floor(num/mul)
            num=num-res[k]*mul
    end

    sock:send(string.char(res[1]) .. string.char(res[2]) .. string.char(res[3]) .. string.char(res[4]) .. msg)
end

local function recvMessage(sock, skey)
    sock:settimeout(600000)
    local line, err, partial = sock:receive(4)
    if not line then
        return nil, false
    end

    local len = string.byte(line,1) + string.byte(line,2)*256 + string.byte(line,3)*256*256 + string.byte(line,4)*256*256*256
    local msg, err, partial = sock:receive(len)
    if not msg then
        return nil, false
    end
    msg = decryptMessage(skey, msg)
    return msg, nil
end

local function handle(sock, client, skey)
    sendMessage(sock, skey, client, true)
    while true do
        if ngx.worker.exiting() then
            return "exit"
        end

        local msg, err = recvMessage(sock, skey)
        if not msg then
            sock:close()
            return "receive timeout or connection closed"
        else
            msg = string.gsub(msg, "^%s*(.-)%s*$", "%1")

            local http = require("resty.http")
            local httpc = http.new()
            local res, err = httpc:request_uri("http://inner.ciaos.net"..msg, {
                method = "GET",
                headers = {
                        ["Content-Type"] = "application/x-www-form-urlencoded",
                }
            })
            local result
            if not res then
                result = err
            else
                result = res.body
            end
            sendMessage(sock, skey, result, false)
        end
    end
end

local function loop()
    local client, err = getLocalClient()
    if not client then
        ngx.timer.at(sleepTime, loop)
        modifySleeptime()
        return
    end
    local skey, err = getSessionKey(client)
    if not skey then
        ngx.timer.at(sleepTime, loop)
        modifySleeptime()
        return
    end

        while true do
        if ngx.worker.exiting() then
            break
        end
            local sock = ngx.socket.tcp()
            local ok, err = sock:connect(remoteIp, remotePort)
            if not ok then
                ngx.timer.at(sleepTime, loop)
                modifySleeptime()
                return
            else
            local err = handle(sock, client, skey)
            if err then
                ngx.timer.at(sleepTime, loop)
                modifySleeptime()
                break
            end
        end
    end
end

ngx.timer.at(3, loop)

golang(需要依赖gotcp项目)

代码

// server/server.go

package main

import (
    "encoding/binary"
    "fmt"
    "log"
    "net"
    "os"
    "os/signal"
    "runtime"
    "syscall"
    "time"

    "auth"
    "gotcp"
    "proto"
)

var clientMap map[uint32]*gotcp.Conn

type Callback struct{}

func (this *Callback) OnConnect(c *gotcp.Conn) bool {
    addr := c.GetRawConn().RemoteAddr()
    c.PutExtraData(addr)
    fmt.Println("OnConnect: ", addr)
    return true
}

func (this *Callback) OnMessage(c *gotcp.Conn, p gotcp.Packet) bool {
    rcPacket := p.(*proto.RcPacket)

    var msgtype string
    if c.GetFirstPackageFlag() {
        msgtype = "auth"

        c.SetFirstPackageFlag()
        uclientid, err := auth.GetClientId(rcPacket.GetBody())
        if err != nil {
            fmt.Printf("OnMessage:  %v MsgType[%v] MsgLen[%v] MsgBody[%v] AuthError[%v]\n", c.GetRawConn().RemoteAddr(), msgtype, rcPacket.GetLength(), string(rcPacket.GetBody()), err.Error())
            c.Close()
            return false
        }
        c.SetClientId(uclientid)
        clientMap[uclientid] = c
    } else {
        msgtype = "resp"

        c.GetRelateConn().AsyncWritePacket(proto.NewRcPacket(rcPacket.Serialize(), true), time.Second)
        c.Cmutex.Unlock()
    }

    fmt.Printf("OnMessage:  %v ClientId[%v] MsgType[%v] MsgLen[%v] MsgBody[%v]\n", c.GetRawConn().RemoteAddr(), c.GetClientId(), msgtype, rcPacket.GetLength(), string(rcPacket.GetBody()))

    return true
}

func (this *Callback) OnClose(c *gotcp.Conn) {
    fmt.Println("OnClose:", c.GetExtraData())
    if c.GetRelateConn() != nil {
        c.GetRelateConn().Close()
    }
    delete(clientMap, c.GetClientId())
}

type InnerCallback struct{}

func (this *InnerCallback) OnConnect(c *gotcp.Conn) bool {
    addr := c.GetRawConn().RemoteAddr()
    c.PutExtraData(addr)
    fmt.Println("Inner OnConnect:", addr)
    return true
}

func (this *InnerCallback) OnMessage(c *gotcp.Conn, p gotcp.Packet) bool {
    rcPacket := p.(*proto.RcPacket)

    clientid := binary.LittleEndian.Uint32(rcPacket.GetBody()[0:4])
    fmt.Printf("OnMessage:  %v ClientId[%v] MsgType[%v] MsgLen[%v] MsgBody[%v]\n", c.GetRawConn().RemoteAddr(), clientid, "req", rcPacket.GetLength(), string(rcPacket.GetBody()))
    if conn, ok := clientMap[clientid]; ok {
        conn.Cmutex.Lock()
        conn.SetRelateConn(c)
        conn.AsyncWritePacket(proto.NewRcPacket(rcPacket.GetBody()[4:], false), time.Second)
    } else {
        c.Close()
        return false
    }
    return true
}

func (this *InnerCallback) OnClose(c *gotcp.Conn) {
    fmt.Println("Inner OnClose:", c.GetExtraData())
}

func outer() *gotcp.Server {
    // creates a tcp listener
    tcpAddr, err := net.ResolveTCPAddr("tcp4", ":9527")
    checkError(err)
    listener, err := net.ListenTCP("tcp", tcpAddr)
    checkError(err)

    // creates a server
    config := &gotcp.Config{
        PacketSendChanLimit:    5,
        PacketReceiveChanLimit: 5,
    }
    srv := gotcp.NewServer(config, &Callback{}, &proto.RcProtocol{})

    // starts service
    go srv.Start(listener, time.Second)
    fmt.Println("listening:", listener.Addr())

    return srv
}

func inner() *gotcp.Server {
    // creates a tcp listener
    tcpAddr, err := net.ResolveTCPAddr("tcp4", "127.0.0.1:9528")
    checkError(err)
    listener, err := net.ListenTCP("tcp", tcpAddr)
    checkError(err)

    // creates a server
    config := &gotcp.Config{
        PacketSendChanLimit:    20,
        PacketReceiveChanLimit: 20,
    }
    srv := gotcp.NewServer(config, &InnerCallback{}, &proto.RcProtocol{})

    // starts service
    go srv.Start(listener, time.Second)
    fmt.Println("listening:", listener.Addr())

    return srv
}

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())

    clientMap = make(map[uint32]*gotcp.Conn)

    innerSvr := inner()
    outerSvr := outer()

    // catchs system signal
    chSig := make(chan os.Signal)
    signal.Notify(chSig, syscall.SIGINT, syscall.SIGTERM)
    fmt.Println("Signal: ", <-chSig)

    // stops service
    innerSvr.Stop()
    outerSvr.Stop()
}

func checkError(err error) {
    if err != nil {
        log.Fatal(err)
    }
}
// auth/auth.go

package auth

import (
    "io/ioutil"
    "net/http"
    "strconv"
    "strings"
)

func GetClientId(message []byte) (uint32, error) {
    res, err := http.Post("http://www.ciaos.com/service/auth", "application/x-www-form-urlencoded", strings.NewReader("token="+string(message)))
    if err != nil {
        return 0, err
    }
    result, err := ioutil.ReadAll(res.Body)
    res.Body.Close()
    if err != nil {
        return 0, err
    }
    clientid, err := strconv.Atoi(string(result))
    uclientid := uint32(clientid)
    return uclientid, nil
}

gotcp中conn.go文件修改如下

type Conn struct {
    srv               *Server
    conn              *net.TCPConn  // the raw connection
    extraData         interface{}   // to save extra data
    closeOnce         sync.Once     // close the conn, once, per instance
    closeFlag         int32         // close flag
    closeChan         chan struct{} // close chanel
    packetSendChan    chan Packet   // packet send chanel
    packetReceiveChan chan Packet   // packet receive chanel

    isFirstPackage bool       // first packet
    clientId       uint32     // remote client id
    relateConn     *Conn      // the relate controll connection
    Cmutex         sync.Mutex // mutex
}

php代码如下

<?php if ( ! defined(‘BASEPATH‘)) exit(‘No direct script access allowed‘);

class Service extends CI_Controller {

    private function genToken($clientid)
    {
        return "abcdefghijklmnop";
    }

    private function getClientId($client)
    {
        return 1002;
    }

    //Get Token
    public function token()
    {
        $param = $this->input->post("param");
        if($param)
        {
            $client = base64_decode($param);
            $clientid = self::getClientId($client);

            $token = self::genToken($clientid);

            $this->redis->select(1);
            $this->redis->setex($clientid, 1800, $token);
            echo base64_encode($token);
        }
        else{
            show_404();
        }
    }

    //Auth Token (From Inner Server)
    public function auth()
    {
        $token = $this->input->post("token");
        if(!$token){
            show_404();
            return;
        }
        $info = explode(‘ ‘,$token);

        $uid = self::getClientId($info[0]);

        $this->redis->select(1);
        $stoken = $this->redis->get($uid);
        if(is_null($stoken)){
            echo "false";
            return;
        }
        $message = substr($token, strlen($info[0])+1);
        $message = str_replace(‘ ‘,‘+‘,$message);
        $client = openssl_decrypt(base64_decode($message), ‘aes-128-cbc‘, $stoken, true, ‘1234567890123456‘);
        if($info[0] == $client){
            echo $uid;
        } else{
            echo false;
        }
    }

    //Controll Message
    public function console()
    {
        $id = $this->input->get("id");
        $command = $this->input->get("cmd");
        if(is_null($command) or $command == false or $id === false or is_null($id)){
            echo "invalid cmd";
            return;
        }
        $port = 9528;
        $ip = "127.0.0.1";
        $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
        if ($socket < 0) {
            echo "internal server error:1";
            return;
        }

        $result = socket_connect($socket, $ip, $port);
        if (!$result) {
            echo "internal server error:2";
            return;
        }

        $this->redis->select(1);
        $stoken = $this->redis->get($id);
        if(is_null($stoken)){
            echo "internal server error:3";
            return;
        }
        $message = openssl_encrypt($command,‘aes-128-cbc‘,$stoken, true,‘1234567890123456‘);
        $message = base64_encode($message);
        $len = strlen($message)+4;
        $bin_head = pack("ii", $len, $id);
        socket_write($socket, $bin_head, strlen($bin_head));
        $in = $message;
        if(!socket_write($socket, $in, strlen($in))) {
            echo "internal server error:4";
            return;
        }

        $out = socket_read($socket, 4);
        if($out && strlen($out)>0){
            $data = unpack("i", $out);
            $out = socket_read($socket, $data[1]);
            if($out){
                $message = openssl_decrypt(base64_decode($out), ‘aes-128-cbc‘, $stoken, true, ‘1234567890123456‘);
                echo $message;
                socket_close($socket);
                return;
            }
        }
        socket_close($socket);
        echo "internal server error:5";
    }
}

/* End of file welcome.php */
/* Location: ./application/controllers/welcome.php */
时间: 2024-08-29 08:28:12

一些tcp通讯代码的相关文章

ActiveMQ之 TCP通讯机制

ActiveMQ支持多种通讯协议TCP/UDP等,我们选取最常用的TCP来分析ActiveMQ的通讯机制.首先我们来明确一个概念:  客户(Client):消息的生产者.消费者对ActiveMQ来说都叫作客户.  消息中介(Message broker):接收消息并进行相关处理后分发给消息的消费者. 为了能清楚的描述出ActiveMQ的核心通讯机制,我们选择3个部分来进行说明,它们分别是建立链接.关闭链接.心跳. 一.Client跟activeMQ的TCP通讯的初始化过程分析如下:  (1) A

C#中的TCP通讯与UDP通讯

最近做了一个项目,主要是给Unity3D和实时数据库做通讯接口.虽然方案一直在变:从开始的UDP通讯变为TCP通讯,然后再变化为UDP通讯;然后通讯的对象又发生改变,由与数据库的驱动进行通讯(主动推送数据给驱动,数据库中数据发生变化把数据以UDP报文形式发送客户端)改为与实时数据库进行直接通讯(自己发送报文修改数据库中的数据,自己请求需要获取的数据并接收自己请求的数据):现在这个项目基本完结,由于这个过程既接触到了UDP又接触到了TCP,现在对其进行一番总结. 阅读目录 TCP通讯协议与UDP通

解决Linux 下server和client 通过TCP通讯:accept成功接收却报错的问题

今天在写简单的TCP通讯例子的时候,遇到了一个问题:server 和client能够连接成功,并且client也能够正常发送,但server就是接收不到,在网上搜索一番后,终于解决了问题.在这里整理如下: 大家要注意的是,一个server端可以连接多个client端,server端的accept()函数负责等待并接收client的连接请求,而且accept()函数将不同client端的sockfd作为返回值.为了保证接收到对应的client端数据,所以在client连接成功且使用recv()函数

&amp;#39;IOKING&amp;#39; TCP Transmission Server Engine (&amp;#39;云猴&amp;#39;&amp;#169;TCP通讯server引擎)(预告版)

关键词: IOKING IOCP TCP  Transmission Server Engine Lock Free Interlocked 云猴完毕portTCP通讯server引擎 无锁 原子锁(函数) 'IOKING' TCP Transmission Server Engine ('云猴'?TCP通讯server引擎)(预告版) 下载连接: http://download.csdn.net/detail/guestcode/7474171 补充: 无锁消息引擎已经完毕: http://b

GCDAsyncSocket类库,IOS下TCP通讯使用心得

关于在IOS下使用Socket进行通讯的技术文章也许诺很久了,今日又是一个还债的日子,网上虽然很多介绍过AsyncSocket或GCDAsyncSocket的文章,但其实就那么一两篇大部分都是转载,于是我义正言辞.慷慨激昂的批判他们这种不负责任的态度,学习,不是给自己学的,是要和大家分享的.技术的共享有利于整体行业的进步,也可以使自身更深入全面的了解. 之前的文章中我们讲到过TCP通讯协议,并且也对其进行了较为详细的介绍和描述,关于TCP通讯的原理此处我们不再赘述,如有需要的看官可自行翻阅本人所

QTcpSocket-Qt使用Tcp通讯实现服务端和客户端

版权声明:若无来源注明,Techie亮博客文章均为原创. 转载请以链接形式标明本文标题和地址: 本文标题:QTcpSocket-Qt使用Tcp通讯实现服务端和客户端     本文地址:http://techieliang.com/2017/12/530/ 文章目录 1. 基本功能  1.1. pro文件配置  1.2. QTcpServer服务端建立  1.3. 客户端建立  1.4. 消息收发 2. 其他  2.1. 实现单服务器多客户端通讯  2.2. 关于QTcpServer  2.3.

异常和TCP通讯

第七章 异常处理 * 异常处理机制中的try-catch * 语法: * try{ * 代码片段 * }catch(XXXException e){ * 当try中的代码片段出现了XXXException后的处理代码 * } * try中的代码片段报错行以下的代码都不会运行* 应当有一个好习惯,在最后一个catch处捕获Exception* 这样能避免因为一个未捕获的异常导致程序中断 * finally * finally块是异常处理机制的最后一块,只能跟在最后一个catch之后或直接跟在try

[Modbus] 如何看懂 Modbus TCP 通讯协定

最近笔者接了一个工控系统,其中PLC所使用的是Modbus TCP的通讯协定. 由于这个部分因为从来没有接触过,所以花了一点时间研究. 趁著现在记忆犹新,赶紧写下来,方便日后对照,也让读者们能够快速看懂Modbus TCP协定. Modbus为工业上常用的通讯协定之一,也是目前工业领域通讯协定常用的标准协定. 一般来说,Modbus主要可以再细分为两种协定(Modbus?RTU?.Modbus ASCII .Modbus TCP) Modbus?RTU?是一种为使用二进制表示法来进行数据的传递与

推荐一款开源的C#TCP通讯框架

原来收费的TCP通讯框架开源了,这是一款国外的开源TCP通信框架,使用了一段时间,感觉不错,介绍给大家 框架名称是networkcomms 作者开发了5年多,目前已经停止开发,对于中小型的应用场景,够用了. 框架的地址是: https://github.com/MarcFletcher/NetworkComms.Net 界面如下: 点那个Download就可以下载源码了 下载之后,解压缩之后的文件列表如下: 可以用VS打开NetworkCommsDotNet工程文件了 用 vs2010打开,可能