BitTorrent协议java实现分析

BitTorrent学习

最近做的项目里,利用开源的NanoHttpd搭建了嵌入安卓的服务器。最近要做的一部分工作,就是基于BitTorrent协议的WIFI下载实现。

协议介绍

普通的HTTP/FTP下载使用TCP/IP协议,BitTorrent协议是架构于TCP/IP协议之上的一个P2P文件传输协议,处于TCP/IP结构的应用层。BitTorrent协议本身也包含了很多具体的内容协议和扩展协议,并在不断扩充中。

根据BitTorrent协议,文件发布者会根据要发布的文件生成提供一个.torrent文件,即种子文件,也简称为“种子”。

种子文件本质上是文本文件,包含Tracker信息和文件信息两部分。

Tracker信息主要是BT下载中需要用到的Tracker服务器的地址和针对Tracker服务器的设置,文件信息是根据对目标文件的计算生成的。

文件信息的主要原理是需要把提供下载的文件虚拟分成大小相等的块,块大小必须为2k的整数次方(由于是虚拟分块,硬盘上并不产生各个块文件),并把每个块的索引信息和Hash验证码写入种子文件中;所以,种子文件就是被下载文件的“索引”。下载者要下载文件内容,需要先得到相应的种子文件。

下载时,BT客户端首先解析种子文件得到Tracker地址,然后连接Tracker服务器。Tracker服务器回应下载者的请求,提供下载者其他下载者(包括发布者)的IP。下载者再连接其他下载者,根据种子文件,两者分别告知对方自己已经有的块,然后交换对方所没有的数据。

下载者每得到一个块,需要算出下载块的Hash验证码与种子文件中的对比,如果一样则说明块正确,不一样则需要重新下载这个块。这种规定是为了解决下载内容准确性的问题。

Tracker 服务器

tracker服务器是BT下载中必须的角色。一个BTclient在下载开始以及下载进行的过程中,要不停的与tracker服务器进行通信,以报告自己的信息,并获取其它下载client的信息。

Tracker 服务器的过程

  • client向tracker发一个HTTP的GET请求,并把它自己的信息放在GET的参数中;这个请求的大致意思是:我是xxx(一个唯一的id),我想下载yyy文件,我的ip是aaa,我用的端口是bbb。。。
  • tracker对所有下载者的信息进行维护,当它收到一个请求后,首先把对方的信息记录下来(如果已经记录在案,那么就检查是否需要更新),然后将一部分(并非全部,根据设置的参数已经下载者的请求)参与下载同一个文件(一个tracker服务器可能同时维护多个文件的下载)的下载者的信息返回给对方。
  • Client在收到tracker的响应后,就能获取其它下载者的信息,那么它就可以根据这些信息,与其它下载者建立连接,从它们那里下载文件片断。

BitTorrent协议执行过程

根据BitTorrent协议,文件发布者会根据要发布的文件生成提供一个种子文件。下载者要下载文件内容,需要先得到相应的种子文件,然后使用BT客户端软件进行下载。

下载时,**BT客户端首先解析种子文件得到Tracker地址,然后连接Tracker服务器。

下载者每得到一个块,需要算出下载块的Hash验证码与种子文件中的对比,如果一样则说明块正确,不一样则需要重新下载这个块。这种规定是为了解决下载内容准确性的问题。**

Bencode

Bencode(发音为 Bee-Encode)是 BitTorrent 用在传输数据结构的编码方式。支持四种编码方式:

  • 字符串
  • 整数
  • 串列
  • 字典表

Bencode编码规则

  • 一个整型数会以十进制数编码并括在i和e之间,不允许前导零(但0依然为整数0),负数在编码后直接加前导负号,不允许负零。如整型数“42”编码为“i42e”,数字“0”编码为“i0e”, “-42”编码为“i-42e”。
  • 一个字节的字符串(只是一个字节的字符串,不一定是一个方块字)会以(长度):(内容)编码,长度的值和数字编码方法一样,只是不允许负数;内容就是字符串的内容,如字符串“spam”就会编码为“4:spam”
  • 线性表会以l和e括住来编码,其中的内容为Bencode四种编码格式所组成的编码字符串,如包含和字符串“spam”数字“42”的线性表会被编码为“l4:spami42ee”,注意分隔符要对应配对。

    分析:

    字符串spam编码后为 4:spam ,42编码后为 i42e,变形表以l和e括住他们俩,即为l 4:spam i42e e

  • 字典表会以d和e括住来编码,字典元素的键和值必须紧跟在一起,而且所有键为字符串类型并按字典顺序排好。如键为“bar”值为字符串“spam”和键为“foo”值为整数“42”的字典表会被编码为“d3:bar4:spam3:fooi42ee”。

    分析: d 3:bar 4:spam 3:foo i42e e。d键-值 键-值e

种子文件

torrent种子文件本质上是文本文件,包含Tracker信息和文件信息两部分。Tracker信息主要是BT下载中需要用到的Tracker服务器的地址和针对Tracker服务器的设置,文件信息是根据对目标文件的计算生成的,计算结果根据BitTorrent协议内的Bencode规则进行编码。它的主要原理是需要把提供下载的文件虚拟分成大小相等的块,块大小必须为2k的整数次方(由于是虚拟分块,硬盘上并不产生各个块文件),并把每个块的索引信息和Hash验证码写入种子文件中;所以,种子文件就是被下载文件的“索引”。



种子文件包含以下数据:

  • announce tracker的URL
  • info 该条映射到一个字典,该字典的键将取决于共享的一个或多个文件:
  • name 建议保存到的文件和目录名称
  • piece length 每个文件块的字节数。通常为2^{8} = 256KB = 262144B
  • pieces 每个文件块的SHA-1的集成Hash。因为SHA-1会返回160-bit的Hash,所以pieces将会得到1个160-bit的整数倍的字符串。和一个length(相当于只有一个文件正在共享)或files(相当于当多个文件被共享):
  • length 文件的大小(以字节为单位)

    -files 一个字典的列表(每个字典对应一个文件)与以下的键:

  • path 一个对应子目录名的字符串列表,最后一项是实际的文件名称
  • length 文件的大小(以字节为单位)

开源项目学习

为了具体学习一下BitTorrent协议的实现,着手研究github上开源项目Ttorrent,github地址是https://github.com/mpetazzoni/ttorrent

该开源项目分以下包:

  • bcodec:这部分主要实现了上面所说的种子文件编码部分的BEncode和BDecode,用来分析.torrent文件,并从中获得想要的信息。
  • common.protocol:这部分是协议的实现。
  • client包,用于实现Bt协议的客户端。
  • tacker包,用于实现tracker服务器的部分。

bcoded编码解码包

这部分的实现符合上面介绍的编码和解码的规则即可,这里不深入进行学习。

protocol协议包

其中Peer类类似Bean数据,存储的是BT网络中的节点信息。 比较简单这里略过。

Torrent类是BitTorrent协议的实现。主要分为几个部分,以其中几个方法分析:

.torrent文件解析部分

public static Torrent load(File torrent, boolean seeder)

throws IOException, NoSuchAlgorithmException {

byte[] data = FileUtils.readFileToByteArray(torrent);

return new Torrent(data, seeder);

}

上面这部分从.torrent文件中读取到二进制缓存。然后送入Torrent构造方法处理:

方法比较长,取关键部分分析:

拿到二进制数据先BDecode解码,转成一个字典表,然后取出info中的信息进行加密,之后再哈希计算出签名,用于后面核对完整性:

      this.decoded = BDecoder.bdecode(
            new ByteArrayInputStream(this.encoded)).getMap();
    this.decoded_info = this.decoded.get("info").getMap();
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    BEncoder.bencode(this.decoded_info, baos);
    this.encoded_info = baos.toByteArray();
    this.info_hash = Torrent.hash(this.encoded_info);
    this.hex_info_hash = Torrent.byteArrayToHexString(this.info_hash);

在获取的torrent文件中,有bep0012 Multitracker Metadata Extension模式,可以解析获得多个tracker。

if (this.decoded.containsKey("announce-list")) {
    ...处理解析多个trackers
}

如果是单个tracker,则分析announce字段中的内容,比如下面torrent 文件的内容:

{

‘announce’: ‘http://bttracker.debian.org:6969/announce‘,

‘info’:

{

‘name’: ‘debian-503-amd64-CD-1.iso’,

‘piece length’: 262144,

‘length’: 678301696,

‘pieces’: ‘841ae846bc5b6d7bd6e9aa3dd9e551559c82abc1…d14f1631d776008f83772ee170c42411618190a4’

}

}

在上面的代码片段中,取出了关于下载文件信息的info。并对其解码,然后取出了announce中的tracker地址并进行添加。

else if (this.decoded.containsKey("announce")) {
            URI tracker = new URI(this.decoded.get("announce").getString());
            this.allTrackers.add(tracker);

            // Build a single-tier announce list.
            List<URI> tier = new ArrayList<URI>();
            tier.add(tracker);
            this.trackers.add(tier);
        }

之后取出其中的信息并保存在Torrent类的属性中。

然后分析我们要下载的文件,上面的torrent信息下载单文件,没有files字段。单文件下载name中存储的就是文件名,length就是文件长度。

public static class TorrentFile {

    public final File file;
    public final long size;

    public TorrentFile(File file, long size) {
        this.file = file;
        this.size = size;
    }
}

内部类TorrentFile用来存储种子文件中下载的文件的信息。

this.files.add(new TorrentFile(
            new File(this.name),
            this.decoded_info.get("length").getLong()));

将文件的信息存储在我们的files列表中,用于后面取出下载。


创建种子文件部分

create方法有多重重载,取其中一个分析:

    public static Torrent create(File source, URI announce, String createdBy)
    throws InterruptedException, IOException, NoSuchAlgorithmException {
    return Torrent.create(source, null, DEFAULT_PIECE_LENGTH,
            announce, null, createdBy);
}

其中source是种子文件中的file字段,annouce是要用到的tracker的地址,createBy是种子发布者的名字。调用其中的create方法如下:

    private static Torrent create(File parent, List<File> files, int pieceLength,
            URI announce, List<List<URI>> announceList, String createdBy)
        throws InterruptedException, IOException, NoSuchAlgorithmException{
        .....
    }

根据我们给入的参数,编码后填入种子文件的各个字段,即构造.torrent的数据结构的过程。比如以下代码片段:

torrent.put("creation date", new BEValue(new Date().getTime() / 1000));
    torrent.put("created by", new BEValue(createdBy));
    Map<String, BEValue> info = new TreeMap<String, BEValue>();
    info.put("name", new BEValue(parent.getName()));
    info.put("piece length", new BEValue(pieceLength));

向其中填入了name、peice length等字段的信息。 最后编码后新生成一个Torrent类。

    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    BEncoder.bencode(new BEValue(torrent), baos);
    return new Torrent(baos.toByteArray(), true);

最后通过调用save方法,将类序列化为一个.torrent文件。

public void save(OutputStream output) throws IOException {

output.write(this.getEncoded());

}


编码部分

除了上面两个部分,Torrent类中还有编码部分。比如哈希编码,BEncode编码等,比如下面的对数据块的哈希编码方法:

private static class CallableChunkHasher implements Callable<String> {

    private final MessageDigest md;
    private final ByteBuffer data;

    CallableChunkHasher(ByteBuffer buffer) throws NoSuchAlgorithmException {
        this.md = MessageDigest.getInstance("SHA-1");

        this.data = ByteBuffer.allocate(buffer.remaining());
        buffer.mark();
        this.data.put(buffer);
        this.data.clear();
        buffer.reset();
    }

    @Override
    public String call() throws UnsupportedEncodingException {
        this.md.reset();
        this.md.update(this.data.array());
        return new String(md.digest(), Torrent.BYTE_ENCODING);
    }
}

这里通过Java的Callable和Future进行操作,即新开线程对各个块的数据进行散列加密,防止阻塞。计算完成后返回结果。在后面调用的时候,可以通过ExecutorService进行调用:

    ExecutorService executor = Executors.newCachedThreadPool();
    Task task = new Task();
    Future<Integer> result = executor.submit(task);
    executor.shutdown();

上面的Task类哈希任务类CallableChunkHasher。这样,在计算文件所划分各个小区域块时,通过一个一个小块计算,获取sha-1的160为哈希值,然后存入pieces字段。方法比较长,对关键部分进行分析:

int threads = getHashingThreadsCount();
    ExecutorService executor = Executors.newFixedThreadPool(threads);

首先获取执行散列计算的线程的数目,然后声明一个ExecutorService,利用其去管理调度线程任务。之后再进行一些初始化的声明,比如:

    ByteBuffer buffer = ByteBuffer.allocate(pieceLenght);
    List<Future<String>> results = new LinkedList<Future<String>>();
    StringBuilder hashes = new StringBuilder();

其中buffer用来存储计算后的pieces散列值,results是Future类,当通过ExecutorService提交一个任务后,返回的Future对象可以用来跟踪线程的运行状况,从而可以获得计算的结果。StringBuilder用来将计算的哈希值拼接成最后的pieces字符串。

        FileInputStream fis = new FileInputStream(file);
        FileChannel channel = fis.getChannel();
        int step = 10;

读取文件,

                while (channel.read(buffer) > 0) {
                if (buffer.remaining() == 0) {
                    buffer.clear();
                    results.add(executor.submit(new CallableChunkHasher(buffer)));
                    }
                }

将文件读取到缓存,缓存满的时候开启一个用来计算散列值的线程,并添加返回的Future对象。详细实现这里不进行分析,仅知道这部分通过多线程来实现高效计算,最后返回pieces值,即各个块sha-1后的结果。

至此,协议部分分析完毕,还剩下客户端的client包和tracker服务器端的tracker包。

tracker包学习

Tracker类

Tracker类是BitTorrent协议中Tracker服务器的实现。Tracker服务器是帮助BitTorrent协议在节点与节点之间做连接的服务器。

BitTorrent客户端下载一开始就要连接到tracker,从tracker获得其他客户端IP地址后,才能连接到其他客户端下载。在传输过程中,也会一直与tracker通信,上传自己的信息,获取其它客户端的信息。

实现中,默认端口为6969,也是BitTorrent协议的默认实现端口。

Tracker类中主要有两个线程,一个tracker线程,一个collector线程。

执行start方法时,依次开启两个新的线程。

tracker线程中主要执行:

connection.connect(address);

connection是simpleframework的方法,会对指定的地址address进行监听。

collector线程中,当tracker一直运行的时候,循环执行

for (TrackedTorrent torrent : torrents.values()) {
                torrent.collectUnfreshPeers();
            } 

public void collectUnfreshPeers() {
    for (TrackedPeer peer : this.peers.values()) {
        if (!peer.isFresh()) {
            this.peers.remove(peer.getHexPeerId());
        }
    }
}

可以看到遍历tracker上已经追踪的torrent并找出不在活跃期的peer,并将其从列表中删除。

    public synchronized TrackedTorrent announce(TrackedTorrent torrent) {
    TrackedTorrent existing = this.torrents.get(torrent.getHexInfoHash());

    if (existing != null) {
        logger.warn("Tracker already announced torrent for ‘{}‘ " +
            "with hash {}.", existing.getName(), existing.getHexInfoHash());
        return existing;
    }

    this.torrents.put(torrent.getHexInfoHash(), torrent);
    logger.info("Registered new torrent for ‘{}‘ with hash {}.",
        torrent.getName(), torrent.getHexInfoHash());
    return torrent;
}

announce方法中,给入一个torrent,依据计算出的哈希值,如果tracker中不存在 则加入,如果存在则返回其中的torrent。

    public synchronized void remove(Torrent torrent) {
    if (torrent == null) {
        return;
    }

    this.torrents.remove(torrent.getHexInfoHash());
}

remove方法移除已经在tracker上发布的种子文件。

TrackedTorrent类

trackedTorrent中主要维护已经在tracker上的torrent的下载信息,即对这个torrent文件参与下载的客户端的数量、状态等信息的管理。

update方法:

public TrackedPeer update(RequestEvent event, ByteBuffer peerId,
    String hexPeerId, String ip, int port, long uploaded, long downloaded,
    long left) throws UnsupportedEncodingException {
    TrackedPeer peer;
    TrackedPeer.PeerState state = TrackedPeer.PeerState.UNKNOWN;

    if (RequestEvent.STARTED.equals(event)) {
        peer = new TrackedPeer(this, ip, port, peerId);
        state = TrackedPeer.PeerState.STARTED;
        this.addPeer(peer);
    } else if (RequestEvent.STOPPED.equals(event)) {
        peer = this.removePeer(hexPeerId);
        state = TrackedPeer.PeerState.STOPPED;
    } else if (RequestEvent.COMPLETED.equals(event)) {
        peer = this.getPeer(hexPeerId);
        state = TrackedPeer.PeerState.COMPLETED;
    } else if (RequestEvent.NONE.equals(event)) {
        peer = this.getPeer(hexPeerId);
        state = TrackedPeer.PeerState.STARTED;
    } else {
        throw new IllegalArgumentException("Unexpected announce event type!");
    }

    peer.update(state, uploaded, downloaded, left);
    return peer;
}

其中输入的event是一个枚举,用来更新节点peer的新的状态。STARTED时,创建一个新的peer节点并填入当前torrent维护的peers,其余情况直接从已经有的peers中取得peer并更新相关的状态。

getSomePeers方法:

从候选的peers中选择合适的peer返回作为answerpeer

TrackedPeer类

在tracker上的一个trackedtorrent中进行数据交换、参与活动的节点client-peer,并维护其中节点的状态的类。

TrackerService类

参考文献:https://wiki.theory.org/BitTorrentSpecification

本类主要做了处理客户端请求的响应工作,用来处理tracker HTTP协议的请求。客户端发来的请求信息帮助tracker了解种子的信息,并且作出响应帮助客户端了解参与下载的其他用户的信息。

客户端携带peerid、上传下载量等信息,服务器依据此更新已经annouce的种子信息。

peer = torrent.update(event,
            ByteBuffer.wrap(announceRequest.getPeerId()),
            announceRequest.getHexPeerId(),
            announceRequest.getIp(),
            announceRequest.getPort(),
            announceRequest.getUploaded(),
            announceRequest.getDownloaded(),
            announceRequest.getLeft());

服务器返回tracker id、peers等信息帮助客户端进行下载:

announceResponse = HTTPAnnounceResponseMessage.craft(
            torrent.getAnnounceInterval(),
            TrackedTorrent.MIN_ANNOUNCE_INTERVAL_SECONDS,
            this.version,
            torrent.seeders(),
            torrent.leechers(),
            torrent.getSomePeers(peer));
        WritableByteChannel channel = Channels.newChannel(body);
        channel.write(announceResponse.getData());

大体上的思路就是这样,具体的一些内容,比如关于url解析相应格式等还要参照项目代码。

总结

BitTorrent协议服务器端的工作主要如上,服务器和client端通过http进行通信,服务器主要维护了种子的信息,并帮助client选择下载文件的对象。

client之间通过tcp协议实现文件的下载,实现部分的代码分析在最后的包client中。

时间: 2024-11-10 13:52:47

BitTorrent协议java实现分析的相关文章

Kademlia、DHT、KRPC、BitTorrent 协议、DHT Sniffer

catalogue 0. 引言 1. Kademlia协议 2. KRPC 协议 KRPC Protocol 3. DHT 公网嗅探器实现(DHT 爬虫) 4. BitTorrent协议 5. uTP协议 6. Peer Wire协议 7. BitTorrent协议扩展与ut_metadata和ut_pex(Extension for Peers to Send Metadata Files) 8. 用P2P对等网络思想改造C/S.B/S架构的思考 0. 引言 平常我们高端用户都会用到BT工具来

java代码分析及分析工具

java代码分析及分析工具 一个项目从搭建开始,开发的初期往往思路比较清晰,代码也比较清晰.随着时间的推移,业务越来越复杂.代码也就面临着耦合,冗余,甚至杂乱,到最后谁都不敢碰. 作为一个互联网电子商务网站的业务支撑系统,业务复杂不言而喻.从09年开始一直沿用到现在,中间代码经过了多少人的手,留下了多少的坑,已经记不清楚了,谁也说不清了. 代码的维护成本越来越高.代码已经急需做调整和改善.最近项目组专门设立了一个小组,利用业余时间做代码分析的工作,目标对核心代码进行分析并进行设计重构. 代码分析

HDFS API的java代码分析与实例

HDFS API的java代码分析与实例 1.HDFS常用的方法,我已经写好,我们看一下 // Create()方法,直接在HDFS中写入一个新的文件,path为写入路径,text为写入的文本内容 public static void  Create(String path,String text) throws IOException {             Configuration conf=new Configuration();                  conf.set(

J2SE快速进阶——Java内存分析

程序的执行过程 要在Java中分析内存,我们先来了解一下程序的执行过程: 正如上图所示,大致分为3个步骤: 1.最开始,我们的程序是存在于硬盘中的,当启动运行时,程序会被加载(load)到内存中去,这里的内存可以看做我们的内存条: 2.此时,内存中除了存在刚加载的程序的代码,还存在操作系统本身的代码(好吧,此句可以当做废话→_→),操作系统会找到程序中的Main方法开始执行程序: 3.第三步就是本文的重点,系统在程序执行过程中对内存的管理.在Java中,内存大致会被分为四块--heap(栈).s

学java教程之java内存分析

学编程吧学java教程之java内存分析发布了,欢迎大家通过xuebiancheng8.com来访问 java的内存模型是java中非常重要的知识,也是面试的时候重点. java虚拟机的内存模型中和我们打交道多的分为这么几个区域 堆区,栈区,方法区. 其中方法区又分为常量池,静态区和方法区. 这几部分分别是干嘛的呢,堆区是用来存放new出来的对象的,堆区是应用程序共享的区域. 栈区又叫方法栈,程序在运行的时候,代码要在方法栈中运行,运行的代码需要放在方法栈中来执行,然后寄存器一行一行加载执行.

Java AsyncTask 分析内部实现

sdk3.0前,使用内部的线程池,多线程并发执行.线程池大小等于5,最大达128 sdk3.0后,使用默认的serial线程池,执行完一个线程,再顺序执行下一个线程.sdk4.3时 线程池大小等于5,最大达128 sdk4.4后线程池大小等于 cpu count + 1,最大值为cpu count * 2 + 1 sdk3.0后有两种线程池的实现,默认为 Serial 线程池 public static final Executor SERIAL_EXECUTOR = new SerialExe

Java性能优化指南系列(二):Java 性能分析工具

进行JAVA程序性能分析的时候,我们一般都会使用各种不同的工具.它们大部分都是可视化的,使得我们可以直观地看到应用程序的内部和运行环境到底执行了什么操作,所以性能分析(性能调优)是依赖于工具的.在第2章,我强调了基于数据驱动的性能测试是非常重要的,我们必须测试应用的性能并理解每个指标的含义.性能分析和数据驱动非常类似,为了提升应用程序的性能,我们必须获取应用运行的相关数据.如何获取这些数据并理解它们是本章的主题.[本章重点介绍JDK中提供的性能分析工具] 操作系统工具及其分析 程序分析的起点并不

协议解析Bug分析

协议解析Bug分析 源自邮件协议RPC(远程过程调用)处理的Request请求数据包的bug.        一.Bug描述 腾讯收购的Foxmail客户端可以作为outlook客户端的替代品与Exchange服务端进行交互完成邮件收发.而我们所要做的就是让邮件经过我们代理的优化处理. 这时候问题来了,Outlook客户端经由我们代理没有任何问题:但是换成Foxmail就会有错误弹窗,错误号:0x000006BE.但是如果不经过代理,Foxmail收发邮件一切正常. 很明显,是代理出了问题.  

java内存分析总结

1.自带的jconsole工具. (1)如果是从命令行启动,使 JDK 在 PATH 上,运行 jconsole 即可. (2)如果从 GUI shell 启动,找到 JDK 安装路径,打开 bin 文件夹,双击 jconsole . (3)当分析工具弹出时(取决于正在运行的 Java 版本以及正在运行的 Java 程序数量),可能会出现一个对话框,要求输入一个进程的 URL 来连接, 也可能列出许多不同     的本地 Java 进程(有时包含 JConsole 进程本身)来连接. 参照htt