利用HDFS java API增删改查操作
利用HDFS java API增删改查操作
在做这个实验的时候需要特别注意下面三个问题:
1、hdfs安全模式需要关闭 命令:./hadoop dfsadmin -safemode leave
2、工程中依赖的版本必须和集群的一致,否则也会报 version不一致错误
3、hadoop集群用户权限的问题,以及各个目录的作用
目前为什么会有这三个问题的原因待查!!!
未验证目前使用hadoop的版本(release-0.20.0)是否支持webhdfs,反正我是怎么都连接不上啊!!!
从这上面看,0.20.0 可能是不支持的
https://jira.springsource.org/browse/IMPALA-15?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
Serengeti Distro:
Apache Hadoop:1.0.1
GreenPlum HD:1.1(Apache Hadoop 1.0.0)
CloudEra: CDH3(Apache Hadoop 0.20.2, WebHDFS is not supported in this version)
Hortonworks: 1.0.7 (Apache Hadoop 1.0.2)
步骤如下:
工程结构,如图:
上代码 O(∩_∩)O哈哈~
pom.xml配置如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.yun.hdfs</groupId> <artifactId>hdfs</artifactId> <version>0.0.1-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <appendAssemblyId>false</appendAssemblyId> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.yun.hdfs.WangPan</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>0.20.2</version> <type>jar</type> <scope>compile</scope> </dependency> </dependencies> </project>
WangPan.java 主方法用于调用:
package com.yun.hdfs; import java.io.IOException; public class WangPan { /*** 运行结果描述. */ private static String result = ""; public static void main(String[] args) { try { // 判断命令输入是否正确 if (args[0] != null && !"".equals(args[0]) && args.length > 0) { if ("upload".equals(args[0])) { result = "upload:" + WangPanUtils.uploadFile(args); } else if ("delete".equals(args[0])) { result = "delete:" + WangPanUtils.deleteFile(args); } else if ("query".equals(args[0])) { if (WangPanUtils.listFile(args) == null) { result = "query:fail!"; } else { result = "query:success"; } } else if ("read".equals(args[0])) { result = "read:" + WangPanUtils.readFile(args); } else { System.out.println("sorry,wo have no this service!"); } System.out.println(result); } else { System.out.println("fail!"); System.exit(1); } } catch (IOException e) { e.printStackTrace(); } } }
WangPanUtils.java增删改查:
package com.yun.hdfs; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class WangPanUtils { /** * 上传文件. java -jar /root/hdfs-0.0.1-SNAPSHOT.jar upload /root/test-hdfs.txt * hdfs://hadoopm:9000/user/root/upload/12390po.txt * * @param args * @return * @throws IOException */ public static String uploadFile(String[] args) throws IOException { String loaclSrc = args[1]; String dst = args[2]; if (args.length < 3) { return "fail"; } InputStream in = new BufferedInputStream(new FileInputStream(loaclSrc)); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); OutputStream out = fs.create(new Path(dst)); IOUtils.copyBytes(in, out, 4096, true); return "success"; } /** * 查询文件列表. java -jar /root/hdfs-0.0.1-SNAPSHOT.jar query * hdfs://hadoopm:9000/user/root/ * * @param args * @return * @throws IOException */ public static Path[] listFile(String[] args) throws IOException { if (args.length < 2) { return null; } String dst = args[1]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); FileStatus[] statu = fs.listStatus(new Path(dst)); Path[] listPaths = FileUtil.stat2Paths(statu); return listPaths; } /** * 删除文件. * java -jar /root/hdfs-0.0.1-SNAPSHOT.jar delete hdfs://hadoopm:9000/user/root/upload/12390po.txt * * @param args * @return * @throws IOException */ public static String deleteFile(String[] args) throws IOException { if (args.length < 2) { return "fail"; } String fileName = args[1]; Configuration config = new Configuration(); FileSystem hdfs = FileSystem.get(URI.create(fileName), config); Path path = new Path(fileName); if (!hdfs.exists(path)) { return "fail"; } boolean isDeleted = hdfs.delete(path, false); if (isDeleted) { return "success"; } else { return "fail"; } } /** * 读取文件. * java -jar /root/hdfs-0.0.1-SNAPSHOT.jar read hdfs://hadoopm:9000/user/root/upload/123.txt /root/test-readfile898.txt * * @param args * @return * @throws IOException */ public static String readFile(String[] args) throws IOException { if(args.length < 3){ return "fail"; } String dst = args[1]; String newPath = args[2]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); FSDataInputStream hdfsInStream = fs.open(new Path(dst)); OutputStream out = new FileOutputStream(newPath); byte[] ioBuffer = new byte[1024]; int readLen = hdfsInStream.read(ioBuffer); while (-1 != readLen) { out.write(ioBuffer, 0, readLen); readLen = hdfsInStream.read(ioBuffer); } out.close(); hdfsInStream.close(); fs.close(); return "success"; } } /** * 创建文件夹. * java -jar /root/hdfs-0.0.1-SNAPSHOT.jar mkdir hdfs://hadoopm:9000/user/root/upload/test909 * * @return * @throws IOException */ public static String mkdir(String[] args) throws IOException{ if(args.length < 2){ return "fali"; } String dst = args[1]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); Path path = new Path(dst); if (fs.exists(path)) { return "fail"; } fs.mkdirs(path); return "success"; }
PS:需要注意的是,我们需要把这个工程利用maven打包成一个可运行的jar包,使用如下命令:
执行命令在每个方法注释上写明了,执行效果如下:
还需要访问 http://hadoopm:50070/ -> Browse the filesystem 查看hdfs文件操作是否真的成功