Windows下基于UDP的可靠传输协议实现

前言:在某互联网公司实习了好几个月,有一个月都是在做基于UDP协议的应用层软件开发,目的是要用在流媒体服务器上,传输高清视频图像帧。整个开发过程,从0到最后完成了几百兆以上的大文件可靠传输,但效率方面还需要进一步提升。UDP网络传输协议部分编程,由于存在丢包问题,确实有点复杂,现在分享一下自己的开发经验。

#ifndef UDPNONBLOCKINGOUTPUT
#define UDPNONBLOCKINGOUTPUT
#include "winsock.h"
#include <iostream>
#include <stdio.h>
#include <windows.h>
#include <tchar.h>
#include <time.h>
#include <vector>
#pragma comment(lib,"wsock32.lib")   

#define SERVER_PORT 2000
#define CLIENT_PORT 3000
#define BEGIN_NUM 19900711
#define DATA_NUM  20160113
#define CHECK_NUM 20161817
#define END_CHECK_NUM 20160114
#define END_NUM   11700991
#define END_ALL   20160115
#define BLOCK_DATA_SIZE (10 * 1024)
#define FILE_HEAD  4
#define BLOCK_HEAD 4
#define DESTADDRESS "192.168.27.170"
#define FILENAME "D:\\48M.mp4"
//UDT包头
typedef struct _UDP_HEAD_
{
	UINT     m_PacketType;			 //包类型 1数据包 2确认包
	UINT     m_PacketLen;			 //本次数据帧长度(不包括包头大小,仅仅数据部分大小)
	UINT     m_PacketId;             //包序列号
	UINT     m_Check;                //效验和 以上字段的相加
}UDP_HEAD;

class CUdpNonBlockingOutput
{
public:
	CUdpNonBlockingOutput()
	{
		m_flagNonBlocking = 0;
		InitSocket();
	}
	~CUdpNonBlockingOutput()
	{
		closesocket(m_socketClient);
		WSACleanup();
	}

//	static DWORD WINAPI ThreadFun(LPVOID pM);

	void InitSocket();

    void SendKeyPackage(char *eachBuf, UINT size);

    void RecvKeyPackage(char *eachBuf, UINT size);

	void SendFile();

	void SendMsg();

//	void RecvMsg(int size);

private:

	char m_RecvBuff[1024];
	char m_SendBuff[1024];

    sockaddr_in Client;
	sockaddr_in ClientAddr;
    SOCKET m_socketServer;
	SOCKET m_socketClient;

	int m_len;
	int m_flagNonBlocking;
	DWORD  dwFileSize;
	BYTE * fileData;

    HANDLE m_Mutex;

	static std::vector<UINT> m_lackPackageNum;
};
#endif
#include "UdpNonBlockingOutput.h"

void CUdpNonBlockingOutput::InitSocket()
{
	WSADATA WSAData;
	if (WSAStartup(MAKEWORD(2, 2), &WSAData) != 0)
	{
		printf("sock init failed!\n");
		return;
	}
    m_socketServer = socket(AF_INET, SOCK_DGRAM, 0);//创建socket
	m_socketClient = socket(AF_INET, SOCK_DGRAM, 0);//创建socket
	if (m_socketClient == SOCKET_ERROR)
	{
		printf("sock create failed\n");
		WSACleanup();
		return;
	}

	ClientAddr.sin_family = AF_INET;
	ClientAddr.sin_addr.s_addr = htonl(INADDR_ANY);
	ClientAddr.sin_port = htons(CLIENT_PORT);
    if (bind(m_socketServer, (struct sockaddr*)&ClientAddr, sizeof(struct sockaddr_in)) == SOCKET_ERROR)//绑定
	{
		printf("sock bind error!\n");
		closesocket(m_socketClient);
		WSACleanup();
		return;
	}

	Client.sin_family = AF_INET;
    Client.sin_addr.s_addr = inet_addr(DESTADDRESS);
    Client.sin_port = htons(SERVER_PORT);
	m_len = sizeof(Client);
    return;
}

std::vector<UINT> CUdpNonBlockingOutput::m_lackPackageNum;

void CUdpNonBlockingOutput::SendKeyPackage(char *eachBuf,UINT size)
{
    char recvKeyWord[4] = { 0 }, keyWord[4] = { 0 }, charFileSize[4] = { 0 };
    memcpy(keyWord, eachBuf, FILE_HEAD); //拷贝前4个字节
    unsigned long ul = 1;//------------------设置非阻塞模式
    int ret;
    ret = ioctlsocket(m_socketServer, FIONBIO, (unsigned long *)&ul);
    if (ret == SOCKET_ERROR)
    {
        printf("unblock failed!\n");
    }
    sendto(m_socketClient, (char*)eachBuf, size, 0, (SOCKADDR*)&Client, m_len);

//    DWORD lTimeOut = 1;
//   setsockopt(m_socketServer, SOL_SOCKET, SO_RCVTIMEO, (char *)&lTimeOut, sizeof(DWORD));//设置超时重发

    for (int i = 0, startClock, endClock; i < 3; i++)
    {
        startClock = clock();
        while (1)
        {
            if (recvfrom(m_socketServer, recvKeyWord, FILE_HEAD, 0, (struct sockaddr*)&ClientAddr, &m_len) > 0)
            {
                if (!strcmp(recvKeyWord, keyWord))
                {
                    return;
                }
            }
            endClock = clock();
            if ((endClock - startClock) > 5)
                break;
        }
        std::cout << "123";
        sendto(m_socketClient, (char*)eachBuf, size, 0, (SOCKADDR*)&Client, m_len);
    }
}

void CUdpNonBlockingOutput::RecvKeyPackage(char *eachBuf,UINT size)
{
    char recvKeyWord[4] = { 0 };
    unsigned long ul = 1;
    int ret;
    ret = ioctlsocket(m_socketServer, FIONBIO, (unsigned long *)&ul);//设置非阻塞模式
    if (ret == SOCKET_ERROR)
    {
        printf("unblock failed!\n");
    }
    int startClock = clock(), endClock;//接收关键包
    while (1)
    {
        endClock = clock();
        if ((endClock - startClock) > 5)
            break;
        if (recvfrom(m_socketServer, eachBuf, size, 0, (struct sockaddr*)&ClientAddr, &m_len) > 0)
        {
            memcpy(recvKeyWord, eachBuf, FILE_HEAD);
            break;
        }
    }
    startClock = clock();
    while (1)
    {
        if (sendto(m_socketClient, recvKeyWord, sizeof(recvKeyWord), 0, (struct sockaddr*)&Client, m_len) > 0)
            break;
        endClock = clock();
        if ((endClock - startClock) > 5)
            break;
    }
}

//void CUdpNonBlockingOutput::RecvMsg(int size)
//{
//    recvfrom(m_socketServer, m_RecvBuff, BLOCK_HEAD + 1, 0, (struct sockaddr*)&Client, &m_len);
//	int DataPos = 0;
//	for (int i = 0; i < 4; i++)
//	{
//		DataPos += ((UCHAR)m_RecvBuff[i]) << (8 * (4 - i - 1)); //获取数据包序列号
//	}
//	m_lackPackageNum.push_back(DataPos);
//	std::cout << DataPos << std::endl;
//}
//
////定时检测线程
//DWORD WINAPI CUdpNonBlockingOutput::ThreadFun(LPVOID lpParameter)
//{
//	printf("开启线程ID号为%d的子线程\n", GetCurrentThreadId());
//	CUdpNonBlockingOutput *pObj = (CUdpNonBlockingOutput *)lpParameter;
//	while (1)
//	{
//        //WaitForSingleObject(pObj->m_Mutex, INFINITE);
//		pObj->RecvMsg(BLOCK_HEAD);
//       // ReleaseMutex(pObj->m_Mutex);
//	}
//	return 0;
//}

void CUdpNonBlockingOutput::SendFile()
{
    //通信、数据、服务、业务模型
	//1、获得文件的大小
	int start, end;
	HANDLE  hFile;
	DWORD   dwHighSize, dwBytesRead;
    hFile = CreateFile(_T(FILENAME), GENERIC_READ, FILE_SHARE_READ, NULL,
		OPEN_EXISTING, FILE_FLAG_SEQUENTIAL_SCAN, NULL);
	dwFileSize = GetFileSize(hFile, &dwHighSize);
	std::cout << "dwFileSize=" << dwFileSize << std::endl;
	//2、读文件内容到 BYTE * fileData 中
	BOOL bSuccess;
	fileData = (BYTE*)malloc(dwFileSize);
	bSuccess = ReadFile(hFile, fileData, dwFileSize, &dwBytesRead, NULL);
	CloseHandle(hFile);

	//3、判断文件是否成功读取
	if (!bSuccess || (dwBytesRead != dwFileSize))
	{
		std::cout << "读取失败" << std::endl;;
		free(fileData);
		return;
	}
	//开启一个线程
    /*  m_Mutex = CreateMutex(NULL, TRUE, NULL);
      HANDLE ThreadHandle = CreateThread(NULL, 0, &CUdpNonBlockingOutput::ThreadFun, (LPVOID)this, NULL, NULL);
      CloseHandle(ThreadHandle);*/

	//4、根据文件大小和设定的包大小,判断是一次发送还是分包发送
	DWORD  retval = 0;
	UINT   DataPos = 0;
	BYTE   *eachBuf = new BYTE[BLOCK_DATA_SIZE + 2 * FILE_HEAD];
	memset(eachBuf, 0, BLOCK_DATA_SIZE + 2 * FILE_HEAD);
	eachBuf[DataPos++] = BEGIN_NUM >> 24 & 0xff;//文件起始标识符
	eachBuf[DataPos++] = BEGIN_NUM >> 16 & 0xff;
	eachBuf[DataPos++] = BEGIN_NUM >> 8 & 0xff;
	eachBuf[DataPos++] = BEGIN_NUM & 0xff;
	eachBuf[DataPos++] = dwFileSize >> 24 & 0xff;
	eachBuf[DataPos++] = dwFileSize >> 16 & 0xff;
	eachBuf[DataPos++] = dwFileSize >> 8 & 0xff;
	eachBuf[DataPos++] = dwFileSize & 0xff;

	//5、若待发送的文件小于分包大小,则一次发完
    //start = clock();
	if (BLOCK_DATA_SIZE >= dwFileSize)
	{
		//二次封装要发送的数据包 //1、-----做关键包发送------
		memcpy(&eachBuf[DataPos], fileData, dwFileSize);//报文格式:报文起始标识符(4)+文件大小(4)+数据(dwFileSize)
        retval = sendto(m_socketClient, (char*)eachBuf, dwFileSize + 2 * FILE_HEAD, 0, (SOCKADDR*)&Client, m_len);
	}
	//6、分步发送
	else
	{
		DWORD n = dwFileSize / BLOCK_DATA_SIZE;  //共需要几次全额发送
		DWORD yu = dwFileSize % BLOCK_DATA_SIZE; //最后剩下的字 符大小
        std::cout << "n=" << n << "yu=" << yu << "retval=" << retval << std::endl;
		//6.1 首先发送文件起始标识符、文件大小,连发三次,建立UDP连接	//报文格式:文件起始位(4)+文件大小(4)
        //2、-----做关键包发送------
        SendKeyPackage((char*)eachBuf, 2 * FILE_HEAD);
        //sendto(m_socketClient, (char*)eachBuf, 2 * FILE_HEAD, MSG_DONTROUTE, (SOCKADDR*)&Client, m_len);

		//DWORD lTimeOut = 50;
		//setsockopt(m_socketClient, SOL_SOCKET, SO_RCVTIMEO, (char *)&lTimeOut, sizeof(DWORD));//设置超时重发
        //recvfrom(m_socketServer, m_RecvBuff, BLOCK_DATA_SIZE + 2 * FILE_HEAD, 0, (struct sockaddr*)&ClientAddr, &m_len);
		for (int i = 0; i < n; i++)
		{
			DataPos = 0;
            eachBuf[DataPos++] = DATA_NUM >> 24 & 0xff;//文件起始标识符
            eachBuf[DataPos++] = DATA_NUM >> 16 & 0xff;
            eachBuf[DataPos++] = DATA_NUM >> 8 & 0xff;
            eachBuf[DataPos++] = DATA_NUM & 0xff;
			eachBuf[DataPos++] = i >> 24 & 0xff;
			eachBuf[DataPos++] = i >> 16 & 0xff;
			eachBuf[DataPos++] = i >> 8 & 0xff;
			eachBuf[DataPos++] = i & 0xff;
			//std::cout << DataPos << std::endl;
			memcpy(&eachBuf[DataPos], fileData + i*BLOCK_DATA_SIZE, BLOCK_DATA_SIZE);//报文格式:数据包序列号(4)+数据(BLOCK_DATA_SIZE)
            if (sendto(m_socketClient, (char*)eachBuf, BLOCK_DATA_SIZE + 2 * BLOCK_HEAD, 0, (SOCKADDR*)&Client, m_len) < 0)
                std::cout << "Sendto error!!!";
		}
		DataPos = 0;
		eachBuf[DataPos++] = END_NUM >> 24 & 0xff;//文件结束标识符
		eachBuf[DataPos++] = END_NUM >> 16 & 0xff;
		eachBuf[DataPos++] = END_NUM >> 8 & 0xff;
		eachBuf[DataPos++] = END_NUM & 0xff;
		eachBuf[DataPos++] = n >> 24 & 0xff;
		eachBuf[DataPos++] = n >> 16 & 0xff;
		eachBuf[DataPos++] = n >> 8 & 0xff;
		eachBuf[DataPos++] = n & 0xff;
		memcpy(&eachBuf[DataPos], fileData + n*BLOCK_DATA_SIZE, yu);//报文格式:文件结束标识符(4)+ 数据包序列号(4)+数据(yu),断开UDP连接
		//3、-----做关键包发送------
        SendKeyPackage((char*)eachBuf, yu + FILE_HEAD + BLOCK_HEAD);
        //sendto(m_socketClient, (char*)eachBuf, yu + FILE_HEAD + BLOCK_HEAD, 0, (SOCKADDR*)&Client, m_len);

        int flag_recv=1, flag_status = 0; char charFileSize[4] = { 0 };
        char *RecvLackNumBuf = new char[2 * 1024 + BLOCK_HEAD + FILE_HEAD+1];//4、-----做关键包接收------
        while (flag_recv)
        {
            start = clock();
            //接收下一步 操作 命令 重传机制
            memset(RecvLackNumBuf, 0, 2 * 1024 + BLOCK_HEAD + FILE_HEAD + 1);
            int ret = recvfrom(m_socketServer, RecvLackNumBuf, 2 * 1024 + 2 * FILE_HEAD + 1, 0, (struct sockaddr*)&ClientAddr, &m_len);
            if (ret > 0)
            {
                std::cout << "Check Recv:" << ret <<std::endl;
               // break;
            }
            //end = clock();
            flag_status = 0;
            memcpy(charFileSize, RecvLackNumBuf, FILE_HEAD); //拷贝前4个字节
            for (int i = 0; i < 4; i++)
            {
                flag_status += ((UCHAR)charFileSize[i]) << (8 * (4 - i - 1)); //获取文件标识符
            }
            memcpy(charFileSize, RecvLackNumBuf + FILE_HEAD, FILE_HEAD); //拷贝第5-8个字节
            dwFileSize = 0;
            for (int i = 0; i < 4; i++)
            {
                dwFileSize += ((UCHAR)charFileSize[i]) << (8 * (4 - i - 1)); //获取文件大小
            }
            if (flag_status == END_ALL)
            {
                std::cout << "end" << std::endl;
                memcpy(charFileSize, RecvLackNumBuf, FILE_HEAD);//关键数据包接收、回复 、
                sendto(m_socketClient, charFileSize, sizeof(charFileSize), 0, (struct sockaddr*)&Client, m_len);

                flag_recv = 0;
                break;
            }
            else if (flag_status == CHECK_NUM)
            {
                std::cout << "check" << std::endl;
                memcpy(charFileSize, RecvLackNumBuf, FILE_HEAD);//关键数据包接收、回复 、
                sendto(m_socketClient, charFileSize, sizeof(charFileSize), 0, (struct sockaddr*)&Client, m_len);

                if (dwFileSize > 510)
                    dwFileSize = 510;//需要改进,代码不规范,每次重发包数目 最大为510个
                for (int i = 0; i < dwFileSize; i++)
                {
                    UINT DataPos = 0;
                    memcpy(charFileSize, RecvLackNumBuf + i* FILE_HEAD + 2 * BLOCK_HEAD, FILE_HEAD); //拷贝第9个字节后面
                    for (int i = 0; i < 4; i++)
                    {
                        DataPos += ((UCHAR)charFileSize[i]) << (8 * (4 - i - 1)); //获取文件大小
                    }
                    UINT Num = 0;
                    eachBuf[Num++] = DATA_NUM >> 24 & 0xff;//文件起始标识符
                    eachBuf[Num++] = DATA_NUM >> 16 & 0xff;
                    eachBuf[Num++] = DATA_NUM >> 8 & 0xff;
                    eachBuf[Num++] = DATA_NUM & 0xff;
                    eachBuf[Num++] = DataPos >> 24 & 0xff;
                    eachBuf[Num++] = DataPos >> 16 & 0xff;
                    eachBuf[Num++] = DataPos >> 8 & 0xff;
                    eachBuf[Num++] = DataPos & 0xff;
                    memcpy(&eachBuf[Num], fileData + DataPos*BLOCK_DATA_SIZE, BLOCK_DATA_SIZE);//报文格式:数据类型(4) +数据包序列号(4)+数据(BLOCK_DATA_SIZE)
                    if (sendto(m_socketClient, (char*)eachBuf, BLOCK_DATA_SIZE + FILE_HEAD + BLOCK_HEAD + 1, 0, (SOCKADDR*)&Client, m_len) < 0)
                    {
                        std::cout << "Check sendto error!!!" << std::endl;
                        break;
                    }
                }
                UINT Num = 0;
                eachBuf[Num++] = END_CHECK_NUM >> 24 & 0xff;//重发校验结束标识符
                eachBuf[Num++] = END_CHECK_NUM >> 16 & 0xff;
                eachBuf[Num++] = END_CHECK_NUM >> 8 & 0xff;
                eachBuf[Num++] = END_CHECK_NUM & 0xff;
                eachBuf[Num++] = END_CHECK_NUM >> 24 & 0xff;//重发校验结束标识符
                eachBuf[Num++] = END_CHECK_NUM >> 16 & 0xff;
                eachBuf[Num++] = END_CHECK_NUM >> 8 & 0xff;
                eachBuf[Num++] = END_CHECK_NUM & 0xff;
                for (int i = 0; i < 3; i++)//5、-----做关键包发送------
                if(sendto(m_socketClient, (char*)eachBuf, FILE_HEAD + BLOCK_HEAD, 0, (SOCKADDR*)&Client, m_len) < 0)
                {
                        std::cout << "Check sendto error!!!" << std::endl;
                        break;
                 }
            }

            //flag_recv = 0;
        }

        std::cout << "flag_status:" << flag_status << "-" << "dwFileSize:" << dwFileSize << std::endl;
        //Sleep(500);
		end = clock();
		std::cout << "耗时:" << end - start << std::endl;
	}
}
#ifndef UDPNONBLOCKINGINPUT
#define UDPNONBLOCKINGINPUT
#include "winsock.h"
#include <iostream>
#include <process.h>
#include "time.h"
#include "windows.h"
#include <stdio.h>
#include <vector>
#include <algorithm>
#pragma comment(lib,"wsock32.lib")   

#define SERVER_PORT 2000
#define CLIENT_PORT 3000
#define BEGIN_NUM 19900711
#define DATA_NUM  20160113
#define CHECK_NUM 20161817
#define END_CHECK_NUM 20160114
#define END_NUM   11700991
#define END_ALL   20160115
#define BLOCK_DATA_SIZE (10 * 1024)
#define FILE_HEAD  4
#define BLOCK_HEAD 4
#define DESTADDRESS "192.168.27.170"
#define FILENAME "D:\\file.MP4"
//每个包的协议头:数据类型(4字节,起始标识、终结标识、数据标识)+数据帧长度(4)+包序列号(4)

//UDT包头
typedef struct _UDP_HEAD_
{
	UINT     m_PacketType;			 //包类型 1数据包 2确认包
	UINT     m_PacketLen;			 //本次数据帧长度(不包括包头大小,仅仅数据部分大小)
	UINT     m_PacketId;             //包序列号
	UINT     m_Check;                //效验和 以上字段的相加
}UDP_HEAD;

class CUdpNonBlockingInput
{
public:
	CUdpNonBlockingInput()
	{
		m_flag_status = 0;
		m_dwFileSize = 0;
		m_flagNonBlocking = 0;
		InitSocket();
	}
	~CUdpNonBlockingInput()
	{
		closesocket(m_socketServer);
		WSACleanup();
	}

	//static DWORD WINAPI ThreadFun(LPVOID pM);

	void InitSocket();

    void SendKeyPackage(char *eachBuf, UINT size);

    void RecvKeyPackage(char *eachBuf, UINT size);

	void StartRevDataes();

	void SendLackPackageNum(std::vector<UINT> m_lackPackageNum);
private:

	char RecvBuff[1024];
	char SendBuff[1024];

	sockaddr_in Server;
	sockaddr_in ServerAddr;
	SOCKET m_socketServer;
	SOCKET m_socketClient;

	int m_len;
	int m_flagNonBlocking;
	DWORD  m_dwFileSize;
	UINT m_prePackageNum;
	UINT m_flag_status;
	HANDLE m_Mutex;

	static std::vector<UINT> m_lackPackageNum;
};

#endif
#include "UdpNonBlockingInput.h"

void CUdpNonBlockingInput::InitSocket()
{
	WSADATA WSAData;
	if (WSAStartup(MAKEWORD(2, 2), &WSAData) != 0)
	{
		printf("sock init failed!\n");
		return;
	}
	m_socketServer = socket(AF_INET, SOCK_DGRAM, 0);//创建socket
	m_socketClient = socket(AF_INET, SOCK_DGRAM, 0);//创建socket
	if (m_socketServer == SOCKET_ERROR)
	{
		printf("sock create failed\n");
		WSACleanup();
		return;
	}

	ServerAddr.sin_family = AF_INET;
	ServerAddr.sin_addr.s_addr = htonl(INADDR_ANY);
	ServerAddr.sin_port = htons(SERVER_PORT);
	if (bind(m_socketServer, (struct sockaddr*)&ServerAddr, sizeof(struct sockaddr_in)) == SOCKET_ERROR)//绑定
	{
		printf("sock bind error!\n");
		closesocket(m_socketServer);
		WSACleanup();
		return;
	}
	//////////////////////////////////////////////////////////////////////////
	Server.sin_family = AF_INET;
	Server.sin_addr.s_addr = inet_addr(DESTADDRESS);
	Server.sin_port = htons(CLIENT_PORT);
	m_len = sizeof(Server);
	return;
}

void CUdpNonBlockingInput::SendKeyPackage(char *eachBuf,UINT size)
{
    char recvKeyWord[4] = { 0 }, keyWord[4] = { 0 }, charFileSize[4] = { 0 };
    memcpy(keyWord, eachBuf, FILE_HEAD); //拷贝前4个字节
    sendto(m_socketClient, (char*)eachBuf, size, 0, (SOCKADDR*)&Server, m_len);

    //    DWORD lTimeOut = 1;
    //   setsockopt(m_socketServer, SOL_SOCKET, SO_RCVTIMEO, (char *)&lTimeOut, sizeof(DWORD));//设置超时重发

    for (int i = 0, startClock, endClock; i < 100; i++)
    {
        startClock = clock();
        while (1)
        {
            if (recvfrom(m_socketServer, recvKeyWord, FILE_HEAD, 0, (struct sockaddr*)&ServerAddr, &m_len) < 0)
                return;
            else
            {
                if (!strcmp(recvKeyWord, keyWord))
                {
                    return;
                }
            }
            endClock = clock();
            if ((endClock - startClock) > 5)
                break;
        }
        sendto(m_socketClient, (char*)eachBuf, size, 0, (SOCKADDR*)&Server, m_len);
    }
}

void CUdpNonBlockingInput::RecvKeyPackage(char *eachBuf, UINT size)
{
    char recvKeyWord[4] = { 0 };
    int startClock = clock(), endClock;//接收关键包
    while (1)
    {
        if (recvfrom(m_socketServer, eachBuf, size, 0, (struct sockaddr*)&ServerAddr, &m_len) > 0)
        {
            memcpy(recvKeyWord, eachBuf, FILE_HEAD);
            break;
        }
        endClock = clock();
        if ((endClock - startClock) > 5)
            break;
    }
    startClock = clock();
    while (1)
    {
        if (sendto(m_socketClient, recvKeyWord, sizeof(recvKeyWord), 0, (struct sockaddr*)&Server, m_len) > 0)
            break;
        endClock = clock();
        if ((endClock - startClock) > 5)
            break;
    }

}

std::vector<UINT> CUdpNonBlockingInput::m_lackPackageNum;

void CUdpNonBlockingInput::SendLackPackageNum(std::vector<UINT> m_lackPackageNum)
{
	int startClock = clock(),endClock;
	if (m_lackPackageNum.size() > 0)
	{

    char *SendLackNumBuf = new char[2 * 1024 + BLOCK_HEAD + FILE_HEAD];
	int DataPos = 0;
    SendLackNumBuf[DataPos++] = CHECK_NUM >> 24 & 0xff;//报文格式:校验标识符CHECK_NUM + 包大小FileSize
    SendLackNumBuf[DataPos++] = CHECK_NUM >> 16 & 0xff;
    SendLackNumBuf[DataPos++] = CHECK_NUM >> 8 & 0xff;
    SendLackNumBuf[DataPos++] = CHECK_NUM & 0xff;
    SendLackNumBuf[DataPos++] = m_lackPackageNum.size() >> 24 & 0xff;
    SendLackNumBuf[DataPos++] = m_lackPackageNum.size() >> 16 & 0xff;
    SendLackNumBuf[DataPos++] = m_lackPackageNum.size() >> 8 & 0xff;
    SendLackNumBuf[DataPos++] = m_lackPackageNum.size() & 0xff;

    for (int i = 0; i < m_lackPackageNum.size(); i++)
    {
        if (DataPos >(2 * 1024))//防止数组下标越界 重发的数据包不会超过510个
            break;
        SendLackNumBuf[DataPos++] = m_lackPackageNum[i] >> 24 & 0xff;//报文格式:数据标识符+包大小+数据内容
        SendLackNumBuf[DataPos++] = m_lackPackageNum[i] >> 16 & 0xff;
        SendLackNumBuf[DataPos++] = m_lackPackageNum[i] >> 8 & 0xff;
        SendLackNumBuf[DataPos++] = m_lackPackageNum[i] & 0xff;
    }
    SendKeyPackage(SendLackNumBuf, 2 * 1024 + BLOCK_HEAD + FILE_HEAD + 1);
    //sendto(m_socketClient, SendLackNumBuf, 2 * 1024 + BLOCK_HEAD + FILE_HEAD + 1, 0, (struct sockaddr*)&Server, m_len);

	if (SendLackNumBuf)
	{
		delete SendLackNumBuf;
		SendLackNumBuf = NULL;
	}
	}
	endClock = clock();
    std::cout << "重发耗时:" << endClock - startClock << "ms" << m_lackPackageNum.size() << std::endl;
}
////定时检测线程
//DWORD WINAPI CUdpNonBlockingInput::ThreadFun(LPVOID lpParameter)
//{
//	printf("开启线程ID号为%d的子线程\n", GetCurrentThreadId());
//	CUdpNonBlockingInput *pObj = (CUdpNonBlockingInput *)lpParameter;
//	WaitForSingleObject(pObj->m_Mutex, INFINITE);
//	pObj->SendLackPackageNum(m_lackPackageNum);
//	ReleaseMutex(pObj->m_Mutex);
//	return 0;
//}

void CUdpNonBlockingInput::StartRevDataes()
{
	//创建接收缓存区 、创建文件
	//开启一个线程
	//m_Mutex = CreateMutex(NULL, TRUE, NULL);
	//HANDLE ThreadHandle = CreateThread(NULL, 0, &CUdpNonBlockingInput::ThreadFun, (LPVOID)this, NULL, NULL);
	//CloseHandle(ThreadHandle);
	char *eachBuf = new char[BLOCK_DATA_SIZE + 3 * FILE_HEAD];//开启接收缓存区
	memset(eachBuf, 0, BLOCK_DATA_SIZE + 2 * FILE_HEAD + 1);
	FILE *fp;//创建文件
	unsigned int RecvNum = 0, flag_recv = 1;
	m_prePackageNum = 0;//当前序列号
	m_lackPackageNum.clear();//待确认的序列号队列
    fp = fopen(FILENAME, "wb");
	//sendto(m_socketClient, "Recv", sizeof("Revc") + 1, 0, (struct sockaddr*)&Server, m_len);
	//1、读取第一组数据,获取文件大小,建立UDP连接--------1、做关键包接收
    RecvKeyPackage(eachBuf, BLOCK_DATA_SIZE + 2 * FILE_HEAD);
	//recvfrom(m_socketServer, eachBuf, BLOCK_DATA_SIZE + 2 * FILE_HEAD, 0, (struct sockaddr*)&ServerAddr, &m_len);
	//m_from.sin_addr.S_un.S_addr = inet_addr(inet_ntoa(m_from.sin_addr));//String转换成char型函数c_str()
	char charFileSize[4] = { 0 };
	memcpy(charFileSize, eachBuf, FILE_HEAD); //拷贝前4个字节
	for (int i = 0; i < 4; i++)
	{
		m_flag_status += ((UCHAR)charFileSize[i]) << (8 * (4 - i - 1)); //获取文件起始符
	}
	memcpy(charFileSize, eachBuf + FILE_HEAD, FILE_HEAD); //拷贝第5-8个字节
	for (int i = 0; i < 4; i++)
	{
		m_dwFileSize += ((UCHAR)charFileSize[i]) << (8 * (4 - i - 1)); //获取文件大小
	}
	int startClock = clock(), endClock;
	if (m_flag_status == BEGIN_NUM)//成功建立UDP连接
	{
		//2、根据文件大小,判断是一次接收完
		if (m_dwFileSize <= BLOCK_DATA_SIZE)
		{
			fwrite(eachBuf + 2 * FILE_HEAD, m_dwFileSize, 1, fp);
			fclose(fp);
			delete eachBuf;
		}
		else//还是分步接收
		{
			//开辟接收内存
			char *FileBuffer = new char[m_dwFileSize];
			memset(FileBuffer, 0, m_dwFileSize);
			DWORD n = m_dwFileSize / BLOCK_DATA_SIZE;  //共需要几次全额发送
			DWORD yu = m_dwFileSize % BLOCK_DATA_SIZE; //最后剩下的字符大小
			//DWORD lTimeOut = 2;
			//setsockopt(m_socketServer, SOL_SOCKET, SO_RCVTIMEO, (char *)&lTimeOut, sizeof(DWORD));//设置接收超时 2ms
            unsigned int DataPos = 0, flag_check = 0;
			flag_recv = 1;
			//---
            unsigned long ul = 1;
            ioctlsocket(m_socketServer, FIONBIO, (unsigned long *)&ul);//设置0阻塞模式
			while (flag_recv)
			{
				if (recvfrom(m_socketServer, eachBuf, BLOCK_DATA_SIZE + 2 * BLOCK_HEAD + 1, 0, (struct sockaddr*)&ServerAddr, &m_len) == -1)
				{
					//std::cout << "Recv error!!!";
					//break;
				}
				else
				RecvNum++;
				memcpy(charFileSize, eachBuf, FILE_HEAD); //拷贝前4个字节
				m_flag_status = 0;
				for (int i = 0; i < 4; i++)
				{
					m_flag_status += ((UCHAR)charFileSize[i]) << (8 * (4 - i - 1)); //获取文件标识符
				}
				switch (m_flag_status)
				{
				case BEGIN_NUM:break;
				case DATA_NUM: {
								   DataPos = 0;
								   for (int i = 0; i < 4; i++)
								   {
									   DataPos += ((UCHAR)eachBuf[i+FILE_HEAD]) << (8 * (4 - i - 1)); //获取数据包序列号
								   }
								   if (DataPos >= 0 && DataPos <= n)
								   {
                                       if (flag_check == 0)
                                       {
	                                        if ((DataPos - m_prePackageNum) > 1)//获取缺失的分包序列号
					                        {
						                        for (int i = DataPos - m_prePackageNum - 1; i > 0; i--)
						                        {
							                        m_lackPackageNum.push_back(DataPos - i);
						                        }
					                        }
                                       }
                                       else//去掉收到的分包序列号
                                       {
                                          //std:: cout << 8;
                                           m_lackPackageNum.erase(remove(m_lackPackageNum.begin(), m_lackPackageNum.end(), DataPos), m_lackPackageNum.end());
                                       }
									   m_prePackageNum = DataPos;
									   //std::cout << DataPos << std::endl;
									   memcpy(FileBuffer + DataPos*BLOCK_DATA_SIZE, eachBuf + BLOCK_HEAD + FILE_HEAD, BLOCK_DATA_SIZE);
								   }
								   break;
								}
				case END_NUM:  {
                                   std::cout << "END_NUM:" << RecvNum << std::endl;
                                   flag_check = 1;//丢包序号队列
								   memcpy(FileBuffer + n*BLOCK_DATA_SIZE, eachBuf + FILE_HEAD + BLOCK_HEAD, yu);

                                   char recvKeyWord[4] = { 0 };
                                   memcpy(recvKeyWord, eachBuf, FILE_HEAD);//关键数据包接收、回复 、
                                   sendto(m_socketClient, recvKeyWord, sizeof(recvKeyWord), 0, (struct sockaddr*)&Server, m_len);

                                   if (m_lackPackageNum.size() > 0)//判断是否丢包
                                   {
                                       /*for (int i = 0; i < m_lackPackageNum.size(); i++)
                                           std::cout << m_lackPackageNum[i] << std::endl;*/
                                       //for (int i = 0; i < 3;i++)
                                       SendLackPackageNum(m_lackPackageNum);//2、-----做关键包发送------
                                   }
                                   else//发送结束标识符
                                   {
                                       std::cout << "SUCESS!!!" << std::endl;
                                       if (eachBuf)
                                       {
                                           delete eachBuf;
                                           eachBuf = NULL;
                                       }
                                       fwrite(FileBuffer, m_dwFileSize, 1, fp);
                                       fclose(fp);
                                       char SendCheckCountBuf[4];
                                       DataPos = 0;
                                       SendCheckCountBuf[DataPos++] = END_ALL >> 24 & 0xff;//报文格式:校验标识符CHECK_NUM + 包大小FileSize
                                       SendCheckCountBuf[DataPos++] = END_ALL >> 16 & 0xff;
                                       SendCheckCountBuf[DataPos++] = END_ALL >> 8 & 0xff;
                                       SendCheckCountBuf[DataPos++] = END_ALL & 0xff;//3、-----做关键包发送------
                                       SendKeyPackage(SendCheckCountBuf, BLOCK_HEAD + 1);
                                       // sendto(m_socketClient, SendCheckCountBuf, BLOCK_HEAD + 1, 0, (struct sockaddr*)&Server, m_len);
                                       flag_recv = 0;
                                   }
								   break;
							    }
                case END_CHECK_NUM:{
                                       std::cout << "END_CHECK_NUM" << std::endl;
                                       flag_check = 1;//丢包序号队列
                                       if (m_lackPackageNum.size() > 0)//判断是否丢包
                                       {
                                           SendLackPackageNum(m_lackPackageNum);//4、-----做关键包发送------
                                       }
                                       else//发送结束标识符
                                       {
                                           if (eachBuf)
                                           {
                                               delete eachBuf;
                                               eachBuf = NULL;
                                           }
                                           fwrite(FileBuffer, m_dwFileSize, 1, fp);
                                           fclose(fp);
                                           char SendCheckCountBuf[4];
                                           DataPos = 0;
                                           SendCheckCountBuf[DataPos++] = END_ALL >> 24 & 0xff;//报文格式:校验标识符CHECK_NUM + 包大小FileSize
                                           SendCheckCountBuf[DataPos++] = END_ALL >> 16 & 0xff;
                                           SendCheckCountBuf[DataPos++] = END_ALL >> 8 & 0xff;
                                           SendCheckCountBuf[DataPos++] = END_ALL & 0xff;
                                           //5、-----做关键包发送------
                                           SendKeyPackage(SendCheckCountBuf, BLOCK_HEAD + 1);
                                           //sendto(m_socketClient, SendCheckCountBuf, BLOCK_HEAD + 1, 0, (struct sockaddr*)&Server, m_len);
                                           flag_recv = 0;
                                           std::cout << "SUCESS!!!" << std::endl;
                                       }
                                       break;
                                   }
				}
			}
		}
	}
	else
	{
		return;
	}
	endClock = clock();
	std::cout << "收到数据:" << RecvNum << "-" << m_dwFileSize << "耗时:" << endClock - startClock << std::endl;
}
时间: 2024-10-05 02:07:00

Windows下基于UDP的可靠传输协议实现的相关文章

基于位图索引的无线传感器网络可靠传输协议_爱学术

[摘要]根据无线传感器网络中大量上行数据流实时传输的需求,提出一种基于位图索引的可靠传输协议(BRDT).该协议使用迭代方式完成一组大量数据的传输.每次迭代的上行数据流采用无重传传输以减少延迟,并采用可靠后项传输传送带有错误标识的位图索引以恢复丢失数据.在已有低功耗无线图像传感器网络Z-EYE系统中实现并进行验证,结果表明,与RMST协议相比,BRDT对下层的服务需求少,在网络质量较差的情况下,BRDT的性能明显优于NACK重传方法,在同等测试条件下,BRDT的传输效率优于PSFQ协议. [作者

qt在windows下的udp通信(最简单)

qt编程:windows下的udp通信 本文博客链接:http://blog.csdn.net/jdh99,作者:jdh,转载请注明. 环境: 主机:win7 开发环境:qt 功能: 用udp进行收发通信 界面: 源代码: LssHost.pro: [cpp] view plain copy #------------------------------------------------- # # Project created by QtCreator 2013-09-22T09:36:44

windows下基于sublime text3的nodejs环境搭建

第一步:先安装sublime text3.详细教程可自行百度,这边不具体介绍了. 第二步.安装nodejs插件,有两种方式 第一种方式:直接下载https://github.com/tanepiper/SublimeText-Nodejs 压缩包,压缩后重命名为Nodejs放到package目录下.package打开方式:Preferences-> Browser Package 第二种方式:通过package control install package的方式下载 第三步:安装nodejs.

Windows下基于eclipse的Spark应用开发环境搭建

原创文章,转载请注明: 转载自www.cnblogs.com/tovin/p/3822985.html 一.软件下载 maven下载安装 :http://10.100.209.243/share/soft/apache-maven-3.2.1-bin.zip       jdk下载安装:          http://10.100.209.243/share/soft/jdk-7u60-windows-i586.exe(32位)         http://10.100.209.243/sh

Windows下基于http的git服务器搭建-gitstack

版权声明:若无来源注明,Techie亮博客文章均为原创. 转载请以链接形式标明本文标题和地址: 本文标题:Windows下基于http的git服务器搭建-gitstack     本文地址:http://techieliang.com/2017/12/514/ 文章目录 1. 下载安装 2. 注意  2.1. 关于Python冲突问题  2.2. gitstack密码重置 3. gitstack与wamp冲突 1. 下载安装 官网下载即可 安装流程也很简洁方便.安装步骤 安装完成后可通过管理地址

Windows下基于IIS服务的SSL服务器的配置

Windows下基于IIS服务的SSL服务器的配置 实验环境 Windows Server 2008 R1(CA) Windows Server 2008 R2(web服务器) Windows 7 x64(客户端) 3台虚拟机打开桥接模式,保证能够相互ping通 实验原理 CA(根CA)负责为服务器颁发证书使得服务器证书可信. 服务器下载IIS组建,向CA申请一个SSL证书,并且将此证书与本机IP绑定.最后打开SSL服务. 客户端信任CA,因此可以安全地访问服务器网址. 实验步骤 安装证书服务

Windows下基于TCP协议的大文件传输(流形式)

简单实现TCP下的大文件高效传输 在TCP下进行大文件传输,不像小文件那样直接打包个BUFFER发送出去,因为文件比较大可能是1G,2G或更大,第一效率问题,第二TCP粘包问题.针对服务端的设计来说就更需要严紧些.下面介绍简单地实现大文件在TCP的传输应用. 粘包出现原因:在流传输中出现,UDP不会出现粘包,因为它有消息边界(参考Windows 网络编程) 1 发送端需要等缓冲区满才发送出去,造成粘包 2 接收方不及时接收缓冲区的包,造成多个包接收 解决办法: 为了避免粘包现象,可采取以下几种措

Windows下基于socket多线程并发通信的实现

本文介绍了在Windows 操作系统下基于TCP/IP 协议Socket 套接口的通信机制以及多线程编程知识与技巧,并给出多线程方式实现多用户与服务端(C/S)并发通信模型的详细算法,最后展现了用C++编写的多用户与服务器通信的应用实例并附有程序. 关键词:Windows:套接字:多线程:并发服务器: Socket 是建立在传输层协议(主要是TCP 和UDP)上的一种套接字规范,最初由美国加州Berkley 大学提出,为UNIX 系统开发的网络通信接口,它定义了两台计算机之间通信的规范,sock

局域网内基于UDP的文件传输小工具

一.目标 基于UDP来设计一个文件传输的小工具,以此来了解UDP协议的特点. 内容: 基本要求: 输入对方IP地址,对方如果存在,可以给对方发送文件. 扩展要求: (1)工具具有友好的用户界面: (2)局域网内使用该工具的两台主机可以互相进行文字通信: (3)自动按照一定频率扫描局域网内其他开启了该工具的主机,并在界面上用列表显示出来: (4)传输文件和文字内容时,直接选择界面上已经显示出来的存活主机,不需要手动输入IP地址. 二.设计实现 环境: Ubuntu14.04+Gcc4.8+Ecli