java结合HADOOP集群文件上传下载
对HDFS上的文件进行上传和下载是对集群的基本操作,在《HADOOP权威指南》一书中,对文件的上传和下载都有代码的实例,但是对如何配置HADOOP客户端却是没有讲得很清楚,经过长时间的搜索和调试,总结了一下,如何配置使用集群的方法,以及自己测试可用的对集群上的文件进行操作的程序。首先,需要配置对应的环境变量:
hadoop_HOME="/home/work/tools/java/hadoop-client/hadoop"
forfin$hadoop_HOME/hadoop-*.jar;do
hadoop_CLASSPATH=${hadoop_CLASSPATH}:$f
done
forfin$hadoop_HOME/lib/*.jar;do
hadoop_CLASSPATH=${hadoop_CLASSPATH}:$f
done
hadoopvfs_HOME="/home/work/tools/java/hadoop-client/hadoop-vfs"
forfin$hadoopvfs_HOME/lib/*.jar;do
hadoop_CLASSPATH=${hadoop_CLASSPATH}:$f
done
exportLD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/work/tools/java/hadoop-client/hadoop/lib/native/Linux-amd64-64/
其中LD_LIBRARY_PATH是在调用时需要用到的库的路径,hadoop_CLASSPATH则是我们hadoop客户端里各种jar包
有一点需要注意的是最好不要使用HADOOP_HOME这个变量,这个是一个系统使用的环境变量,最好不要和它冲突
编译类的方法:
javac-classpath$CLASSPATH:$hadoop_CLASSPATHHDFSUtil.java
运行的方法:
java-classpath$CLASSPATH:$hadoop_CLASSPATHHDFSUtil
但是在实际的使用过程中,会报NoPermission之类的错误,或者你能保证代码没有问题的情况下,在运行的时候也会报一些奇奇怪怪的错误
那么问题来了,这是什么鬼?
答案:这是因为没有配置对应集群的配置文件
因为在《HADOOP权威指南》一书中,弱化了配置的东西,所以在具体使用集群的时候就会出现问题,如何解决呢,这样子:
this.conf=newConfiguration(false);
conf.addResource("./hadoop-site.xml");
conf.addResource("./hadoop-default.xml");
conf.set("fs.hdfs.impl",org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());conf.set("fs.file.impl",org.apache.hadoop.fs.LocalFileSystem.class.getName());
为什么会这样,书上只是很简单的:
this.conf=newConfiguration();
那是因为默认你的集群在本地,所以不需要做配置,但是在实际使用的过程中,各个集群的配置是不同的,所以我们要引入集群的配置
这是非常重要的一点,因为实际使用的过程中我们都是使用的HADOOP的客户端,而且是已经搭好环境的集群,所以我们需要做好本地的配置
hadoop-site.xml和hadoop-default.xml这两个文件在所使用的客户端的conf目录下,在addResource的时候指定好目录就行了
将以上所提到的配置,全部配完之后,这个程序才能真正运行起来,所以配置是非常重要的一环。
以下是对应的工具的代码,有兴趣的看一下吧,使用的是文件流的方式来搞的,这样子也可以打通FTP和HDFS之间文件的互传:
importjava.io.BufferedInputStream;
importjava.io.FileInputStream;
importjava.io.FileNotFoundException;
importjava.io.FileOutputStream;
importjava.io.IOException;
importjava.io.InputStream;
importjava.io.OutputStream;
importjava.net.URI;
importjava.net.URL;
importjava.io.*;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.FSDataInputStream;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.IOUtils;
importorg.apache.hadoop.util.Progressable;
publicclassHDFSUtil{
privateStringhdfs_node="";
privateStringhdfs_path="";
privateStringfile_path="";
privateStringhadoop_site="";
privateStringhadoop_default="";
privateConfigurationconf=null;
publicHDFSUtil(Stringhdfs_node){
this.hdfs_node=hdfs_node;
}
publicStringgetHdfsNode(){
returnthis.hdfs_node;
}
publicvoidsetHdfsPath(Stringhdfs_path){
this.hdfs_path=hdfs_path;
}
publicStringgetHdfsPath(){
returnthis.hdfs_path;
}
publicvoidsetFilePath(Stringfile_path){
this.file_path=file_path;
}
publicStringgetFilePath(){
returnthis.file_path;
}
publicvoidsetHadoopSite(Stringhadoop_site){
this.hadoop_site=hadoop_site;
}
publicStringgetHadoopSite(){
returnthis.hadoop_site;
}
publicvoidsetHadoopDefault(Stringhadoop_default){
this.hadoop_default=hadoop_default;
}
publicStringgetHadoopDefault(){
returnthis.hadoop_default;
}
publicintsetConfigure(booleanflag){
if(flag==false){
if(this.getHadoopSite()==""||this.getHadoopDefault()==""){
return-1;
}
else{
this.conf=newConfiguration(false);
conf.addResource(this.getHadoopDefault());
conf.addResource(this.getHadoopSite());
conf.set("fs.hdfs.impl",org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl",org.apache.hadoop.fs.LocalFileSystem.class.getName());
return0;
}
}
this.conf=newConfiguration();
return0;
}
publicConfigurationgetConfigure(){
returnthis.conf;
}
publicintupLoad(StringlocalName,StringremoteName)throwsFileNotFoundException,IOException{
InputStreaminStream=null;
FileSystemfs=null;
try{
inStream=newBufferedInputStream(newFileInputStream(localName));
fs=FileSystem.get(URI.create(this.hdfs_node),this.conf);
OutputStreamoutStream=fs.create(newPath(remoteName),newProgressable(){
publicvoidprogress(){
System.out.print('.');
}
});
IOUtils.copyBytes(inStream,outStream,4096,true);
inStream.close();
return0;
}catch(IOExceptione){
inStream.close();
e.printStackTrace();
return-1;
}
}
publicintupLoad(InputStreaminStream,StringremoteName)throwsFileNotFoundException,IOException{
FileSystemfs=null;
try{
fs=FileSystem.get(URI.create(this.hdfs_node),this.conf);
OutputStreamoutStream=fs.create(newPath(remoteName),newProgressable(){
publicvoidprogress(){
System.out.print('.');
}
});
IOUtils.copyBytes(inStream,outStream,4096,true);
inStream.close();
return0;
}catch(IOExceptione){
inStream.close();
e.printStackTrace();
return-1;
}
}
publicintdonwLoad(StringremoteName,StringlocalName,intlines)throwsFileNotFoundException,IOException{
FileOutputStreamfos=null;
InputStreamReaderisr=null;
BufferedReaderbr=null;
Stringstr=null;
OutputStreamWriterosw=null;
BufferedWriterbuffw=null;
PrintWriterpw=null;
FileSystemfs=null;
InputStreaminStream=null;
try{
fs=FileSystem.get(URI.create(this.hdfs_node+remoteName),this.conf);
inStream=fs.open(newPath(this.hdfs_node+remoteName));
fos=newFileOutputStream(localName);
osw=newOutputStreamWriter(fos,"UTF-8");
buffw=newBufferedWriter(osw);
pw=newPrintWriter(buffw);
isr=newInputStreamReader(inStream,"UTF-8");
br=newBufferedReader(isr);
while((str=br.readLine())!=null&&lines>0){
lines--;
pw.println(str);
}
}catch(IOExceptione){
thrownewIOException("Couldn'twrite.",e);
}finally{
pw.close();
buffw.close();
osw.close();
fos.close();
inStream.close()
}
return0;
}
//maintotest
publicstaticvoidmain(String[]args){
Stringhdfspath=null;
Stringlocalname=null;
Stringhdfsnode=null;
intlines=0;
if(args.length==4){
hdfsnode=args[0];
hdfspath=args[1];
localname=args[2];
lines=Integer.parseInt(args[3]);
}
else{
hdfsnode="hdfs://nj01-nanling-hdfs.dmop.baidu.com:54310";
hdfspath="/app/ps/spider/wdmqa/wangweilong/test/HDFSUtil.java";
localname="/home/work/workspace/project/dhc2-0/dhc/base/ftp/papapa";
lines=5;
}
HDFSUtilhdfsutil=newHDFSUtil(hdfsnode);
hdfsutil.setFilePath(hdfsutil.getHdfsNode()+hdfspath);
hdfsutil.setHadoopSite("./hadoop-site.xml");
hdfsutil.setHadoopDefault("./hadoop-default.xml");
hdfsutil.setConfigure(false);
try{
hdfsutil.donwLoad(hdfspath,localname,lines);
}catch(IOExceptione){
e.printStackTrace();
}
}
如果想要了解FTP上文件的下载,请参考这篇文章:
ftp下载工具
如果想要打通FTP和HDFS文件互传,只要创建一个类,调用这两篇文章中的工具的接口就可以搞定,自己写的代码,实测有效。
以上就是本文的全部内容了,希望能够对大家熟练掌握java有所帮助。
请您花一点时间将文章分享给您的朋友或者留下评论。我们将会由衷感谢您的支持!