环境篇:DolphinScheduler-1.3.1安装部署及使用技巧

2023-05-08,,

环境篇:DolphinScheduler-1.3.1安装部署

1 配置jdk

JDK百度网盘:https://pan.baidu.com/s/1og3mfefJrwl1QGZGZDZ8Sw 提取码:t6l1

#查看命令
rpm -qa | grep java
#删除命令
rpm -e --nodeps xxx

将oracle-j2sdk1.8-1.8.0+update181-1.x86_64.rpm上传至每个节点安装

rpm -ivh oracle-j2sdk1.8-1.8.0+update181-1.x86_64.rpm

修改配置文件

vim /etc/profile
#添加
export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

刷新源

source /etc/profile

检验

java
javac

2 pip、kazoo 安装

yum -y install epel-release
yum -y install python-pip
yum -y install pip
pip --version
pip install kazoo
#使用python
#import kazoo,不报错即可

3 下载二进制tar.gz包

请下载最新版本的后端安装包至服务器部署目录,比如创建 /opt/dolphinscheduler 做为安装部署目录

https://dolphinscheduler.apache.org/zh-cn/docs/release/download.html

# 创建部署目录,部署目录请不要创建在/root、/home等高权限目录
mkdir -p /opt/dolphinscheduler
cd /opt/dolphinscheduler
# 上传文件并解压缩
tar -zxvf apache-dolphinscheduler-incubating-1.3.1-dolphinscheduler-bin.tar.gz -C /opt/dolphinscheduler
#修改解压文件名字
mv apache-dolphinscheduler-incubating-1.3.1-dolphinscheduler-bin dolphinscheduler-bin

4 创建用户

# 创建部署用户并赋予密码
userdel -r dolphinscheduler
useradd dolphinscheduler && echo dolphinscheduler | passwd --stdin dolphinscheduler
# 配置sudo免密
sed -i '$adolphinscheduler ALL=(ALL) NOPASSWD: NOPASSWD: ALL' /etc/sudoers
sed -i 's/Defaults requirett/#Defaults requirett/g' /etc/sudoers
# 修改目录权限,使得部署用户对dolphinscheduler-bin目录有操作权限
chown -R dolphinscheduler:dolphinscheduler dolphinscheduler-bin

注意:

因为任务执行服务是以 sudo -u {linux-user} 切换不同linux用户的方式来实现多租户运行作业,所以部署用户需要有 sudo 权限,而且是免密的。初学习者不理解的话,完全可以暂时忽略这一点
如果发现/etc/sudoers文件中有"Default requiretty"这行,也请注释掉
如果用到资源上传的话,还需要给该部署用户分配操作本地文件系统或者HDFS或者MinIO的权限

5 对部署用户配置免密

所有节点

su dolphinscheduler
#生成密钥对(公钥和私钥)三次回车生成密钥
ssh-keygen -t rsa
#查看公钥
cat ~/.ssh/id_rsa.pub
#将密匙输出到/root/.ssh/authorized_keys
cat ~/.ssh/id_rsa.pub > ~/.ssh/authorized_keys
chmod 600 ~/.ssh/authorized_keys
#注意:正常设置后,dolphinscheduler用户在执行命令ssh localhost 是不需要再输入密码的(请进行测试)

以下是集群配置需要进行多机器免密,如果是单机请忽略

主节点

#追加密钥到主节点(需要操作及密码验证,追加完后查看一下该文件)--在主节点上操作,拷取从节点密匙
ssh 从节点机器IP cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys cat ~/.ssh/authorized_keys
#从主节点复制密钥到从节点
scp ~/.ssh/authorized_keys dolphinscheduler@从节点机器IP:~/.ssh/authorized_keys

所有节点互相进行ssh连接

ssh dolphinscheduler@172.xx.xx.xxx
ssh dolphinscheduler@172.xx.xx.xxx

7 部署mysql用户

因为我们并没有选择默认数据库PostgreSQL,故进入mysql数据库

# 设置数据用户 dolphinscheduler 的访问密码为 dolphinscheduler,并且不对访问的 ip 做限制
# 测试环境将访问设置为所有,如果是生产,可以限制只能子网段的ip才能访问('192.168.1.%')
CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
GRANT ALL PRIVILEGES ON dolphinscheduler.* TO 'dolphinscheduler'@'%' IDENTIFIED BY 'dolphinscheduler';
GRANT ALL PRIVILEGES ON dolphinscheduler.* TO 'dolphinscheduler'@'localhost' IDENTIFIED BY 'dolphinscheduler';
flush privileges;

8 创建表和导入基础数据

修改datasource.properties中的下列属性

vim  conf/datasource.properties
#注意:注释postgre连接,打开mysql连接
>>>>
#注意:注释postgre连接,添加mysql连接
#spring.datasource.driver-class-name=org.postgresql.Driver
#spring.datasource.url=jdbc:postgresql://localhost:5432/dolphinscheduler
#注意下面:{user}值 和 {password}值 还有 IP地址 需要自行修改
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://xxx:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true
spring.datasource.username=dolphinscheduler
spring.datasource.password=dolphinscheduler

还需要手动添加 mysql-connector-java 驱动jar包到lib目录下,这里下载的是mysql-connector-java-5.1.47.jar

https://downloads.mysql.com/archives/c-j/

#查看jar包是否放入
ll lib | grep mysql

执行 script 目录下的创建表及导入基础数据脚本,这样我们的数据库里面有了小海豚的数据表了。

sh script/create-dolphinscheduler.sh

9 修改运行参数

9.1 dolphinscheduler_env.sh

修改 conf/env 目录下的 dolphinscheduler_env.sh`环境变量

vim conf/env/dolphinscheduler_env.sh
>>>>
export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
export HADOOP_CONF_DIR=/etc/hadoop/conf
export SPARK_HOME1=/opt/cloudera/parcels/CDH/lib/spark
#export SPARK_HOME2=/opt/soft/spark2
export PYTHON_HOME=/usr/bin/python
export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera
export HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive
#export FLINK_HOME=/opt/soft/flink
#export DATAX_HOME=/opt/soft/datax/bin/datax.py
#export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH:$FLINK_HOME/bin:$DATAX_HOME:$PATH
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME:$JAVA_HOME/bin:$HIVE_HOME/bin:$PATH
<<<

这里的环境屏蔽了FLINK和DATAX有需要的同学可以自行添加

9.2 修改一键部署配置文件

conf/config/install_config.conf中的各参数,特别注意以下参数的配置

vim conf/config/install_config.conf
>>>
# 这里填 mysql or postgresql
dbtype="mysql" # 数据库连接地址
dbhost="localhost:3306" # 数据库名
dbname="dolphinscheduler" # 数据库用户名,此处需要修改为上面设置的{user}具体值
username="dolphinscheduler" # 数据库密码, 如果有特殊字符,请使用\转义,需要修改为上面设置的{passowrd}具体值
passowrd="dolphinscheduler" #Zookeeper地址,单机本机是localhost:2181,记得把2181端口带上
zkQuorum="192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181" #将DS安装到哪个目录,如: /usr/local/src/dolphinscheduler,不同于现在的目录
installPath="/usr/local/src/dolphinscheduler" #使用哪个用户部署,使用之前创建的用户
deployUser="dolphinscheduler" # 邮件配置,以qq邮箱为例
# 邮件协议
mailProtocol="SMTP" # 邮件服务地址,这是是qq邮箱
mailServerHost="smtp.qq.com" # 邮件服务端口
mailServerPort="587" # mailSender和mailUser配置成一样即可
# 发送者
mailSender="31xxxxx@qq.com" # 发送用户
mailUser="31xxxxx@qq.com" # 邮箱密码,这里是开启协议后服务商提供的密码
mailPassword="ewbzasdcbhea" # TLS协议的邮箱设置为true,否则设置为false
starttlsEnable="true" # 开启SSL协议的邮箱配置为true,否则为false。注意: starttlsEnable和sslEnable不能同时为true
sslEnable="false" # 邮件服务地址值,参考上面 mailServerHost
sslTrust="smtp.qq.com" # 业务用到的比如sql等资源文件上传到哪里,可以设置:HDFS,S3,NONE,单机如果想使用本地文件系统,请配置为HDFS,因为HDFS支持本地文件系统;如果不需要资源上传功能请选择NONE。强调一点:使用本地文件系统不需要部署hadoop
resourceStorageType="HDFS" # 这里以保存到HDFS为例
#注:但是如果你想上传到HDFS的话,NameNode启用了HA,则需要将hadoop的配置文件core-site.xml和hdfs-site.xml放到conf目录下,并配置namenode cluster名称;如果NameNode不是HA,则修改为具体的ip或者主机名即可
# 本地文件系统:"file:///data/dolphinscheduler" 或 HDFS集群: hdfs://{具体的ip/主机名}:8020
defaultFS="hdfs://192.168.xx.xx:8020" # 如resourcemanager HA启用,则配置为ResourceManager节点的主备ip或者hostname,比如"192.168.xx.xx,192.168.xx.xx";如果是单ResourceManager请配置yarnHaIps=""即可
yarnHaIps="" # 如果ResourceManager是HA或者没有使用到Yarn保持默认值"yarnIp1"即可;如果是单ResourceManager,请配置真实的ResourceManager主机名或者ip
singleYarnIp="192.168.xx.xx" # 资源上传根路径,支持HDFS和S3,由于hdfs支持本地文件系统,需要确保本地文件夹存在且有读写权限
resourceUploadPath="/data/dolphinscheduler" # 具备权限创建resourceUploadPath的用户
hdfsRootUser="hdfs" #在哪些机器上部署DS服务,本机选localhost(如下配置为单机配置,需要集群配置,直接参考默认值即可)
ips="localhost" #ssh端口,默认22
sshPort="22" #master服务部署在哪台机器上
masters="localhost" #worker服务部署在哪台机器上,并指定此worker属于哪一个worker组,下面示例的default即为组名
workers="localhost:default" #报警服务部署在哪台机器上
alertServer="localhost" #后端api服务部署在在哪台机器上
apiServers="localhost"
<<<

9.3 配置资源中心功能

上传资源文件和udf函数,所有上传的文件和资源都会被存储到hdfs上,所以需要配置以下配置

vim conf/common.properties
>>>
#有权在HDFS根路径下创建目录的用户
hdfs.root.user=hdfs
#数据存储文件夹指定,资源文件将存储到此hadoop hdfs路径,自配置,请确保该目录存在于hdfs上并具有读/写权限
data.store2hdfs.basepath=/data/dolphinscheduler
#资源上传启动类型:HDFS,S3,NONE
res.upload.startup.type=HDFS
#是否启动kerberos
hadoop.security.authentication.startup.state=false
#java.security.krb5.conf路径
java.security.krb5.conf.path=/opt/krb5.conf
#LoginUserFromKeytab用户
login.user.keytab.username=hdfs-mycluster@ESZ.COM
#LoginUserFromKeytab路径
login.user.keytab.path=/opt/hdfs.headless.keytab
# 如果namenode ha需要复制core-site.xml和hdfs-site.xml,到conf目录,支持s3,例如:s3a://dolphinscheduler
fs.defaultFS=hdfs://mycluster:8020
#resourcemanager ha注意这需要ip,如果是单ip,则为空
yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
#如果它是单个resourcemanager,您只需要配置一个主机名。如果它是resourcemanager HA,则默认配置是正确的
yarn.application.status.address=http://xxxx:8088/ws/v1/cluster/apps/%s
<<<
sudo mkdir /data/dolphinscheduler
sudo chown -R dolphinscheduler:dolphinscheduler /data/dolphinscheduler

yarn.resourcemanager.ha.rm.ids与yarn.application.status.address只需配置其中一个地址,另一个地址配置为空。

需要从Hadoop集群的conf目录下复制core-site.xml、hdfs-site.xml到dolphinscheduler项目的conf目录下,重启api-server服务。

9.4 一键部署

切换到部署用户,执行一键部署脚本

sh install.sh

注意: 第一次部署的话,在运行中第3步3,stop server出现5次以下信息,此信息可以忽略 sh: bin/dolphinscheduler-daemon.sh: No such file or directory

脚本完成后,会启动以下5个服务,使用 jps 命令查看服务是否启动

如果以上服务都正常启动,说明自动部署成功

部署成功后,可以进行日志查看,日志统一存放于logs文件夹内

logs/
├── dolphinscheduler-alert-server.log
├── dolphinscheduler-master-server.log
├── dolphinscheduler-worker-server.log
├── dolphinscheduler-api-server.log
├── dolphinscheduler-logger-server.log

10 登录系统

访问前端页面地址,接口ip(自行修改) http://192.168.xx.xx:12345/dolphinscheduler

账号:admin

密码:dolphinscheduler123

11 Hello DolphinScheduler

11.1 Hello 租户用户

进入小海豚,我们会被直接拉入安全中心页面,难道不应该是炫酷的首页么?其实这是告诉我们这个页面很重要,为什么呢?

因为我们需要做我们的租户用户管理,我们先建立,慢慢解释。

创建dolphinscheduler租户,这里的租户需要和我们的linux用户关联,也就是说liunx必须有。

然后是创建用户(下图例子为创建开发账号dev2020),这里我们的admin用户不做修改,只作为管理账号使用。

从图中我们看见租户的选择。其实意思也显而易见了,比如我们根据业务环境来区分用户,那么这个用户,在提交我们小海豚上编辑的流程时,小海豚会去liunx上执行,这时使用的就是我们的租户(也就是liunx用户)。

注意:租户编码:租户编码是Linux上的用户,唯一,不能重复

11.2 Hello 警告组

警告组的意思是,在我们出现错误或者执行完成的时候会给那些人发送消息,把这些人放在一起,方便组播,因为公司都是分部门,分小组的嘛。

这里我们添加警告组,将开发人dev2020添加进去

11.3 Hello Worker分组

因为这里部署的单台worker,所以不能进行编辑,解释一下,我们部署多台worker的时候,肯定会有想手工指定执行机器的需求,那么这个时候我们的worker就体现了价值,他将我们的多台worker进行分组,这样我们就能手工指定任务的执行节点。

点击“创建Worker分组”按钮,创建Worker分组。worker分组内有多个ip地址(不能写别名),以英文逗号分隔。

11.4 Hello 列队

队列是在执行spark、mapreduce等程序,需要用到“队列”参数时使用的。

11.5 Hello 令牌

由于后端接口有登录检查,令牌管理提供了一种可以通过调用接口的方式对系统进行各种操作。

管理员进入安全中心->令牌管理页面,点击“创建令牌”按钮,选择失效时间与用户,点击"生成令牌"按钮,点击"提交"按钮,则选择用户的token创建成功。

普通用户登录后,点击用户名下拉框中的用户信息,进入令牌管理页面,选择失效时间,点击"生成令牌"按钮,点击"提交"按钮,则该用户创建token成功。

调用示例:

   /**
* test token
*/
public void doPOSTParam()throws Exception{
// create HttpClient
CloseableHttpClient httpclient = HttpClients.createDefault(); // create http post request
HttpPost httpPost = new HttpPost("http://127.0.0.1:12345/escheduler/projects/create");
httpPost.setHeader("token", "123");
// set parameters
List<NameValuePair> parameters = new ArrayList<NameValuePair>();
parameters.add(new BasicNameValuePair("projectName", "qzw"));
parameters.add(new BasicNameValuePair("desc", "qzw"));
UrlEncodedFormEntity formEntity = new UrlEncodedFormEntity(parameters);
httpPost.setEntity(formEntity);
CloseableHttpResponse response = null;
try {
// execute
response = httpclient.execute(httpPost);
// response status code 200
if (response.getStatusLine().getStatusCode() == 200) {
String content = EntityUtils.toString(response.getEntity(), "UTF-8");
System.out.println(content);
}
} finally {
if (response != null) {
response.close();
}
httpclient.close();
}
}

11.6 Hello 权限

授予权限包括项目权限,资源权限,数据源权限,UDF函数权限。

管理员可以对普通用户进行非其创建的项目、资源、数据源和UDF函数进行授权。因为项目、资源、数据源和UDF函数授权方式都是一样的,所以以项目授权为例介绍。

注意:对于用户自己创建的项目,该用户拥有所有的权限。则项目列表和已选项目列表中不会显示。

管理员进入安全中心->用户管理页面,点击需授权用户的“授权”按钮,如下图所示:

选择项目,进行项目授权。

资源、数据源、UDF函数授权同项目授权。

11.7 Hello 首页

首页包含用户所有项目的任务状态统计、流程状态统计、工作流定义统计。

11.8 Hello 监控

master监控

worker监控

Zookeeper监控

DB监控

统计管理

待执行命令数:统计t_ds_command表的数据
执行失败的命令数:统计t_ds_error_command表的数据
待运行任务数:统计Zookeeper中task_queue的数据
待杀死任务数:统计Zookeeper中task_kill的数据

11.9 Hello 资源中心

11.9.1 文件管理

是对各种资源文件的管理,包括创建基本的txt/log/sh/conf/py/java等文件、上传jar包等各种类型文件,可进行编辑、重命名、下载、删除等操作。

创建文件

文件格式支持以下几种类型:txt、log、sh、conf、cfg、py、java、sql、xml、hql、properties

查看文件

还可以对文件进行上传和下载哦!

删除

文件列表->点击"删除"按钮,删除指定文件

11.9.2 UDF管理

资源管理

资源管理和文件管理功能类似,不同之处是资源管理是上传的UDF函数,文件管理上传的是用户程序,脚本及配置文件 操作功能:重命名、下载、删除。

上传udf资源

和上传文件相同。

函数管理

创建udf函数

点击“创建UDF函数”,输入udf函数参数,选择udf资源,点击“提交”,创建udf函数。

目前只支持HIVE的临时UDF函数

UDF函数名称:输入UDF函数时的名称
包名类名:输入UDF函数的全路径
UDF资源:设置创建的UDF对应的资源文件

11.10 Hello 数据源中心

数据源中心支持MySQL、POSTGRESQL、HIVE/IMPALA、SPARK、CLICKHOUSE、ORACLE、SQLSERVER等数据源

11.10.1 创建/编辑Mysq数据源

    点击“数据源中心->创建数据源”,根据需求创建不同类型的数据源。
    数据源:选择MYSQL
    数据源名称:输入数据源的名称
    描述:输入数据源的描述
    IP主机名:输入连接MySQL的IP
    端口:输入连接MySQL的端口
    用户名:设置连接MySQL的用户名
    密码:设置连接MySQL的密码
    数据库名:输入连接MySQL的数据库名称
    Jdbc连接参数:用于MySQL连接的参数设置,以JSON形式填写

11.10.2 创建/编辑Hive数据源

    数据源:选择HIVE
    数据源名称:输入数据源的名称
    描述:输入数据源的描述
    IP/主机名:输入连接HIVE的IP
    端口:输入连接HIVE的端口
    用户名:设置连接HIVE的用户名
    密码:设置连接HIVE的密码
    数据库名:输入连接HIVE的数据库名称
    Jdbc连接参数:用于HIVE连接的参数设置,以JSON形式填写

11.10.3 创建/编辑POSTGRESQL数据源

    数据源:选择POSTGRESQL
    数据源名称:输入数据源的名称
    描述:输入数据源的描述
    IP/主机名:输入连接POSTGRESQL的IP
    端口:输入连接POSTGRESQL的端口
    用户名:设置连接POSTGRESQL的用户名
    密码:设置连接POSTGRESQL的密码
    数据库名:输入连接POSTGRESQL的数据库名称
    Jdbc连接参数:用于POSTGRESQL连接的参数设置,以JSON形式填写

11.10.4 创建/编辑Spark数据源

    数据源:选择Spark
    数据源名称:输入数据源的名称
    描述:输入数据源的描述
    IP/主机名:输入连接Spark的IP
    端口:输入连接Spark的端口
    用户名:设置连接Spark的用户名
    密码:设置连接Spark的密码
    数据库名:输入连接Spark的数据库名称
    Jdbc连接参数:用于Spark连接的参数设置,以JSON形式填写

注意:如果开启了kerberos,则需要填写 Principal

11.11 Hello 项目管理

要编辑我们的任务流,首先要有项目,使用我们的开发账号dev2020登录

点击"项目管理"进入项目管理页面,点击“创建项目”按钮,输入项目名称,项目描述,点击“提交”,创建新的项目。

项目首页

在项目管理页面点击项目名称链接,进入项目首页,如下图所示,项目首页包含该项目的任务状态统计、流程状态统计、工作流定义统计。

任务状态统计:在指定时间范围内,统计任务实例中状态为提交成功、正在运行、准备暂停、暂停、准备停止、停止、失败、成功、需要容错、kill、等待线程的个数
流程状态统计:在指定时间范围内,统计工作流实例中状态为提交成功、正在运行、准备暂停、暂停、准备停止、停止、失败、成功、需要容错、kill、等待线程的个数
工作流定义统计:统计用户创建的工作流定义及管理员授予该用户的工作流定义

11.11.1 创建工作流定义

点击项目管理->工作流->工作流定义,进入工作流定义页面,点击“创建工作流”按钮,进入工作流DAG编辑页面,如下图所示:

工具栏中拖拽到画板中,新增一个Shell任务,如下图所示:

添加shell任务的参数设置:

    填写“节点名称”,“描述”,“脚本”字段;
    “运行标志”勾选“正常”,若勾选“禁止执行”,运行工作流不会执行该任务;
    选择“任务优先级”:当worker线程数不足时,级别高的任务在执行队列中会优先执行,相同优先级的任务按照先进先出的顺序执行;
    超时告警(非必选):勾选超时告警、超时失败,填写“超时时长”,当任务执行时间超过超时时长,会发送告警邮件并且任务超时失败;
    资源(非必选)。资源文件是资源中心->文件管理页面创建或上传的文件,如文件名为test.sh,脚本中调用资源命令为sh test.sh
    自定义参数(非必填),参考https://dolphinscheduler.apache.org/zh-cn/docs/1.3.1/user_doc/system-manual.html#UserDefinedParameters
    点击"确认添加"按钮,保存任务设置。

增加任务执行的先后顺序: 点击右上角图标连接任务;如下图所示,任务2和任务3并行执行,当任务1执行完,任务2、3会同时执行。

删除依赖关系: 点击右上角"箭头"图标,选中连接线,点击右上角"删除"图标,删除任务间的依赖关系。

保存工作流定义: 点击”保存“按钮,弹出"设置DAG图名称"弹框,如下图所示,输入工作流定义名称,工作流定义描述,设置全局参数(选填,参考https://dolphinscheduler.apache.org/zh-cn/docs/1.3.1/user_doc/system-manual.html#UserDefinedParameters),点击"添加"按钮,工作流定义创建成功。

其他类型任务,请参考 https://dolphinscheduler.apache.org/zh-cn/docs/1.3.1/user_doc/system-manual.html#TaskParamers。

11.11.2 工作流定义操作功能

点击项目管理->工作流->工作流定义,进入工作流定义页面,如下图所示:

工作流定义列表的操作功能如下:

编辑: 只能编辑"下线"的工作流定义。工作流DAG编辑同创建工作流定义。
上线: 工作流状态为"下线"时,上线工作流,只有"上线"状态的工作流能运行,但不能编辑。
下线: 工作流状态为"上线"时,下线工作流,下线状态的工作流可以编辑,但不能运行。
运行: 只有上线的工作流能运行。
定时: 只有上线的工作流能设置定时,系统自动定时调度工作流运行。创建定时后的状态为"下线",需在定时管理页面上线定时才生效。
定时管理: 定时管理页面可编辑、上线/下线、删除定时。
删除: 删除工作流定义。
下载: 下载工作流定义到本地。
树形图: 以树形结构展示任务节点的类型及任务状态,如下图所示:

11.11.3 运行工作流

点击项目管理->工作流->工作流定义,进入工作流定义页面,如下图所示,点击"上线"按钮,上线工作流。

点击”运行“按钮,弹出启动参数设置弹框,如下图所示,设置启动参数,点击弹框中的"运行"按钮,工作流开始运行,工作流实例页面生成一条工作流实例。

运行参数说明

* 失败策略:当某一个任务节点执行失败时,其他并行的任务节点需要执行的策略。”继续“表示:某一任务失败后,其他任务节点正常执行;”结束“表示:终止所有正在执行的任务,并终止整个流程。
* 通知策略:当流程结束,根据流程状态发送流程执行信息通知邮件,包含任何状态都不发,成功发,失败发,成功或失败都发。
* 流程优先级:流程运行的优先级,分五个等级:最高(HIGHEST),高(HIGH),中(MEDIUM),低(LOW),最低(LOWEST)。当master线程数不足时,级别高的流程在执行队列中会优先执行,相同优先级的流程按照先进先出的顺序执行。
* worker分组:该流程只能在指定的worker机器组里执行。默认是Default,可以在任一worker上执行。
* 通知组:选择通知策略||超时报警||发生容错时,会发送流程信息或邮件到通知组里的所有成员。
* 收件人:选择通知策略||超时报警||发生容错时,会发送流程信息或告警邮件到收件人列表。
* 抄送人:选择通知策略||超时报警||发生容错时,会抄送流程信息或告警邮件到抄送人列表。
* 补数:包括串行补数、并行补数2种模式。串行补数:指定时间范围内,从开始日期至结束日期依次执行补数,只生成一条流程实例;并行补数:指定时间范围内,多天同时进行补数,生成N条流程实例。

补数: 执行指定日期的工作流定义,可以选择补数时间范围(目前只支持针对连续的天进行补数),比如需要补5月1号到5月10号的数据,如下图所示:

串行模式:补数从5月1号到5月10号依次执行,流程实例页面生成一条流程实例;

并行模式:同时执行5月1号到5月10号的任务,流程实例页面生成十条流程实例。

11.11.4 工作流定时

创建定时:点击项目管理->工作流->工作流定义,进入工作流定义页面,上线工作流,点击"定时"按钮,弹出定时参数设置弹框,如下图所示:

选择起止时间。在起止时间范围内,定时运行工作流;不在起止时间范围内,不再产生定时工作流实例。
添加一个每天凌晨5点执行一次的定时,如下图所示:

失败策略、通知策略、流程优先级、Worker分组、通知组、收件人、抄送人同https://dolphinscheduler.apache.org/zh-cn/docs/1.3.1/user_doc/system-manual.html#runParamers。
点击"创建"按钮,创建定时成功,此时定时状态为"下线",定时需上线才生效。
定时上线:点击"定时管理"按钮,进入定时管理页面,点击"上线"按钮,定时状态变为"上线",如下图所示,工作流定时生效。

11.11.5 导入工作流

点击项目管理->工作流->工作流定义,进入工作流定义页面,点击"导入工作流"按钮,导入本地工作流文件,工作流定义列表显示导入的工作流,状态为下线。

11.11.6 查看工作流实例

点击项目管理->工作流->工作流实例,进入工作流实例页面,如下图所示:

点击工作流名称,进入DAG查看页面,查看任务执行状态,如下图所示。

11.11.7 查看任务日志

进入工作流实例页面,点击工作流名称,进入DAG查看页面,双击任务节点,如下图所示:

点击"查看日志",弹出日志弹框,如下图所示,任务实例页面也可查看任务日志,参考https://dolphinscheduler.apache.org/zh-cn/docs/1.3.1/user_doc/system-manual.html#taskLog。

11.11.8 查看任务历史记录

点击项目管理->工作流->工作流实例,进入工作流实例页面,点击工作流名称,进入工作流DAG页面;
双击任务节点,如下图所示,点击"查看历史",跳转到任务实例页面,并展示该工作流实例运行的任务实例列表

11.11.9 查看运行参数

点击项目管理->工作流->工作流实例,进入工作流实例页面,点击工作流名称,进入工作流DAG页面;
点击左上角图标,查看工作流实例的启动参数;点击图标),查看工作流实例的全局参数和局部参数,如下图所示:

11.11.10 工作流实例操作功能

点击项目管理->工作流->工作流实例,进入工作流实例页面,如下图所示:

编辑: 只能编辑已终止的流程。点击"编辑"按钮或工作流实例名称进入DAG编辑页面,编辑后点击"保存"按钮,弹出保存DAG弹框,如下图所示,在弹框中勾选"是否更新到工作流定义",保存后则更新工作流定义;若不勾选,则不更新工作流定义。

重跑: 重新执行已经终止的流程。
恢复失败: 针对失败的流程,可以执行恢复失败操作,从失败的节点开始执行。
停止: 对正在运行的流程进行停止操作,后台会先killworker进程,再执行kill -9操作
暂停: 对正在运行的流程进行暂停操作,系统状态变为等待执行,会等待正在执行的任务结束,暂停下一个要执行的任务。
恢复暂停: 对暂停的流程恢复,直接从暂停的节点开始运行
删除: 删除工作流实例及工作流实例下的任务实例
甘特图: Gantt图纵轴是某个工作流实例下的任务实例的拓扑排序,横轴是任务实例的运行时间,如图示:

11.11.11 任务实例

点击项目管理->工作流->任务实例,进入任务实例页面,如下图所示,点击工作流实例名称,可跳转到工作流实例DAG图查看任务状态。

查看日志:点击操作列中的“查看日志”按钮,可以查看任务执行的日志情况。

11.12 Hello 任务节点类型和参数设置

11.12.1 Shell节点

shell节点,在worker执行的时候,会生成一个临时shell脚本,使用租户同名的linux用户执行这个脚本。

点击项目管理-项目名称-工作流定义,点击"创建工作流"按钮,进入DAG编辑页面。

工具栏中拖动到画板中,如下图所示:

节点名称:一个工作流定义中的节点名称是唯一的。

运行标志:标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。

描述信息:描述该节点的功能。

任务优先级:worker线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。

Worker分组:任务分配给worker组的机器机执行,选择Default,会随机选择一台worker机执行。

失败重试次数:任务失败重新提交的次数,支持下拉和手填。

失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。

超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.

脚本:用户开发的SHELL程序。

资源:是指脚本中需要调用的资源文件列表,资源中心-文件管理上传或创建的文件。

自定义参数:是SHELL局部的用户自定义参数,会替换脚本中以${变量}的内容。

11.12.2 子流程节点

子流程节点,就是把外部的某个工作流定义当做一个任务节点去执行。

拖动工具栏中的任务节点到画板中,如下图所示:

节点名称:一个工作流定义中的节点名称是唯一的
运行标志:标识这个节点是否能正常调度
描述信息:描述该节点的功能
超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
子节点:是选择子流程的工作流定义,右上角进入该子节点可以跳转到所选子流程的工作流定义

11.12.3 依赖(DEPENDENT)节点

依赖节点,就是依赖检查节点。比如A流程依赖昨天的B流程执行成功,依赖节点会去检查B流程在昨天是否有执行成功的实例。

拖动工具栏中的任务节点到画板中,如下图所示:

依赖节点提供了逻辑判断功能,比如检查昨天的B流程是否成功,或者C流程是否执行成功。

例如,A流程为周报任务,B、C流程为天任务,A任务需要B、C任务在上周的每一天都执行成功,如图示:

假如,周报A同时还需要自身在上周二执行成功:

11.12.4 存储过程节点

根据选择的数据源,执行存储过程。

拖动工具栏中的任务节点到画板中,如下图所示:

数据源:存储过程的数据源类型支持MySQL和POSTGRESQL两种,选择对应的数据源
方法:是存储过程的方法名称
自定义参数:存储过程的自定义参数类型支持IN、OUT两种,数据类型支持VARCHAR、INTEGER、LONG、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP、BOOLEAN九种数据类型

11.12.5 SQL节点

拖动工具栏中的任务节点到画板中
非查询SQL功能:编辑非查询SQL任务信息,sql类型选择非查询,如下图所示:

查询SQL功能:编辑查询SQL任务信息,sql类型选择查询,选择表格或附件形式发送邮件到指定的收件人,如下图所示。

数据源:选择对应的数据源
sql类型:支持查询和非查询两种,查询是select类型的查询,是有结果集返回的,可以指定邮件通知为表格、附件或表格附件三种模板。非查询是没有结果集返回的,是针对update、delete、insert三种类型的操作。
sql参数:输入参数格式为key1=value1;key2=value2…
sql语句:SQL语句
UDF函数:对于HIVE类型的数据源,可以引用资源中心中创建的UDF函数,其他类型的数据源暂不支持UDF函数。
自定义参数:SQL任务类型,而存储过程是自定义参数顺序的给方法设置值自定义参数类型和数据类型同存储过程任务类型一样。区别在于SQL任务类型自定义参数会替换sql语句中${变量}。
前置sql:前置sql在sql语句之前执行。
后置sql:后置sql在sql语句之后执行。

11.12.6 SPARK节点

通过SPARK节点,可以直接直接执行SPARK程序,对于spark节点,worker会使用spark-submit方式提交任务

拖动工具栏中的任务节点到画板中,如下图所示:

程序类型:支持JAVA、Scala和Python三种语言
主函数的class:是Spark程序的入口Main Class的全路径
主jar包:是Spark的jar包
部署方式:支持yarn-cluster、yarn-client和local三种模式
Driver内核数:可以设置Driver内核数及内存数
Executor数量:可以设置Executor数量、Executor内存数和Executor内核数
命令行参数:是设置Spark程序的输入参数,支持自定义参数变量的替换。
其他参数:支持 --jars、--files、--archives、--conf格式
资源:如果其他参数中引用了资源文件,需要在资源中选择指定
自定义参数:是MR局部的用户自定义参数,会替换脚本中以${变量}的内容

注意:JAVA和Scala只是用来标识,没有区别,如果是Python开发的Spark则没有主函数的class,其他都是一样

11.12.7 MapReduce(MR)节点

使用MR节点,可以直接执行MR程序。对于mr节点,worker会使用hadoop jar方式提交任务

拖动工具栏中的任务节点到画板中,如下图所示:

    JAVA程序

主函数的class:是MR程序的入口Main Class的全路径
程序类型:选择JAVA语言
主jar包:是MR的jar包
命令行参数:是设置MR程序的输入参数,支持自定义参数变量的替换
其他参数:支持 –D、-files、-libjars、-archives格式
资源: 如果其他参数中引用了资源文件,需要在资源中选择指定
自定义参数:是MR局部的用户自定义参数,会替换脚本中以${变量}的内容

    Python程序

程序类型:选择Python语言
主jar包:是运行MR的Python jar包
其他参数:支持 –D、-mapper、-reducer、-input -output格式,这里可以设置用户自定义参数的输入,比如:
-mapper "mapper.py 1" -file mapper.py -reducer reducer.py -file reducer.py –input /journey/words.txt -output /journey/out/mr/${currentTimeMillis}
其中 -mapper 后的 mapper.py 1是两个参数,第一个参数是mapper.py,第二个参数是1
资源: 如果其他参数中引用了资源文件,需要在资源中选择指定
自定义参数:是MR局部的用户自定义参数,会替换脚本中以${变量}的内容

11.12.8 Python节点

使用python节点,可以直接执行python脚本,对于python节点,worker会使用python **方式提交任务。

拖动工具栏中的任务节点到画板中,如下图所示:

脚本:用户开发的Python程序
资源:是指脚本中需要调用的资源文件列表
自定义参数:是Python局部的用户自定义参数,会替换脚本中以${变量}的内容

11.12.9 Flink节点

拖动工具栏中的任务节点到画板中,如下图所示:

程序类型:支持JAVA、Scala和Python三种语言
主函数的class:是Flink程序的入口Main Class的全路径
主jar包:是Flink的jar包
部署方式:支持cluster、local三种模式
slot数量:可以设置slot数
taskManage数量:可以设置taskManage数
jobManager内存数:可以设置jobManager内存数
taskManager内存数:可以设置taskManager内存数
命令行参数:是设置Spark程序的输入参数,支持自定义参数变量的替换。
其他参数:支持 --jars、--files、--archives、--conf格式
资源:如果其他参数中引用了资源文件,需要在资源中选择指定
自定义参数:是Flink局部的用户自定义参数,会替换脚本中以${变量}的内容

注意:JAVA和Scala只是用来标识,没有区别,如果是Python开发的Flink则没有主函数的class,其他都是一样

11.12.10 http节点

拖动工具栏中的任务节点到画板中,如下图所示:

节点名称:一个工作流定义中的节点名称是唯一的。
运行标志:标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。
描述信息:描述该节点的功能。
任务优先级:worker线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。
Worker分组:任务分配给worker组的机器机执行,选择Default,会随机选择一台worker机执行。
失败重试次数:任务失败重新提交的次数,支持下拉和手填。
失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。
超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.
请求地址:http请求URL。
请求类型:支持GET、POSt、HEAD、PUT、DELETE。
请求参数:支持Parameter、Body、Headers。
校验条件:支持默认响应码、自定义响应码、内容包含、内容不包含。
校验内容:当校验条件选择自定义响应码、内容包含、内容不包含时,需填写校验内容。
自定义参数:是http局部的用户自定义参数,会替换脚本中以${变量}的内容。

11.12.11 DATAX节点

拖动工具栏中的任务节点到画板中

自定义模板:打开自定义模板开关时,可以自定义datax节点的json配置文件内容(适用于控件配置不满足需求时)
数据源:选择抽取数据的数据源
sql语句:目标库抽取数据的sql语句,节点执行时自动解析sql查询列名,映射为目标表同步列名,源表和目标表列名不一致时,可以通过列别名(as)转换
目标库:选择数据同步的目标库
目标表:数据同步的目标表名
前置sql:前置sql在sql语句之前执行(目标库执行)。
后置sql:后置sql在sql语句之后执行(目标库执行)。
json:datax同步的json配置文件
自定义参数:SQL任务类型,而存储过程是自定义参数顺序的给方法设置值自定义参数类型和数据类型同存储过程任务类型一样。区别在于SQL任务类型自定义参数会替换sql语句中${变量}。

11.13 Hello 参数

11.13.1 系统参数

变量 含义
${system.biz.date} 日常调度实例定时的定时时间前一天,格式为 yyyyMMdd,补数据时,该日期 +1
${system.biz.curdate} 日常调度实例定时的定时时间,格式为 yyyyMMdd,补数据时,该日期 +1
${system.datetime} 日常调度实例定时的定时时间,格式为 yyyyMMddHHmmss,补数据时,该日期 +1

11.13.2 时间自定义参数

支持代码中自定义变量名,声明方式:${变量名}。可以是引用 "系统参数" 或指定 "常量"。

我们定义这种基准变量为 [...] 格式的,[yyyyMMddHHmmss] 是可以任意分解组合的,比如:$[yyyyMMdd], $[HHmmss], $[yyyy-MM-dd] 等

也可以使用以下格式:

* 后 N 年:$[add_months(yyyyMMdd,12*N)]
* 前 N 年:$[add_months(yyyyMMdd,-12*N)]
* 后 N 月:$[add_months(yyyyMMdd,N)]
* 前 N 月:$[add_months(yyyyMMdd,-N)]
* 后 N 周:$[yyyyMMdd+7*N]
* 前 N 周:$[yyyyMMdd-7*N]
* 后 N 天:$[yyyyMMdd+N]
* 前 N 天:$[yyyyMMdd-N]
* 后 N 小时:$[HHmmss+N/24]
* 前 N 小时:$[HHmmss-N/24]
* 后 N 分钟:$[HHmmss+N/24/60]
* 前 N 分钟:$[HHmmss-N/24/60]

11.13.3 用户自定义参数

用户自定义参数分为全局参数和局部参数。全局参数是保存工作流定义和工作流实例的时候传递的全局参数,全局参数可以在整个流程中的任何一个任务节点的局部参数引用。 例如:

global_bizdate为全局参数,引用的是系统参数。

任务中local_param_bizdate通过${global_bizdate}来引用全局参数,对于脚本可以通过${local_param_bizdate}来引全局变量global_bizdate的值,或通过JDBC直接将local_param_bizdate的值set进去

14 元数据解释

Mysql核心表概览

表名 表信息
t_ds_access_token 访问ds后端的token
t_ds_alert 告警信息
t_ds_alertgroup 告警组
t_ds_command 执行命令
t_ds_datasource 数据源
t_ds_error_command 错误命令
t_ds_process_definition 流程定义
t_ds_process_instance 流程实例
t_ds_project 项目
t_ds_queue 队列
t_ds_relation_datasource_user 用户关联数据源
t_ds_relation_process_instance 子流程
t_ds_relation_project_user 用户关联项目
t_ds_relation_resources_user 用户关联资源
t_ds_relation_udfs_user 用户关联UDF函数
t_ds_relation_user_alertgroup 用户关联告警组
t_ds_resources 资源文件
t_ds_schedules 流程定时调度
t_ds_session 用户登录的session
t_ds_task_instance 任务实例
t_ds_tenant 租户
t_ds_udfs UDF资源
t_ds_user 用户
t_ds_version ##ds版本信息

15 架构设计

系统架构图

启动流程活动图

MasterServer

MasterServer采用分布式无中心设计理念,MasterServer主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。 MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。 MasterServer基于netty提供监听服务。

该服务内主要包含:

Distributed Quartz分布式调度组件,主要负责定时任务的启停操作,当quartz调起任务后,Master内部会有线程池具体负责处理任务的后续操作

MasterSchedulerThread是一个扫描线程,定时扫描数据库中的 command 表,根据不同的命令类型进行不同的业务操作

MasterExecThread主要是负责DAG任务切分、任务提交监控、各种不同命令类型的逻辑处理

MasterTaskExecThread主要负责任务的持久化

WorkerServer

WorkerServer也采用分布式无中心设计理念,WorkerServer主要负责任务的执行和提供日志服务。 WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。 Server基于netty提供监听服务。Worker

该服务包含:

FetchTaskThread主要负责不断从Task Queue中领取任务,并根据不同任务类型调用TaskScheduleThread对应执行器。

LoggerServer是一个RPC服务,提供日志分片查看、刷新和下载等功能

ZooKeeper

ZooKeeper服务,系统中的MasterServer和WorkerServer节点都通过ZooKeeper来进行集群管理和容错。另外系统还基于ZooKeeper进行事件监听和分布式锁。 我们也曾经基于Redis实现过队列,不过我们希望DolphinScheduler依赖到的组件尽量地少,所以最后还是去掉了Redis实现。

Task Queue

提供任务队列的操作,目前队列也是基于Zookeeper来实现。由于队列中存的信息较少,不必担心队列里数据过多的情况,实际上我们压测过百万级数据存队列,对系统稳定性和性能没影响。

Alert

提供告警相关接口,接口主要包括告警两种类型的告警数据的存储、查询和通知功能。其中通知功能又有邮件通知SNMP(暂未实现)两种。

API

API接口层,主要负责处理前端UI层的请求。该服务统一提供RESTful api向外部提供请求服务。 接口包括工作流的创建、定义、查询、修改、发布、下线、手工启动、停止、暂停、恢复、从该节点开始执行等等。

UI

系统的前端页面,提供系统的各种可视化操作界面,详见https://dolphinscheduler.apache.org/zh-cn/docs/user_doc/system-manual.html部分。

16 参考网站

https://dolphinscheduler.apache.org/zh-cn/docs/1.3.1/user_doc/hardware-environment.html

环境篇:DolphinScheduler-1.3.1安装部署及使用技巧的相关教程结束。

《环境篇:DolphinScheduler-1.3.1安装部署及使用技巧.doc》

下载本文的Word格式文档,以方便收藏与打印。