聊聊flink的log.file配置

本文主要研究一下flink的log.file配置

log4j.properties

flink-release-1.6.2/flink-dist/src/main/flink-bin/conf/log4j.properties

# This affects logging for both user code and Flink
log4j.rootLogger=INFO, file

# Uncomment this if you want to _only_ change Flink‘s logging
#log4j.logger.org.apache.flink=INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
  • 这里使用log.file这个系统属性配置log4j.appender.file.file

MiniCluster

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/minicluster/MiniCluster.java

	/**
	 * Starts the mini cluster, based on the configured properties.
	 *
	 * @throws Exception This method passes on any exception that occurs during the startup of
	 *                   the mini cluster.
	 */
	public void start() throws Exception {
		synchronized (lock) {
			checkState(!running, "FlinkMiniCluster is already running");

			LOG.info("Starting Flink Mini Cluster");
			LOG.debug("Using configuration {}", miniClusterConfiguration);

			final Configuration configuration = miniClusterConfiguration.getConfiguration();
			final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout();
			final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
			final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;

			try {
				initializeIOFormatClasses(configuration);

				LOG.info("Starting Metrics Registry");
				metricRegistry = createMetricRegistry(configuration);
				this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(
					metricRegistry,
					"localhost");

				final RpcService jobManagerRpcService;
				final RpcService resourceManagerRpcService;
				final RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers];

				// bring up all the RPC services
				LOG.info("Starting RPC Service(s)");

				// we always need the ‘commonRpcService‘ for auxiliary calls
				commonRpcService = createRpcService(configuration, rpcTimeout, false, null);

				// TODO: Temporary hack until the metric query service is ported to the RpcEndpoint
				final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem();
				metricRegistry.startQueryService(actorSystem, null);

				if (useSingleRpcService) {
					for (int i = 0; i < numTaskManagers; i++) {
						taskManagerRpcServices[i] = commonRpcService;
					}

					jobManagerRpcService = commonRpcService;
					resourceManagerRpcService = commonRpcService;

					this.resourceManagerRpcService = null;
					this.jobManagerRpcService = null;
					this.taskManagerRpcServices = null;
				}
				else {
					// start a new service per component, possibly with custom bind addresses
					final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress();
					final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();
					final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress();

					jobManagerRpcService = createRpcService(configuration, rpcTimeout, true, jobManagerBindAddress);
					resourceManagerRpcService = createRpcService(configuration, rpcTimeout, true, resourceManagerBindAddress);

					for (int i = 0; i < numTaskManagers; i++) {
						taskManagerRpcServices[i] = createRpcService(
								configuration, rpcTimeout, true, taskManagerBindAddress);
					}

					this.jobManagerRpcService = jobManagerRpcService;
					this.taskManagerRpcServices = taskManagerRpcServices;
					this.resourceManagerRpcService = resourceManagerRpcService;
				}

				// create the high-availability services
				LOG.info("Starting high-availability services");
				haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
					configuration,
					commonRpcService.getExecutor());

				blobServer = new BlobServer(configuration, haServices.createBlobStore());
				blobServer.start();

				heartbeatServices = HeartbeatServices.fromConfiguration(configuration);

				// bring up the ResourceManager(s)
				LOG.info("Starting ResourceManger");
				resourceManagerRunner = startResourceManager(
					configuration,
					haServices,
					heartbeatServices,
					metricRegistry,
					resourceManagerRpcService,
					new ClusterInformation("localhost", blobServer.getPort()),
					jobManagerMetricGroup);

				blobCacheService = new BlobCacheService(
					configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
				);

				// bring up the TaskManager(s) for the mini cluster
				LOG.info("Starting {} TaskManger(s)", numTaskManagers);
				taskManagers = startTaskManagers(
					configuration,
					haServices,
					heartbeatServices,
					metricRegistry,
					blobCacheService,
					numTaskManagers,
					taskManagerRpcServices);

				// starting the dispatcher rest endpoint
				LOG.info("Starting dispatcher rest endpoint.");

				dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
					jobManagerRpcService,
					DispatcherGateway.class,
					DispatcherId::fromUuid,
					20,
					Time.milliseconds(20L));
				final RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
					jobManagerRpcService,
					ResourceManagerGateway.class,
					ResourceManagerId::fromUuid,
					20,
					Time.milliseconds(20L));

				this.dispatcherRestEndpoint = new DispatcherRestEndpoint(
					RestServerEndpointConfiguration.fromConfiguration(configuration),
					dispatcherGatewayRetriever,
					configuration,
					RestHandlerConfiguration.fromConfiguration(configuration),
					resourceManagerGatewayRetriever,
					blobServer.getTransientBlobService(),
					WebMonitorEndpoint.createExecutorService(
						configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
						configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
						"DispatcherRestEndpoint"),
					new AkkaQueryServiceRetriever(
						actorSystem,
						Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
					haServices.getWebMonitorLeaderElectionService(),
					new ShutDownFatalErrorHandler());

				dispatcherRestEndpoint.start();

				restAddressURI = new URI(dispatcherRestEndpoint.getRestBaseUrl());

				// bring up the dispatcher that launches JobManagers when jobs submitted
				LOG.info("Starting job dispatcher(s) for JobManger");

				this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost");

				final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint);

				dispatcher = new StandaloneDispatcher(
					jobManagerRpcService,
					Dispatcher.DISPATCHER_NAME + UUID.randomUUID(),
					configuration,
					haServices,
					resourceManagerRunner.getResourceManageGateway(),
					blobServer,
					heartbeatServices,
					jobManagerMetricGroup,
					metricRegistry.getMetricQueryServicePath(),
					new MemoryArchivedExecutionGraphStore(),
					Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
					new ShutDownFatalErrorHandler(),
					dispatcherRestEndpoint.getRestBaseUrl(),
					historyServerArchivist);

				dispatcher.start();

				resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
				dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();

				resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
				dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
			}
			catch (Exception e) {
				// cleanup everything
				try {
					close();
				} catch (Exception ee) {
					e.addSuppressed(ee);
				}
				throw e;
			}

			// create a new termination future
			terminationFuture = new CompletableFuture<>();

			// now officially mark this as running
			running = true;

			LOG.info("Flink Mini Cluster started successfully");
		}
	}
  • 这里先创建了metricRegistry、commonRpcService、jobManagerRpcService、resourceManagerRpcService、haServices、blobServer、heartbeatServices、resourceManagerRunner、blobCacheService、taskManagers、dispatcherGatewayRetriever、dispatcherRestEndpoint、dispatcher、dispatcherLeaderRetriever

RestServerEndpoint

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/RestServerEndpoint.java

	/**
	 * Starts this REST server endpoint.
	 *
	 * @throws Exception if we cannot start the RestServerEndpoint
	 */
	public final void start() throws Exception {
		synchronized (lock) {
			Preconditions.checkState(state == State.CREATED, "The RestServerEndpoint cannot be restarted.");

			log.info("Starting rest endpoint.");

			final Router router = new Router();
			final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();

			List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = initializeHandlers(restAddressFuture);

			/* sort the handlers such that they are ordered the following:
			 * /jobs
			 * /jobs/overview
			 * /jobs/:jobid
			 * /jobs/:jobid/config
			 * /:*
			 */
			Collections.sort(
				handlers,
				RestHandlerUrlComparator.INSTANCE);

			handlers.forEach(handler -> {
				log.debug("Register handler {} under {}@{}.", handler.f1, handler.f0.getHttpMethod(), handler.f0.getTargetRestEndpointURL());
				registerHandler(router, handler);
			});

			ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {

				@Override
				protected void initChannel(SocketChannel ch) {
					RouterHandler handler = new RouterHandler(router, responseHeaders);

					// SSL should be the first handler in the pipeline
					if (sslEngineFactory != null) {
						ch.pipeline().addLast("ssl",
							new RedirectingSslHandler(restAddress, restAddressFuture, sslEngineFactory));
					}

					ch.pipeline()
						.addLast(new HttpServerCodec())
						.addLast(new FileUploadHandler(uploadDir))
						.addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders))
						.addLast(new ChunkedWriteHandler())
						.addLast(handler.getName(), handler)
						.addLast(new PipelineErrorHandler(log, responseHeaders));
				}
			};

			NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("flink-rest-server-netty-boss"));
			NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("flink-rest-server-netty-worker"));

			bootstrap = new ServerBootstrap();
			bootstrap
				.group(bossGroup, workerGroup)
				.channel(NioServerSocketChannel.class)
				.childHandler(initializer);

			log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort);
			final ChannelFuture channel;
			if (restBindAddress == null) {
				channel = bootstrap.bind(restBindPort);
			} else {
				channel = bootstrap.bind(restBindAddress, restBindPort);
			}
			serverChannel = channel.syncUninterruptibly().channel();

			final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
			final String advertisedAddress;
			if (bindAddress.getAddress().isAnyLocalAddress()) {
				advertisedAddress = this.restAddress;
			} else {
				advertisedAddress = bindAddress.getAddress().getHostAddress();
			}
			final int port = bindAddress.getPort();

			log.info("Rest endpoint listening at {}:{}", advertisedAddress, port);

			final String protocol;

			if (sslEngineFactory != null) {
				protocol = "https://";
			} else {
				protocol = "http://";
			}

			restBaseUrl = protocol + advertisedAddress + ‘:‘ + port;

			restAddressFuture.complete(restBaseUrl);

			state = State.RUNNING;

			startInternal();
		}
	}
  • 这里调用了initializeHandlers来获取ChannelInboundHandler,initializeHandlers在子类DispatcherRestEndpoint中有实现

DispatcherRestEndpoint

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java

	@Override
	protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
		List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = super.initializeHandlers(restAddressFuture);

		// Add the Dispatcher specific handlers

		final Time timeout = restConfiguration.getTimeout();

		JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(
			restAddressFuture,
			leaderRetriever,
			timeout,
			responseHeaders,
			executor,
			clusterConfiguration);

		if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) {
			try {
				webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension(
					leaderRetriever,
					restAddressFuture,
					timeout,
					responseHeaders,
					uploadDir,
					executor,
					clusterConfiguration);

				// register extension handlers
				handlers.addAll(webSubmissionExtension.getHandlers());
			} catch (FlinkException e) {
				if (log.isDebugEnabled()) {
					log.debug("Failed to load web based job submission extension.", e);
				} else {
					log.info("Failed to load web based job submission extension. " +
						"Probable reason: flink-runtime-web is not in the classpath.");
				}
			}
		} else {
			log.info("Web-based job submission is not enabled.");
		}

		handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));

		return handlers;
	}
  • 这里首先调用了父类的initializeHandlers,这里的父类为WebMonitorEndpoint(它是RestServerEndpoint的直接子类,而DispatcherRestEndpoint又继承了WebMonitorEndpoint)

WebMonitorEndpoint

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java

	@Override
	protected List<Tuple2<RestHandlerSpecification, www.mcyllpt.com ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
		ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(30);

		final Time timeout = restConfiguration.getTimeout();

		//......

		// TODO: Remove once the Yarn proxy can forward all REST verbs
		handlers.add(Tuple2.of(YarnCancelJobTerminationHeaders.getInstance(), jobCancelTerminationHandler));
		handlers.add(Tuple2.of(YarnStopJobTerminationHeaders.getInstance(), jobStopTerminationHandler));

		handlers.add(Tuple2.of(shutdownHandler.getMessageHeaders(), shutdownHandler));

		//......

		// load the log and stdout file handler for the main cluster component
		final WebMonitorUtils.LogFileLocation logFileLocation = WebMonitorUtils.LogFileLocation.find(clusterConfiguration);

		final ChannelInboundHandler logFileHandler = createStaticFileHandler(
			restAddressFuture,
			timeout,
			logFileLocation.logFile);

		final ChannelInboundHandler stdoutFileHandler = createStaticFileHandler(
			restAddressFuture,
			timeout,
			logFileLocation.stdOutFile);

		handlers.add(Tuple2.of(LogFileHandlerSpecification.getInstance(), logFileHandler));
		handlers.add(Tuple2.of(StdoutFileHandlerSpecification.getInstance(), stdoutFileHandler));

		// TaskManager log and stdout file handler

		final Time cacheEntryDuration = Time.milliseconds(restConfiguration.getRefreshInterval());

		final TaskManagerLogFileHandler taskManagerLogFileHandler www.yigouyule2.cn= new TaskManagerLogFileHandler(
			restAddressFuture,
			leaderRetriever,
			timeout,
			responseHeaders,
			TaskManagerLogFileHeaders.getInstance(www.gcyl152.com/),
			resourceManagerRetriever,
			transientBlobService,
			cacheEntryDuration);

		final TaskManagerStdoutFileHandler taskManagerStdoutFileHandler = new TaskManagerStdoutFileHandler(
			restAddressFuture,
			leaderRetriever,
			timeout,
			responseHeaders,
			TaskManagerStdoutFileHeaders.getInstance(),
			resourceManagerRetriever,
			transientBlobService,
			cacheEntryDuration);

		handlers.add(Tuple2.of(TaskManagerLogFileHeaders.getInstance(), taskManagerLogFileHandler));
		handlers.add(Tuple2.of(TaskManagerStdoutFileHeaders.getInstance(), taskManagerStdoutFileHandler));

		//......

	}

	@Nonnull
	private ChannelInboundHandler createStaticFileHandler(
			CompletableFuture<String> restAddressFuture,
			Time timeout,
			File fileToServe) {

		if (fileToServe www.gcyL157.com== null) {
			return new ConstantTextHandler("(file unavailable)");
		} else {
			try {
				return new StaticFileServerHandler<>(
					leaderRetriever,
					restAddressFuture,
					timeout,
					fileToServe);
			} catch (IOException e) {
				log.info("Cannot load log file handler.", e);
				return new ConstantTextHandler("(log file unavailable)");
			}
		}
	}
  • 它初始化了一系列的ChannelInboundHandler,然后注册到handlers中
  • 对于JobManager的FileHandler,它先调用了WebMonitorUtils.LogFileLocation.find(clusterConfiguration),构建了logFileLocation,之后使用logFileLocation.logFile及logFileLocation.stdOutFile分别构造了logFileHandler、stdoutFileHandler,分别用于处理log及stdout文件的下载
  • 对于TaskManager的FileHandler,分别构造了TaskManagerLogFileHandler以及TaskManagerStdoutFileHandler来处理log及stdout文件的下载

JobManager FileHandler

WebMonitorUtils.LogFileLocation.find

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java

	/**
	 * Singleton to hold the log and stdout file.
	 */
	public static class LogFileLocation {

		public final File logFile;
		public final File stdOutFile;

		private LogFileLocation(File logFile, File stdOutFile) {
			this.logFile = logFile;
			this.stdOutFile = stdOutFile;
		}

		/**
		 * Finds the Flink log directory using log.file Java property that is set during startup.
		 */
		public static LogFileLocation find(Configuration config) {
			final String logEnv = "log.file";
			String logFilePath = System.getProperty(logEnv);

			if (logFilePath == null) {
				LOG.warn("Log file environment variable ‘{}‘ is not set.", logEnv);
				logFilePath = config.getString(WebOptions.LOG_PATH);
			}

			// not configured, cannot serve log files
			if (logFilePath == null || logFilePath.length() < 4) {
				LOG.warn("JobManager log files are unavailable in the web dashboard. " +
					"Log file location not found in environment variable ‘{}‘ or configuration key ‘{}‘.",
					logEnv, WebOptions.LOG_PATH);
				return new LogFileLocation(null, null);
			}

			String outFilePath = logFilePath.substring(0, logFilePath.length() - 3).concat("out");

			LOG.info("Determined location of main cluster component log file: {}", logFilePath);
			LOG.info("Determined location of main cluster component stdout file: {}", outFilePath);

			return new LogFileLocation(resolveFileLocation(logFilePath), resolveFileLocation(outFilePath));
		}

		/**
		 * Verify log file location.
		 *
		 * @param logFilePath Path to log file
		 * @return File or null if not a valid log file
		 */
		private static File resolveFileLocation(String logFilePath) {
			File logFile = new File(logFilePath);
			return (logFile.exists() && logFile.canRead()) ? logFile : null;
		}
	}
  • 这里先从系统属性读取log.file属性,没有找到,则打印warning(Log file environment variable ‘log.file‘ is not set.)
  • log.file没有配置的话,则从flink的Configuration读取WebOptions.LOG_PATH(web.log.path)配置,如果没有或者logFilePath.length()小于4,则打印warning(JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable ‘log.file‘ or configuration key ‘Key: ‘web.log.path‘ , default: null (deprecated keys: [jobmanager.web.log.path])‘.)
  • 这里之所以要logFilePath.length()大于等于4,主要是后面要使用logFilePath.substring(0, logFilePath.length() - 3).concat("out")来构建outFilePath;然后通过resolveFileLocation方法校验logFilePath及outFilePath,构建LogFileLocation返回

StaticFileServerHandler

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java

/**
 * Simple file server handler that serves requests to web frontend‘s static files, such as
 * HTML, CSS, or JS files.
 *
 * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project‘s HTTP server
 * example.</p>
 */
@ChannelHandler.Sharable
public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectHandler<T> {

	/** Timezone in which this server answers its "if-modified" requests. */
	private static final TimeZone GMT_TIMEZONE = TimeZone.getTimeZone("GMT");

	/** Date format for HTTP. */
	public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";

	/** Be default, we allow files to be cached for 5 minutes. */
	private static final int HTTP_CACHE_SECONDS = 300;

	// ------------------------------------------------------------------------

	/** The path in which the static documents are. */
	private final File rootPath;

	public StaticFileServerHandler(
			GatewayRetriever<? extends T> retriever,
			CompletableFuture<String> localJobManagerAddressFuture,
			Time timeout,
			File rootPath) throws IOException {

		super(localJobManagerAddressFuture, retriever, timeout, Collections.emptyMap());

		this.rootPath = checkNotNull(rootPath).getCanonicalFile();
	}

	// ------------------------------------------------------------------------
	//  Responses to requests
	// ------------------------------------------------------------------------

	@Override
	protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, RoutedRequest routedRequest, T gateway) throws Exception {
		final HttpRequest request = routedRequest.getRequest();
		final String requestPath;

		// make sure we request the "index.html" in case there is a directory request
		if (routedRequest.getPath().endsWith("/")) {
			requestPath = routedRequest.getPath() + "index.html";
		}
		// in case the files being accessed are logs or stdout files, find appropriate paths.
		else if (routedRequest.getPath().equals("/jobmanager/log") || routedRequest.getPath().equals("/jobmanager/stdout")) {
			requestPath = "";
		} else {
			requestPath = routedRequest.getPath();
		}

		respondToRequest(channelHandlerContext, request, requestPath);
	}

	//......

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		if (ctx.channel().isActive()) {
			logger.error("Caught exception", cause);
			sendError(ctx, INTERNAL_SERVER_ERROR);
		}
	}
}
  • 对于/jobmanager/log以及/jobmanager/stdout它会重置一下requestPath,之后调用respondToRequest处理,它根据rootPath来传输文件

TaskManager FileHandler

TaskManagerLogFileHandler

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogFileHandler.java

/**
 * Rest handler which serves the log files from {@link TaskExecutor}.
 */
public class TaskManagerLogFileHandler extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> {

	public TaskManagerLogFileHandler(
			@Nonnull CompletableFuture<String> localAddressFuture,
			@Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever,
			@Nonnull Time timeout,
			@Nonnull Map<String, String> responseHeaders,
			@Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders,
			@Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
			@Nonnull TransientBlobService transientBlobService,
			@Nonnull Time cacheEntryDuration) {
		super(localAddressFuture, leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration);
	}

	@Override
	protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerResourceId) {
		return resourceManagerGateway.requestTaskManagerFileUpload(taskManagerResourceId, FileType.LOG, timeout);
	}
}
  • 它的requestFileUpload是调用了ResourceManager.requestTaskManagerFileUpload,传递的FileType是FileType.LOG类型

TaskManagerStdoutFileHandler.requestFileUpload

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerStdoutFileHandler.java

/**
 * Rest handler which serves the stdout file of the {@link TaskExecutor}.
 */
public class TaskManagerStdoutFileHandler extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> {

	public TaskManagerStdoutFileHandler(
			@Nonnull CompletableFuture<String> localAddressFuture,
			@Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever,
			@Nonnull Time timeout,
			@Nonnull Map<String, String> responseHeaders,
			@Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders,
			@Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
			@Nonnull TransientBlobService transientBlobService,
			@Nonnull Time cacheEntryDuration) {
		super(localAddressFuture, leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration);
	}

	@Override
	protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerResourceId) {
		return resourceManagerGateway.requestTaskManagerFileUpload(taskManagerResourceId, FileType.STDOUT, timeout);
	}
}
  • 它的requestFileUpload是调用了ResourceManager.requestTaskManagerFileUpload,传递的FileType是FileType.STDOUT类型

ResourceManager.requestTaskManagerFileUpload

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/resourcemanager/ResourceManager.java

	@Override
	public CompletableFuture<TransientBlobKey> requestTaskManagerFileUpload(ResourceID taskManagerId, FileType fileType, Time timeout) {
		log.debug("Request file {} upload from TaskExecutor {}.", fileType, taskManagerId);

		final WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(taskManagerId);

		if (taskExecutor == null) {
			log.debug("Requested file {} upload from unregistered TaskExecutor {}.", fileType, taskManagerId);
			return FutureUtils.completedExceptionally(new UnknownTaskExecutorException(taskManagerId));
		} else {
			return taskExecutor.getTaskExecutorGateway().requestFileUpload(fileType, timeout);
		}
	}
  • ResourceManager的requestTaskManagerFileUpload是通过TaskExecutor.requestFileUpload来实现的

TaskExecutor.requestFileUpload

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskExecutor.java

	@Override
	public CompletableFuture<TransientBlobKey> requestFileUpload(FileType fileType, Time timeout) {
		log.debug("Request file {} upload.", fileType);

		final String filePath;

		switch (fileType) {
			case LOG:
				filePath = taskManagerConfiguration.getTaskManagerLogPath();
				break;
			case STDOUT:
				filePath = taskManagerConfiguration.getTaskManagerStdoutPath();
				break;
			default:
				filePath = null;
		}

		if (filePath != null && !filePath.isEmpty()) {
			final File file = new File(filePath);

			if (file.exists()) {
				final TransientBlobCache transientBlobService = blobCacheService.getTransientBlobService();
				final TransientBlobKey transientBlobKey;
				try (FileInputStream fileInputStream = new FileInputStream(file)) {
					transientBlobKey = transientBlobService.putTransient(fileInputStream);
				} catch (IOException e) {
					log.debug("Could not upload file {}.", fileType, e);
					return FutureUtils.completedExceptionally(new FlinkException("Could not upload file " + fileType + ‘.‘, e));
				}

				return CompletableFuture.completedFuture(transientBlobKey);
			} else {
				log.debug("The file {} does not exist on the TaskExecutor {}.", fileType, getResourceID());
				return FutureUtils.completedExceptionally(new FlinkException("The file " + fileType + " does not exist on the TaskExecutor."));
			}
		} else {
			log.debug("The file {} is unavailable on the TaskExecutor {}.", fileType, getResourceID());
			return FutureUtils.completedExceptionally(new FlinkException("The file " + fileType + " is not available on the TaskExecutor."));
		}
	}
  • TaskExecutor的requestFileUpload会根据fileType来获取filePath,如果是LOG类型取的是taskManagerConfiguration.getTaskManagerLogPath();如果是STDOUT类型,取的是taskManagerConfiguration.getTaskManagerStdoutPath(),之后将文件传输过去

TaskManagerRunner.startTaskManager

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java

	public static TaskExecutor startTaskManager(
			Configuration configuration,
			ResourceID resourceID,
			RpcService rpcService,
			HighAvailabilityServices highAvailabilityServices,
			HeartbeatServices heartbeatServices,
			MetricRegistry metricRegistry,
			BlobCacheService blobCacheService,
			boolean localCommunicationOnly,
			FatalErrorHandler fatalErrorHandler) throws Exception {

		checkNotNull(configuration);
		checkNotNull(resourceID);
		checkNotNull(rpcService);
		checkNotNull(highAvailabilityServices);

		LOG.info("Starting TaskManager with ResourceID: {}", resourceID);

		InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress());

		TaskManagerServicesConfiguration taskManagerServicesConfiguration =
			TaskManagerServicesConfiguration.fromConfiguration(
				configuration,
				remoteAddress,
				localCommunicationOnly);

		TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
			taskManagerServicesConfiguration,
			resourceID,
			rpcService.getExecutor(), // TODO replace this later with some dedicated executor for io.
			EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(),
			EnvironmentInformation.getMaxJvmHeapMemory());

		TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
			metricRegistry,
			taskManagerServices.getTaskManagerLocation(),
			taskManagerServices.getNetworkEnvironment());

		TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);

		return new TaskExecutor(
			rpcService,
			taskManagerConfiguration,
			highAvailabilityServices,
			taskManagerServices,
			heartbeatServices,
			taskManagerMetricGroup,
			blobCacheService,
			fatalErrorHandler);
	}
  • TaskManagerRunner.startTaskManager通过TaskManagerConfiguration.fromConfiguration(configuration)构造了taskManagerConfiguration

TaskManagerConfiguration.fromConfiguration

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java

	public static TaskManagerConfiguration fromConfiguration(Configuration configuration) {
		int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);

		if (numberSlots == -1) {
			numberSlots = 1;
		}

		//......

		final String taskManagerLogPath = configuration.getString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file"));
		final String taskManagerStdoutPath;

		if (taskManagerLogPath != null) {
			final int extension = taskManagerLogPath.lastIndexOf(‘.‘);

			if (extension > 0) {
				taskManagerStdoutPath = taskManagerLogPath.substring(0, extension) + ".out";
			} else {
				taskManagerStdoutPath = null;
			}
		} else {
			taskManagerStdoutPath = null;
		}

		return new TaskManagerConfiguration(
			numberSlots,
			tmpDirPaths,
			timeout,
			finiteRegistrationDuration,
			initialRegistrationPause,
			maxRegistrationPause,
			refusedRegistrationPause,
			configuration,
			exitOnOom,
			FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder),
			alwaysParentFirstLoaderPatterns,
			taskManagerLogPath,
			taskManagerStdoutPath);
	}
  • TaskManagerConfiguration.fromConfiguration里头首先根据ConfigConstants.TASK_MANAGER_LOG_PATH_KEY(taskmanager.log.path)从flink的Configuration读取taskManagerLogPath,如果读取不到,则取系统属性log.file;如果读取到taskManagerLogPath不为null,则换个后缀构建taskManagerStdoutPath

小结

  • flink的log4j.properties里头配置了file appender,使用了系统属性log.file
  • flink的MiniCluster在start的时候会创建DispatcherRestEndpoint,它的start方法会使用initializeHandlers来初始化一系列的handlers,对于JobManager的fileHandler,使用WebMonitorUtils.LogFileLocation.find(clusterConfiguration)获取logFileLocation,它先从系统属性读取log.file属性,没有找到的话再从flink的Configuration读取WebOptions.LOG_PATH(web.log.path)配置;之后分别使用logFileLocation.logFile及logFileLocation.stdOutFile创建了两个StaticFileServerHandler
  • 对于TaskManager的fileHandler,则分别创建了TaskManagerLogFileHandler及TaskManagerStdoutFileHandler来处理log及stdout文件的下载,它们内部都是调用了ResourceManager.requestTaskManagerFileUpload方法,只是fileType不同,一个是LOG,一个是STDOUT;而ResourceManager.requestTaskManagerFileUpload方法最后是通过TaskExecutor.requestFileUpload来完成文件传输;TaskManagerRunner.startTaskManager在创建TaskExecutor的时候,构造了TaskManagerConfiguration,它里头先从flink的Configuration获取ConfigConstants.TASK_MANAGER_LOG_PATH_KEY(taskmanager.log.path),如果没有则取系统属性log.file

原文地址:https://www.cnblogs.com/qwangxiao/p/10005208.html

时间: 2024-08-27 20:56:24

聊聊flink的log.file配置的相关文章

RAC 性能分析 - &#39;log file sync&#39; 等待事件

简介 本文主要讨论 RAC 数据库中的'log file sync' 等待事件.RAC 数据库中的'log file sync' 等待事件要比单机数据库中的'log file sync' 等待事件复杂,主要原因是由于RAC 数据库需要将SCN同步到所有实例. 首先,回顾一下单机数据库中的'log file sync' 等待事件,当user session 提交(commit)时,user session会通知LGWR进程将redo buffer中的信息写入到redo log file,当LGWR

Oracle db file parallel write 和 log file parallel write 等待事件

一. db file parallel write等待事件 引自如下blog: http://oradbpedia.com/wiki/Wait_Events_-_db_file_parallel_write db文件并行写 db文件并行写等待事件属于Oracle数据库写入程序(DBWR)进程,因为它是将块从SGA写入数据文件的唯一进程.当是写入时,DBWR进程编译一组脏块,将批处理交给操作系统,并等待db文件并行写事件以完成I / O.虽然用户会话从来没有遇到db文件并行写等待事件,但这并不意味

报错问题:InnoDB: Error: log file ./ib_logfile0 is of different size

在使用xtrabackup对mysql执行备份操作的时候,出现下面的报错:.....................xtrabackup: innodb_log_file_size = 50331648InnoDB: Error: log file ./ib_logfile0 is of different size 33554432 bytesInnoDB: than specified in the .cnf file 50331648 bytes! 解决办法:可以计算一下33554432的

Flink 从0到1学习 —— Flink 中如何管理配置?

前言 如果你了解 Apache Flink 的话,那么你应该熟悉该如何像 Flink 发送数据或者如何从 Flink 获取数据.但是在某些情况下,我们需要将配置数据发送到 Flink 集群并从中接收一些额外的数据. 在本文的第一部分中,我将描述如何将配置数据发送到 Flink 集群.我们需要配置很多东西:方法参数.配置文件.机器学习模型.Flink 提供了几种不同的方法,我们将介绍如何使用它们以及何时使用它们.在本文的第二部分中,我将描述如何从 Flink 集群中获取数据. 如何发送数据给 Ta

Linux/Unix shell 监控Oracle告警日志(monitor alter log file)

使用shell脚本实现对Oracle数据库的监控与管理将大大简化DBA的工作负担,如常见的对实例的监控,监听的监控,告警日志的监控,以及数据库的备份,AWR report的自动邮件等.本文给出Linux 下使用 shell 脚本来监控 Oracle 告警日志(monitor alter log file). Linux Shell的相关参考:        Linux/Unix shell 脚本中调用SQL,RMAN脚本        Linux/Unix shell sql 之间传递变量   

聊聊默认支持的各种配置源[内存变量,环境变量和命令行参数]

聊聊默认支持的各种配置源[内存变量,环境变量和命令行参数] 较之传统通过App.config和Web.config这两个XML文件承载的配置系统,.NET Core采用的这个全新的配置模型的最大一个优势就是针对多种不同配置源的支持.我们可以将内存变量.命令行参数.环境变量和物理文件作为原始配置数据的来源,如果采用物理文件作为配置源,我们可以选择不同的格式(比如XML.JSON和INI等) .如果这些默认支持的配置源形式还不能满足你的需求,我们还可以通过注册自定义ConfigurationSour

eclipse启动出现“An Error has Occurred. See the log file”解决方法

最近在启动eclipse时出现了“An Error has Occurred. See the log file”的错误,点击确定后也不能启动eclipse.查看log文件,出现类似: java.lang.ClassNotFoundException: javax.xml.parsers.SAXParserFactory at java.net.URLClassLoader$1.run(Unknown Source) at java.security.AccessController.doPri

InnoDB: Error: log file .\ib_logfile0 is of different size 0 10485760 bytes

启动WAMP Server的时候报例如以下的错误: 140618 23:12:32 [Note] Plugin 'FEDERATED' is disabled. 140618 23:12:32 InnoDB: The InnoDB memory heap is disabled 140618 23:12:32 InnoDB: Mutexes and rw_locks use Windows interlocked functions 140618 23:12:32 InnoDB: Compres

MYSQL 5.7 无法启动(Could not open error log file errno 2)

前两天电脑中毒, 病毒好像把mysql的 log.err 文件给删掉了.然后服务一直启动不了:Could not open error log file errno 2. 然后疯狂百度,搜索的结果大多都不是我这种情况,只有很少一部分和我的情况一样. 出现和我相同情况的同学: mysql启动错误之 Can't init tc log  Failed to open log (file '', errno 2) ... 发现他们的方法并不能解决我的问题〒_〒,忙活了一上午还是没有找到解决办法. 没有