合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
使用流的方式实现文件上传到HDFS系统或下载HDFS系统的文件到本地。 <br/> *`com/exa/hdfs001/HdfsClientIo.java`* ```java package com.exa.hdfs001; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.junit.Test; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class HdfsClientIo { /** * 本地文件上传到HDFS文件系统, 文件已存在则覆盖 */ @Test public void putFileToHDFS() throws URISyntaxException, IOException, InterruptedException { // 获取文件系统 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), configuration, "root"); // 输入流 FileInputStream fis = new FileInputStream(new File("d:/hello.txt")); // 输出流 FSDataOutputStream fos = fs.create(new Path("/user/hadoop/input/hello.txt")); // 拷贝 IOUtils.copyBytes(fis, fos, configuration); // 资源关闭 IOUtils.closeStream(fis); IOUtils.closeStream(fos); fs.close(); } /** * 下载HDFS文件到本地,文件已存在则覆盖 */ @Test public void getFileFromHDFS() throws URISyntaxException, IOException, InterruptedException { // 获取文件系统 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), configuration, "root"); // 输入流 FSDataInputStream fis = fs.open(new Path("/user/hadoop/input/hello.txt")); // 输出流 FileOutputStream fos = new FileOutputStream(new File("d:/hello.txt")); // 拷贝 IOUtils.copyBytes(fis, fos, configuration); // 资源关闭 IOUtils.closeStream(fis); IOUtils.closeStream(fos); fs.close(); } /** * 分块读取HDFS文件 * 下载第一块 */ @Test public void readFileSeek1() throws URISyntaxException, IOException, InterruptedException { // 获取文件系统 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), configuration, "root"); // 输入流 FSDataInputStream fis = fs.open(new Path("/user/hadoop/input/hadoop-2.6.0-cdh5.14.2.tar.gz")); // 输出流 FileOutputStream fos = new FileOutputStream(new File("d:/hadoop-2.6.0-cdh5.14.2.tar.gz.part1")); // 缓冲区,每次都1024行数据,写1024行数据 byte[] buf = new byte[1024]; // 读取 // hadoop-2.6.0-cdh5.14.2.tar.gz 文件的总大小为413MB,第一次下载128MB for (int i = 0; i < 1024 * 128; i++) { fis.read(buf); // 先写到buf缓冲区 fos.write(buf); // 再从缓冲区写入fos } IOUtils.closeStream(fis); IOUtils.closeStream(fos); fs.close(); } /** * 分块读取HDFS文件 * 下载第二块 */ @Test public void readFileSeek2() throws URISyntaxException, IOException, InterruptedException { // 获取文件系统 Configuration configuration = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), configuration, "root"); // 输入流 FSDataInputStream fis = fs.open(new Path("/user/hadoop/input/hadoop-2.6.0-cdh5.14.2.tar.gz")); // 定位输入数据位置 // 第一次下载了128MB, ,需要定位到128这个位置。 // 文件单位为byte,1024byte=1kb,1024kb=1MB // 1024 * 1024 = 1024kb,1024kb * 128 = 128MB fis.seek(1024 * 1024 * 128); // 输出流 FileOutputStream fos = new FileOutputStream(new File("d:/hadoop-2.6.0-cdh5.14.2.tar.gz.part2")); // 拷贝 IOUtils.copyBytes(fis, fos, configuration); IOUtils.closeStream(fis); IOUtils.closeStream(fos); fs.close(); } } ``` 程序中分块读取HDFS文件后,将两个文件合并后就得到了原文件,如下将hadoop-2.6.0-cdh5.14.2.tar.gz.part2合并到hadoop-2.6.0-cdh5.14.2.tar.gz.part1中 ![](https://img.kancloud.cn/79/9d/799da7fa082ff6f10c1c1659a03734ca_877x88.png) 从上面的案例中可以看出,HDFS 提供的流读取数据的方式,可以从任意位置开始读取数据。这与后面 MapReduce 获取数据分片相关。