SpringBoot开发azkaban Java类型任务

2021-02-03 15:15:53 浏览数 (1)

在上篇文章手把手教你安装 azkaban 运行环境中,详细介绍了 azkaban服务的安装,现在就用SpringBoot来开发一个azkaban的调度任务,上传到web界面运行。

azkaban调度类型

azkaban 可以支持非常多的任务类型,这一点在官方网站有说明,它支持的任务类型有如下几种:

  • Command:使用Linux shell命令行任务。
  • HadoopShell:这和Command一样也是命令类型,只不过可以和Hadoop集群通信
  • Java:Java任务
  • hadoopJava:也是一种Java类型,可以和hadoop集群通信,可以通过运行hadoopJava作业来创建大多数Hadoop作业类型,例如Pig,Hive等
  • Pig:pig脚本任务
  • Hive:支持 执行hiveSQL 任务

在官方网站中,每种任务类型都有相应的例子,任务的配置也进行了详细的说明,官方网站地址:http://xiaoshuai.github.io/azkaban-gh-pages/#new-hive-type

开发Java类型任务

接下来就来开发一个Java类型的任务,上传azkaban进行运行。

「需求:简单的从数据库查询用户信息打印出来。」

数据准备

首先来准备一下要查询的数据。

建表语句:

代码语言:javascript复制
create table if not exists ts_userinfo
(
 name varchar(255) charset utf8 null,
 age int null,
 job varchar(255) charset utf8 null,
 address varchar(255) charset utf8 null,
 `desc` varchar(255) charset utf8 null
);

表结构:

插入要查询的数据:

任务开发

项目采用SpringBoot进行开发,目录结构如下:

结构非常简单,相信各位看官都可以看懂。

application.properties

application.properties配置文件,用来配置 「mysql」 数据源和「mybatis」相关配置,如下所示:

代码语言:javascript复制
# 端口
server.port=8080

# mybatis配置文件路径
mybatis.mapper-locations=classpath*:mapper/*.xml

# mysql 配置
spring.datasource.url=jdbc:mysql://121.196.166.1xx:3306/azkaban
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

由于我的mybatis-spring引用的是高版本的,所以还需要手动配置 「SqlSessionFactory」「SqlSessionTemplate」 ,不然启动的时候会提示如下错误:Property sqlSessionFactory or sqlSessionTemplate are required

MybatisConfig

配置数据源,如下所示

代码语言:javascript复制

@Configuration
@MapperScan(basePackages = {"com.tsmyk.azkaban.azkaban.dao"}, sqlSessionTemplateRef = "sqlSessionTemplate")
public class MybatisConfig {

    @Value("${mybatis.mapper-locations}")
    private String mybatisMapperLocations;
    @Value("${spring.datasource.url}")
    private String url;
    @Value("${spring.datasource.username}")
    private String username;
    @Value("${spring.datasource.password}")
    private String password;
    @Value("${spring.datasource.driver-class-name}")
    private String driverClassName;

    @Bean(name = "dataSource")
    public DataSource getDataSource(){
        DruidDataSource dataSource =  new DruidDataSource();
        dataSource.setUrl(url);
        dataSource.setUsername(username);
        dataSource.setPassword(password);
        dataSource.setDriverClassName(driverClassName);
        return dataSource;
    }

    @Bean(name = "sqlSessionFactory")
    @Primary
    public SqlSessionFactory sqlSessionFactory(@Qualifier("dataSource") DataSource dataSource) throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mybatisMapperLocations));
        return bean.getObject();
    }

    @Bean(name = "transactionManager")
    public DataSourceTransactionManager transactionManager(@Qualifier("dataSource") DataSource dataSource){
        return new DataSourceTransactionManager(dataSource);
    }

    @Bean(name = "sqlSessionTemplate")
    public SqlSessionTemplate sqlSessionTemplate(@Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory){
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}
TsUserinfoDao

定义mybatis操作接口,如下所示:

代码语言:javascript复制
@Mapper
public interface TsUserinfoDao {
   List<TsUserinfo> queryAllUserInfo();
}
TsUserinfoDao.xml

编写mybatis sql 文件,如下所示:

代码语言:javascript复制
<mapper namespace="com.tsmyk.azkaban.azkaban.dao.TsUserinfoDao">
    <resultMap id="BaseResultMap" type="com.tsmyk.azkaban.azkaban.pojo.TsUserinfo">
        <result column="name" jdbcType="VARCHAR" property="name"/>
        <result column="age" jdbcType="INTEGER" property="age"/>
        <result column="job" jdbcType="VARCHAR" property="job"/>
        <result column="address" jdbcType="VARCHAR" property="address"/>
        <result column="desc" jdbcType="VARCHAR" property="desc"/>
    </resultMap>
    <select id="queryAllUserInfo" resultType="com.tsmyk.azkaban.azkaban.pojo.TsUserinfo">
        select * from ts_userinfo
    </select>
</mapper>
AzkabanServiceImpl

主要实现查询逻辑,如下所示:

代码语言:javascript复制
@Service
public class AzkabanServiceImpl implements  IAzkabanService  {
    
    private static final Logger  LOGGER = LoggerFactory.getLogger(AzkabanServiceImpl.class);
    @Autowired
    private TsUserinfoDao tsUserinfoDao;

    @Override
    public List<TsUserinfo> queryAllUserInfo() {
        LOGGER.info("开始查询用户信息........");
        List<TsUserinfo> userinfos = tsUserinfoDao.queryAllUserInfo();
        LOGGER.info("结束查询用户信息........");
        userinfos.forEach(System.out::println);
        return userinfos;
    }
}
AzkabanApplication

修改启动类,如下所示:

代码语言:javascript复制
@PropertySource("classpath:application.properties")
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class AzkabanApplication {
    private static final Logger LOGGER = LoggerFactory.getLogger(AzkabanApplication.class);

    public static void main(String[] args) {
        LOGGER.info("开始执行任务......");
        ApplicationContext context = new AnnotationConfigApplicationContext(AzkabanApplication.class);
        IAzkabanService azkabanService = context.getBean(IAzkabanService.class);
        List<TsUserinfo> userinfos = azkabanService.queryAllUserInfo();
        LOGGER.info("任务执行结束,结果为:rn");
        userinfos.forEach(System.out::println);
    }
}
运行测试:

之后,启动AzkabanApplication,日志打印如下:

到这里,azkaban java 类型的任务就算开发完了,接下来,需要打包部署到 azkaban上进行运行。

打包部署

第一步,首先使用 maven 把项目打成一个 jar包,使用 IDEA 自带的工具就可以:

第二步,把项目所依赖的所有jar包导出来,也可以使用 IDEA 自带的工具进行导出,如下:

  • 点击 maven 导航栏中的 Execute Maven Goal
  • 输入 dependency:copy-dependencies -DoutputDirectory=D:\ 命令,点击确定即可

第三步,编写 job 任务文件,后缀名为 .job,内容如下:

代码语言:javascript复制
# 指定任务类型为java任务
type=javaprocess
# 指定启动类
java.class=com.tsmyk.azkaban.AzkabanApplication
# 指定配置文件和依赖的jar包
classpath=resources,lib/*

当然,还可以指定更多的参数,比如jvm参数,Xms,Xmx 等参数。

最后,把项目依赖的jar包和项目本身的jar包放入到 lib文件夹中,把 lib 文件夹,resources 文件夹和 job文件,打包成压缩包:

之后登陆azkaban web管理端,项目项目,把压缩包上传即可。

上传成功后,运行即可。

到这里,一个azkaban简单的java类型的任务就开发完了,当然还可以在界面设置调度周期等。

0 人点赞