温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。
Fayson的github: https://github.com/fayson/cdhproject
提示:代码块部分可以左右滑动查看噢
1
文章编写目的
前面Fayson介绍了《如何使用Java API访问HDFS为目录设置配额》,随着开发语言的多样性,也有基于Scala语言进行开发,本篇文章主要介绍如何使用Scala代码访问Kerberos环境的HDFS。
- 内容概述
1.环境准备
2.Kerberos环境连接示例
- 测试环境
1.CDH版本为5.15.0
2.OS为Redhat7.2
- 前置条件
1.CDH集群运行正常
2.集群已启用Kerberos
2
环境准备
使用IDE工具通过Maven创建一个Scala工程,这里就不详细介绍Scala的开发环境搭建了。
1.在工程的pom.xml文件中增加如下依赖
代码语言:javascript复制<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.15.0</version>
</dependency>
(可左右滑动)
2.为Kerberos集群,需要导出一个keytab文件用于访问HDFS,导出步骤如下
在CMD命令行执行如下命令导出AD中用户的keytab文件
代码语言:javascript复制ktpass -princ hdfs/admin@FAYSON.COM -mapuser hdfs/admin -pass 123!QAZ -out hdfsadmin.keytab -crypto RC4-HMAC-NT
(可左右滑动)
导出的keytab文件会在当前命令执行目录。
3.获取集群krb5.conf文件,内容如下
代码语言:javascript复制[root@cdh4 ~]# more /etc/krb5.conf
# Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/
includedir /var/lib/sss/pubconf/krb5.include.d/
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
dns_lookup_realm = false
ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true
rdns = true
default_realm = FAYSON.COM
#default_ccache_name = KEYRING:persistent:%{uid}
[realms]
FAYSON.COM = {
kdc = adserver.fayson.com
admin_server = adserver.fayson.com
}
[domain_realm]
.fayson.com = FAYSON.COM
fayson.com = FAYSON.COM
(可左右滑动)
4.配置hosts文件,确保本地开发环境与集群所有节点通且端口均放通(如8020等)
由于Fayson这里使用的是公网环境所以hostname与外网的ip对应,这里会导致一个问题在向集群put数据文件时会失败,如果开发环境和HDFS都属于内网环境则不会有这个问题。
5.通过Cloudera Manager下载HDFS客户端配置
6.将上述准备的配置文件及keytab等信息拷贝至本地目录或工程中,Fayson的工程目录结构如下:
3
客户端访问HDFS工具类
1.ClientUtils类主要提供客户端初始化方法,内容如下:
代码语言:javascript复制package com.cloudera.utils
import java.io.IOException
import java.util.Properties
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.UserGroupInformation
/**
* package: com.cloudera.utils
* describe: 客户端访问HDFS工具类
* creat_user: Fayson
* email: htechinfo@163.com
* creat_date: 2018/11/13
* creat_time: 下午9:16
* 公众号:Hadoop实操
*/
object ClientUtils {
/**
* 初始化HDFS的Configuration
* @return
*/
def initConfiguration(): Configuration = {
val configuration = new Configuration
configuration.addResource(this.getClass().getResourceAsStream("/hdfs-client-kb/core-site.xml"))
configuration.addResource(this.getClass().getResourceAsStream("/hdfs-client-kb/hdfs-site.xml"))
configuration
}
/**
* 初始化访问Kerberos访问
* @param configuration
* @param debug 是否启用Kerberos的Debug模式
* @param properties 客户端配置信息
*/
def initKerberosENV(configuration: Configuration, debug: Boolean, properties: Properties):Unit = {
System.setProperty("java.security.krb5.conf", properties.getProperty("krb5.conf.path"))
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false")
if (debug) System.setProperty("sun.security.krb5.debug", "true")
try {
UserGroupInformation.setConfiguration(configuration)
UserGroupInformation.loginUserFromKeytab(properties.getProperty("kerberos.user"), properties.getProperty("kerberos.keytab.path"))
System.out.println(UserGroupInformation.getCurrentUser)
} catch {
case e: IOException => {
e.printStackTrace()
}
}
}
}
(可左右滑动)
2.HDFSUtils用于操作HDFS的工具类
代码语言:javascript复制package com.cloudera.utils
import org.apache.hadoop.fs.permission._
import org.apache.hadoop.fs.{FileSystem, Path}
import scala.collection.JavaConversions._
/**
* package: com.cloudera.utils
* describe: 用户操作HDFS工具类
* creat_user: Fayson
* email: htechinfo@163.com
* creat_date: 2018/11/13
* creat_time: 下午10:05
* 公众号:Hadoop实操
*/
object HDFSUtils {
/**
* 使用HDFS API向HDFS创建目录
* 在创建目录指定目录权限为777时,该权限需要与HDFS默认的umask权限相减,最终得出目录权限为755
* umask默认为022,0表示对owner没有限制,2表示对group不允许有写权限,2表示对other不允许有写权限
* 因此在创建目录指定777,但创建出来的目录为755的原因
* @param fileSystem
* @param dirName
*/
def mkdir(fileSystem: FileSystem, dirName: String):Unit = {
val path = new Path(dirName)
if(fileSystem.exists(path)) {
System.out.println("目录已存在")
} else {
val isok = fileSystem.mkdirs(path)
if(isok) {
System.out.println("HDFS目录创建成功:" dirName)
} else {
System.out.println("HDFS目录创建失败:" dirName)
}
}
}
/**
* 设置HDFS指定目录及文件权限
* @param fileSystem
* @param path 文件或目录路径
* @param mode 权限模式,如:777、755、644,数字对应的R=4,W=2,X=1
*/
def setPermission(fileSystem: FileSystem, path: String, mode: String): Unit = {
val fspath = new Path(path)
fileSystem.setPermission(fspath, new FsPermission(mode))
}
/**
* 设置HDFS指定目录或文件的属主及属组
* @param fileSystem
* @param path
* @param username
* @param groupname
*/
def setowner(fileSystem: FileSystem, path: String, username: String, groupname: String): Unit = {
val fspath = new Path(path)
fileSystem.setOwner(fspath, username, groupname)
}
/**
* 设置HDFS指定目录的ACL权限
* 在指定ACL时AclEntryScope.ACCESS表示当前目录所拥有的访问权限
* AclEntryScope.DEFAULT,表示该目录下所有子目录及文件集成父目录的Default ACL权限
* @param fileSystem
* @param path
*/
def setAcl(fileSystem: FileSystem,path: String): Unit = {
val fspath = new Path(path)
val listAcl = List[AclEntry](
new AclEntry.Builder().setType(AclEntryType.GROUP).setScope(AclEntryScope.ACCESS).setName("testa").setPermission(FsAction.ALL).build()
)
fileSystem.modifyAclEntries(fspath, listAcl)
}
/**
* 递归指定路径下所有目录及文件
* @param path
* @param fileSystem
* @return
*/
def recursiveDir(path: String, fileSystem: FileSystem): List[Path] = {
var listPath = List[Path]()
val fspath = new Path(path)
val listfiles = fileSystem.listStatus(fspath)
listfiles.foreach(f => {
System.out.println(f.getPath.toString)
if(f.isDirectory) {
recursiveDir(f.getPath.toString, fileSystem)
}
})
listPath
}
}
(可左右滑动)
4
示例代码及运行
1.OperatorHDFSByAPI为测试类包含API的调用
代码语言:javascript复制package com.cloudera.hdfs
import java.util.Properties
import com.cloudera.utils.{ClientUtils, HDFSUtils}
import org.apache.hadoop.fs.FileSystem
/**
* package: com.cloudera.hdfs
* describe: Scala访问Kerberos环境下的HDFS示例
* creat_user: Fayson
* email: htechinfo@163.com
* creat_date: 2018/11/13
* creat_time: 下午9:02
* 公众号:Hadoop实操
*/
object OperatorHDFSByAPI {
def main(args: Array[String]): Unit = {
//加载客户端配置参数
val properties = new Properties()
properties.load(this.getClass.getResourceAsStream("/client.properties"))
//初始化HDFS Configuration 配置
val configuration = ClientUtils.initConfiguration()
//集群启用Kerberos,代码中加入Kerberos环境
ClientUtils.initKerberosENV(configuration, false, properties)
val fileSystem = FileSystem.get(configuration)
val testPath = "/fayson/test"
//创建HDFS目录
// HDFSUtils.mkdir(fileSystem, testPath)
//设置目录属主及组
HDFSUtils.setowner(fileSystem, testPath, "hive", "hive")
//设置指定HDFS路径的权限
HDFSUtils.setPermission(fileSystem, testPath, "771")
//设置指定HDFS目录的ACL
HDFSUtils.setAcl(fileSystem, testPath)
//递归指定路径下所有目录及文件
HDFSUtils.recursiveDir("/user/hive/warehouse/test.db/", fileSystem)
fileSystem.close()
}
}
(可左右滑动)
2.示例运行
3.查看HDFS上创建的目录、权限及ACL等
未设置ACL权限的userc用户无权限访问该目录
5
总结
1.在进行本地开发时,必须将集群的hostname及IP配置在本地的hosts文件中(如果使用DNS服务则可以不配置hosts文件),否则无法与集群互通,确保本地客户端与集群的端口是放通的。
2.在创建目录指定目录权限为777时,创建目录的权限只能到755,是由于HDFS的umask导致,默认的umask为022(0表示对owner没有限制,2表示对group不允许有写权限,2表示对other不允许有写权限),指定的777权限减去022即为创建目录的权限
3.设置HDFS目录或文件的ACL时,需要区分AclEntryScope.ACCESS和DEFAULT。ACCESS表示为当前目录或文件指定ACL访问权限,DEFAULT表示在该目录下创建子目录或文件会继承该ACL权限。
GitHub源码地址:
https://github.com/fayson/cdhproject/tree/master/scalademo
提示:代码块部分可以左右滑动查看噢
为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。
推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。