前言
本文Flink使用版本1.12.7
代码提交任务
准备文件夹和文件
代码语言:javascript复制hadoop fs -mkdir -p /jar/userTask
hadoop fs -mkdir -p /jar/flink12/libdist
hadoop fs -mkdir -p /jar/flink12/lib
拷贝需要的文件
代码语言:javascript复制hadoop fs -put $FLINK_HOME/examples/batch/WordCount.jar /jar/userTask/WordCount.jar
hadoop fs -put $FLINK_HOME/lib/flink-dist_2.12-1.12.7.jar /jar/flink12/libdist/flink-dist_2.12-1.12.7.jar
hadoop fs -put $FLINK_HOME/lib/* /jar/flink12/lib/
查看文件可以访问这个地址
http://hadoop01:50070/explorer.html#/
http://hadoop02:50070/explorer.html#/
在服务器上测试一下
代码语言:javascript复制flink run-application -t yarn-application hdfs://hacluster/jar/userTask/WordCount.jar --output hdfs://hacluster/bigdata_study/output03
添加依赖
代码语言:javascript复制<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
代码
代码语言:javascript复制package cn.psvmc;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterInformationRetriever;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.util.Collections;
import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
public class RunFlinkJob {
public static void main(String[] args) {
//flink的本地配置目录,为了得到flink的配置
// 如果出现org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.错误
// 则在flink-config.yaml加入
// classloader.resolve-order: parent-first
String configurationDirectory = "/data/tools/bigdata/flink-1.12.7/conf";
//存放flink集群相关的jar包目录
String flinkLibs = "hdfs://hacluster/jar/flink12/lib";
//用户jar
String userJarPath = "hdfs://hacluster/jar/userTask/WordCount.jar";
String flinkDistJar = "hdfs://hacluster/jar/flink12/libdist/flink-dist_2.12-1.12.7.jar";
YarnClient yarnClient = YarnClient.createYarnClient();
org.apache.hadoop.conf.Configuration entries = new org.apache.hadoop.conf.Configuration();
entries.addResource(new Path("/data/tools/bigdata/hadoop-2.7.7/etc/hadoop/yarn-site.xml"));
entries.addResource(new Path("/data/tools/bigdata/hadoop-2.7.7/etc/hadoop/hdfs-site.xml"));
entries.addResource(new Path("/data/tools/bigdata/hadoop-2.7.7/etc/hadoop/core-site.xml"));
YarnConfiguration yarnConfiguration = new YarnConfiguration(entries);
yarnClient.init(yarnConfiguration);
yarnClient.start();
// 设置日志的,没有的话看不到日志
YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever
.create(yarnClient);
//获取flink的配置
Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
configurationDirectory
);
flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
flinkConfiguration.set(
PipelineOptions.JARS,
Collections.singletonList(userJarPath)
);
Path remoteLib = new Path(flinkLibs);
flinkConfiguration.set(
YarnConfigOptions.PROVIDED_LIB_DIRS,
Collections.singletonList(remoteLib.toString())
);
// flinkConfiguration.set(
// YarnConfigOptions.FLINK_DIST_JAR,
// flinkDistJar
// );
// 设置为APPLICATION模式
flinkConfiguration.set(
DeploymentOptions.TARGET,
YarnDeploymentTarget.APPLICATION.getName()
);
// yarn application name
flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "zApplication");
// flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024", MEGA_BYTES));
// flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024", MEGA_BYTES));
YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, configurationDirectory);
ClusterSpecification clusterSpecification = new ClusterSpecification
.ClusterSpecificationBuilder()
.createClusterSpecification();
// 设置用户jar的参数和主类
// ApplicationConfiguration appConfig = new ApplicationConfiguration(args, "org.apache.flink.examples.java.wordcount.WordCount");
ApplicationConfiguration appConfig = new ApplicationConfiguration(args, null);
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
flinkConfiguration,
yarnConfiguration,
yarnClient,
clusterInformationRetriever,
true
);
try {
ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
clusterSpecification,
appConfig
);
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
ApplicationId applicationId = clusterClient.getClusterId();
String webInterfaceURL = clusterClient.getWebInterfaceURL();
System.out.println("applicationId is {}" applicationId);
System.out.println("webInterfaceURL is {}" webInterfaceURL);
// 退出
// yarnClusterDescriptor.killCluster(applicationId);
} catch (Exception e) {
e.printStackTrace();
}
}
}
查看yarn
http://hadoop02:8088/cluster
调用脚本执行
代码语言:javascript复制package cn.psvmc;
import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.sun.istack.logging.Logger;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class ConnectionSSH {
private static final Logger logger = Logger.getLogger(ConnectionSSH.class);
public static void main(String[] args) throws JSchException, IOException {
JSch jsch = new JSch();
String pubKeyPath = "C:\Users\Administrator\.ssh\id_rsa";
jsch.addIdentity(pubKeyPath);
String username = "root";
String host = "192.168.7.101";
Session session =jsch.getSession(username, host, 22);//为了连接做准备
session.setConfig("StrictHostKeyChecking", "no");
session.connect();
String command = "flink run -t yarn-per-job $FLINK_HOME/examples/batch/WordCount.jar";
ChannelExec channel=(ChannelExec)session.openChannel("exec");
channel.setCommand(command);
BufferedReader in = new BufferedReader(new InputStreamReader(channel.getInputStream()));
channel.connect();
String msg;
while((msg = in.readLine()) != null){
System.out.println(msg);
}
channel.disconnect();
session.disconnect();
}
}
使用密码
代码语言:javascript复制JSch jsch = new JSch();
String username = "root";
String host = "192.168.7.101";
Session session =jsch.getSession(username, host, 22);//为了连接做准备
session.setConfig("StrictHostKeyChecking", "no");
session.setPassword("zhangjian");
session.connect();
使用密匙
代码语言:javascript复制JSch jsch = new JSch();
String pubKeyPath = "C:\Users\Administrator\.ssh\id_rsa";
jsch.addIdentity(pubKeyPath);
String username = "root";
String host = "192.168.7.101";
Session session =jsch.getSession(username, host, 22);//为了连接做准备
session.setConfig("StrictHostKeyChecking", "no");
session.connect();
调用脚本执行2
这个类除了可以运行脚本,还可以复制文件。
依赖:
代码语言:javascript复制<dependency>
<groupId>ch.ethz.ganymed</groupId>
<artifactId>ganymed-ssh2</artifactId>
<version>build210</version>
</dependency>
工具类
代码语言:javascript复制package cn.psvmc;
import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.SCPClient;
import ch.ethz.ssh2.Session;
import ch.ethz.ssh2.StreamGobbler;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import java.io.*;
/**
* 描述:连接linux服务器并执行相关的shell命令
*/
public class ConnectLinuxCommand {
private static final Logger logger = Logger.getLogger(ConnectLinuxCommand.class);
private static final String DEFAULTCHARTSET = "UTF-8";
private static Connection conn;
/**
* @Title: login
* @Description: 用户名密码方式 远程登录linux服务器
* @return: Boolean
*/
public static Boolean login(RemoteConnect remoteConnect) {
boolean flag = false;
try {
conn = new Connection(remoteConnect.getIp());
conn.connect();// 连接
flag = conn.authenticateWithPassword(remoteConnect.getUserName(), remoteConnect.getPassword());// 认证
if (flag) {
logger.info("认证成功!");
} else {
logger.error("认证失败!");
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
return flag;
}
public static Boolean loginWithoutPwd(RemoteConnect remoteConnect) {
boolean flag = true;
try {
conn = new Connection(remoteConnect.getIp());
conn.connect();// 连接
boolean authenticationPartialSuccess = conn.isAuthenticationPartialSuccess();
System.out.println("authenticationPartialSuccess = " authenticationPartialSuccess);
logger.info("认证成功!");
} catch (IOException e) {
e.printStackTrace();
}
return flag;
}
/**
* @param remoteConnect 连接信息对象
* @param keyFile 一个文件对象指向一个文件,该文件包含OpenSSH**格式的用户的DSA或RSA私钥(PEM,不能丢失"-----BEGIN DSA PRIVATE KEY-----" or "-----BEGIN RSA PRIVATE KEY-----"标签
* @param keyfilePass 如果秘钥文件加密 需要用该参数解密,如果没有加密可以为null
* @return Boolean
* @Title: loginByKey
* @Description: 秘钥方式 远程登录linux服务器
*/
public static Boolean loginByFileKey(RemoteConnect remoteConnect, File keyFile, String keyfilePass) {
boolean flag = false;
try {
conn = new Connection(remoteConnect.getIp());
conn.connect();
// 登录认证
flag = conn.authenticateWithPublicKey(remoteConnect.getUserName(), keyFile, keyfilePass);
if (flag) {
logger.info("认证成功!");
} else {
logger.error("认证失败!");
conn.close();
}
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
/**
* @param remoteConnect 连接信息对象
* @param keys 一个字符[],其中包含用户的DSA或RSA私钥(OpenSSH密匙格式,您不能丢失“----- begin DSA私钥-----”或“-----BEGIN RSA PRIVATE KEY-----“标签。char数组可以包含换行符/换行符。
* @param keyPass 如果秘钥字符数组加密 需要用该字段解密 否则不需要可以为null
* @return Boolean
* @Title: loginByCharsKey
* @Description: 秘钥方式 远程登录linux服务器
*/
public static Boolean loginByCharsKey(RemoteConnect remoteConnect, char[] keys, String keyPass) {
boolean flag = false;
try {
conn = new Connection(remoteConnect.getIp());
conn.connect();
// 登录认证
flag = conn.authenticateWithPublicKey(remoteConnect.getUserName(), keys, keyPass);
if (flag) {
logger.info("认证成功!");
} else {
logger.error("认证失败!");
conn.close();
}
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
/**
* @param cmd 脚本命令
* @Title: execute
* @Description: 远程执行shll脚本或者命令
* @return: result 命令执行完毕返回结果
*/
public static String runCmd(String cmd) {
String result = "";
try {
Session session = conn.openSession();// 打开一个会话
session.execCommand(cmd);// 执行命令
result = processStdout(session.getStdout(), DEFAULTCHARTSET);
// 如果为得到标准输出为空,说明脚本执行出错了
if (StringUtils.isBlank(result)) {
result = processStdout(session.getStderr(), DEFAULTCHARTSET);
}
conn.close();
session.close();
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
/**
* @return String 命令执行成功后返回的结果值,如果命令执行失败,返回空字符串,不是null
* @Title: executeSuccess
* @Description: 远程执行shell脚本或者命令
*/
public static String runCmdSuccess(String cmd) {
String result = "";
try {
Session session = conn.openSession();// 打开一个会话
session.execCommand(cmd);// 执行命令
result = processStdout(session.getStdout(), DEFAULTCHARTSET);
conn.close();
session.close();
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
/**
* @param in 输入流对象
* @param charset 编码
* @return String 以纯文本的格式返回
* @Title: processStdout
* @Description: 解析脚本执行的返回结果
*/
public static String processStdout(InputStream in, String charset) {
InputStream stdout = new StreamGobbler(in);
StringBuilder buffer = new StringBuilder();
try {
BufferedReader br = new BufferedReader(new InputStreamReader(stdout, charset));
String line = null;
while ((line = br.readLine()) != null) {
buffer.append(line).append("n");
}
} catch (IOException e) {
e.printStackTrace();
}
return buffer.toString();
}
/**
* @return String
* @Description: 通过用户名和密码关联linux服务器
*/
public static String runCmd(String ip, String userName, String password, String commandStr) {
logger.info(
"ConnectLinuxCommand scpGet==="
"ip:" ip
" userName:" userName
" commandStr:" commandStr
);
String returnStr = "";
RemoteConnect remoteConnect = new RemoteConnect();
remoteConnect.setIp(ip);
remoteConnect.setUserName(userName);
remoteConnect.setPassword(password);
try {
if (login(remoteConnect)) {
returnStr = runCmd(commandStr);
System.out.println(returnStr);
}
} catch (Exception e) {
e.printStackTrace();
}
return returnStr;
}
public static boolean connectLinuxWithoutPwd(String ip, String userName, String commandStr) {
logger.info("ConnectLinuxCommand scpGet===" "ip:" ip " userName:" userName " commandStr:"
commandStr);
String returnStr = "";
boolean result = true;
RemoteConnect remoteConnect = new RemoteConnect();
remoteConnect.setIp(ip);
remoteConnect.setUserName(userName);
try {
if (loginWithoutPwd(remoteConnect)) {
returnStr = runCmd(commandStr);
System.out.println(result);
}
} catch (Exception e) {
e.printStackTrace();
}
if (StringUtils.isBlank(returnStr)) {
result = false;
}
return result;
}
/**
* @param password 密码(其他服务器)
* @param remoteFile 文件位置(其他服务器)
* @param localDir 本服务器目录
* @Title: scpGet
* @Description: 从其他服务器获取文件到本服务器指定目录
*/
public static void scpPull(String ip, String userName, String password, String remoteFile, String localDir)
throws IOException {
logger.info("ConnectLinuxCommand scpGet===" "ip:" ip " userName:" userName " remoteFile:"
remoteFile " localDir:" localDir);
RemoteConnect remoteConnect = new RemoteConnect();
remoteConnect.setIp(ip);
remoteConnect.setUserName(userName);
remoteConnect.setPassword(password);
if (login(remoteConnect)) {
SCPClient client = new SCPClient(conn);
client.get(remoteFile, localDir);
conn.close();
}
}
/**
* 将文件复制到其他计算机中
* @param ip 远程IP
* @param userName 远程用户名
* @param password 远程密码
* @param localFile 本地文件
* @param remoteDir 远程目录
* @throws IOException 异常
*/
public static void scpPush(String ip, String userName, String password, String localFile, String remoteDir)
throws IOException {
logger.info("ConnectLinuxCommand scpPut===" "ip:" ip " userName:" userName " localFile:"
localFile " remoteDir:" remoteDir);
RemoteConnect remoteConnect = new RemoteConnect();
remoteConnect.setIp(ip);
remoteConnect.setUserName(userName);
remoteConnect.setPassword(password);
if (login(remoteConnect)) {
SCPClient client = new SCPClient(conn);
client.put(localFile, remoteDir);
conn.close();
}
}
}
RemoteConnect
代码语言:javascript复制public class RemoteConnect {
String ip;
String userName;
String password;
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
测试
代码语言:javascript复制package cn.psvmc;
public class CLCTest {
public static void main(String[] args) {
mTest1();
}
public static void mTest1() {
System.out.println("--------------------------------------");
String commandStr="flink run -t yarn-per-job $FLINK_HOME/examples/batch/WordCount.jar";
String result=ConnectLinuxCommand.runCmd("192.168.7.101","root","zhangjian",commandStr);
System.out.println("结果:" result);
System.out.println("--------------------------------------");
}
public static void mTest2() {
try {
ConnectLinuxCommand.scpPull("192.168.7.101","root","zhangjian", "/root/test.txt", "d:/aa");
} catch (Exception e) {
e.printStackTrace();
}
}
public static void mTest3() {
try {
ConnectLinuxCommand.scpPush("192.168.7.101","root","zhangjian", "d:/aa/test2.txt", "/root/");
} catch (Exception e) {
e.printStackTrace();
}
}
}