HDFS Java API

2022-05-06 17:38:50 浏览数 (1)

HDFS Java API

官网 http://hadoop.apache.org/docs/r2.7.3/api/index.html

1.读取HDFS文件

代码语言:javascript复制
package test;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class ReadFile {

    public static void main(String[] args) throws IOException {
        String uri="hdfs://192.168.1.25:8020/user/root/wordcount/input/words.txt";
        Configuration cfg=new Configuration();
        FileSystem fs=  FileSystem.get(URI.create(uri),cfg);
        InputStream in=null;
        try{
            in=fs.open(new Path(uri));
            IOUtils.copyBytes(in, System.out,4096,false);
        }catch(Exception e){
            System.out.println(e.getMessage());
        }finally{
            IOUtils.closeStream(in);
        }
    }

}

直接在Eclipse中运行: Run as -> Run Application

代码语言:javascript复制
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
三月 02, 2017 4:42:20 下午 org.apache.hadoop.util.NativeCodeLoader <clinit>
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
三月 02, 2017 4:42:20 下午 org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory <init>
警告: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
hi
hello,world
hello,hadoop
hello,java
hi,baby

在服务器端运行 将该类导出为 ReadFile.jar:

  • 在eclipse中选择要导出的类或者package
  • 右击选择Export子选项
  • 在弹出的对话框中,选择Java目,选择JAR file
  • 在JAR Export对话框中的JAR file文本框中选择你要生成的jar包的位置以及名字,比如此处是/root/ReadFile.jar
  • 注意在Export generated class files and resources和Export java source files and resources前面打上勾

将jar上传集群中一个节点下

代码语言:javascript复制
[root@hadron ~]# scp ReadFile.jar 192.168.1.25:/root
root@192.168.1.25's password: 
ReadFile.jar                                  100% 1248     1.2KB/s   00:00

在anode1(192.168.1.25)节点上执行命令:

代码语言:javascript复制
[root@anode1 ~]# hadoop jar ReadFile.jar  test.ReadFile
hi
hello,world
hello,hadoop
hello,java
hi,baby

2.上传HDFS文件

[root@hadron ~]# cat test.txt test Hi,HDFS!

代码语言:javascript复制
package test;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class PutFile {

    public static void main(String[] args) throws IOException {
        //本地文件路径
        String local="/root/test.txt";
        String dest="hdfs://192.168.1.25:8020/user/root/wordcount/input/test2.txt";
        InputStream in=new BufferedInputStream(new FileInputStream(local));
        Configuration cfg=new Configuration();
        FileSystem fs=  FileSystem.get(URI.create(dest),cfg);
        OutputStream out=fs.create(new Path(dest));
        IOUtils.copyBytes(in, out,4096,true);
        fs.close();
        IOUtils.closeStream(in);
    }
}
代码语言:javascript复制
[root@anode1 ~]# hadoop fs -ls /user/root/wordcount/input
Found 3 items
-rw-r--r--   3 root hdfs         14 2017-03-02 17:09 /user/root/wordcount/input/test2.txt
-rw-r--r--   3 root hdfs         47 2017-03-01 09:53 /user/root/wordcount/input/words.txt
-rw-r--r--   3 root hdfs         47 2017-03-01 10:16 /user/root/wordcount/input/words2.txt
You have new mail in /var/spool/mail/root
[root@anode1 ~]# hadoop fs -cat /user/root/wordcount/input/test2.txt
test
Hi,HDFS!
[root@anode1 ~]# 

3 下载文件

代码语言:javascript复制
package test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
public class GetFile {

    public static void main(String[] args) throws IOException {
        String hdfsPath="hdfs://192.168.1.25:8020/user/root/wordcount/input/words.txt";
        String localPath="/root/words";
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        Path hdfs_path = new Path(hdfsPath);
        Path local_path = new Path(localPath);
        fs.copyToLocalFile(hdfs_path, local_path);
        fs.close();
    }
}
代码语言:javascript复制
[root@hadron ~]# cat words
hi
hello,world
hello,hadoop
hello,java
hi,baby

4 创建HDFS目录

代码语言:javascript复制
package test;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class CreateDir {
    public static void main(String[] args) throws IOException {
        String rootPath = "hdfs://192.168.1.25:8020";//dfs.namenode.rpc-address.mycluster.m1  hdfs://172.16.5.174:8020
        Path p = new Path(rootPath   "/tmp/");
        Configuration conf = new Configuration();
        FileSystem fs = p.getFileSystem(conf);
        boolean b = fs.mkdirs(p);
        System.out.println(b);
        fs.close();
    }
}

执行

代码语言:javascript复制
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
三月 03, 2017 8:39:52 上午 org.apache.hadoop.util.NativeCodeLoader <clinit>
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
三月 03, 2017 8:39:53 上午 org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory <init>
警告: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
true
代码语言:javascript复制
[root@anode1 ~]# hadoop fs -ls /
Found 9 items
drwxrwxrwx   - yarn   hadoop          0 2017-02-22 12:48 /app-logs
drwxr-xr-x   - hdfs   hdfs            0 2017-02-22 12:29 /apps
drwxr-xr-x   - yarn   hadoop          0 2017-02-22 12:16 /ats
drwxr-xr-x   - hdfs   hdfs            0 2017-02-22 12:17 /hdp
drwxr-xr-x   - mapred hdfs            0 2017-02-22 12:17 /mapred
drwxrwxrwx   - mapred hadoop          0 2017-02-22 12:17 /mr-history
drwxrwxrwx   - spark  hadoop          0 2017-02-24 09:39 /spark2-history
drwxrwxrwx   - hdfs   hdfs            0 2017-02-27 08:32 /tmp
drwxr-xr-x   - hdfs   hdfs            0 2017-02-22 18:22 /user

5 删除HDFS文件或文件夹

代码语言:javascript复制
[root@anode1 ~]# hadoop fs -ls /user/root/wordcount
Found 2 items
drwxr-xr-x   - root hdfs          0 2017-03-02 17:12 /user/root/wordcount/input
drwxr-xr-x   - root hdfs          0 2017-03-01 10:22 /user/root/wordcount/output
代码语言:javascript复制
package test;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class DeleteFile {
    public static void main(String[] args) {
        String uri = "hdfs://192.168.1.25:8020/user/root/wordcount/output";
        Path path = new Path(uri);
        Configuration conf = new Configuration();
        try {
            FileSystem fs = path.getFileSystem(conf);
            //递归删除文件夹及文件夹下的文件
            boolean b = fs.delete(path, true);
            System.out.println(b);
            fs.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
代码语言:javascript复制
[root@anode1 ~]# hadoop fs -ls /user/root/wordcount
Found 1 items
drwxr-xr-x   - root hdfs          0 2017-03-02 17:12 /user/root/wordcount/input

6 输出HDFS指定目录下的文件和子目录

代码语言:javascript复制
package test;

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class ListFiles {
    public static void main(String[] args) throws IOException {
        String uri = "hdfs://192.168.1.25:8020/user/root/";
        Configuration cfg = new Configuration();
        FileSystem fs=  FileSystem.get(URI.create(uri),cfg);
        Path path = new Path(uri);
        FileStatus[] fss = fs.listStatus(path);
        for(FileStatus f:fss){
            if(f.isFile())
                System.out.println("File:" f.getPath().toString());
            else
                System.out.println("Dir:" f.getPath().toString());
        }
    }
}

执行结果:

代码语言:javascript复制
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
三月 03, 2017 9:21:02 上午 org.apache.hadoop.util.NativeCodeLoader <clinit>
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
三月 03, 2017 9:21:02 上午 org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory <init>
警告: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
Dir:hdfs://192.168.1.25:8020/user/root/.Trash
Dir:hdfs://192.168.1.25:8020/user/root/.hiveJars
Dir:hdfs://192.168.1.25:8020/user/root/.sparkStaging
Dir:hdfs://192.168.1.25:8020/user/root/.staging
Dir:hdfs://192.168.1.25:8020/user/root/wordcount

7 下载HDFS目录

HDFS存在的目录

代码语言:javascript复制
[root@anode1 ~]# hadoop fs -ls /user/root/wordcount
Found 1 items
drwxr-xr-x   - root hdfs          0 2017-03-02 17:12 /user/root/wordcount/input

本地目录

代码语言:javascript复制
[root@hadron ~]# ls hdfs/
input

程序

代码语言:javascript复制
package test;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class CopyToLocalFile {

    public static void main(String[] args) throws IOException {
        String hdfsPath="hdfs://192.168.1.25:8020/user/root/wordcount";
        String localPath="/root/hdfs";
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        Path hdfs_path = new Path(hdfsPath);
        Path local_path = new Path(localPath);
        fs.copyToLocalFile(hdfs_path,local_path);
        fs.close();
    }
}

执行结果

代码语言:javascript复制
[root@hadron ~]# ls hdfs/
input  wordcount
[root@hadron ~]# ls hdfs/wordcount/
input
[root@hadron ~]# ls hdfs/wordcount/input/
test2.txt  test3.txt  words2.txt  words.txt

8 上传本地目录(文件夹)

将本地/root/hdfs/input下文件重命名

代码语言:javascript复制
[root@hadron ~]# ls hdfs/input
a.txt  b.txt  c.txt  d.txt

程序代码

代码语言:javascript复制
package test;

import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class CopyFromLocalFile {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        String hdfsPath="hdfs://192.168.1.25:8020/user/root";
        String localPath="/root/hdfs/input";
        Configuration conf = new Configuration();
        try {
            FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
            Path hdfs_path = new Path(hdfsPath);
            Path local_path = new Path(localPath);
            fs.copyFromLocalFile(local_path, hdfs_path);
            fs.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

执行结果

代码语言:javascript复制
[root@anode1 ~]# hadoop fs -ls /user/root
Found 6 items
drwx------   - root hdfs          0 2017-03-01 20:00 /user/root/.Trash
drwxr-xr-x   - root hdfs          0 2017-02-22 12:48 /user/root/.hiveJars
drwxr-xr-x   - root hdfs          0 2017-02-24 09:39 /user/root/.sparkStaging
drwx------   - root hdfs          0 2017-03-01 10:22 /user/root/.staging
drwxr-xr-x   - root hdfs          0 2017-03-03 11:15 /user/root/input
drwxr-xr-x   - root hdfs          0 2017-03-03 08:49 /user/root/wordcount
You have new mail in /var/spool/mail/root
[root@anode1 ~]# hadoop fs -ls /user/root/input
Found 4 items
-rw-r--r--   3 root hdfs         14 2017-03-03 11:15 /user/root/input/a.txt
-rw-r--r--   3 root hdfs         14 2017-03-03 11:15 /user/root/input/b.txt
-rw-r--r--   3 root hdfs         47 2017-03-03 11:15 /user/root/input/c.txt
-rw-r--r--   3 root hdfs         47 2017-03-03 11:15 /user/root/input/d.txt

9 HDFS的文件或目录下载

本地

代码语言:javascript复制
[root@hadron ~]# vi /etc/hosts
192.168.1.25    anode1
[root@hadron ~]# ping anode1
PING anode1 (192.168.1.25) 56(84) bytes of data.
64 bytes from anode1 (192.168.1.25): icmp_seq=1 ttl=64 time=0.266 ms
64 bytes from anode1 (192.168.1.25): icmp_seq=2 ttl=64 time=0.265 ms
^C
--- anode1 ping statistics ---
2 packets transmitted, 2 received, 0% packet loss, time 999ms
rtt min/avg/max/mdev = 0.265/0.265/0.266/0.016 ms

新建目录conf 右击目录conf -> Build Path -> Use as source Folder 将集群配置文件复制到conf下

代码语言:javascript复制
package test;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.io.IOUtils;

public class Downloader {
    private static Log log = LogFactory.getLog(Downloader.class);
    private String src;
    private static Configuration conf;
    public static DistributedFileSystem dfs;
    static {
        conf = new HdfsConfiguration();
        dfs = new DistributedFileSystem();
        try {
            dfs.initialize(new URI(conf.get("fs.defaultFS")), conf);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    public Downloader(String src) {
        this.src = src;
    }

    public void download(String dest) {
        Path path = new Path(src);
        File file = new File(dest);
        file.mkdirs();
        try {
            if (dfs.isFile(path)) {//下载文件
                innerDownloadFile(src, dest);
            } else {//下载目录
                innerDownloadDir(src, dest);
            }
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
    }

    private void innerDownloadFile(String src, String dest) {
        Path path = new Path(src);
        try {
            if (dfs.exists(path)) {//HDFS上文件存在
                //创建本地文件
                File file = new File(dest   File.separator   path.getName());
                //从HDFS输入,然后输出到本地文件
                file.createNewFile();
                InputStream in = dfs.open(path);
                OutputStream out = new FileOutputStream(file);
                IOUtils.copyBytes(in, out, conf);
                in.close();
                out.close();
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    private void innerDownloadDir(String src, String dest) {
        Path path = new Path(src);
        //创建本地目录
        File file = new File(dest   File.separator   path.getName());
        file.mkdirs();
        try {
            //由HDFS的path目录的文件或子目录组成的List或数组
            FileStatus[] fss = dfs.listStatus(path);
            for (int i = 0; i < fss.length; i  ) {
                if (fss[i].isFile()) {//当前元素是文件
                    innerDownloadFile(fss[i].getPath().toString(), dest
                              File.separator   path.getName());
                } else {//当前元素是子目录
                    innerDownloadDir(fss[i].getPath().toString(),
                            file.getAbsolutePath());
                }
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    public static void main(String[] args) {
        System.out.println(args[0]);
        System.out.println(args[1]);
        //args[0]是HDFS上的文件名
        Downloader dl = new Downloader(args[0]);
        //args[1]是下载到本地的文件名
        dl.download(args[1]);
    }
}
代码语言:javascript复制
[root@hadron ~]# ls hdfs/input/
test2.txt  test3.txt  words2.txt  words.txt

0 人点赞