海豚调度dolphinscheduler目前是 Apache 顶级项目,作为国内优秀的开源项目,它的架构设计理念会有许多值得我们学习和借鉴。
海豚调度dolphinscheduler是分布式易扩展的可视化DAG工作流任务调度系统
本文会包含如下内容:
本篇文章适合人群:架构师、技术专家以及对任务调度超级感兴趣的高级工程师
本文以海豚1.3.5的源代码进行分析。
SiftingAppender是Logback Appender的实现类,可以包含根据MDC值动态构建的appender。
SiftingAppender将Logback委托的日志事件根据接口Discriminator实现类的getDiscriminatingValue的返回值选择对应的appender, 然后由对应的appender写到不同的日志文件中。
源代码如下: 详细
ch.qos.logback.core.sift.SiftingAppenderBase 97行
注意:在获取appender的getOrCreate方法是一个同步方法,估计在日志量很大时,可能存在性能问题
@Override
protected void append(E event) {
if (!isStarted()) {
return;
}
String discriminatingValue = discriminator.getDiscriminatingValue(event);
long timestamp = getTimestamp(event);
Appender<E> appender = appenderTracker.getOrCreate(discriminatingValue, timestamp);
// marks the appender for removal as specified by the user
if (eventMarksEndOfLife(event)) {
appenderTracker.endOfLife(discriminatingValue);
}
appenderTracker.removeStaleComponents(timestamp);
appender.doAppend(event);
}配置示例
通过Discriminator定义Discriminator接口的实现类,及设置用于指定鉴别值(discriminating value)的key或变量名
通过sift配置动态构建appender的配置
<appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
<!-- <filter>
<level>INFO</level>
</filter> -->
<filter class="org.apache.dolphinscheduler.server.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.server.log.TaskLogDiscriminator">
<key>taskAppId</key>
<logBase>${log.base}</logBase>
</Discriminator>
<sift>
<appender name="FILE-${taskAppId}" class="org.apache.dolphinscheduler.server.log.TaskLogAppender">
<file>${log.base}/${taskAppId}.log</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %messsage%n
</pattern>
<charset>UTF-8</charset>
</encoder>
<append>true</append>
</appender>
</sift>
</appender>实现此接口是为了针对一个给定的日志事件计算discriminating值,discriminating值用来区分不同的日志事件。
SiftingAppender根据discriminating值来创建或选择已经存在的appender,如果在配置时,不指定具体class时,默认使用MDCBasedDiscriminator。
MDCBasedDiscriminator本质上是根据配置中指定的key,在MDC(Mapped Diagnostic Context,映射调试上下文)中获取对应的值,如果不存在,则返回默认值
MDC本质上一个ThreadLocal<Map<String, String>>,所以需要在写日志前,需要通过MDC.put("keyxx", discriminating-value),列如程序中不同线程的日志输出到不同的LOGER, discriminating-value填写线程ID即可。
如果需要根据业务需要,如海豚调度中,将不同任务的日志输出到不同的文件中,则需要实现此接口。
以下是Discriminator的类图

海豚调度是基于Logback日志框架的SiftingAppender,根据Task的流程定义ID、流程实例ID、任务实例ID来创建logger,并在Discriminator的接口实现类中将流程定义ID、流程实例ID、任务实例ID转化为目录
流程定义ID/流程实例ID/任务实例ID.log, 以实现将不同Task的日志输出到不同的日志文件。
任务是在WorkerServer中执行的,对应的日志配置在logback-worker.xml中,如下:
<!--workerserver的stdout配置及-->
<appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
<!-- <filter>
<level>INFO</level>
</filter> -->
<filter class="org.apache.dolphinscheduler.server.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.server.log.TaskLogDiscriminator">
<key>taskAppId</key>
<logBase>${log.base}</logBase>
</Discriminator>
<sift>
<appender name="FILE-${taskAppId}" class="org.apache.dolphinscheduler.server.log.TaskLogAppender">
<file>${log.base}/${taskAppId}.log</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %messsage%n
</pattern>
<charset>UTF-8</charset>
</encoder>
<append>true</append>
</appender>
</sift>
</appender>
<!--workerserver的WORKERLOGFILE配置及-->
<root level="INFO">
<appender-ref ref="TASKLOGFILE"/>
<appender-ref ref="WORKERLOGFILE"/>
</root>在task中指定logger的过程如下:
// 1 custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
//2 submit task
workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger));
//3 指定Task的logger
task = TaskManager.newTask(taskExecutionContext, taskLogger);
/**
* constructor
* @param taskExecutionContext taskExecutionContext
* @param logger logger
*/
protected AbstractTask(TaskExecutionContext taskExecutionContext, Logger logger) {
this.taskExecutionContext = taskExecutionContext;
this.logger = logger;
}源代码如下,将logger名称中的流程定义ID、流程实例ID、任务实例ID转换为目录,这样不同任务的日志就输出到不同的日志文件中
/**
* logger name should be like:
* Task Logger name should be like: Task-{processDefinitionId}-{processInstanceId}-{taskInstanceId}
*/
@Override
public String getDiscriminatingValue(ILoggingEvent event) {
String loggerName = event.getLoggerName()
.split(Constants.EQUAL_SIGN)[1];
String prefix = LoggerUtils.TASK_LOGGER_INFO_PREFIX + "-";
if (loggerName.startsWith(prefix)) {
return loggerName.substring(prefix.length(),
loggerName.length() - 1).replace("-","/");
} else {
return "unknown_task";
}
}