该包下只有一个文件:ClientUtils.scala。它是一个object,里面封装了各种client(包括producer,consumer或admin)可能会用到的方法:
1. fetchTopicMetadata(producer版本): producer client会调用该方法来发送一个TopicMetadata请求,最后返回该请求对应的response。具体逻辑如下:
- 构造一个TopicMetadataRequest请求
- 将给定的broker列表按照随机顺序打散以防止大量的请求被路由到同一个broker上
- 遍历打乱顺序后的broker列表,尝试着为每一个broker构造一个sync producer连向它并发送请求,如果成功就退出遍历,否则就记录一个警告信息然后尝试下一个broker
- 但如果遍历完之后request都没有被成功发送,抛出异常退出,否则返回得到的response
2. fetchTopicMetadata(非producer版本):非producer客户端程序会调用该方法来发送一个元数据请求,真正的逻辑也是通过调用上一个版本的fetchTopicMetadata实现。
3. parseBrokerList:解析一组broker url,url格式为host1:port1, host2:port2,...返回一组Broker对象
4. channelToAnyBroker:创建一个阻塞通道连向任意一个broker,返回该阻塞通道
5. channelToOffsetManager:创建一个阻塞通道连向给定consumer group的offset管理器。具体逻辑就是不断地尝试向不同的broker发送ConsumerMetadataRequest获取对应的broker也就是coordinator信息;一旦获取之后比较一下是否是已创建的阻塞通道所连向的broker,如果是直接返回;否则创建一个新的阻塞队列连向这个coordinator并断开已创建的,然后返回这个coordinator
时间: 2024-10-10 01:38:42