开发者

DolphinScheduler容错源码分析之Worker

目录
  • 引言
  • Worker容错源码分析
    • worker启动注册
    • Master监听worker在zk节点的状态
    • 处理容错event事件
  • 总结

    引言

    上一篇文章介绍了DolphinScheduler中Master的容错机制,作为去中心化的多Master和多Worker服务对等架构,Worker的容错机制也是我们需要关注的。

    和Master一样源码的版本基于3.1.3

    Worker容错源码分析

    worker启动注册

    首先Worker的启动入口是在WorkerServer中,在Worker启动后就会执行其run方法

    @PostConstruct
    public void run() {
    	this.workerRpcServer.start();
    	this.workerRpcClient.start();
    	this.taskPluginManager.loadPlugin();
    	this.workerRegistryClient.setRegistryStoppable(this);
    	this.workerRegistryClient.start();
    	this.workerManagerThread.start();
    	this.messageRetryRunner.start();
    	/*
    	 * registry hooks, which are called before the process exits
    	 */
    	Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    		if (!ServerLifeCycleManager.isStopped()) {
    			close("WorkerServer shutdown hook");
    		}
    	}));
    }
    

    这里我们只关心this.workerRegistryClient.start();方法所做的事情:注册当前worker信息到Zookeeper,并且启动了一个心跳任务定时更新worker的信息到Zookeeper。

    /**
     * registry
     */
    private void registry() {
    	WorkerHeartBeat workerHeartBeat = workerHeartBeatTask.getHeartBeat();
    	String workerZKPath = workerConfig.getWorkerRegistryPath();
    	// remove before persist
    	registryClient.remove(workerZKPath);
    	registryClient.persistEphemeral(workerZKPath, jsONUtils.toJsonString(workerHeartBeat));
    	log.info("Worker node: {} registry to ZK {} successfully", workerConfig.getWorkerAddress(), workerZKPath);
    	while (!registryClient.checkNodeExists(workerConfig.getWorkerAddress(), NodeType.WORKER)) {
    		ThreadUtils.sleep(SLEEP_TIME_MILLIS);
    	}
    	// sleep 1s, waiting master failover remove
    	ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
    	workerHeartBeatTask.start();
    	log.info("Worker node: {} registry finished", workerConfig.getWorkerAddress());
    }
    

    这里和master的注册流程基本一致,来看看worker注册的目录:

    worker注册到zk的路径如下,并且和master都有相同的父级目录名称是/node:

    // /nodes/worker/+ip:listenPortworkerConfig.setWorkerRegistryPath(REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerConfig.getWorkerAddress());
    

    注册的内容就是当前worker节点的健康状况,包含了cpu,内存,负载,磁盘等信息,通过这些信息就可以标识当前worker是否健康,可以接收任务的分配并且去执行。

    @Override
    public WorkerHeartBeat getHeartBeat() {
    	double loadAverage = OSUtils.loadAverage();
    	double cpuUsage = OSUtils.cpuUsage();
    	int maxCpuLoadAvg = workerConfig.getMaxCpuLoadAvg();
    	double reservedMemory = workerConfig.getReservedMemory();
    	double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
    	int execThreads = workerConfig.getExecThreads();
    	int workerWaitingTaskCount = this.workerWaitingTaskCount.get();
    	int serverStatus = getServerStatus(loadAverage, maxCpuLoadAvg, availablePhysicalMemorySize, reservedMemory,
    			execThreads, workerWaitingTaskCount);
    	return WorkerHeartBeat.builder()
    			.startupTime(ServerLifeCycleManager.getServerStartupTime())
    			.reportTime(System.currentTimeMillis())
    			.cpuUsage(cpuUsage)
    			.loadAverage(loadAverage)
    			.availablePhysicalMemorySize(availablePhysicalMemorySize)
    			.maxCpuloadAvg(maxCpuLoadAvg)
    			.memoryUsage(OSUtils.memoryUsage())
    			.reservedMemory(reservedMemory)
    			.diskAvailable(OSUtils.diskAvailable())
    			.processId(processId)
    			.workerHostWeight(workerConfig.getHostWeight())
    			.workerWaitingTaskCount(this.workerWaitingTaskCount.get())
    			.workerExecThreadCount(workerConfig.getExecThreads())
    			.serverStatus(serverStatus)
    			.build();
    }
    

    Master监听worker在zk节点的状态

    接下来,master就会对注册的worker节点进行监控,在上一篇的介绍中,master启动注册后对node节点已经进行了监听,大家可以进行回顾一下,这里监听了/node/节点,当其下面的子路径/master或者/worker有变动就会触发回调 :

    //node
    registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
    

    因此当worker临时节点异常后,master就会感知到其变化。最终会回调MasterRegistryDataListener中的notify方法,并根据变动的路径来判断是master还是worker:

    @Override
    public void notify(Event event) {
    	final String path = event.path();
    	if (Strings.isNullOrEmpty(path)) {
    		return;
    	}
    	//monitor master
    	if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_MASTERS + Constants.SINGLE_SLASH)) {
    		handleMasterEvent(event);
    	} else if (path.startsWith(REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH)) {
    		//monitor worker
    		handleWorkerEvent(event);
    	}
    }
    

    这段代码在之php前master的容错中也见到过。这里是对于worker的容错,就会触发handleWorkerEvent方法。

    private void handleWorkerEvent(Event event) {
    	final String path = event.path();
    	switch (event.type()) {
    		case ADD:
    			logger.info("worker node added : {}", path);
    			break;
    		case REMOVE:
    			logger.info("worker node deleted : {}", path);
    			masterRegistryClient.removeWorkerNodePath(path, NodeType.WORKER, true);
    			break;
    		default:
    			break;
    	}
    }
    

    接下来就是获取到下线worker节点的host信息进行进一步的容错处理了:

    public void removeWorkerNodePath(String path, NodeType nodeType, boolean failover) {
    	logger.info("{} node deleted : {}", nodeType, path);
    	try {
                    //获取节点信息
    		String serverHost = null;
    		if (!StringUtils.isEmpty(path)) {
    			serverHost = registryClient.getHostByEventDataPath(path);
    			if (StringUtils.isEmpty(serverHost)) {
    				logger.error("server down error: unknown path: {}", path);
    				return;
    			}
    			if (!registryClient.exists(path)) {
    				logger.info("path: {} not exists", path);
    			}
    		}
    		// failover server
    		if (failover) {
    			failoverService.failoverServerWhenDown(serverHost, nodeType);
    		}
    	} catch (Exception e) {
    		logger.error("{} server failover failed", nodeType, e);
    	}
    }
    

    整个worker容错的大致过程如下:

    1-获取需要容错worker节点的启动时间,用于后续判断worker节点是否还在下线状态,或者是否已经重新启动 

    2-根据异常的worker的信息查询需要容错的任务实例,获取只属于当前master节点需要容错的任务实例信息,这里也是和master不同的,并且容错没加锁的原因。 

    3-遍历所有要容错的任务实例进行容错 这里注意的是需要容错的任务是在worker重新启动之前的任务,之后worker异常重启后分配的新任务不要容错   

    /**
     * Do the worker failover. Will find the SUBMITTED_SUCCESS/DISPATCH/RUNNING_EXECUTION/DELAY_EXECUTION/READY_PAUSE/READY_STOP tasks belong the given workerandroid,
     * and failover these tasks.
     * <p>
     * Note: When we do worker failover, the master will only failover the processInstance belongs to the current master.
     *
     * @param workerHost worker host
     */
    public void failoverWorker(@NonNull String workerHost) {
    	LOGGER.info("Worker[{}] failover starting", workerHost);
    	final StopWatch failoverTimeCost = StopWatch.createStarted();
    	//获取需要容错worker节点的启动时间,用于后续判断worker节点是否还在下线状态,或者是否已经重新启动
    	// we query the task instance from cache, so that we can directly update the cache
    	final Optional<Date> needFailoverWorkerStartTime =
    			getServerStartupTime(registryClient.getServerList(NodeType.WORKER), workerHost);
    	//根据异常的worker的信息查询需要容错的任务实例,获取只属于当前master节点需要容错的任务实例信息,这里也是和master不同的,并且容错没加锁的原因。
    	final List<TaskInstance> needFailoverTaskInstanceList = getNeedFailoverTaskInstance(workerHost);
    	if (CollectionUtils.isEmpty(needFailoverTaskInstanceList)) {
    		LOGGER.info("Worker[{}] failover finished there are no taskInstance need to failover", workerHost);
    		return;
    	}
    	LOGGER.info(
    			"Worker[{}] failover there are {} taskInstance may need to failover, will do a deep check, taskInstanceIds: {}",
    			workerHost,
    			needFailoverTaskInstanceList.size(),
    			needFailoverTaskInstanceList.stream().map(TaskInstance::getId).collect(Collectors.toList()));
    	final Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
    	for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
    		LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId());
    		try {
    			ProcessInstance processInstance = processInstanceCacheMap.computeIfAbsent(
    					taskInstance.getProcessInstanceId(), k -> {
    						WorkflowExecuteRunnable workflowExecuteRunnable = cacheManager.getByProcessInstanceId(
    								taskInstance.getProcessInstanceId());
    						if (workflowExecuteRunnable == null) {
    							return null;
    						}
    						return workflowExecuteRunnable.getProcessInstance();
    					});
    			//这里注意的是需要容错的任务是在worker重新启动之前的任务,之后worker异常重启后分配的新任务不要容错
    			if (!checkTaskInstanceNeedFailover(needFailoverWorkerStartTime, processInstance, taskInstance)) {
    				LOGGER.info("Worker[{}] the current taskInstance doesn't need to failover", workerHost);
    				continue;
    			}
    			LOGGER.info(
    					"Worker[{}] failover: begin to failover taskInstance, will set the status to NEED_FAULT_TOLERANCE",
    					workerHost);
    			failoverTaskInstance(processInstance, taskInstance);
    			LOGGER.info("Worker[{}] failover: Finish failover taskInstance", workerHost);
    		} catch (Exception ex) {
    			LOGGER.info("Worker[{}] failover taskInstance occur exception", workerHost, ex);
    		} finally {
    			LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
    		}
    	}
    	failoverTimeCost.stop();
    	LOGGER.info("Worker[{}] failover finished, useTime:{}ms",
    			workerHost,
    			failoverTimeCost.getTime(TimeUnit.MILLISECONDS));
    }
    

    4-更新taskInstance的状态为TaskExecutionStatus.NEED_FAULT_TOLERANCE。并且构造TaskStateEvent事件,设置其状态为需要容TaskExecutionStatus.NEED_FAULT_TOLERANCE的,其类型是TASK_STATE_CHANGE。最后提交需要容错的event。

    private void failoverTaskInstance(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
    	TaskMetrics.incTaskInstanceByState("failover");
    	boolean isMasterTask = TaskProcessorFactory.isMasterTask(taskInstance.getTaskType());
    	taskInstance.setProcessInstance(processInstance);
    	if (!isMasterTask) {
    		LOGGER.info("The failover taskInstance is not master task");
    		TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
    				.buildTaskInstanceRelatedInfo(taskInstance)
    				.buildProcessInstanceRelatedInfo(processInstance)
    				.buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
    				.create();
    		if (masterConfig.isKillYarnJobWhenTaskFailover()) {
    			// only kill yarn job if exists , the local thread has exited
    			LOGGER.info("TaskInstance failover begin kill the task related yarn job");
    			ProcessUtils.killYarnJob(logClient, taskExecutionContext);
    		}
    	} else {
    		LOGGER.info("The failover taskInstance is a master task");
    	}
    	taskInstance.setState(TaskExecutionStatus.NEED_FAULT_TOLERANCE);
    	taskInstance.setFlag(Flag.NO);
    	processService.saveTaskInstance(taskInstance);
            //提交event
    	TaskStateEvent stateEvent = TaskStateEvent.builder()
    			.processInstanceId(processInstance.getId())
    			.taskInstanceId(taskInstance.getId())
    			.status(TaskExecutionStatus.NEED_FAULT_TOLERANCE)
    			.type(StateEventType.TASK_STATE_CHANGE)
    			.build();
    	workflowExecuteThreadPool.submitStateEvent(stateEvent);
    }
    

    event的提交会去根据其所属的工作流实例来选择其对应的WorkflowExecuteRunnable进行提交容错:

    public void submitStateEvent(StateEvent stateEvent) {
    	WorkflowExecuteRunnable workflowExecuteThread =
    			processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
    	if (workflowExecuteThread == null) {
    		logger.warn("Submit state event error, cannot from workflowExecuteThread from cache manager, stateEvent:{}",
    				stateEvent);
    		return;
    	}
    	workflowExecuteThread.addStateEvent(stateEvent);
    	logger.info("Submit state event success, stateEvent: {}", stateEvent);
    }
    

    处理容错event事件

    在上面的代码中已经对需要容错的任务提交了一个event事件,那么肯定会有线程对这个event进行具体的处理。我们来看WorkflowExecuteRunnable类,submitStateEvent就是将event提交到了这个类中的stateEvents队列中:

    private final ConcurrentLinked编程Queue<StateEvent> stateEvents = new ConcurrentLinkedQueue<>();
    

    WorkflowExecuteRunnable在master启动的时候就已经启动了,并且会不停的从stateEvents中获取ev开发者_Go教程ent进行处理:

    /**
     * handle event
     */
    public void handleEvents() {
    	if (!isStart()) {
    		logger.info(
    				"The workflow instance is not started, will not handle its state event, current state event size: {}",
    				stateEvents);
    		return;
    	}
    	StateEvent stateEvent = null;
    	while (!this.stateEvents.isEmpty()) {
    		try {
    			stateEvent = this.stateEvents.peek();
    			LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
    					stateEvent.getTaskInstanceId());
    			// if state handle success then will remove this state, otherwise will retry this state next time.
    			// The state should always handle success except database error.
    			checkProcessInstaandroidnce(stateEvent);
    			StateEventHandler stateEventHandler =
    					StateEventHandlerManager.getStateEventHandler(stateEvent.getType())
    							.orElseThrow(() -> new StateEventHandleError(
    									"Cannot find handler for the given state event"));
    			logger.info("Begin to handle state event, {}", stateEvent);
    			if (stateEventHandler.handleStateEvent(this, stateEvent)) {
    				this.stateEvents.remove(stateEvent);
    			}
    		} catch (StateEventHandleError stateEventHandleError) {
    			logger.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError);
    			this.stateEvents.remove(stateEvent);
    			TpythonhreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
    		} catch (StateEventHandleException stateEventHandleException) {
    			logger.error("State event handle error, will retry this event: {}",
    					stateEvent,
    					stateEventHandleException);
    			ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
    		} catch (Exception e) {
    			// we catch the exception here, since if the state event handle failed, the state event will still keep
    			// in the stateEvents queue.
    			logger.error("State event handle error, get a unknown exception, will retry this event: {}",
    					stateEvent,
    					e);
    			ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
    		} finally {
    			LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
    		}
    	}
    }
    

    根据提交事件的类型StateEventType.TASK_STATE_CHANGE 可以获取到具体的StateEventHandler实现是TaskStateEventHandler。在TaskStateEventHandler的handleStateEvent方法中主要对需要容错的任务做了如下处理:

     if (task.getState().isFinished()) {
    		if (completeTaskMap.containsKey(task.getTaskCode())
    				&& completeTaskMap.get(task.getTaskCode()) == task.getId()) {
    			logger.warn("The task instance is already complete, stateEvent: {}", stateEvent);
    			return true;
    		}
    		workflowExecuteRunnable.taskFinished(task);
    		if (task.getTaskGroupId() > 0) {
    			logger.info("The task instance need to release task Group: {}", task.getTaskGroupId());
    			workflowExecuteRunnable.releaseTaskGroup(task);
    		}
    		return true;
    	}
    

    其中判断是否完成的具体实现中就包含了是否是容错的状态。

    public boolean isFinished() {
    	return isSuccess() || isKill() || isFailure() || isPause();
    }
    public boolean isFailure() {
    	return this == TaskExecutionStatus.FAILURE || this == NEED_FAULT_TOLERANCE;
    }
    

    接着就会调用workflowExecuteRunnable.taskFinished(task);方法去处理各种任务实例状态变化后的事件。这里我们只关注容错相关的代码分支:

    } else if (taskInstance.taskCanRetry() && !processInstance.getState().isReadyStop()) {
    			// retry task
    			logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
    			retryTaskInstance(taskInstance);
    }
    //判断了是否容错的状态,前面对其已经进行了更新
    public boolean taskCanRetry() {
    	if (this.isSubProcess()) {
    		return false;
    	}
    	if (this.getState() == TaskExecutionStatus.NEED_FAULT_TOLERANCE) {
    		return true;
    	}
    	return this.getState() == TaskExecutionStatus.FAILURE && (this.getRetryTimes() < this.getMaxRetryTimes());
    }
    /**
     * crate new task instance to retry, different objects from the original
     *
     */
    private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException {
    	if (!taskInstance.taskCanRetry()) {
    		return;
    	}
    	TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
    	if (newTaskInstance == null) {
    		logger.error("Retry task fail because new taskInstance is null, task code:{}, task id:{}",
    				taskInstance.getTaskCode(),
    				taskInstance.getId());
    		return;
    	}
    	waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
    	if (!taskInstance.retryTaskIntervalOverTime()) {
    		logger.info(
    				"Failure task will be submitted, process id: {}, task instance code: {}, state: {}, retry times: {} / {}, interval: {}",
    				processInstance.getId(), newTaskInstance.getTaskCode(),
    				newTaskInstance.getState(), newTaskInstance.getRetryTimes(), newTaskInstance.getMaxRetryTimes(),
    				newTaskInstance.getRetryInterval());
    		stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, newTaskInstance);
    		stateWheelExecuteThread.addTask4RetryCheck(processInstance, newTaskInstance);
    	} else {
    		addTaskToStandByList(newTaskInstance);
    		submitStandByTask();
    		waitToRetryTaskInstanceMap.remove(newTaskInstance.getTaskCode());
    	}
    }
    

    最终将需要容错的任务实例重新加入到了readyToSubmitTaskQueue队列中,重新进行submit:

    addTaskToStandByList(newTaskInstance);
    submitStandByTask();
    

    后面就是和正常任务一样处理了通过submitTaskExec方法提交任务到具体的worker执行。

    总结

    对于Worker的容错流程大致如下:

    1-Master基于ZK的监听来感知需要容错的Worker节点信息

    2-每个Master只负责容错属于自己调度的工作流实例,在容错前会比较实例的开始时间和服务节点的启动时间,在服务启动时间之后的则跳过容错;

    3-需要容错的任务实例会重新加入到readyToSubmitTaskQueue,并提交运行。

    到此,对于Worker的容错,就到这里了,更多关于DolphinScheduler容错Worker的资料请关注我们其它相关文章!

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜