Flink使用代码提交任务

2023-01-08 11:15:46 浏览数 (1)

前言

本文Flink使用版本1.12.7

代码提交任务

准备文件夹和文件

代码语言:javascript复制
hadoop fs -mkdir -p /jar/userTask
hadoop fs -mkdir -p /jar/flink12/libdist
hadoop fs -mkdir -p /jar/flink12/lib

拷贝需要的文件

代码语言:javascript复制
hadoop fs -put $FLINK_HOME/examples/batch/WordCount.jar /jar/userTask/WordCount.jar
hadoop fs -put $FLINK_HOME/lib/flink-dist_2.12-1.12.7.jar /jar/flink12/libdist/flink-dist_2.12-1.12.7.jar
hadoop fs -put $FLINK_HOME/lib/* /jar/flink12/lib/

查看文件可以访问这个地址

http://hadoop01:50070/explorer.html#/

http://hadoop02:50070/explorer.html#/

在服务器上测试一下

代码语言:javascript复制
flink run-application -t yarn-application hdfs://hacluster/jar/userTask/WordCount.jar --output hdfs://hacluster/bigdata_study/output03

添加依赖

代码语言:javascript复制
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-yarn_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>

代码

代码语言:javascript复制
package cn.psvmc;

import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterInformationRetriever;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;

import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import java.util.Collections;

import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;

public class RunFlinkJob {
    public static void main(String[] args) {
        //flink的本地配置目录,为了得到flink的配置
        // 如果出现org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.错误
        // 则在flink-config.yaml加入
        // classloader.resolve-order: parent-first
        String configurationDirectory = "/data/tools/bigdata/flink-1.12.7/conf";

        //存放flink集群相关的jar包目录
        String flinkLibs = "hdfs://hacluster/jar/flink12/lib";
        //用户jar
        String userJarPath = "hdfs://hacluster/jar/userTask/WordCount.jar";
        String flinkDistJar = "hdfs://hacluster/jar/flink12/libdist/flink-dist_2.12-1.12.7.jar";

        YarnClient yarnClient = YarnClient.createYarnClient();
        org.apache.hadoop.conf.Configuration entries = new org.apache.hadoop.conf.Configuration();
        entries.addResource(new Path("/data/tools/bigdata/hadoop-2.7.7/etc/hadoop/yarn-site.xml"));
        entries.addResource(new Path("/data/tools/bigdata/hadoop-2.7.7/etc/hadoop/hdfs-site.xml"));
        entries.addResource(new Path("/data/tools/bigdata/hadoop-2.7.7/etc/hadoop/core-site.xml"));
        YarnConfiguration yarnConfiguration = new YarnConfiguration(entries);
        yarnClient.init(yarnConfiguration);
        yarnClient.start();

        // 设置日志的,没有的话看不到日志
        YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever
                .create(yarnClient);

        //获取flink的配置
        Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
                configurationDirectory
        );

        flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);

        flinkConfiguration.set(
                PipelineOptions.JARS,
                Collections.singletonList(userJarPath)
        );

        Path remoteLib = new Path(flinkLibs);
        flinkConfiguration.set(
                YarnConfigOptions.PROVIDED_LIB_DIRS,
                Collections.singletonList(remoteLib.toString())
        );

//        flinkConfiguration.set(
//                YarnConfigOptions.FLINK_DIST_JAR,
//                flinkDistJar
//        );

        // 设置为APPLICATION模式
        flinkConfiguration.set(
                DeploymentOptions.TARGET,
                YarnDeploymentTarget.APPLICATION.getName()
        );

        // yarn application name
        flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "zApplication");

//        flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024", MEGA_BYTES));
//        flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024", MEGA_BYTES));


        YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, configurationDirectory);

        ClusterSpecification clusterSpecification = new ClusterSpecification
                .ClusterSpecificationBuilder()
                .createClusterSpecification();

        // 设置用户jar的参数和主类
//        ApplicationConfiguration appConfig = new ApplicationConfiguration(args, "org.apache.flink.examples.java.wordcount.WordCount");
        ApplicationConfiguration appConfig = new ApplicationConfiguration(args, null);
        YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
                flinkConfiguration,
                yarnConfiguration,
                yarnClient,
                clusterInformationRetriever,
                true
        );

        try {
            ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
                    clusterSpecification,
                    appConfig
            );

            ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();

            ApplicationId applicationId = clusterClient.getClusterId();
            String webInterfaceURL = clusterClient.getWebInterfaceURL();
            System.out.println("applicationId is {}"   applicationId);
            System.out.println("webInterfaceURL is {}"   webInterfaceURL);

            // 退出
            // yarnClusterDescriptor.killCluster(applicationId);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

查看yarn

http://hadoop02:8088/cluster

调用脚本执行

代码语言:javascript复制
package cn.psvmc;

import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.sun.istack.logging.Logger;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class ConnectionSSH {
  private static final Logger logger = Logger.getLogger(ConnectionSSH.class);
  public static void main(String[] args) throws JSchException, IOException {
    JSch jsch = new JSch();
    String pubKeyPath = "C:\Users\Administrator\.ssh\id_rsa";
    jsch.addIdentity(pubKeyPath);
    String username = "root";
    String host = "192.168.7.101";
    Session session =jsch.getSession(username, host, 22);//为了连接做准备
    session.setConfig("StrictHostKeyChecking", "no");
    session.connect();
    String command = "flink run -t yarn-per-job $FLINK_HOME/examples/batch/WordCount.jar";
    ChannelExec channel=(ChannelExec)session.openChannel("exec");
    channel.setCommand(command);
    BufferedReader in = new BufferedReader(new InputStreamReader(channel.getInputStream()));
    channel.connect();

    String msg;
    while((msg = in.readLine()) != null){
      System.out.println(msg);
    }
    channel.disconnect();
    session.disconnect();
  }
}

使用密码

代码语言:javascript复制
JSch jsch = new JSch();
String username = "root";
String host = "192.168.7.101";
Session session =jsch.getSession(username, host, 22);//为了连接做准备
session.setConfig("StrictHostKeyChecking", "no");
session.setPassword("zhangjian");
session.connect();

使用密匙

代码语言:javascript复制
JSch jsch = new JSch();
String pubKeyPath = "C:\Users\Administrator\.ssh\id_rsa";
jsch.addIdentity(pubKeyPath);
String username = "root";
String host = "192.168.7.101";
Session session =jsch.getSession(username, host, 22);//为了连接做准备
session.setConfig("StrictHostKeyChecking", "no");
session.connect();

调用脚本执行2

这个类除了可以运行脚本,还可以复制文件。

依赖:

代码语言:javascript复制
<dependency>
    <groupId>ch.ethz.ganymed</groupId>
    <artifactId>ganymed-ssh2</artifactId>
    <version>build210</version>
</dependency>

工具类

代码语言:javascript复制
package cn.psvmc;

import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.SCPClient;
import ch.ethz.ssh2.Session;
import ch.ethz.ssh2.StreamGobbler;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;

import java.io.*;
/**
 * 描述:连接linux服务器并执行相关的shell命令
 */
public class ConnectLinuxCommand {
    private static final Logger logger = Logger.getLogger(ConnectLinuxCommand.class);

    private static final String DEFAULTCHARTSET = "UTF-8";
    private static Connection conn;

    /**
     * @Title: login
     * @Description: 用户名密码方式  远程登录linux服务器
     * @return: Boolean
     */
    public static Boolean login(RemoteConnect remoteConnect) {
        boolean flag = false;
        try {
            conn = new Connection(remoteConnect.getIp());
            conn.connect();// 连接
            flag = conn.authenticateWithPassword(remoteConnect.getUserName(), remoteConnect.getPassword());// 认证
            if (flag) {
                logger.info("认证成功!");
            } else {
                logger.error("认证失败!");
                conn.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return flag;
    }

    public static Boolean loginWithoutPwd(RemoteConnect remoteConnect) {
        boolean flag = true;
        try {
            conn = new Connection(remoteConnect.getIp());
            conn.connect();// 连接
            boolean authenticationPartialSuccess = conn.isAuthenticationPartialSuccess();
            System.out.println("authenticationPartialSuccess = "   authenticationPartialSuccess);
            logger.info("认证成功!");
        } catch (IOException e) {
            e.printStackTrace();
        }
        return flag;
    }

    /**
     * @param remoteConnect 连接信息对象
     * @param keyFile       一个文件对象指向一个文件,该文件包含OpenSSH**格式的用户的DSA或RSA私钥(PEM,不能丢失"-----BEGIN DSA PRIVATE KEY-----" or "-----BEGIN RSA PRIVATE KEY-----"标签
     * @param keyfilePass   如果秘钥文件加密 需要用该参数解密,如果没有加密可以为null
     * @return Boolean
     * @Title: loginByKey
     * @Description: 秘钥方式  远程登录linux服务器
     */
    public static Boolean loginByFileKey(RemoteConnect remoteConnect, File keyFile, String keyfilePass) {
        boolean flag = false;
        try {
            conn = new Connection(remoteConnect.getIp());
            conn.connect();
            // 登录认证
            flag = conn.authenticateWithPublicKey(remoteConnect.getUserName(), keyFile, keyfilePass);
            if (flag) {
                logger.info("认证成功!");
            } else {
                logger.error("认证失败!");
                conn.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return flag;
    }

    /**
     * @param remoteConnect 连接信息对象
     * @param keys          一个字符[],其中包含用户的DSA或RSA私钥(OpenSSH密匙格式,您不能丢失“----- begin DSA私钥-----”或“-----BEGIN RSA PRIVATE KEY-----“标签。char数组可以包含换行符/换行符。
     * @param keyPass       如果秘钥字符数组加密  需要用该字段解密  否则不需要可以为null
     * @return Boolean
     * @Title: loginByCharsKey
     * @Description: 秘钥方式  远程登录linux服务器
     */
    public static Boolean loginByCharsKey(RemoteConnect remoteConnect, char[] keys, String keyPass) {
        boolean flag = false;
        try {
            conn = new Connection(remoteConnect.getIp());
            conn.connect();
            // 登录认证
            flag = conn.authenticateWithPublicKey(remoteConnect.getUserName(), keys, keyPass);
            if (flag) {
                logger.info("认证成功!");
            } else {
                logger.error("认证失败!");
                conn.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return flag;
    }

    /**
     * @param cmd 脚本命令
     * @Title: execute
     * @Description: 远程执行shll脚本或者命令
     * @return: result 命令执行完毕返回结果
     */
    public static String runCmd(String cmd) {
        String result = "";
        try {
            Session session = conn.openSession();// 打开一个会话
            session.execCommand(cmd);// 执行命令
            result = processStdout(session.getStdout(), DEFAULTCHARTSET);
            // 如果为得到标准输出为空,说明脚本执行出错了
            if (StringUtils.isBlank(result)) {
                result = processStdout(session.getStderr(), DEFAULTCHARTSET);
            }
            conn.close();
            session.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return result;
    }

    /**
     * @return String 命令执行成功后返回的结果值,如果命令执行失败,返回空字符串,不是null
     * @Title: executeSuccess
     * @Description: 远程执行shell脚本或者命令
     */
    public static String runCmdSuccess(String cmd) {
        String result = "";
        try {
            Session session = conn.openSession();// 打开一个会话
            session.execCommand(cmd);// 执行命令
            result = processStdout(session.getStdout(), DEFAULTCHARTSET);
            conn.close();
            session.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return result;
    }

    /**
     * @param in      输入流对象
     * @param charset 编码
     * @return String 以纯文本的格式返回
     * @Title: processStdout
     * @Description: 解析脚本执行的返回结果
     */
    public static String processStdout(InputStream in, String charset) {
        InputStream stdout = new StreamGobbler(in);
        StringBuilder buffer = new StringBuilder();
        try {
            BufferedReader br = new BufferedReader(new InputStreamReader(stdout, charset));
            String line = null;
            while ((line = br.readLine()) != null) {
                buffer.append(line).append("n");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return buffer.toString();
    }

    /**
     * @return String
     * @Description: 通过用户名和密码关联linux服务器
     */
    public static String runCmd(String ip, String userName, String password, String commandStr) {
        logger.info(
                "ConnectLinuxCommand  scpGet==="  
                        "ip:"   ip  
                        "  userName:"   userName  
                        "  commandStr:"   commandStr
        );

        String returnStr = "";
        RemoteConnect remoteConnect = new RemoteConnect();
        remoteConnect.setIp(ip);
        remoteConnect.setUserName(userName);
        remoteConnect.setPassword(password);
        try {
            if (login(remoteConnect)) {
                returnStr = runCmd(commandStr);
                System.out.println(returnStr);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return returnStr;
    }

    public static boolean connectLinuxWithoutPwd(String ip, String userName, String commandStr) {
        logger.info("ConnectLinuxCommand  scpGet==="   "ip:"   ip   "  userName:"   userName   "  commandStr:"
                  commandStr);

        String returnStr = "";
        boolean result = true;
        RemoteConnect remoteConnect = new RemoteConnect();
        remoteConnect.setIp(ip);
        remoteConnect.setUserName(userName);
        try {
            if (loginWithoutPwd(remoteConnect)) {
                returnStr = runCmd(commandStr);
                System.out.println(result);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        if (StringUtils.isBlank(returnStr)) {
            result = false;
        }
        return result;
    }

    /**
     * @param password   密码(其他服务器)
     * @param remoteFile 文件位置(其他服务器)
     * @param localDir   本服务器目录
     * @Title: scpGet
     * @Description: 从其他服务器获取文件到本服务器指定目录
     */
    public static void scpPull(String ip, String userName, String password, String remoteFile, String localDir)
            throws IOException {

        logger.info("ConnectLinuxCommand  scpGet==="   "ip:"   ip   "  userName:"   userName   "  remoteFile:"
                  remoteFile   "  localDir:"   localDir);
        RemoteConnect remoteConnect = new RemoteConnect();
        remoteConnect.setIp(ip);
        remoteConnect.setUserName(userName);
        remoteConnect.setPassword(password);
        if (login(remoteConnect)) {
            SCPClient client = new SCPClient(conn);
            client.get(remoteFile, localDir);
            conn.close();
        }
    }


    /**
     * 将文件复制到其他计算机中
     * @param ip 远程IP
     * @param userName 远程用户名
     * @param password 远程密码
     * @param localFile 本地文件
     * @param remoteDir 远程目录
     * @throws IOException 异常
     */
    public static void scpPush(String ip, String userName, String password, String localFile, String remoteDir)
            throws IOException {
        logger.info("ConnectLinuxCommand  scpPut==="   "ip:"   ip   "  userName:"   userName   "  localFile:"
                  localFile   "  remoteDir:"   remoteDir);
        RemoteConnect remoteConnect = new RemoteConnect();
        remoteConnect.setIp(ip);
        remoteConnect.setUserName(userName);
        remoteConnect.setPassword(password);
        if (login(remoteConnect)) {
            SCPClient client = new SCPClient(conn);
            client.put(localFile, remoteDir);
            conn.close();
        }
    }
}

RemoteConnect

代码语言:javascript复制
public class RemoteConnect {
    String ip;
    String userName;
    String password;

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}

测试

代码语言:javascript复制
package cn.psvmc;

public class CLCTest {
    public static void main(String[] args) {
        mTest1();
    }

    public static  void mTest1() {
        System.out.println("--------------------------------------");
        String commandStr="flink run -t yarn-per-job $FLINK_HOME/examples/batch/WordCount.jar";
        String result=ConnectLinuxCommand.runCmd("192.168.7.101","root","zhangjian",commandStr);
        System.out.println("结果:" result);
        System.out.println("--------------------------------------");
    }

    public static void mTest2() {
        try {
            ConnectLinuxCommand.scpPull("192.168.7.101","root","zhangjian", "/root/test.txt", "d:/aa");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void mTest3() {
        try {
            ConnectLinuxCommand.scpPush("192.168.7.101","root","zhangjian", "d:/aa/test2.txt", "/root/");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

0 人点赞