fastdfs是一个轻量级的分布式文件系统,主要由 tracker server, storage server 以及client组成,这里主要涉及两点 :
1)客户端上传文件流程和协议分析
2)实现一个简单的文件上传函数
一: 文件上传的基本流程
fastdfs中上传一个文件,主要涉及以下几个步骤:
1)上传连接请求,客户端会向tracker server发出上传文件的请求
2)tracker收到请求后,返回storage server的ip和端口
3)客户端连接storage,并且上传文件
4)文件上传完成后,storage返回路径信息
以下具体分析文件上传过程中的协议和各种操作
fastdfs协议头部:
typedef struct
{
char pkg_len[FDFS_PROTO_PKG_LEN_SIZE]; //body length, not including header(8个字节)
char cmd; //command code TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE
char status; //status code for response
} TrackerHeader;
fastdfs协议的头部是由10个字节大小的结构体构成,
发送:发送数据时,先发送TrackerHeader到服务器,随后发送具体的数据
接受:接受数据时,先接受sizeof(TrackerHeader)大小的报文头部,随后接受pkg_len长度的报文体
status: 发送的时候设置为0
cmd: 命令
pkg_len:一个int64_t的整型,除去TrackerHeader长度的报文长度
二: 客户端向tracker server发送获取storage地址请求
#define TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE 101
// 协议头
// pkg_len | cmd | status
// 8 bytes | 1 bytes | 1 bytes
//向tracker server请求storage server cmd
#define TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE 101
TrackerHeader header;//协议头部
memset(&header, 0, sizeof(TrackerHeader));
header.cmd = TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE;
//向tracker server 请求 storage,tcpsenddata 返回非0,表示发送成功
if(tcpsenddata(sockfd, &header, sizeof(TrackerHeader), 10, &count) != 0)
{
fprintf(stderr, "tcpsenddata error: %s\n", strerror(errno));
return 1;
}
else//请求发送成功,等待tracker回复
{
//接收头部,头部是一个TrackerHeader类型,10个字节
TrackerHeader resp;
if((ret_code = tcprecvdata(sockfd, &resp, sizeof(TrackerHeader), 10, &count)) != 0)
{
fprintf(stderr, "tcprecvdata error: %s\n", strerror(ret_code));
return 1;
}
//开始接收报文体
//int64_t read_int64(const char *buff)
//{
// unsigned char *p;
// p = (unsigned char *)buff;
// return (((int64_t)(*p)) << 56) | \
// (((int64_t)(*(p+1))) << 48) | \
// (((int64_t)(*(p+2))) << 40) | \
// (((int64_t)(*(p+3))) << 32) | \
// (((int64_t)(*(p+4))) << 24) | \
// (((int64_t)(*(p+5))) << 16) | \
// (((int64_t)(*(p+6))) << 8) | \
// ((int64_t)(*(p+7)));
//}
int size = read_int64(resp.pkg_len);//获取报体长度
char *buf = (char*)calloc(size + 1, sizeof(char));
if((ret_code = tcprecvdata(sockfd, buf, size, 10, &count) != 0))
{
fprintf(stderr, "tcprecvdata error: %s\n", strerror(ret_code));
return 1;
}
// 报文体
// group_name |ip |port |storage_index
// 16 bytes |16 bytes |8 bytes |
//#define TRACKER_QUERY_STORAGE_STORE_BODY_LEN 40
if(count != TRACKER_QUERY_STORAGE_STORE_BODY_LEN)
{
fprintf(stderr, "invalid message");
return 1;
}
//group name
//#define FDFS_GROUP_NAME_MAX_LEN 16
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1] = {0};
memcpy(group_name, buf, FDFS_GROUP_NAME_MAX_LEN);
group_name[FDFS_GROUP_NAME_MAX_LEN] = ‘\0‘;
//ip: port
//#define IP_ADDRESS_SIZE 16
//port:8 bytes
char ip[IP_ADDRESS_SIZE + 1] = {0};
memcpy(ip, buf + FDFS_GROUP_NAME_MAX_LEN, IP_ADDRESS_SIZE - 1);
char szPort[8] = {0};
memcpy(szPort, buf + FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE - 1, 8);
ip[IP_ADDRESS_SIZE] = ‘\0‘;
int port = read_int64(szPort);
//storage index;
char *storage_index = buf + FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE - 1 + FDFS_PROTO_PKG_LEN_SIZE;
三:以上步骤完成后,获取storage的ip 和 port后,就可以上传文件了
在官方的客户端中,文件操作有upload,download, append,delete等,这里只涉及upload
上传文件中,官方给出了三种方式
1)通过buffer上传,即将文件读取进内存,然后在发送
2)使用sendfile,sendfile是Linux提供的一个库函数
3)通过回调函数的方式
这里主要涉及的是第一种,通过buffer上传的方式
文件上传协议:
//文件上传协议头部
10 bytes | 1 bytes | 8 bytes | 6 bytes |
TrackerHeader | storage_index | 文件长度 | 文件名或者全为0) |
//storage_index 是客户端向tracker server申请storage index时候返回的结果
//文件名 如果不为空,那么取前6位,或者可以全部设置为0
//上传完成 storage回复客户端协议
10 bytes | 16 bytes | TrackerHeader.pkg_len - 16bytes
TrackerHeader | groupname | remote file name
void uploadfile(int sockfd, const char *filepath, char *storage_index)
{
char out_buf[512];
TrackerHeader *pHeader;
char *p = out_buf;
char *buf = NULL;
//TrackerHeader 10 bytes
//文件上传协议头部
//10 bytes | 1 bytes | 8 bytes | 6 bytes |
//TrackerHeader | storage_index | 文件长度 | 文件名或者全为0) |
pHeader = (TrackerHeader*)out_buf;
p += sizeof(TrackerHeader);
//storage index 1 bytes
*p++ = *storage_index;
//filesize 8bytes
long int filesize = 0;
int ret = 0;
//读取文件到buf,并且返回文件长度 filesize
if((ret = getfilebuf(&buf, &filesize, filepath) != 0))
{
fprintf(stderr, "getfilebuf failed: %s\n", strerror(ret));
return;
}
//void write_int64(int64_t n, char *buff)
//{
// unsigned char *p;
// p = (unsigned char *)buff;
// *p++ = (n >> 56) & 0xFF;
// *p++ = (n >> 48) & 0xFF;
// *p++ = (n >> 40) & 0xFF;
// *p++ = (n >> 32) & 0xFF;
// *p++ = (n >> 24) & 0xFF;
// *p++ = (n >> 16) & 0xFF;
// *p++ = (n >> 8) & 0xFF;
// *p++ = n & 0xFF;
//}
write_int64(filesize, p);
//#define FDFS_PROTO_PKG_LEN_SIZE 8
p += FDFS_PROTO_PKG_LEN_SIZE;
//ext_name
//#define FDFS_FILE_EXT_NAME_MAX_LEN 6
memset(p, 0, FDFS_FILE_EXT_NAME_MAX_LEN);
p += FDFS_FILE_EXT_NAME_MAX_LEN;
//set TrackerHeader
write_int64(p - out_buf + filesize - sizeof(TrackerHeader), pHeader->pkg_len);
//#define STORAGE_PROTO_CMD_UPLOAD_FILE 11
pHeader->cmd = STORAGE_PROTO_CMD_UPLOAD_FILE;
pHeader->status = 0;
//发送报文头部
int count;
int ret_code = 0;
if((ret_code = tcpsenddata(sockfd, out_buf, p - out_buf, 10, &count) != 0)) {
fprintf(stderr, "tcpsenddata failed: %s\n", strerror(errno));
return;
}
//发送报文体,具体文件数据
if((ret_code = tcpsenddata(sockfd, buf, filesize, 10, &count)) != 0) {
fprintf(stderr, "tcpsenddata body failed: %s\n", strerror(errno));
return;
}
//接收storage server回复
//上传完成 storage回复客户端协议
//10 bytes | 16 bytes | TrackerHeader.pkg_len - 16bytes
//TrackerHeader | groupname | remote file name
TrackerHeader resp;
if((ret_code = tcprecvdata(sockfd, &resp, sizeof(TrackerHeader), 1000, &count)) != 0) {
fprintf(stderr, "tcprecvdata failed: %s\n", strerror(ret_code));
return;
}
if(count != sizeof(TrackerHeader)) {
fprintf(stderr, "invalid header");
return;
}
int64_t bodylen = read_int64(resp.pkg_len);
//接收报文体
char *in_buf = (char*)calloc(bodylen + 1, sizeof(char));
if((ret_code = tcprecvdata(sockfd, in_buf, bodylen, 10, &count)) != 0)
{
fprintf(stderr, "read body failed: %s\n", strerror(ret_code));
return;
}
//groupname
//#define FDFS_GROUP_NAME_MAX_LEN 16
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
memcpy(group_name, in_buf, FDFS_GROUP_NAME_MAX_LEN);
group_name[FDFS_GROUP_NAME_MAX_LEN] = ‘\0‘;
//remote filename
char remote_filename[bodylen - FDFS_GROUP_NAME_MAX_LEN + 1];
memcpy(remote_filename, in_buf + FDFS_GROUP_NAME_MAX_LEN, bodylen - FDFS_GROUP_NAME_MAX_LEN + 1);
cout << "groupname: " << group_name << endl;
cout << "remote_filename: " << remote_filename << endl;
char httpaddr[128] = {0};
sprintf(httpaddr, "http://106.75.129.177:8080/%s/%s", group_name, remote_filename);
cout << "httpaddr: " << httpaddr << endl;//http地址
}
以下附上完整代码, ubuntu14位, 编译器 g++,测试已通过
#include <iostream>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
using namespace std;
#define FDFS_GROUP_NAME_MAX_LEN 16
#define FDFS_PROTO_PKG_LEN_SIZE 8
#define IP_ADDRESS_SIZE 16
//cmd
#define TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE 101
#define STORAGE_PROTO_CMD_UPLOAD_FILE 11
#define TRACKER_QUERY_STORAGE_STORE_BODY_LEN (FDFS_GROUP_NAME_MAX_LEN \
+ IP_ADDRESS_SIZE - 1 + FDFS_PROTO_PKG_LEN_SIZE + 1)
#define FDFS_FILE_EXT_NAME_MAX_LEN 6
typedef struct {
char pkg_len[FDFS_PROTO_PKG_LEN_SIZE];
char cmd;
char status;
}TrackerHeader;
//set socketfd nonblocking
int setnonblocking(int sockfd);
int tcprecvdata(int sockfd, void *data, const int size, const int timeout_ms, int *count);
int tcpsenddata(int sockfd, void *data, const int size,const int timeout_ms, int *count);
int64_t read_int64(const char* buf);
void write_int64(int64_t n, char* buf);
void uploadfile(int sockfd, const char *filepath, char *storage_index);
int getfilebuf(char **buf, long int *filesize, const char* filepath);
//apply storage address from tracker server
int main() {
const char *ip = "127.0.0.1";
uint16_t port = 22122;
int ret_code = 0;
int sockfd = -1;
int count = 0;
//connect tracker server
if((sockfd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
{
fprintf(stderr, "socket errnor: %s\n", strerror(errno));
return 1;
}
if((ret_code = setnonblocking(sockfd)) != 0)
{
fprintf(stderr, "setnonblocking error: %s\n", strerror(ret_code));
return 1;
}
struct sockaddr_in addr;
addr.sin_addr.s_addr = inet_addr(ip);
addr.sin_port = htons(port);
addr.sin_family = AF_INET;
socklen_t len = sizeof(struct sockaddr);
if(connect(sockfd, (struct sockaddr*)&addr, len) < 0)
{
fprintf(stderr, "connect error: %s\n", strerror(errno));
return 1;
}
TrackerHeader header;
memset(&header, 0, sizeof(TrackerHeader));
header.cmd = TRACKER_PROTO_CMD_SERVICE_QUERY_STORE_WITHOUT_GROUP_ONE;
if(tcpsenddata(sockfd, &header, sizeof(TrackerHeader), 10, &count) != 0)
{
fprintf(stderr, "tcpsenddata error: %s\n", strerror(errno));
return 1;
}
else
{
//recv header
TrackerHeader resp;
if((ret_code = tcprecvdata(sockfd, &resp, sizeof(TrackerHeader), 10, &count)) != 0)
{
fprintf(stderr, "tcprecvdata error: %s\n", strerror(ret_code));
return 1;
}
cout << "recv header: " << count << endl;
//read body;
int size = read_int64(resp.pkg_len);
char *buf = (char*)calloc(size + 1, sizeof(char));
if((ret_code = tcprecvdata(sockfd, buf, size, 10, &count) != 0))
{
fprintf(stderr, "tcprecvdata error: %s\n", strerror(ret_code));
return 1;
}
//body
// group_name |ip |port |storage_index
// 16bytes |16bytes |8bytes |
cout << "read body: " << count << endl;
if(count != TRACKER_QUERY_STORAGE_STORE_BODY_LEN)
{
fprintf(stderr, "invalid message");
return 1;
}
//group name
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1] = {0};
memcpy(group_name, buf, FDFS_GROUP_NAME_MAX_LEN);
group_name[FDFS_GROUP_NAME_MAX_LEN] = ‘\0‘;
cout << "group name: " << group_name << endl;
//ip: port
char ip[IP_ADDRESS_SIZE + 1] = {0};
memcpy(ip, buf + FDFS_GROUP_NAME_MAX_LEN, IP_ADDRESS_SIZE - 1);
char szPort[8] = {0};
memcpy(szPort, buf + FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE - 1, 8);
ip[IP_ADDRESS_SIZE] = ‘\0‘;
int port = read_int64(szPort);
cout << "address: " << ip << ":" << port << endl;
//storage index;
char *storage_index = buf + FDFS_GROUP_NAME_MAX_LEN + IP_ADDRESS_SIZE - 1 + FDFS_PROTO_PKG_LEN_SIZE;
cout << "storage_index: " << storage_index << endl;
free(buf);
//connect storage server
sockaddr_in st_addr;
st_addr.sin_addr.s_addr = inet_addr(ip);
st_addr.sin_family = AF_INET;
st_addr.sin_port = htons(port);
int storage_fd = socket(AF_INET, SOCK_STREAM, 0);
if(storage_fd < 0) {
fprintf(stderr, "socket failed: %s\n", strerror(errno));
return 1;
}
socklen_t len2 = sizeof(sockaddr_in);
if(connect(storage_fd, (struct sockaddr*)&st_addr, len2) < 0) {
fprintf(stderr, "connect failed: %s\n", strerror(errno));
return 1;
}
uploadfile(storage_fd, "1.jpg", storage_index);
}
return 0;
}
int getfilebuf(char **buf, long int *filesize, const char *filepath)
{
int ret_code = 0;
FILE *fp = fopen(filepath, "rb+");
if(fp == NULL)
{
ret_code = errno;
return ret_code;
}
//get filesize;
fseek(fp, 0, SEEK_END);
*filesize = ftell(fp);
fseek(fp, 0, SEEK_SET);
cout << "get filesize: " <<*filesize << endl;
//malloc buf
*buf = (char*)calloc(*filesize + 1, sizeof(char));
if(*buf == NULL) {
ret_code = errno;
return ret_code;
}
int read_bytes = 0;
int left_bytes = *filesize;
char *p = *buf;
while(left_bytes > 0) {
read_bytes = fread(p, sizeof(char), left_bytes, fp);
left_bytes -= read_bytes;
p += read_bytes;
}
return ret_code;
}
void uploadfile(int sockfd, const char *filepath, char *storage_index)
{
char out_buf[512];
TrackerHeader *pHeader;
char *p = out_buf;
char *buf = NULL;
//TrackerHeader 10 bytes
pHeader = (TrackerHeader*)out_buf;
p += sizeof(TrackerHeader);
//storage index 1 bytes
*p++ = *storage_index;
//filesize 8bytes
long int filesize = 0;
int ret = 0;
if((ret = getfilebuf(&buf, &filesize, filepath) != 0))
{
fprintf(stderr, "getfilebuf failed: %s\n", strerror(ret));
return;
}
printf("filesize: %ld\n", filesize);
write_int64(filesize, p);
p += FDFS_PROTO_PKG_LEN_SIZE;
//ext_name
memset(p, 0, FDFS_FILE_EXT_NAME_MAX_LEN);
p += FDFS_FILE_EXT_NAME_MAX_LEN;
//set TrackerHeader
write_int64(p - out_buf + filesize - sizeof(TrackerHeader), pHeader->pkg_len);
pHeader->cmd = STORAGE_PROTO_CMD_UPLOAD_FILE;
pHeader->status = 0;
//send header
int count;
int ret_code = 0;
if((ret_code = tcpsenddata(sockfd, out_buf, p - out_buf, 10, &count) != 0)) {
fprintf(stderr, "tcpsenddata failed: %s\n", strerror(errno));
return;
}
//send body
if((ret_code = tcpsenddata(sockfd, buf, filesize, 10, &count)) != 0) {
fprintf(stderr, "tcpsenddata body failed: %s\n", strerror(errno));
return;
}
//recv response
TrackerHeader resp;
if((ret_code = tcprecvdata(sockfd, &resp, sizeof(TrackerHeader), 1000, &count)) != 0) {
fprintf(stderr, "tcprecvdata failed: %s\n", strerror(ret_code));
return;
}
if(count != sizeof(TrackerHeader)) {
fprintf(stderr, "invalid header");
return;
}
int64_t bodylen = read_int64(resp.pkg_len);
char *in_buf = (char*)calloc(bodylen + 1, sizeof(char));
if((ret_code = tcprecvdata(sockfd, in_buf, bodylen, 10, &count)) != 0)
{
fprintf(stderr, "read body failed: %s\n", strerror(ret_code));
return;
}
//groupname
char group_name[FDFS_GROUP_NAME_MAX_LEN + 1];
memcpy(group_name, in_buf, FDFS_GROUP_NAME_MAX_LEN);
group_name[FDFS_GROUP_NAME_MAX_LEN] = ‘\0‘;
//remote filename
char remote_filename[bodylen - FDFS_GROUP_NAME_MAX_LEN + 1];
memcpy(remote_filename, in_buf + FDFS_GROUP_NAME_MAX_LEN, bodylen - FDFS_GROUP_NAME_MAX_LEN + 1);
cout << "groupname: " << group_name << endl;
cout << "remote_filename: " << remote_filename << endl;
char httpaddr[128] = {0};
sprintf(httpaddr, "http://127.0.0.1:8080/%s/%s", group_name, remote_filename);
cout << "httpaddr" << httpaddr << endl;
}
void write_int64(int64_t n, char *buff)
{
unsigned char *p;
p = (unsigned char *)buff;
*p++ = (n >> 56) & 0xFF;
*p++ = (n >> 48) & 0xFF;
*p++ = (n >> 40) & 0xFF;
*p++ = (n >> 32) & 0xFF;
*p++ = (n >> 24) & 0xFF;
*p++ = (n >> 16) & 0xFF;
*p++ = (n >> 8) & 0xFF;
*p++ = n & 0xFF;
}
int64_t read_int64(const char *buff)
{
unsigned char *p;
p = (unsigned char *)buff;
return (((int64_t)(*p)) << 56) | (((int64_t)(*(p+1))) << 48) | (((int64_t)(*(p+2))) << 40) | (((int64_t)(*(p+3))) << 32) | (((int64_t)(*(p+4))) << 24) | (((int64_t)(*(p+5))) << 16) | (((int64_t)(*(p+6))) << 8) | ((int64_t)(*(p+7)));
}
int setnonblocking(int sockfd)
{
int ret_code = 0;
if(fcntl(sockfd, F_SETFD, O_NONBLOCK) < 0) {
ret_code = errno;
}
return ret_code;
}
int tcpsenddata(int sockfd, void *data, const int size,const int timeout_ms, int *count)
{
int left_bytes = size;
int write_bytes = 0;
int ret_code = 0;
int res = 0;
char *p = (char*)data;
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(sockfd, &rfds);
while(left_bytes > 0) {
write_bytes = send(sockfd, p, left_bytes, 0);
if(write_bytes > 0)
{
left_bytes -= write_bytes;
p += write_bytes;
continue;
}
else if(write_bytes < 0)
{
if(!(errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK))
{
ret_code = errno == 0 ? errno : EINTR;
break;
}
}
else
{
ret_code = ENOTCONN;
break;
}
if(timeout_ms <= 0)
{
res = select(sockfd + 1, &rfds, NULL, NULL, NULL);
}
else
{
struct timeval tv;
tv.tv_usec = timeout_ms;
tv.tv_sec = 0;
res = select(sockfd + 1, &rfds, NULL, NULL, &tv);
}
if(res == 0)
{
ret_code = ETIMEDOUT;
break;
}
if(res < 0)
{
if(errno == EINTR)
{
continue;
}
ret_code = errno == 0 ? errno : EINTR;
}
}
if(count != NULL)
{
*count = size - left_bytes;
}
return ret_code;
}
int tcprecvdata(int sockfd, void *data, const int size, const int timeout_ms, int *count) {
int left_bytes = size;
int read_bytes = 0;
int ret_code = 0;
int res = 0;
char *p = (char*)data;
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(sockfd, &rfds);
while(left_bytes > 0) {
read_bytes = recv(sockfd, p, left_bytes, 0);
if(read_bytes > 0)
{
left_bytes -= read_bytes;
p += read_bytes;
continue;
}
else if(read_bytes < 0)
{
if(!(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR))
{
ret_code = errno != 0 ? errno : EINTR;
break;
}
}
else
{
ret_code = ENOTCONN;
break;
}
if(timeout_ms <= 0)
{
res = select(sockfd + 1, &rfds, NULL, NULL, NULL);
}
else
{
struct timeval tv;
tv.tv_usec = timeout_ms;
tv.tv_sec = 0;
res = select(sockfd + 1, &rfds, NULL, NULL, &tv);
}
if(res == 0)
{
ret_code = ETIMEDOUT;
break;
}
if(res < 0)
{
if(errno == EINTR)
{
continue;
}
ret_code = errno == 0 ? errno : EINTR;
break;
}
}
if(count != NULL)
{
*count = size - left_bytes;
}
return ret_code;
}