一文读懂,硬核 Apache DolphinScheduler3.0 源码解析
2024-05-08 21:17:04

一文读懂,硬核 Apache DolphinScheduler3.0 源码解析

全网最全大数据面试提升手册!一文读懂

本文目录
  1. DolphinScheduler的硬核源码设计与策略
    1.1 分布式设计
    1.1.1 中心化
    1.1.2 去中心化
    1.2 DophinScheduler架构设计
    1.3 容错问题
    1.3.1 宕机容错
    1.3.2 失败重试
    1.4 远程日志访问

  2. DolphinScheduler源码分析
    2.1 工程模块介绍与配置文件
    2.1.1 工程模块介绍
    2.1.2 配置文件
    2.2 Api主要任务操作接口
    2.3 Quaterz架构与运行流程
    2.3.1 概念与架构
    2.3.2 初始化与执行流程
    2.3.3 集群运转
    2.4 Master启动与执行流程
    2.4.1 概念与执行逻辑
    2.4.2 集群与槽(slot)
    2.4.3 代码执行流程
    2.5 Work启动与执行流程
    2.5.1 概念与执行逻辑
    2.5.2 代码执行流程
    2.6 rpc交互
    2.6.1 Master与Worker交互
    2.6.2 其他服务与Master交互
    2.7 负载均衡算法
    2.7.1 加权随机
    2.7.2 线性负载
    2.7.3 平滑轮询
    2.8 日志服务
    2.9 报警

大家能关注DolphinScheduler那么一定对调度系统有了一定的了解,对于调度所涉及的解析到一些专有名词在这里就不做过多的介绍,重点介绍一下流程定义,一文读懂流程实例,硬核源码任务定义 ,解析任务实例。一文读懂(没有作业这个概念确实也很新奇 ,硬核源码可能是解析不想和Quartz的JobDetail重叠)。

  • 任务定义:各种类型的一文读懂任务 ,是硬核源码流程定义的关键组成 ,如sql,解析shell,spark,mr ,python等;

  • 任务实例:任务的一文读懂实例化 ,标识着具体的硬核源码任务执行状态;

  • 流程定义 :一组任务节点通过依赖关系建立的起来的有向无环图(DAG);

  • 流程实例 :通过手动或者定时调度生成的流程实例;

  • 定时调度 :系统采用Quartz 分布式调度器,并同时支持cron表达式可视化的解析生成;

分布式系统的架构设计基本分为中心化和去中心化两种,各有优劣 ,凭借各自的业务选择。

1.1.1 中心化

中心化设计比较简单,集群中的节点安装角色可以分为Master和Slave两种 ,如下图 :

Master: Master的角色主要负责任务分发并监督Slave的健康状态 ,可以动态的将任务均衡到Slave上,以致Slave节点不至于“忙死”或”闲死”的状态。

中心化设计存在一些问题 。

第一点 ,一旦Master出现了问题,则群龙无首 ,整个集群就会崩溃。

为了解决这个问题,大多数Master/Slave架构模式都采用了主备Master的设计方案,可以是热备或者冷备,也可以是自动切换或手动切换,而且越来越多的新系统都开始具备自动选举切换Master的能力,以提升系统的可用性 。

第二点 ,如果Scheduler在Master上 ,虽然可以支持一个DAG中不同的任务运行在不同的机器上,但是会产生Master的过负载。如果Scheduler在Slave上,一个DAG中所有的任务都只能在某一台机器上进行作业提交 ,在并行任务比较多的时候 ,Slave的压力可能会比较大 。

xxl-job就是采用这种设计方式,但是存在相应的问题。管理器(admin)宕机集群会崩溃 ,Scheduler在管理器上 ,管理器负责所有任务的校验和分发,管理器存在过载的风险,需要开发者想方案解决。

1.1.2 去中心化

在去中心化设计里 ,通常没有Master/Slave的概念 ,所有的角色都是一样的 ,地位是平等的,去中心化设计的核心设计在于整个分布式系统中不存在一个区别于其他节点的“管理者”,因此不存在单点故障问题 。

但由于不存在“管理者”节点所以每个节点都需要跟其他节点通信才得到必须要的机器信息  ,而分布式系统通信的不可靠性,则大大增加了上述功能的实现难度 。实际上,真正去中心化的分布式系统并不多见。

反而动态中心化分布式系统正在不断涌出。在这种架构下 ,集群中的管理者是被动态选择出来的,而不是预置的 ,并且集群在发生故障的时候,集群的节点会自发的举行会议来选举新的管理者去主持工作。

一般都是基于Raft算法实现的选举策略。Raft算法,目前社区也有相应的PR ,还没合并 。

  • PR链接:https://github.com/apache/dolphinscheduler/issues/10874

  • 动态展示见链接 :http://thesecretlivesofdata.com/

DolphinScheduler的去中心化是Master/Worker注册到注册中心 ,实现Master集群和Worker集群无中心。

随手盗用一张官网的系统架构图,可以看到调度系统采用去中心化设计,由UI ,API,MasterServer ,Zookeeper,WorkServer ,Alert等几部分组成 。

API: API接口层 ,主要负责处理前端UI层的请求。该服务统一提供RESTful api向外部提供请求服务 。接口包括工作流的创建、定义  、查询 、修改 、发布 、下线 、手工启动、停止 、暂停、恢复、从该节点开始执行等等。

MasterServer: MasterServer采用分布式无中心设计理念 ,MasterServer集成了Quartz,主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态 。MasterServer服务启动时向Zookeeper注册临时节点 ,通过监听Zookeeper临时节点变化来进行容错处理。
WorkServer:WorkerServer也采用分布式无中心设计理念,WorkerServer主要负责任务的执行和提供日志服务。WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。

ZooKeeper: ZooKeeper服务,系统中的MasterServer和WorkerServer节点都通过ZooKeeper来进行集群管理和容错。另外系统还基于ZooKeeper进行事件监听和分布式锁 。

Alert:提供告警相关接口 ,接口主要包括两种类型的告警数据的存储、查询和通知功能 ,支持丰富的告警插件自由拓展配置 。

容错分为服务宕机容错和任务重试,服务宕机容错又分为Master容错和Worker容错两种情况;

1.3.1 宕机容错

服务容错设计依赖于ZooKeeper的Watcher机制 ,实现原理如图 :

其中Master监控其他Master和Worker的目录 ,如果监听到remove事件,则会根据具体的业务逻辑进行流程实例容错或者任务实例容错 ,容错流程图相对官方文档里面的流程图 ,人性化了些,大家可以参考一下 ,具体如下所示。

Master容错流程图

ZooKeeper Master容错完成之后则重新由DolphinScheduler中Scheduler线程调度,遍历 DAG 找到“正在运行”和“提交成功”的任务,对“正在运行”的任务监控其任务实例的状态  ,对“提交成功”的任务需要判断Task Queue中是否已经存在 ,如果存在则同样监控任务实例的状态,如果不存在则重新提交任务实例 。

Worker容错流程图

Master Scheduler线程一旦发现任务实例为” 需要容错”状态,则接管任务并进行重新提交 。注意由于” 网络抖动”可能会使得节点短时间内失去和ZooKeeper的心跳,从而发生节点的remove事件 。

对于这种情况 ,我们使用最简单的方式,那就是节点一旦和ZooKeeper发生超时连接 ,则直接将Master或Worker服务停掉 。

1.3.2 失败重试

这里首先要区分任务失败重试  、流程失败恢复、流程失败重跑的概念 :

  1. 任务失败重试是任务级别的 ,是调度系统自动进行的,比如一个Shell任务设置重试次数为3次,那么在Shell任务运行失败后会自己再最多尝试运行3次 。

  2. 流程失败恢复是流程级别的,是手动进行的,恢复是从只能从失败的节点开始执行或从当前节点开始执行。
    流程失败重跑也是流程级别的 ,是手动进行的 ,重跑是从开始节点进行。

接下来说正题 ,我们将工作流中的任务节点分了两种类型 。

  1. 一种是业务节点 ,这种节点都对应一个实际的脚本或者处理语句,比如Shell节点 、MR节点 、Spark节点 、依赖节点等 。

  2. 还有一种是逻辑节点 ,这种节点不做实际的脚本或语句处理 ,只是整个流程流转的逻辑处理,比如子流程节等。

每一个业务节点都可以配置失败重试的次数 ,当该任务节点失败  ,会自动重试 ,直到成功或者超过配置的重试次数。逻辑节点不支持失败重试 。但是逻辑节点里的任务支持重试 。

如果工作流中有任务失败达到最大重试次数,工作流就会失败停止,失败的工作流可以手动进行重跑操作或者流程恢复操作 。

由于Web(UI)和Worker不一定在同一台机器上 ,所以查看日志不能像查询本地文件那样。

有两种方案 :

  1. 将日志放到ES搜索引擎上;

  2. 通过netty通信获取远程日志信息;

介于考虑到尽可能的DolphinScheduler的轻量级性,所以选择了RPC实现远程访问日志信息 ,具体代码的实践见2.8章节 。

本章的主要目的是从代码层面一一介绍第一张讲解的功能。关于系统的安装在这里并不会涉及,安装运行请大家自行探索。

2.1.1 工程模块介绍
  • dolphinscheduler-alert 告警模块,提供告警服务;

  • dolphinscheduler-api web应用模块 ,提供 Rest Api 服务,供 UI 进行调用;

  • dolphinscheduler-common 通用的常量枚举、工具类、数据结构或者基类 dolphinscheduler-dao 提供数据库访问等操作;

  • dolphinscheduler-remote 基于netty的客户端 、服务端 ;

  • dolphinscheduler-server 日志与心跳服务 ;

  • dolphinscheduler-log-server LoggerServer 用于Rest Api通过RPC查看日志;

  • dolphinscheduler-master MasterServer服务 ,主要负责 DAG 的切分和任务状态的监控 ;

  • dolphinscheduler-worker WorkerServer服务 ,主要负责任务的提交、执行和任务状态的更新;

  • dolphinscheduler-service service模块 ,包含Quartz 、Zookeeper、日志客户端访问服务 ,便于server模块和api模块调用 ;

  • dolphinscheduler-ui 前端模块;

2.1.2 配置文件

dolphinscheduler-common common.properties

#本地工作目录,用于存放临时文件data.basedir.path=/tmp/dolphinscheduler#资源文件存储类型:HDFS,S3,NONEresource.storage.type=NONE#资源文件存储路径resource.upload.path=/dolphinscheduler#hadoop是否开启kerberos权限hadoop.security.authentication.startup.state=false#kerberos配置目录java.security.krb5.conf.path=/opt/krb5.conf#kerberos登录用户login.user.keytab.username=hdfs-mycluster@ESZ.COM#kerberos登录用户keytablogin.user.keytab.path=/opt/hdfs.headless.keytab#kerberos过期时间,整数,单位为小时kerberos.expire.time=2#如果存储类型为HDFS,需要配置拥有对应操作权限的用户hdfs.root.user=hdfs#请求地址如果resource.storage.type=S3,该值类似为:s3a://dolphinscheduler.如果resource.storage.type=HDFS,如果hadoop配置了HA,需要复制core-site.xml和hdfs-site.xml文件到conf目录fs.defaultFS=hdfs://mycluster:8020aws.access.key.id=minioadminaws.secret.access.key=minioadminaws.region=us-east-1aws.endpoint=http://localhost:9000#resourcemanagerport,thedefaultvalueis8088ifnotspecifiedresource.manager.httpaddress.port=8088#yarnresourcemanager地址,如果resourcemanager开启了HA,输入HA的IP地址(以逗号分隔),如果resourcemanager为单节点,该值为空即可yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx#如果resourcemanager开启了HA或者没有使用resourcemanager,保持默认值即可.如果resourcemanager为单节点,你需要将ds1配置为resourcemanager对应的hostnameyarn.application.status.address=http://ds1:%s/ws/v1/cluster/apps/%s#jobhistorystatusurlwhenapplicationnumberthresholdisreached(default10000,maybeitwassetto1000)yarn.job.history.status.address=http://ds1:19888/ws/v1/history/mapreduce/jobs/%s#datasourceencryptionenabledatasource.encryption.enable=false#datasourceencryptionsaltdatasource.encryption.salt=!@#$%^&*#dataqualityoptiondata-quality.jar.name=dolphinscheduler-data-quality-dev-SNAPSHOT.jar#data-quality.error.output.path=/tmp/data-quality-error-data#NetworkIPgetspriority,defaultinnerouter#WhetherhiveSQLisexecutedinthesamesessionsupport.hive.oneSession=false#usesudoornot,ifsettrue,executinguseristenantuseranddeployuserneedssudopermissions;ifsetfalse,executinguseristhedeployuseranddoesn'tneedsudopermissionssudo.enable=true#networkinterfacepreferredlikeeth0,default:empty#dolphin.scheduler.network.interface.preferred=#networkIPgetspriority,default:innerouter#dolphin.scheduler.network.priority.strategy=default#systemenvpath#dolphinscheduler.env.path=dolphinscheduler_env.sh#是否处于开发模式development.state=false#rpcportalert.rpc.port=50052#UrlendpointforzeppelinRESTfulAPIzeppelin.rest.url=http://localhost:8080

dolphinscheduler-api application.yaml

server:port:12345servlet:session:timeout:120mcontext-path:/dolphinscheduler/compression:enabled:truemime-types:text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json,application/xmljetty:max-http-form-post-size:5000000spring:application:name:api-serverbanner:charset:UTF-8jackson:time-zone:UTCdate-format:"yyyy-MM-ddHH:mm:ss"servlet:multipart:max-file-size:1024MBmax-request-size:1024MBmessages:basename:i18n/messagesdatasource:#driver-class-name:org.postgresql.Driver#url:jdbc:postgresql://127.0.0.1:5432/dolphinschedulerdriver-class-name:com.mysql.jdbc.Driverurl:jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNullusername:rootpassword:roothikari:connection-test-query:select1minimum-idle:5auto-commit:truevalidation-timeout:3000pool-name:DolphinSchedulermaximum-pool-size:50connection-timeout:30000idle-timeout:600000leak-detection-threshold:0initialization-fail-timeout:1quartz:auto-startup:falsejob-store-type:jdbcjdbc:initialize-schema:neverproperties:org.quartz.threadPool:threadPriority:5org.quartz.jobStore.isClustered:trueorg.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTXorg.quartz.scheduler.instanceId:AUTOorg.quartz.jobStore.tablePrefix:QRTZ_org.quartz.jobStore.acquireTriggersWithinLock:trueorg.quartz.scheduler.instanceName:DolphinSchedulerorg.quartz.threadPool.class:org.quartz.simpl.SimpleThreadPoolorg.quartz.jobStore.useProperties:falseorg.quartz.threadPool.makeThreadsDaemons:trueorg.quartz.threadPool.threadCount:25org.quartz.jobStore.misfireThreshold:60000org.quartz.scheduler.makeSchedulerThreadDaemon:true#org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.PostgreSQLDelegateorg.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegateorg.quartz.jobStore.clusterCheckinInterval:5000management:endpoints:web:exposure:include:'*'metrics:tags:application:${ spring.application.name}registry:type:zookeeperzookeeper:namespace:dolphinscheduler#connect-string:localhost:2181connect-string:10.255.158.70:2181retry-policy:base-sleep-time:60msmax-sleep:300msmax-retries:5session-timeout:30sconnection-timeout:9sblock-until-connected:600msdigest:~audit:enabled:falsemetrics:enabled:truepython-gateway:#Weatherenablepythongatewayserverornot.Thedefaultvalueistrue.enabled:true#TheaddressofPythongatewayserverstart.Setitsvalueto`0.0.0.0`ifyourPythonAPIrunindifferent#betweenPythongatewayserver.Itcouldbebespecifictootheraddresslike`127.0.0.1`or`localhost`gateway-server-address:0.0.0.0#TheportofPythongatewayserverstart.DefinewhichportyoucouldconnecttoPythongatewayserverfrom#PythonAPIside.gateway-server-port:25333#TheaddressofPythoncallbackclient.python-address:127.0.0.1#TheportofPythoncallbackclient.python-port:25334#Closeconnectionofsocketserverifnootherrequestacceptafterxmilliseconds.Definevalueis(0=infinite),#andsocketserverwouldnevercloseeventhoughnorequestsacceptconnect-timeout:0#Closeeachactiveconnectionofsocketserverifpythonprogramnotactiveafterxmilliseconds.Definevalueis#(0=infinite),andsocketserverwouldnevercloseeventhoughnorequestsacceptread-timeout:0#Overridebyprofile---spring:config:activate:on-profile:mysqldatasource:driver-class-name:com.mysql.jdbc.Driverurl:jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8quartz:properties:org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate

dolphinscheduler-master application.yaml

spring:banner:charset:UTF-8application:name:master-serverjackson:time-zone:UTCdate-format:"yyyy-MM-ddHH:mm:ss"cache:#defaultenablecache,youcandisableby`type:none`type:nonecache-names:-tenant-user-processDefinition-processTaskRelation-taskDefinitioncaffeine:spec:maximumSize=100,expireAfterWrite=300s,recordStatsdatasource:#driver-class-name:org.postgresql.Driver#url:jdbc:postgresql://127.0.0.1:5432/dolphinschedulerdriver-class-name:com.mysql.jdbc.Driverurl:jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNullusername:rootpassword:hikari:connection-test-query:select1minimum-idle:5auto-commit:truevalidation-timeout:3000pool-name:DolphinSchedulermaximum-pool-size:50connection-timeout:30000idle-timeout:600000leak-detection-threshold:0initialization-fail-timeout:1quartz:job-store-type:jdbcjdbc:initialize-schema:neverproperties:org.quartz.threadPool:threadPriority:5org.quartz.jobStore.isClustered:trueorg.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTXorg.quartz.scheduler.instanceId:AUTOorg.quartz.jobStore.tablePrefix:QRTZ_org.quartz.jobStore.acquireTriggersWithinLock:trueorg.quartz.scheduler.instanceName:DolphinSchedulerorg.quartz.threadPool.class:org.quartz.simpl.SimpleThreadPoolorg.quartz.jobStore.useProperties:falseorg.quartz.threadPool.makeThreadsDaemons:trueorg.quartz.threadPool.threadCount:25org.quartz.jobStore.misfireThreshold:60000org.quartz.scheduler.makeSchedulerThreadDaemon:true#org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.PostgreSQLDelegateorg.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegateorg.quartz.jobStore.clusterCheckinInterval:5000registry:type:zookeeperzookeeper:namespace:dolphinscheduler#connect-string:localhost:2181connect-string:10.255.158.70:2181retry-policy:base-sleep-time:60msmax-sleep:300msmax-retries:5session-timeout:30sconnection-timeout:9sblock-until-connected:600msdigest:~master:listen-port:5678#masterfetchcommandnumfetch-command-num:10#masterprepareexecutethreadnumbertolimithandlecommandsinparallelpre-exec-threads:10#masterexecutethreadnumbertolimitprocessinstancesinparallelexec-threads:100#masterdispatchtasknumberperbatchdispatch-task-number:3#masterhostselectortoselectasuitableworker,defaultvalue:LowerWeight.Optionalvaluesincluderandom,round_robin,lower_weighthost-selector:lower_weight#masterheartbeatinterval,theunitissecondheartbeat-interval:10#mastercommittaskretrytimestask-commit-retry-times:5#mastercommittaskinterval,theunitismillisecondtask-commit-interval:1000state-wheel-interval:5#mastermaxcpuloadavg,onlyhigherthanthesystemcpuloadaverage,masterservercanschedule.defaultvalue-1:thenumberofcpucores*2max-cpu-load-avg:-1#masterreservedmemory,onlylowerthansystemavailablememory,masterservercanschedule.defaultvalue0.3,theunitisGreserved-memory:0.3#failoverinterval,theunitisminutefailover-interval:10#killyarnjonwhenfailovertaskInstance,defaulttruekill-yarn-job-when-task-failover:trueserver:port:5679management:endpoints:web:exposure:include:'*'metrics:tags:application:${ spring.application.name}metrics:enabled:true#Overridebyprofile---spring:config:activate:on-profile:mysqldatasource:driver-class-name:com.mysql.jdbc.Driverurl:jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNullquartz:properties:org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate

dolphinscheduler-worker application.yaml

spring:banner:charset:UTF-8application:name:worker-serverjackson:time-zone:UTCdate-format:"yyyy-MM-ddHH:mm:ss"datasource:#driver-class-name:org.postgresql.Driver#url:jdbc:postgresql://127.0.0.1:5432/dolphinschedulerdriver-class-name:com.mysql.jdbc.Driverurl:jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNullusername:root#password:rootpassword:hikari:connection-test-query:select1minimum-idle:5auto-commit:truevalidation-timeout:3000pool-name:DolphinSchedulermaximum-pool-size:50connection-timeout:30000idle-timeout:600000leak-detection-threshold:0initialization-fail-timeout:1registry:type:zookeeperzookeeper:namespace:dolphinscheduler#connect-string:localhost:2181connect-string:10.255.158.70:2181retry-policy:base-sleep-time:60msmax-sleep:300msmax-retries:5session-timeout:30sconnection-timeout:9sblock-until-connected:600msdigest:~worker:#workerlistenerportlisten-port:1234#workerexecutethreadnumbertolimittaskinstancesinparallelexec-threads:100#workerheartbeatinterval,theunitissecondheartbeat-interval:10#workerhostweighttodispatchtasks,defaultvalue100host-weight:100#workertenantautocreatetenant-auto-create:true#workermaxcpuloadavg,onlyhigherthanthesystemcpuloadaverage,workerservercanbedispatchedtasks.defaultvalue-1:thenumberofcpucores*2max-cpu-load-avg:-1#workerreservedmemory,onlylowerthansystemavailablememory,workerservercanbedispatchedtasks.defaultvalue0.3,theunitisGreserved-memory:0.3#defaultworkergroupsseparatedbycomma,like'worker.groups=default,test'groups:-default#alertserverlistenhostalert-listen-host:localhostalert-listen-port:50052server:port:1235management:endpoints:web:exposure:include:'*'metrics:tags:application:${ spring.application.name}metrics:enabled:true

其他业务接口可以不用关注,只需要关注最最主要的流程上线功能接口 ,此接口可以发散出所有的任务调度相关的代码 。

接口 :
/dolphinscheduler/projects/{ projectCode}/schedules/{ id}/online;

此接口会将定义的流程提交到Quartz调度框架;

publicMap<String,Object>setScheduleState(UserloginUser,longprojectCode,Integerid,ReleaseStatescheduleStatus){ Map<String,Object>result=newHashMap<>();Projectproject=projectMapper.queryByCode(projectCode);//checkprojectauthbooleanhasProjectAndPerm=projectService.hasProjectAndPerm(loginUser,project,result);if(!hasProjectAndPerm){ returnresult;}//checkscheduleexistsSchedulescheduleObj=scheduleMapper.selectById(id);if(scheduleObj==null){ putMsg(result,Status.SCHEDULE_CRON_NOT_EXISTS,id);returnresult;}//checkschedulereleasestateif(scheduleObj.getReleaseState()==scheduleStatus){ logger.info("schedulereleaseisalready{ },needn'ttochangescheduleid:{ }from{ }to{ }",scheduleObj.getReleaseState(),scheduleObj.getId(),scheduleObj.getReleaseState(),scheduleStatus);putMsg(result,Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE,scheduleStatus);returnresult;}ProcessDefinitionprocessDefinition=processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());if(processDefinition==null||projectCode!=processDefinition.getProjectCode()){ putMsg(result,Status.PROCESS_DEFINE_NOT_EXIST,String.valueOf(scheduleObj.getProcessDefinitionCode()));returnresult;}List<ProcessTaskRelation>processTaskRelations=processTaskRelationMapper.queryByProcessCode(projectCode,scheduleObj.getProcessDefinitionCode());if(processTaskRelations.isEmpty()){ putMsg(result,Status.PROCESS_DAG_IS_EMPTY);returnresult;}if(scheduleStatus==ReleaseState.ONLINE){ //checkprocessdefinitionreleasestateif(processDefinition.getReleaseState()!=ReleaseState.ONLINE){ logger.info("notreleaseprocessdefinitionid:{ },name:{ }",processDefinition.getId(),processDefinition.getName());putMsg(result,Status.PROCESS_DEFINE_NOT_RELEASE,processDefinition.getName());returnresult;}//checksubprocessdefinitionreleasestateList<Long>subProcessDefineCodes=newArrayList<>();processService.recurseFindSubProcess(processDefinition.getCode(),subProcessDefineCodes);if(!subProcessDefineCodes.isEmpty()){ List<ProcessDefinition>subProcessDefinitionList=processDefinitionMapper.queryByCodes(subProcessDefineCodes);if(subProcessDefinitionList!=null&&!subProcessDefinitionList.isEmpty()){ for(ProcessDefinitionsubProcessDefinition:subProcessDefinitionList){ /***ifthereisnoonlineprocess,exitdirectly*/if(subProcessDefinition.getReleaseState()!=ReleaseState.ONLINE){ logger.info("notreleaseprocessdefinitionid:{ },name:{ }",subProcessDefinition.getId(),subProcessDefinition.getName());putMsg(result,Status.PROCESS_DEFINE_NOT_RELEASE,String.valueOf(subProcessDefinition.getId()));returnresult;}}}}}//checkmasterserverexistsList<Server>masterServers=monitorService.getServerListFromRegistry(true);if(masterServers.isEmpty()){ putMsg(result,Status.MASTER_NOT_EXISTS);returnresult;}//setstatusscheduleObj.setReleaseState(scheduleStatus);scheduleMapper.updateById(scheduleObj);try{ switch(scheduleStatus){ caseONLINE:logger.info("Callmasterclientsetscheduleonline,projectid:{ },flowid:{ },host:{ }",project.getId(),processDefinition.getId(),masterServers);setSchedule(project.getId(),scheduleObj);break;caseOFFLINE:logger.info("Callmasterclientsetscheduleoffline,projectid:{ },flowid:{ },host:{ }",project.getId(),processDefinition.getId(),masterServers);deleteSchedule(project.getId(),id);break;default:putMsg(result,Status.SCHEDULE_STATUS_UNKNOWN,scheduleStatus.toString());returnresult;}}catch(Exceptione){ result.put(Constants.MSG,scheduleStatus==ReleaseState.ONLINE?"setonlinefailure":"setofflinefailure");thrownewServiceException(result.get(Constants.MSG).toString(),e);}putMsg(result,Status.SUCCESS);returnresult;}publicvoidsetSchedule(intprojectId,Scheduleschedule){ logger.info("setschedule,projectid:{ },scheduleId:{ }",projectId,schedule.getId());quartzExecutor.addJob(ProcessScheduleJob.class,projectId,schedule);}publicvoidaddJob(Class<?extendsJob>clazz,intprojectId,finalScheduleschedule){ StringjobName=this.buildJobName(schedule.getId());StringjobGroupName=this.buildJobGroupName(projectId);Map<String,Object>jobDataMap=this.buildDataMap(projectId,schedule);StringcronExpression=schedule.getCrontab();StringtimezoneId=schedule.getTimezoneId();/***transformfromserverdefaulttimezonetoscheduletimezone*e.g.serverdefaulttimezoneis`UTC`*usersetaschedulewithstartTime`2022-04-2810:00:00`,timezoneis`Asia/Shanghai`,*apiskiptotransformitandsaveintodatabasesdirectly,startTime`2022-04-2810:00:00`,timezoneis`UTC`,whichactuallyadded8hours,*sowhenaddjobtoquartz,itshouldrecoverbytransformtimezone*/DatestartDate=DateUtils.transformTimezoneDate(schedule.getStartTime(),timezoneId);DateendDate=DateUtils.transformTimezoneDate(schedule.getEndTime(),timezoneId);lock.writeLock().lock();try{ JobKeyjobKey=newJobKey(jobName,jobGroupName);JobDetailjobDetail;//addatask(ifthistaskalreadyexists,returnthistaskdirectly)if(scheduler.checkExists(jobKey)){ jobDetail=scheduler.getJobDetail(jobKey);jobDetail.getJobDataMap().putAll(jobDataMap);}else{ jobDetail=newJob(clazz).withIdentity(jobKey).build();jobDetail.getJobDataMap().putAll(jobDataMap);scheduler.addJob(jobDetail,false,true);logger.info("Addjob,jobname:{ },groupname:{ }",jobName,jobGroupName);}TriggerKeytriggerKey=newTriggerKey(jobName,jobGroupName);/**InstructstheSchedulerthatuponamis-fire*situation,theCronTriggerwantstohaveit's*next-fire-timeupdatedtothenexttimeinthescheduleafterthe*currenttime(takingintoaccountanyassociatedCalendar),*butitdoesnotwanttobefirednow.*/CronTriggercronTrigger=newTrigger().withIdentity(triggerKey).startAt(startDate).endAt(endDate).withSchedule(cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing().inTimeZone(DateUtils.getTimezone(timezoneId))).forJob(jobDetail).build();if(scheduler.checkExists(triggerKey)){ //updateProcessInstanceschedulertriggerwhenschedulercyclechangesCronTriggeroldCronTrigger=(CronTrigger)scheduler.getTrigger(triggerKey);StringoldCronExpression=oldCronTrigger.getCronExpression();if(!StringUtils.equalsIgnoreCase(cronExpression,oldCronExpression)){ //reschedulejobtriggerscheduler.rescheduleJob(triggerKey,cronTrigger);logger.info("reschedulejobtrigger,triggerName:{ },triggerGroupName:{ },cronExpression:{ },startDate:{ },endDate:{ }",jobName,jobGroupName,cronExpression,startDate,endDate);}}else{ scheduler.scheduleJob(cronTrigger);logger.info("schedulejobtrigger,triggerName:{ },triggerGroupName:{ },cronExpression:{ },startDate:{ },endDate:{ }",jobName,jobGroupName,cronExpression,startDate,endDate);}}catch(Exceptione){ thrownewServiceException("addjobfailed",e);}finally{ lock.writeLock().unlock();}}2.3.1 概念与架构

Quartz 框架主要包括如下几个部分:

  • SchedulerFactory:任务调度工厂 ,主要负责管理任务调度器;

  • Scheduler  :任务调度器 ,主要负责任务调度 ,以及操作任务的相关接口;

  • Job :任务接口,实现类包含具体任务业务代码;

  • JobDetail:用于定义作业的实例;

  • Trigger:任务触发器 ,主要存放 Job 执行的时间策略 。例如多久执行一次 ,什么时候执行,以什么频率执行等等;

  • JobBuilder :用于定义/构建 JobDetail 实例 ,用于定义作业的实例。

  • TriggerBuilder :用于定义/构建触发器实例;

  • Calendar:Trigger 扩展对象 ,可以排除或者包含某个指定的时间点(如排除法定节假日);

  • JobStore :存储作业和任务调度期间的状态Scheduler的生命期,从 SchedulerFactory 创建它时开始,到 Scheduler 调用Shutdown() 方法时结束;

Scheduler 被创建后,可以增加、删除和列举 Job 和 Trigger ,以及执行其它与调度相关的操作(如暂停 Trigger) 。但Scheduler 只有在调用 start() 方法后 ,才会真正地触发 trigger(即执行 job)

2.3.2 初始化与执行流程

Quartz的基本原理就是通过Scheduler来调度被JobDetail和Trigger定义的安装Job接口规范实现的自定义任务业务对象 ,来完成任务的调度。基本逻辑如下图:

代码时序图如下 :

基本内容就是初始化任务调度容器Scheduler,以及容器所需的线程池 ,数据交互对象JobStore,任务处理线程QuartzSchedulerThread用来处理Job接口的具体业务实现类。

DolphinScheduler的业务类是ProcessScheduleJob,主要功能就是根据调度信息往commond表中写数据。

2.3.3 集群运转

需要注意的事:

  1. 当Quartz采用集群形式部署的时候 ,存储介质不能使用内存的形式 ,也就是不能使用JobStoreRAM。

  2. Quartz集群对于对于需要被调度的Triggers实例的扫描是使用数据库锁TRIGGER_ACCESS来完成的,保障此扫描过程只能被一个Quartz实例获取到 。代码如下  :

publicList<OperableTrigger>acquireNextTriggers(finallongnoLaterThan,finalintmaxCount,finallongtimeWindow)throwsJobPersistenceException{ StringlockName;if(isAcquireTriggersWithinLock()||maxCount>1){ lockName=LOCK_TRIGGER_ACCESS;}else{ lockName=null;}returnexecuteInNonManagedTXLock(lockName,newTransactionCallback<List<OperableTrigger>>(){ publicList<OperableTrigger>execute(Connectionconn)throwsJobPersistenceException{ returnacquireNextTrigger(conn,noLaterThan,maxCount,timeWindow);}},newTransactionValidator<List<OperableTrigger>>(){ publicBooleanvalidate(Connectionconn,List<OperableTrigger>result)throwsJobPersistenceException{ try{ List<FiredTriggerRecord>acquired=getDelegate().selectInstancesFiredTriggerRecords(conn,getInstanceId());Set<String>fireInstanceIds=newHashSet<String>();for(FiredTriggerRecordft:acquired){ fireInstanceIds.add(ft.getFireInstanceId());}for(OperableTriggertr:result){ if(fireInstanceIds.contains(tr.getFireInstanceId())){ returntrue;}}returnfalse;}catch(SQLExceptione){ thrownewJobPersistenceException("errorvalidatingtriggeracquisition",e);}}});}
  1. 集群失败实例恢复需要注意的是各个实例恢复各自实例对应的异常实例 ,因为数据库有调度容器的instanceId信息。代码如下:

protectedvoidclusterRecover(Connectionconn,List<SchedulerStateRecord>failedInstances)throwsJobPersistenceException{ if(failedInstances.size()>0){ longrecoverIds=System.currentTimeMillis();logWarnIfNonZero(failedInstances.size(),"ClusterManager:detected"+failedInstances.size()+"failedorrestartedinstances.");try{ for(SchedulerStateRecordrec:failedInstances){ getLog().info("ClusterManager:Scanningforinstance\""+rec.getSchedulerInstanceId()+"\"'sfailedin-progressjobs.");List<FiredTriggerRecord>firedTriggerRecs=getDelegate().selectInstancesFiredTriggerRecords(conn,rec.getSchedulerInstanceId());intacquiredCount=0;intrecoveredCount=0;intotherCount=0;Set<TriggerKey>triggerKeys=newHashSet<TriggerKey>();for(FiredTriggerRecordftRec:firedTriggerRecs){ TriggerKeytKey=ftRec.getTriggerKey();JobKeyjKey=ftRec.getJobKey();triggerKeys.add(tKey);//releaseblockedtriggers..if(ftRec.getFireInstanceState().equals(STATE_BLOCKED)){ getDelegate().updateTriggerStatesForJobFromOtherState(conn,jKey,STATE_WAITING,STATE_BLOCKED);}elseif(ftRec.getFireInstanceState().equals(STATE_PAUSED_BLOCKED)){ getDelegate().updateTriggerStatesForJobFromOtherState(conn,jKey,STATE_PAUSED,STATE_PAUSED_BLOCKED);}//releaseacquiredtriggers..if(ftRec.getFireInstanceState().equals(STATE_ACQUIRED)){ getDelegate().updateTriggerStateFromOtherState(conn,tKey,STATE_WAITING,STATE_ACQUIRED);acquiredCount++;}elseif(ftRec.isJobRequestsRecovery()){ //handlejobsmarkedforrecoverythatwerenotfully//executed..if(jobExists(conn,jKey)){ @SuppressWarnings("deprecation")SimpleTriggerImplrcvryTrig=newSimpleTriggerImpl("recover_"+rec.getSchedulerInstanceId()+"_"+String.valueOf(recoverIds++),Scheduler.DEFAULT_RECOVERY_GROUP,newDate(ftRec.getScheduleTimestamp()));rcvryTrig.setJobName(jKey.getName());rcvryTrig.setJobGroup(jKey.getGroup());rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY);rcvryTrig.setPriority(ftRec.getPriority());JobDataMapjd=getDelegate().selectTriggerJobDataMap(conn,tKey.getName(),tKey.getGroup());jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME,tKey.getName());jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP,tKey.getGroup());jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS,String.valueOf(ftRec.getFireTimestamp()));jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_SCHEDULED_FIRETIME_IN_MILLISECONDS,String.valueOf(ftRec.getScheduleTimestamp()));rcvryTrig.setJobDataMap(jd);rcvryTrig.computeFirstFireTime(null);storeTrigger(conn,rcvryTrig,null,false,STATE_WAITING,false,true);recoveredCount++;}else{ getLog().warn("ClusterManager:failedjob'"+jKey+"'nolongerexists,cannotschedulerecovery.");otherCount++;}}else{ otherCount++;}//freeupstatefuljob'striggersif(ftRec.isJobDisallowsConcurrentExecution()){ getDelegate().updateTriggerStatesForJobFromOtherState(conn,jKey,STATE_WAITING,STATE_BLOCKED);getDelegate().updateTriggerStatesForJobFromOtherState(conn,jKey,STATE_PAUSED,STATE_PAUSED_BLOCKED);}}getDelegate().deleteFiredTriggers(conn,rec.getSchedulerInstanceId());//Checkifanyofthefiredtriggerswejustdeletedwerethelastfiredtrigger//recordsofaCOMPLETEtrigger.intcompleteCount=0;for(TriggerKeytriggerKey:triggerKeys){ if(getDelegate().selectTriggerState(conn,triggerKey).equals(STATE_COMPLETE)){ List<FiredTriggerRecord>firedTriggers=getDelegate().selectFiredTriggerRecords(conn,triggerKey.getName(),triggerKey.getGroup());if(firedTriggers.isEmpty()){ if(removeTrigger(conn,triggerKey)){ completeCount++;}}}}logWarnIfNonZero(acquiredCount,"ClusterManager:......Freed"+acquiredCount+"acquiredtrigger(s).");logWarnIfNonZero(completeCount,"ClusterManager:......Deleted"+completeCount+"completetriggers(s).");logWarnIfNonZero(recoveredCount,"ClusterManager:......Scheduled"+recoveredCount+"recoverablejob(s)forrecovery.");logWarnIfNonZero(otherCount,"ClusterManager:......Cleaned-up"+otherCount+"otherfailedjob(s).");if(!rec.getSchedulerInstanceId().equals(getInstanceId())){ getDelegate().deleteSchedulerState(conn,rec.getSchedulerInstanceId());}}}catch(Throwablee){ thrownewJobPersistenceException("Failurerecoveringjobs:"+e.getMessage(),e);}}}
2.4.1 概念与执行逻辑

关键概念:

Quartz相关 :

  • Scheduler(任务调度容器,一般都是StdScheduler实例) 。

  • ProcessScheduleJob :(实现Quarts调度框架的Job接口的业务类,专门生成DolphinScheduler数据库业务表t_ds_commond数据);

DolphinScheduler相关 :

  • NettyRemotingServer(netty服务端,包含netty服务端serverBootstrap对象与netty服务端业务处理对象serverHandler) , NettyServerHandler :(netty服务端业务处理类 :包含各类处理器以及处理器对应的执行线程池);

  • TaskPluginManager(任务插件管理器,不同类型的任务以插件的形式管理 ,在应用服务启动的时候 ,通过@AutoService加载实现了TaskChannelFactory接口的工厂信息到数据库,通过工厂对象来加载各类TaskChannel实现类到缓存);

  • MasterRegistryClient(master操作zk的客户端,封装了master对于zk的所有操作,注册 ,查询 ,删除等);

  • MasterSchedulerService(扫描服务 ,包含业务执行线程和work包含的nettyhe护短,负责任务调度业务 ,slot来控制集群模式下任务不被重复调度 ,底层实现是zookeeper分布式锁);

  • WorkflowExecuteThread(真正的业务处理线程,通过插槽获取命令commond,执行之前会校验slot的变化,如果变化不执行 ,关键功能就是构建任务相关的参数,定义,优先级等 ,然后发送到队列 ,供队列处理线程消费);

  • CommonTaskProcessor(普通任务处理器,实现ITaskProcessor接口,根据业务分为普通,依赖,子任务 ,阻塞 ,条件任务类型 ,包含了任务的提交 ,运行,分发 ,杀死等业务 ,通过@AutoService加载的类  ,根本就是封装了对);

  • TaskPriorityQueueImpl(任务队列,负责任务队列的存储控制);

  • TaskPriorityQueueConsumer(任务队列消费线程 ,负责任务的根据负载均衡策略在worker之间分发与执行);

  • ServerNodeManager (节点信息控制器 ,负责节点注册信息更新与槽位(slot)变更 ,底层实现是zookeeper分布式锁的应用);

  • EventExecuteService(事件处理线程,通过缓存起来的任务处理线程  ,处理每个任务在处理过程中注册在线程事件队列中的事件);

  • FailoverExecuteThread(故障转移线程,包含Master和worker的);

  • MasterRegistryDataListener(托管在zk管理框架cautor的故障监听器 ,负责对worker和master注册在zk上的节点的新增和删除) 。

主节点容错代码如下 ,业务解释见1.5.1Master容错解释:

privatevoidfailoverMasterWithLock(StringmasterHost){ StringfailoverPath=getFailoverLockPath(NodeType.MASTER,masterHost);try{ registryClient.getLock(failoverPath);this.failoverMaster(masterHost);}catch(Exceptione){ LOGGER.error("{ }serverfailoverfailed,host:{ }",NodeType.MASTER,masterHost,e);}finally{ registryClient.releaseLock(failoverPath);}}/***failovermaster*<p>*failoverprocessinstanceandassociatedtaskinstance*故障转移流程实例和关联的任务实例*@parammasterHostmasterhost*/privatevoidfailoverMaster(StringmasterHost){ if(StringUtils.isEmpty(masterHost)){ return;}DateserverStartupTime=getServerStartupTime(NodeType.MASTER,masterHost);longstartTime=System.currentTimeMillis();List<ProcessInstance>needFailoverProcessInstanceList=processService.queryNeedFailoverProcessInstances(masterHost);LOGGER.info("startmaster[{ }]failover,processlistsize:{ }",masterHost,needFailoverProcessInstanceList.size());List<Server>workerServers=registryClient.getServerList(NodeType.WORKER);for(ProcessInstanceprocessInstance:needFailoverProcessInstanceList){ if(Constants.NULL.equals(processInstance.getHost())){ continue;}List<TaskInstance>validTaskInstanceList=processService.findValidTaskListByProcessId(processInstance.getId());for(TaskInstancetaskInstance:validTaskInstanceList){ LOGGER.info("failovertaskinstanceid:{ },processinstanceid:{ }",taskInstance.getId(),taskInstance.getProcessInstanceId());failoverTaskInstance(processInstance,taskInstance,workerServers);}if(serverStartupTime!=null&&processInstance.getRestartTime()!=null&&processInstance.getRestartTime().after(serverStartupTime)){ continue;}LOGGER.info("failoverprocessinstanceid:{ }",processInstance.getId());//updateProcessInstancehostisnullandinsertintocommandprocessInstance.setHost(Constants.NULL);processService.processNeedFailoverProcessInstances(processInstance);}LOGGER.info("master[{ }]failoverend,useTime:{ }ms",masterHost,System.currentTimeMillis()-startTime);}2.4.2 集群与槽(slot)

其实这里的采用Zookeer分布式锁准确也不准确 ,为什么这么说 ,因为Slot是CommondId对Master列表长度取模来计算的,而Master列表长度的刷新是Zookeeper分布式锁来控制,Master节点的调度数据扫描是通过Slot来控制的。

具体代码如下:

Slot刷新

privatevoidupdateMasterNodes(){ MASTER_SLOT=0;MASTER_SIZE=0;this.masterNodes.clear();StringnodeLock=Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;try{ registryClient.getLock(nodeLock);Collection<String>currentNodes=registryClient.getMasterNodesDirectly();List<Server>masterNodes=registryClient.getServerList(NodeType.MASTER);syncMasterNodes(currentNodes,masterNodes);}catch(Exceptione){ logger.error("updatemasternodeserror",e);}finally{ registryClient.releaseLock(nodeLock);}}/***syncmasternodes**@paramnodesmasternodes*/privatevoidsyncMasterNodes(Collection<String>nodes,List<Server>masterNodes){ masterLock.lock();try{ Stringaddr=NetUtils.getAddr(NetUtils.getHost(),masterConfig.getListenPort());this.masterNodes.addAll(nodes);this.masterPriorityQueue.clear();this.masterPriorityQueue.putList(masterNodes);intindex=masterPriorityQueue.getIndex(addr);if(index>=0){ MASTER_SIZE=nodes.size();MASTER_SLOT=index;}else{ logger.warn("currentaddr:{ }isnotinactivemasterlist",addr);}logger.info("updatemasternodes,mastersize:{ },slot:{ },addr:{ }",MASTER_SIZE,MASTER_SLOT,addr);}finally{ masterLock.unlock();}}

Slot应用

/***1.getcommandbyslot*2.donothandlecommandifslotisempty*//***1.通过插槽获取命令*2.如果插槽为空,则不处理命令*/privatevoidscheduleProcess()throwsException{ List<Command>commands=findCommands();if(CollectionUtils.isEmpty(commands)){ //indicatethatnocommand,sleepfor1sThread.sleep(Constants.SLEEP_TIME_MILLIS);return;}List<ProcessInstance>processInstances=command2ProcessInstance(commands);if(CollectionUtils.isEmpty(processInstances)){ return;}for(ProcessInstanceprocessInstance:processInstances){ if(processInstance==null){ continue;}WorkflowExecuteThreadworkflowExecuteThread=newWorkflowExecuteThread(processInstance,processService,nettyExecutorManager,processAlertManager,masterConfig,stateWheelExecuteThread);this.processInstanceExecCacheManager.cache(processInstance.getId(),workflowExecuteThread);if(processInstance.getTimeout()>0){ stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);}workflowExecuteThreadPool.startWorkflow(workflowExecuteThread);}}privateList<Command>findCommands(){ intpageNumber=0;intpageSize=masterConfig.getFetchCommandNum();List<Command>result=newArrayList<>();if(Stopper.isRunning()){ intthisMasterSlot=ServerNodeManager.getSlot();intmasterCount=ServerNodeManager.getMasterSize();if(masterCount>0){ result=processService.findCommandPageBySlot(pageSize,pageNumber,masterCount,thisMasterSlot);}}returnresult;}@OverridepublicList<Command>findCommandPageBySlot(intpageSize,intpageNumber,intmasterCount,intthisMasterSlot){ if(masterCount<=0){ returnLists.newArrayList();}returncommandMapper.queryCommandPageBySlot(pageSize,pageNumber*pageSize,masterCount,thisMasterSlot);}<selectid="queryCommandPageBySlot"resultType="org.apache.dolphinscheduler.dao.entity.Command">select*fromt_ds_commandwhereid%#{ masterCount}=#{ thisMasterSlot}orderbyprocess_instance_priority,idasclimit#{ limit}offset#{ offset}</select>##槽位检查privateList<ProcessInstance>command2ProcessInstance(List<Command>commands){ List<ProcessInstance>processInstances=Collections.synchronizedList(newArrayList<>(commands.size()));CountDownLatchlatch=newCountDownLatch(commands.size());for(finalCommandcommand:commands){ masterPrepareExecService.execute(()->{ try{ //slotcheckagainSlotCheckStateslotCheckState=slotCheck(command);if(slotCheckState.equals(SlotCheckState.CHANGE)||slotCheckState.equals(SlotCheckState.INJECT)){ logger.info("handlecommand{ }skip,slotcheckstate:{ }",command.getId(),slotCheckState);return;}ProcessInstanceprocessInstance=processService.handleCommand(logger,getLocalAddress(),command);if(processInstance!=null){ processInstances.add(processInstance);logger.info("handlecommand{ }end,createprocessinstance{ }",command.getId(),processInstance.getId());}}catch(Exceptione){ logger.error("handlecommanderror",e);processService.moveToErrorCommand(command,e.toString());}finally{ latch.countDown();}});}try{ //makesuretofinishhandlingcommandeachtimebeforenextscanlatch.await();}catch(InterruptedExceptione){ logger.error("countDownLatchawaiterror",e);}returnprocessInstances;}privateSlotCheckStateslotCheck(Commandcommand){ intslot=ServerNodeManager.getSlot();intmasterSize=ServerNodeManager.getMasterSize();SlotCheckStatestate;if(masterSize<=0){ state=SlotCheckState.CHANGE;}elseif(command.getId()%masterSize==slot){ state=SlotCheckState.PASS;}else{ state=SlotCheckState.INJECT;}returnstate;}2.4.3 代码执行流程

代码过于繁琐,此处不再一一粘贴代码解释各个类的功能 ,自行看代码更加清晰 。

2.5.1 概念与执行逻辑
  • NettyRemotingServer(worker包含的netty服务端) WorkerRegistryClient(zk客户端,封装了worker与zk相关的操作 ,注册 ,查询 ,删除等) ;

  • TaskPluginManager(任务插件管理器,封装了插件加载逻辑和任务实际执行业务的抽象) ;

  • WorkerManagerThread(任务工作线程生成器,消费netty处理器推进队列的任务信息,并生成任务执行线程提交线程池管理) ;

  • TaskExecuteProcessor(Netty任务执行处理器 ,生成master分发到work的任务信息,并推送到队列) ;

  • TaskExecuteThread(任务执行线程) ;

  • TaskCallbackService(任务回调线程 ,与master包含的netty client通信);

  • AbstractTask(任务实际业务的抽象类 ,子类包含实际的任务执行业务,SqlTask ,DataXTask等) ;

  • RetryReportTaskStatusThread(不关注)

2.5.2 代码执行流程

Worker节点代码时序图如下:

代码过于繁琐,此处不再一一粘贴代码解释各个类的功能,自行看代码更加清晰。

因为节点和应用服务之间的RPC通信都是基于Netty实现的,Netty相关知识不在这里过多的讲解,当前章节只涉及Master与Worker之间的交互模式的设计与实现。

整体设计如下

2.6.1 Master与Worker交互

Master与worker之间的业务逻辑的交互是基于Netty服务端与客户端来实现Rpc通信的 ,Master和Worker启动的时候会将自己的Netty服务端信息注册到ZK相应的节点上,Master的任务分发线程和任务杀死等业务运行时,拉取ZK上的Worker节点信息,根据负载均衡策略选择一个节点,构建Netty客户端与Worker的Netty服务端通信 ,Worker收到Master的RPC请求之后会缓存Channel信息并处理对应业务 ,同时Callback回调线程会获取缓存的通道来执行回调操作,这样就形成的闭环 。

任务的执行杀死 ,以及回调状态处理等操作都是通过Netty客户端与服务端绑定的Processer处理器来进行的 。

Master部分具体代码如下:

Master启动的时候会初始化Nettyserver  ,注册对应的请求处理器到NettyHandler并启动:

@PostConstructpublicvoidrun()throwsSchedulerException{ //initremotingserverNettyServerConfigserverConfig=newNettyServerConfig();serverConfig.setListenPort(masterConfig.getListenPort());this.nettyRemotingServer=newNettyRemotingServer(serverConfig);this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE,taskExecuteResponseProcessor);this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING,taskExecuteRunningProcessor);this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE,taskKillResponseProcessor);this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST,stateEventProcessor);this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST,taskEventProcessor);this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST,taskEventProcessor);this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE,cacheProcessor);//loggerserverthis.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST,loggerRequestProcessor);this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST,loggerRequestProcessor);this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST,loggerRequestProcessor);this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST,loggerRequestProcessor);this.nettyRemotingServer.start();//installtaskpluginthis.taskPluginManager.installPlugin();//selftolerantthis.masterRegistryClient.init();this.masterRegistryClient.start();this.masterRegistryClient.setRegistryStoppable(this);this.masterSchedulerService.init();this.masterSchedulerService.start();this.eventExecuteService.start();this.failoverExecuteThread.start();this.scheduler.start();Runtime.getRuntime().addShutdownHook(newThread(()->{ if(Stopper.isRunning()){ close("shutdownHook");}}));}/***serverstart*/publicvoidstart(){ if(isStarted.compareAndSet(false,true)){ this.serverBootstrap.group(this.bossGroup,this.workGroup).channel(NettyUtils.getServerSocketChannelClass()).option(ChannelOption.SO_REUSEADDR,true).option(ChannelOption.SO_BACKLOG,serverConfig.getSoBacklog()).childOption(ChannelOption.SO_KEEPALIVE,serverConfig.isSoKeepalive()).childOption(ChannelOption.TCP_NODELAY,serverConfig.isTcpNoDelay()).childOption(ChannelOption.SO_SNDBUF,serverConfig.getSendBufferSize()).childOption(ChannelOption.SO_RCVBUF,serverConfig.getReceiveBufferSize()).childHandler(newChannelInitializer<SocketChannel>(){ @OverrideprotectedvoidinitChannel(SocketChannelch){ initNettyChannel(ch);}});ChannelFuturefuture;try{ future=serverBootstrap.bind(serverConfig.getListenPort()).sync();}catch(Exceptione){ logger.error("NettyRemotingServerbindfail{ },exit",e.getMessage(),e);thrownewRemoteException(String.format(NETTY_BIND_FAILURE_MSG,serverConfig.getListenPort()));}if(future.isSuccess()){ logger.info("NettyRemotingServerbindsuccessatport:{ }",serverConfig.getListenPort());}elseif(future.cause()!=null){ thrownewRemoteException(String.format(NETTY_BIND_FAILURE_MSG,serverConfig.getListenPort()),future.cause());}else{ thrownewRemoteException(String.format(NETTY_BIND_FAILURE_MSG,serverConfig.getListenPort()));}}}

Master的NettyExecutorManager初始化的时候会将NettyRemotingClient也初始化 ,并且会注册处理Worker回调请求的处理器 ,真正的端口绑定是在获取到执行器端口之后 :

/***constructor*/publicNettyExecutorManager(){ finalNettyClientConfigclientConfig=newNettyClientConfig();this.nettyRemotingClient=newNettyRemotingClient(clientConfig);}##注册处理worker回调的处理器@PostConstructpublicvoidinit(){ this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE,taskExecuteResponseProcessor);this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING,taskExecuteRunningProcessor);this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE,taskKillResponseProcessor);}publicNettyRemotingClient(finalNettyClientConfigclientConfig){ this.clientConfig=clientConfig;if(NettyUtils.useEpoll()){ this.workerGroup=newEpollEventLoopGroup(clientConfig.getWorkerThreads(),newThreadFactory(){ privatefinalAtomicIntegerthreadIndex=newAtomicInteger(0);@OverridepublicThreadnewThread(Runnabler){ returnnewThread(r,String.format("NettyClient_%d",this.threadIndex.incrementAndGet()));}});}else{ this.workerGroup=newNioEventLoopGroup(clientConfig.getWorkerThreads(),newThreadFactory(){ privatefinalAtomicIntegerthreadIndex=newAtomicInteger(0);@OverridepublicThreadnewThread(Runnabler){ returnnewThread(r,String.format("NettyClient_%d",this.threadIndex.incrementAndGet()));}});}this.callbackExecutor=newThreadPoolExecutor(5,10,1,TimeUnit.MINUTES,newLinkedBlockingQueue<>(1000),newNamedThreadFactory("CallbackExecutor",10),newCallerThreadExecutePolicy());this.clientHandler=newNettyClientHandler(this,callbackExecutor);this.responseFutureExecutor=Executors.newSingleThreadScheduledExecutor(newNamedThreadFactory("ResponseFutureExecutor"));this.start();}/***start*/privatevoidstart(){ this.bootstrap.group(this.workerGroup).channel(NettyUtils.getSocketChannelClass()).option(ChannelOption.SO_KEEPALIVE,clientConfig.isSoKeepalive()).option(ChannelOption.TCP_NODELAY,clientConfig.isTcpNoDelay()).option(ChannelOption.SO_SNDBUF,clientConfig.getSendBufferSize()).option(ChannelOption.SO_RCVBUF,clientConfig.getReceiveBufferSize()).option(ChannelOption.CONNECT_TIMEOUT_MILLIS,clientConfig.getConnectTimeoutMillis()).handler(newChannelInitializer<SocketChannel>(){ @OverridepublicvoidinitChannel(SocketChannelch){ ch.pipeline().addLast("client-idle-handler",newIdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME,0,0,TimeUnit.MILLISECONDS)).addLast(newNettyDecoder(),clientHandler,encoder);}});this.responseFutureExecutor.scheduleAtFixedRate(ResponseFuture::scanFutureTable,5000,1000,TimeUnit.MILLISECONDS);isStarted.compareAndSet(false,true);}

任务分发代码如下:

/***taskdispatch**@paramcontextcontext*@returnresult*@throwsExecuteExceptioniferrorthrowsExecuteException*/publicBooleandispatch(finalExecutionContextcontext)throwsExecuteException{ /***getexecutormanager*/ExecutorManager<Boolean>executorManager=this.executorManagers.get(context.getExecutorType());if(executorManager==null){ thrownewExecuteException("noExecutorManagerfortype:"+context.getExecutorType());}/***hostselect*/Hosthost=hostManager.select(context);if(StringUtils.isEmpty(host.getAddress())){ thrownewExecuteException(String.format("failtoexecute:%sduetonosuitableworker,"+"currenttaskneedsworkergroup%stoexecute",context.getCommand(),context.getWorkerGroup()));}context.setHost(host);executorManager.beforeExecute(context);try{ /***taskexecute*/returnexecutorManager.execute(context);}finally{ executorManager.afterExecute(context);}}/***executelogic**@paramcontextcontext*@returnresult*@throwsExecuteExceptioniferrorthrowsExecuteException*/@OverridepublicBooleanexecute(ExecutionContextcontext)throwsExecuteException{ /***allnodes*/Set<String>allNodes=getAllNodes(context);/***failnodes*/Set<String>failNodeSet=newHashSet<>();/***buildcommandaccordexecuteContext*/Commandcommand=context.getCommand();/***executetaskhost*/Hosthost=context.getHost();booleansuccess=false;while(!success){ try{ doExecute(host,command);success=true;context.setHost(host);}catch(ExecuteExceptionex){ logger.error(String.format("executecommand:%serror",command),ex);try{ failNodeSet.add(host.getAddress());Set<String>tmpAllIps=newHashSet<>(allNodes);Collection<String>remained=CollectionUtils.subtract(tmpAllIps,failNodeSet);if(remained!=null&&remained.size()>0){ host=Host.of(remained.iterator().next());logger.error("retryexecutecommand:{ }host:{ }",command,host);}else{ thrownewExecuteException("failaftertryallnodes");}}catch(Throwablet){ thrownewExecuteException("failaftertryallnodes");}}}returnsuccess;}/***executelogic**@paramhosthost*@paramcommandcommand*@throwsExecuteExceptioniferrorthrowsExecuteException*/publicvoiddoExecute(finalHosthost,finalCommandcommand)throwsExecuteException{ /***retrycount ,defaultretry3*/intretryCount=3;booleansuccess=false;do{ try{ nettyRemotingClient.send(host,command);success=true;}catch(Exceptionex){ logger.error(String.format("sendcommand:%sto%serror",command,host),ex);retryCount--;ThreadUtils.sleep(100);}}while(retryCount>=0&&!success);if(!success){ thrownewExecuteException(String.format("sendcommand:%sto%serror",command,host));}}/***sendtask**@paramhosthost*@paramcommandcommand*/publicvoidsend(finalHosthost,finalCommandcommand)throwsRemotingException{ Channelchannel=getChannel(host);if(channel==null){ thrownewRemotingException(String.format("connectto:%sfail",host));}try{ ChannelFuturefuture=channel.writeAndFlush(command).await();if(future.isSuccess()){ logger.debug("sendcommand:{ },to:{ }successfully.",command,host.getAddress());}else{ Stringmsg=String.format("sendcommand:%s,to:%sfailed",command,host.getAddress());logger.error(msg,future.cause());thrownewRemotingException(msg);}}catch(Exceptione){ logger.error("Sendcommand{ }toaddress{ }encountererror.",command,host.getAddress());thrownewRemotingException(String.format("Sendcommand:%s,to:%sencountererror",command,host.getAddress()),e);}}

Worker部分具体代码如下 :

同理Woker在启动的时候会初始化NettyServer ,注册对应处理器并启动:

/***workerserverrun*/@PostConstructpublicvoidrun(){ //initremotingserverNettyServerConfigserverConfig=newNettyServerConfig();serverConfig.setListenPort(workerConfig.getListenPort());this.nettyRemotingServer=newNettyRemotingServer(serverConfig);this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST,taskExecuteProcessor);this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST,taskKillProcessor);this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK,taskExecuteRunningAckProcessor);this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK,taskExecuteResponseAckProcessor);this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST,hostUpdateProcessor);//loggerserverthis.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST,loggerRequestProcessor);this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST,loggerRequestProcessor);this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST,loggerRequestProcessor);this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST,loggerRequestProcessor);this.nettyRemotingServer.start();//installtaskpluginthis.taskPluginManager.installPlugin();//workerregistrytry{ this.workerRegistryClient.registry();this.workerRegistryClient.setRegistryStoppable(this);Set<String>workerZkPaths=this.workerRegistryClient.getWorkerZkPaths();this.workerRegistryClient.handleDeadServer(workerZkPaths,NodeType.WORKER,Constants.DELETE_OP);}catch(Exceptione){ logger.error(e.getMessage(),e);thrownewRuntimeException(e);}//taskexecutemanagerthis.workerManagerThread.start();//retryreporttaskstatusthis.retryReportTaskStatusThread.start();/**registryhooks,whicharecalledbeforetheprocessexits*/Runtime.getRuntime().addShutdownHook(newThread(()->{ if(Stopper.isRunning()){ close("shutdownHook");}}));}

回调线程对象初始化的时候,会将包含的Nettyremotingclient一起初始化  ,并注册好对应的业务处理器:

publicTaskCallbackService(){ finalNettyClientConfigclientConfig=newNettyClientConfig();this.nettyRemotingClient=newNettyRemotingClient(clientConfig);this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK,taskExecuteRunningProcessor);this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK,taskExecuteResponseAckProcessor);}

回调线程会通过其他执行器中缓存下来的Chanel与Master的客户端进行通信 :

/***sendresult**@paramtaskInstanceIdtaskInstanceId*@paramcommandcommand*/publicvoidsend(inttaskInstanceId,Commandcommand){ NettyRemoteChannelnettyRemoteChannel=getRemoteChannel(taskInstanceId);if(nettyRemoteChannel!=null){ nettyRemoteChannel.writeAndFlush(command).addListener(newChannelFutureListener(){ @OverridepublicvoidoperationComplete(ChannelFuturefuture)throwsException{ if(future.isSuccess()){ //remove(taskInstanceId);return;}}});}}2.6.2 其他服务与Master交互

以日志服务为例,前端触发请求日志的接口,通过参数与数据库交互获取到Master的NettyServer信息,然后构建Netty客户端与Master进行通信获取日志并返回。具体代码如下:

@RequestParam(value = "skipLineNum") int skipNum, @RequestParam(value = "limit") int limit) { return loggerService.queryLog(taskInstanceId, skipNum, limit); }*viewlog**@paramtaskInstIdtaskinstanceid*@paramskipLineNumskiplinenumber*@paramlimitlimit*@returnlogstringdata*/@Override@SuppressWarnings("unchecked")publicResult<String>queryLog(inttaskInstId,intskipLineNum,intlimit){ TaskInstancetaskInstance=processService.findTaskInstanceById(taskInstId);if(taskInstance==null){ returnResult.error(Status.TASK_INSTANCE_NOT_FOUND);}if(StringUtils.isBlank(taskInstance.getHost())){ returnResult.error(Status.TASK_INSTANCE_HOST_IS_NULL);}Result<String>result=newResult<>(Status.SUCCESS.getCode(),Status.SUCCESS.getMsg());Stringlog=queryLog(taskInstance,skipLineNum,limit);result.setData(log);returnresult;}*querylog**@paramtaskInstancetaskinstance*@paramskipLineNumskiplinenumber*@paramlimitlimit*@returnlogstringdata*/privateStringqueryLog(TaskInstancetaskInstance,intskipLineNum,intlimit){ Hosthost=Host.of(taskInstance.getHost());logger.info("loghost:{ },logPath:{ },port:{ }",host.getIp(),taskInstance.getLogPath(),host.getPort());StringBuilderlog=newStringBuilder();if(skipLineNum==0){ Stringhead=String.format(LOG_HEAD_FORMAT,taskInstance.getLogPath(),host,Constants.SYSTEM_LINE_SEPARATOR);log.append(head);}log.append(logClient.rollViewLog(host.getIp(),host.getPort(),taskInstance.getLogPath(),skipLineNum,limit));returnlog.toString();}*rollviewlog**@paramhosthost*@paramportport*@parampathpath*@paramskipLineNumskiplinenumber*@paramlimitlimit*@returnlogcontent*/publicStringrollViewLog(Stringhost,intport,Stringpath,intskipLineNum,intlimit){ logger.info("rollviewlog,host:{ },port:{ },path{ },skipLineNum{ },limit{ }",host,port,path,skipLineNum,limit);RollViewLogRequestCommandrequest=newRollViewLogRequestCommand(path,skipLineNum,limit);Stringresult="";finalHostaddress=newHost(host,port);try{ Commandcommand=request.convert2Command();Commandresponse=this.client.sendSync(address,command,LOG_REQUEST_TIMEOUT);if(response!=null){ RollViewLogResponseCommandrollReviewLog=JSONUtils.parseObject(response.getBody(),RollViewLogResponseCommand.class);returnrollReviewLog.getMsg();}}catch(Exceptione){ logger.error("rollviewlogerror",e);}finally{ this.client.closeChannel(address);}returnresult;}*syncsend**@paramhosthost*@paramcommandcommand*@paramtimeoutMillistimeoutMillis*@returncommand*/publicCommandsendSync(finalHosthost,finalCommandcommand,finallongtimeoutMillis)throwsInterruptedException,RemotingException{ finalChannelchannel=getChannel(host);if(channel==null){ thrownewRemotingException(String.format("connectto:%sfail",host));}finallongopaque=command.getOpaque();finalResponseFutureresponseFuture=newResponseFuture(opaque,timeoutMillis,null,null);channel.writeAndFlush(command).addListener(future->{ if(future.isSuccess()){ responseFuture.setSendOk(true);return;}else{ responseFuture.setSendOk(false);}responseFuture.setCause(future.cause());responseFuture.putResponse(null);logger.error("sendcommand{ }tohost{ }failed",command,host);});/**syncwaitforresult*/Commandresult=responseFuture.waitResponse();if(result==null){ if(responseFuture.isSendOK()){ thrownewRemotingTimeoutException(host.toString(),timeoutMillis,responseFuture.getCause());}else{ thrownewRemotingException(host.toString(),responseFuture.getCause());}}returnresult;}

Nettyclient随着日志业务对象初始化而初始化  :

*constructclient*/publicLogClientService(){ this.clientConfig=newNettyClientConfig();this.clientConfig.setWorkerThreads(4);this.client=newNettyRemotingClient(clientConfig);this.isRunning=true;}

Master在选择执行器的时候DolphinScheduler提供了三种负载均衡算法 ,且所有的算法都用到了节点权重:加权随机(random) ,平滑轮询(roundrobin),线性负载(lowerweight) 。通过配置文件来控制到底使用哪一个负载均衡策略 ,默认配置是权重策略 :host-selector: lower_weight 。

publicHostManagerhostManager(){ HostSelectorselector=masterConfig.getHostSelector();HostManagerhostManager;switch(selector){ caseRANDOM:hostManager=newRandomHostManager();break;caseROUND_ROBIN:hostManager=newRoundRobinHostManager();break;caseLOWER_WEIGHT:hostManager=newLowerWeightHostManager();break;default:thrownewIllegalArgumentException("unSupportselector"+selector);}beanFactory.autowireBean(hostManager);returnhostManager;}2.7.1 加权随机

看代码更好理解 :按照全部权重值求和,然后取汇总结果的随机整数 ,随机整数对原先所有host的权重累差 ,返回小于零的时候的host,没有就随机返回一个。

publicHostWorkerdoSelect(finalCollection<HostWorker>source){ List<HostWorker>hosts=newArrayList<>(source);intsize=hosts.size();int[]weights=newint[size];inttotalWeight=0;intindex=0;for(HostWorkerhost:hosts){ totalWeight+=host.getHostWeight();weights[index]=host.getHostWeight();index++;}if(totalWeight>0){ intoffset=ThreadLocalRandom.current().nextInt(totalWeight);for(inti=0;i<size;i++){ offset-=weights[i];if(offset<0){ returnhosts.get(i);}}}returnhosts.get(ThreadLocalRandom.current().nextInt(size));}2.7.2 线性负载

权重计算逻辑 :利用注册的Cpu占用 、内存占用以及加载因子还有启动时间消耗做计算。

doublecalculatedWeight=cpu*CPU_FACTOR+memory*MEMORY_FACTOR+loadAverage*LOAD_AVERAGE_FACTOR;longuptime=System.currentTimeMillis()-startTime;if(uptime>0&&uptime<Constants.WARM_UP_TIME){ //Ifthewarm-upisnotover,addtheweightreturncalculatedWeight*Constants.WARM_UP_TIME/uptime;}returncalculatedWeight;}

获取权重最小的节点 ,并把节点权重置为最大 。

*select**@paramsourcessources*@returnHostWeight*/@OverridepublicHostWeightdoSelect(Collection<HostWeight>sources){ doubletotalWeight=0;doublelowWeight=0;HostWeightlowerNode=null;for(HostWeighthostWeight:sources){ totalWeight+=hostWeight.getWeight();hostWeight.setCurrentWeight(hostWeight.getCurrentWeight()+hostWeight.getWeight());if(lowerNode==null||lowWeight>hostWeight.getCurrentWeight()){ lowerNode=hostWeight;lowWeight=hostWeight.getCurrentWeight();}}lowerNode.setCurrentWeight(lowerNode.getCurrentWeight()+totalWeight);returnlowerNode;}2.7.3 平滑轮询

这个算法不是很好的能够理解,所以我不知道我的理解是否正确 ,它有一个预热的过程,之前都是取第一个,等到累计的权重超过最大就整数就开始按权重轮询 。

publicHostWorkerdoSelect(Collection<HostWorker>source){ List<HostWorker>hosts=newArrayList<>(source);Stringkey=hosts.get(0).getWorkerGroup();ConcurrentMap<String,WeightedRoundRobin>map=workGroupWeightMap.get(key);if(map==null){ workGroupWeightMap.putIfAbsent(key,newConcurrentHashMap<>());map=workGroupWeightMap.get(key);}inttotalWeight=0;longmaxCurrent=Long.MIN_VALUE;longnow=System.currentTimeMillis();HostWorkerselectedHost=null;WeightedRoundRobinselectWeightRoundRobin=null;for(HostWorkerhost:hosts){ StringworkGroupHost=host.getWorkerGroup()+host.getAddress();WeightedRoundRobinweightedRoundRobin=map.get(workGroupHost);intweight=host.getHostWeight();if(weight<0){ weight=0;}if(weightedRoundRobin==null){ weightedRoundRobin=newWeightedRoundRobin();//setweightweightedRoundRobin.setWeight(weight);map.putIfAbsent(workGroupHost,weightedRoundRobin);weightedRoundRobin=map.get(workGroupHost);}if(weight!=weightedRoundRobin.getWeight()){ weightedRoundRobin.setWeight(weight);}longcur=weightedRoundRobin.increaseCurrent();weightedRoundRobin.setLastUpdate(now);if(cur>maxCurrent){ maxCurrent=cur;selectedHost=host;selectWeightRoundRobin=weightedRoundRobin;}totalWeight+=weight;}if(!updateLock.get()&&hosts.size()!=map.size()&&updateLock.compareAndSet(false,true)){ try{ ConcurrentMap<String,WeightedRoundRobin>newMap=newConcurrentHashMap<>(map);newMap.entrySet().removeIf(item->now-item.getValue().getLastUpdate()>RECYCLE_PERIOD);workGroupWeightMap.put(key,newMap);}finally{ updateLock.set(false);}}if(selectedHost!=null){ selectWeightRoundRobin.sel(totalWeight);returnselectedHost;}returnhosts.get(0);}

2.6.2已经介绍不在做过多的说明 。

暂未研究,目测基本就是根据规则筛选数据  ,然后调用指定类型的报警服务接口做报警操作 ,比如邮件,微信 ,短信通知等。

如果这个文章对你有帮助  ,不要忘记「在看」「点赞」「收藏」三连啊喂 !

2022年全网首发|大数据专家级技能模型与学习指南(胜天半子篇)

互联网最坏的时代可能真的来了

我在B站读大学,大数据专业

我们在学习Flink的时候 ,到底在学习什么 ?

193篇文章暴揍Flink,这个合集你需要关注一下

Flink生产环境TOP难题与优化 ,阿里巴巴藏经阁YYDS

Flink CDC我吃定了耶稣也留不住他 !| Flink CDC线上问题小盘点

我们在学习Spark的时候  ,到底在学习什么?

在所有Spark模块中 ,我愿称SparkSQL为最强 !

硬刚Hive | 4万字基础调优面试小总结

数据治理方法论和实践小百科全书

标签体系下的用户画像建设小指南

4万字长文 | ClickHouse基础&实践&调优全视角解析

【面试&个人成长】2021年过半  ,社招和校招的经验之谈

大数据方向另一个十年开启 |《硬刚系列》第一版完结

我写过的关于成长/面试/职场进阶的文章

当我们在学习Hive的时候在学习什么 ?「硬刚Hive续集」

(作者:新闻中心)