利用HDFS java API增删改查操作
利用HDFS java API增删改查操作
在做这个实验的时候需要特别注意下面三个问题:
1、hdfs安全模式需要关闭 命令:./hadoop dfsadmin -safemode leave
2、工程中依赖的版本必须和集群的一致,否则也会报 version不一致错误
3、hadoop集群用户权限的问题,以及各个目录的作用
目前为什么会有这三个问题的原因待查!!!
未验证目前使用hadoop的版本(release-0.20.0)是否支持webhdfs,反正我是怎么都连接不上啊!!!
从这上面看,0.20.0 可能是不支持的
https://jira.springsource.org/browse/IMPALA-15?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
Serengeti Distro:
Apache Hadoop:1.0.1
GreenPlum HD:1.1(Apache Hadoop 1.0.0)
CloudEra: CDH3(Apache Hadoop 0.20.2, WebHDFS is not supported in this version)
Hortonworks: 1.0.7 (Apache Hadoop 1.0.2)
步骤如下:
工程结构,如图:
上代码 O(∩_∩)O哈哈~
pom.xml配置如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.yun.hdfs</groupId> <artifactId>hdfs</artifactId> <version>0.0.1-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <appendAssemblyId>false</appendAssemblyId> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.yun.hdfs.WangPan</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> <version>0.20.2</version> <type>jar</type> <scope>compile</scope> </dependency> </dependencies> </project>
WangPan.java 主方法用于调用:
package com.yun.hdfs;
import java.io.IOException;
public class WangPan {
/*** 运行结果描述. */
private static String result = "";
public static void main(String[] args) {
try {
// 判断命令输入是否正确
if (args[0] != null && !"".equals(args[0]) && args.length > 0) {
if ("upload".equals(args[0])) {
result = "upload:" + WangPanUtils.uploadFile(args);
} else if ("delete".equals(args[0])) {
result = "delete:" + WangPanUtils.deleteFile(args);
} else if ("query".equals(args[0])) {
if (WangPanUtils.listFile(args) == null) {
result = "query:fail!";
} else {
result = "query:success";
}
} else if ("read".equals(args[0])) {
result = "read:" + WangPanUtils.readFile(args);
} else {
System.out.println("sorry,wo have no this service!");
}
System.out.println(result);
} else {
System.out.println("fail!");
System.exit(1);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
WangPanUtils.java增删改查:
package com.yun.hdfs;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class WangPanUtils {
/**
* 上传文件. java -jar /root/hdfs-0.0.1-SNAPSHOT.jar upload /root/test-hdfs.txt
* hdfs://hadoopm:9000/user/root/upload/12390po.txt
*
* @param args
* @return
* @throws IOException
*/
public static String uploadFile(String[] args) throws IOException {
String loaclSrc = args[1];
String dst = args[2];
if (args.length < 3) {
return "fail";
}
InputStream in = new BufferedInputStream(new FileInputStream(loaclSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
OutputStream out = fs.create(new Path(dst));
IOUtils.copyBytes(in, out, 4096, true);
return "success";
}
/**
* 查询文件列表. java -jar /root/hdfs-0.0.1-SNAPSHOT.jar query
* hdfs://hadoopm:9000/user/root/
*
* @param args
* @return
* @throws IOException
*/
public static Path[] listFile(String[] args) throws IOException {
if (args.length < 2) {
return null;
}
String dst = args[1];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
FileStatus[] statu = fs.listStatus(new Path(dst));
Path[] listPaths = FileUtil.stat2Paths(statu);
return listPaths;
}
/**
* 删除文件.
* java -jar /root/hdfs-0.0.1-SNAPSHOT.jar delete hdfs://hadoopm:9000/user/root/upload/12390po.txt
*
* @param args
* @return
* @throws IOException
*/
public static String deleteFile(String[] args) throws IOException {
if (args.length < 2) {
return "fail";
}
String fileName = args[1];
Configuration config = new Configuration();
FileSystem hdfs = FileSystem.get(URI.create(fileName), config);
Path path = new Path(fileName);
if (!hdfs.exists(path)) {
return "fail";
}
boolean isDeleted = hdfs.delete(path, false);
if (isDeleted) {
return "success";
} else {
return "fail";
}
}
/**
* 读取文件.
* java -jar /root/hdfs-0.0.1-SNAPSHOT.jar read hdfs://hadoopm:9000/user/root/upload/123.txt /root/test-readfile898.txt
*
* @param args
* @return
* @throws IOException
*/
public static String readFile(String[] args) throws IOException {
if(args.length < 3){
return "fail";
}
String dst = args[1];
String newPath = args[2];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
FSDataInputStream hdfsInStream = fs.open(new Path(dst));
OutputStream out = new FileOutputStream(newPath);
byte[] ioBuffer = new byte[1024];
int readLen = hdfsInStream.read(ioBuffer);
while (-1 != readLen) {
out.write(ioBuffer, 0, readLen);
readLen = hdfsInStream.read(ioBuffer);
}
out.close();
hdfsInStream.close();
fs.close();
return "success";
}
}
/**
* 创建文件夹.
* java -jar /root/hdfs-0.0.1-SNAPSHOT.jar mkdir hdfs://hadoopm:9000/user/root/upload/test909
*
* @return
* @throws IOException
*/
public static String mkdir(String[] args) throws IOException{
if(args.length < 2){
return "fali";
}
String dst = args[1];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
Path path = new Path(dst);
if (fs.exists(path)) {
return "fail";
}
fs.mkdirs(path);
return "success";
}
PS:需要注意的是,我们需要把这个工程利用maven打包成一个可运行的jar包,使用如下命令:
执行命令在每个方法注释上写明了,执行效果如下:
还需要访问 http://hadoopm:50070/ -> Browse the filesystem 查看hdfs文件操作是否真的成功



🙄 😯 😯
👿 😐 😀
和我那个同学毕设的代码差不多= =
这是最基本功能,网上挺多的,还得做个web server