在上篇文章手把手教你安装 azkaban 运行环境中,详细介绍了 azkaban
服务的安装,现在就用SpringBoot
来开发一个azkaban的调度任务,上传到web界面运行。
azkaban调度类型
azkaban
可以支持非常多的任务类型,这一点在官方网站有说明,它支持的任务类型有如下几种:
Command
:使用Linux shell命令行任务。HadoopShell
:这和Command一样也是命令类型,只不过可以和Hadoop集群通信Java
:Java任务hadoopJava
:也是一种Java类型,可以和hadoop集群通信,可以通过运行hadoopJava作业来创建大多数Hadoop作业类型,例如Pig,Hive等Pig
:pig脚本任务Hive
:支持 执行hiveSQL 任务
在官方网站中,每种任务类型都有相应的例子,任务的配置也进行了详细的说明,官方网站地址:http://xiaoshuai.github.io/azkaban-gh-pages/#new-hive-type
开发Java类型任务
接下来就来开发一个Java类型的任务,上传azkaban进行运行。
「需求:简单的从数据库查询用户信息打印出来。」
数据准备
首先来准备一下要查询的数据。
建表语句:
代码语言:javascript复制create table if not exists ts_userinfo
(
name varchar(255) charset utf8 null,
age int null,
job varchar(255) charset utf8 null,
address varchar(255) charset utf8 null,
`desc` varchar(255) charset utf8 null
);
表结构:
插入要查询的数据:
任务开发
项目采用SpringBoot
进行开发,目录结构如下:
结构非常简单,相信各位看官都可以看懂。
application.properties
application.properties
配置文件,用来配置 「mysql」 数据源和「mybatis」相关配置,如下所示:
# 端口
server.port=8080
# mybatis配置文件路径
mybatis.mapper-locations=classpath*:mapper/*.xml
# mysql 配置
spring.datasource.url=jdbc:mysql://121.196.166.1xx:3306/azkaban
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
由于我的mybatis-spring
引用的是高版本的,所以还需要手动配置 「SqlSessionFactory」 和 「SqlSessionTemplate」 ,不然启动的时候会提示如下错误:Property sqlSessionFactory or sqlSessionTemplate are required
MybatisConfig
配置数据源,如下所示
代码语言:javascript复制
@Configuration
@MapperScan(basePackages = {"com.tsmyk.azkaban.azkaban.dao"}, sqlSessionTemplateRef = "sqlSessionTemplate")
public class MybatisConfig {
@Value("${mybatis.mapper-locations}")
private String mybatisMapperLocations;
@Value("${spring.datasource.url}")
private String url;
@Value("${spring.datasource.username}")
private String username;
@Value("${spring.datasource.password}")
private String password;
@Value("${spring.datasource.driver-class-name}")
private String driverClassName;
@Bean(name = "dataSource")
public DataSource getDataSource(){
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl(url);
dataSource.setUsername(username);
dataSource.setPassword(password);
dataSource.setDriverClassName(driverClassName);
return dataSource;
}
@Bean(name = "sqlSessionFactory")
@Primary
public SqlSessionFactory sqlSessionFactory(@Qualifier("dataSource") DataSource dataSource) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mybatisMapperLocations));
return bean.getObject();
}
@Bean(name = "transactionManager")
public DataSourceTransactionManager transactionManager(@Qualifier("dataSource") DataSource dataSource){
return new DataSourceTransactionManager(dataSource);
}
@Bean(name = "sqlSessionTemplate")
public SqlSessionTemplate sqlSessionTemplate(@Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory){
return new SqlSessionTemplate(sqlSessionFactory);
}
}
TsUserinfoDao
定义mybatis操作接口,如下所示:
代码语言:javascript复制@Mapper
public interface TsUserinfoDao {
List<TsUserinfo> queryAllUserInfo();
}
TsUserinfoDao.xml
编写mybatis sql 文件,如下所示:
代码语言:javascript复制<mapper namespace="com.tsmyk.azkaban.azkaban.dao.TsUserinfoDao">
<resultMap id="BaseResultMap" type="com.tsmyk.azkaban.azkaban.pojo.TsUserinfo">
<result column="name" jdbcType="VARCHAR" property="name"/>
<result column="age" jdbcType="INTEGER" property="age"/>
<result column="job" jdbcType="VARCHAR" property="job"/>
<result column="address" jdbcType="VARCHAR" property="address"/>
<result column="desc" jdbcType="VARCHAR" property="desc"/>
</resultMap>
<select id="queryAllUserInfo" resultType="com.tsmyk.azkaban.azkaban.pojo.TsUserinfo">
select * from ts_userinfo
</select>
</mapper>
AzkabanServiceImpl
主要实现查询逻辑,如下所示:
代码语言:javascript复制@Service
public class AzkabanServiceImpl implements IAzkabanService {
private static final Logger LOGGER = LoggerFactory.getLogger(AzkabanServiceImpl.class);
@Autowired
private TsUserinfoDao tsUserinfoDao;
@Override
public List<TsUserinfo> queryAllUserInfo() {
LOGGER.info("开始查询用户信息........");
List<TsUserinfo> userinfos = tsUserinfoDao.queryAllUserInfo();
LOGGER.info("结束查询用户信息........");
userinfos.forEach(System.out::println);
return userinfos;
}
}
AzkabanApplication
修改启动类,如下所示:
代码语言:javascript复制@PropertySource("classpath:application.properties")
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class AzkabanApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(AzkabanApplication.class);
public static void main(String[] args) {
LOGGER.info("开始执行任务......");
ApplicationContext context = new AnnotationConfigApplicationContext(AzkabanApplication.class);
IAzkabanService azkabanService = context.getBean(IAzkabanService.class);
List<TsUserinfo> userinfos = azkabanService.queryAllUserInfo();
LOGGER.info("任务执行结束,结果为:rn");
userinfos.forEach(System.out::println);
}
}
运行测试:
之后,启动AzkabanApplication
,日志打印如下:
到这里,azkaban java 类型的任务就算开发完了,接下来,需要打包部署到 azkaban上进行运行。
打包部署
第一步,首先使用 maven 把项目打成一个 jar包,使用 IDEA 自带的工具就可以:
第二步,把项目所依赖的所有jar包导出来,也可以使用 IDEA 自带的工具进行导出,如下:
- 点击 maven 导航栏中的
Execute Maven Goal
- 输入
dependency:copy-dependencies -DoutputDirectory=D:\
命令,点击确定即可
第三步,编写 job 任务文件,后缀名为 .job
,内容如下:
# 指定任务类型为java任务
type=javaprocess
# 指定启动类
java.class=com.tsmyk.azkaban.AzkabanApplication
# 指定配置文件和依赖的jar包
classpath=resources,lib/*
当然,还可以指定更多的参数,比如jvm参数,Xms,Xmx 等参数。
最后,把项目依赖的jar包和项目本身的jar包放入到 lib文件夹中,把 lib 文件夹,resources 文件夹和 job文件,打包成压缩包:
之后登陆azkaban web管理端,项目项目,把压缩包上传即可。
上传成功后,运行即可。
到这里,一个azkaban简单的java类型的任务就开发完了,当然还可以在界面设置调度周期等。