(reify DistributedRPC$Iface
(^String execute
[this ^String function ^String args]
(log-debug "Received DRPC request for " function " " args " at " (System/currentTimeMillis))
(let [id (str (swap! ctr (fn [v] (mod (inc v) 1000000000))))
^Semaphore sem (Semaphore. 0)
req (DRPCRequest. args id)
^ConcurrentLinkedQueue queue (acquire-queue request-queues function)]
(swap! id->start assoc id (current-time-secs))
(swap! id->sem assoc id sem)
(swap! id->function assoc id function)
(swap! id->request assoc id req)
(.add queue req)
(log-debug "Waiting for DRPC result for " function " " args " at " (System/currentTimeMillis))
(.acquire sem)
(log-debug "Acquired DRPC result for " function " " args " at " (System/currentTimeMillis))
(let [result (@id->result id)]
(cleanup id)
(log-debug "Returning DRPC result for " function " " args " at " (System/currentTimeMillis))
(if (instance? DRPCExecutionException result)
(throw result)
(if (nil? result)
(throw (DRPCExecutionException. "Request timed out"))
result)))))
几个关键点:
1. reify:
Essentially, it is a way to
create objects that satisfy any protocol (or implement methods of any interface or
Object). This makes it analogous to anonymous inner classes in Java.(摘自《clojure programming》)
2. ^String
type hint
3. ^String execute
这里的^String是execute方法的返回类型hint
4. 总结
这里的代码其实就是实现了DistributedRPC$Iface接口。这个接口只定义了一个函数,execute,其接收两个参数,类型都是^String,返回类型也为^String。
关于DistributedRPC$Iface接口,参见https://storm.incubator.apache.org/apidocs/。以下是execute的说明:
String execute(String functionName, String funcArgs) throws DRPCExecutionException, org.apache.thrift.TException和这里的实现正好吻合。