从官方的Programming Guides中看到的
我理解streaming中的checkpoint有两种,一种指的是metadata的checkpoint,用于恢复你的streaming;一种是rdd的checkpoint的;下面的代码指的是第一种:
// Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // new context val lines = ssc.socketTextStream(...) // create DStreams ... ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc } // Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted context. ... // Start the context context.start() context.awaitTermination()
时间: 2024-10-10 18:03:24