Python API 操作Hadoop hdfs详解
http://pyhdfs.readthedocs.io/en/latest/
1:安装
由于是windows环境(linux其实也一样),只要有pip或者setup_install安装起来都是很方便的
>pipinstallhdfs
2:Client——创建集群连接
>fromhdfsimport*
>client=Client("http://s100:50070")
其他参数说明:
classhdfs.client.Client(url,root=None,proxy=None,timeout=None,session=None)
url:ip:端口
root:制定的hdfs根目录
proxy:制定登陆的用户身份
timeout:设置的超时时间
session:连接标识
client=Client("http://127.0.0.1:50070",root="/",timeout=100,session=False)
>>>client.list("/")
[u'home',u'input',u'output',u'tmp']
3:dir——查看支持的方法
>dir(client)
4:status——获取路径的具体信息
其他参数:
status(hdfs_path,strict=True)
hdfs_path:就是hdfs路径
strict:设置为True时,如果hdfs_path路径不存在就会抛出异常,如果设置为False,如果路径为不存在,则返回None
5:list——获取指定路径的子目录信息
>client.list("/")
[u'home',u'input',u'output',u'tmp']
其他参数:
list(hdfs_path,status=False)
status:为True时,也返回子目录的状态信息,默认为Flase
6:makedirs——创建目录
>client.makedirs("/123")
其他参数:makedirs(hdfs_path,permission=None)
permission:设置权限
>client.makedirs("/test",permission=777)
7:rename—重命名
>client.rename("/123","/test")
8:delete—删除
>client.delete("/test")
其他参数:
delete(hdfs_path,recursive=False)
recursive:删除文件和其子目录,设置为False如果不存在,则会抛出异常,默认为False
9:upload——上传数据
>client.upload("/test","F:\[PPT]GoogleProtocolBuffers.pdf");
其他参数:
upload(hdfs_path,local_path,overwrite=False,n_threads=1,temp_dir=None,
chunk_size=65536,progress=None,cleanup=True,**kwargs)
overwrite:是否是覆盖性上传文件
n_threads:启动的线程数目
temp_dir:当overwrite=true时,远程文件一旦存在,则会在上传完之后进行交换
chunk_size:文件上传的大小区间
progress:回调函数来跟踪进度,为每一chunk_size字节。它将传递两个参数,文件上传的路径和传输的字节数。一旦完成,-1将作为第二个参数
cleanup:如果在上传任何文件时发生错误,则删除该文件
10:download——下载
>client.download("/test/NOTICE.txt","/home")
11:read——读取文件
withclient.read("/test/[PPT]GoogleProtocolBuffers.pdf")asreader:
printreader.read()
其他参数:
read(*args,**kwds)
hdfs_path:hdfs路径
offset:设置开始的字节位置
length:读取的长度(字节为单位)
buffer_size:用于传输数据的字节的缓冲区的大小。默认值设置在HDFS配置。
encoding:制定编码
chunk_size:如果设置为正数,上下文管理器将返回一个发生器产生的每一chunk_size字节而不是一个类似文件的对象
delimiter:如果设置,上下文管理器将返回一个发生器产生每次遇到分隔符。此参数要求指定的编码。
progress:回调函数来跟踪进度,为每一chunk_size字节(不可用,如果块大小不是指定)。它将传递两个参数,文件上传的路径和传输的字节数。称为一次与-1作为第二个参数。
问题:
1.
hdfs.util.HdfsError:Permissiondenied:user=dr.who,access=WRITE,inode="/test":root:supergroup:drwxr-xr-x
解决办法是:在配置文件hdfs-site.xml中加入
dfs.permissions false
/usr/local/hadoop-2.6.4/bin/hadoopjar/usr/local/hadoop-2.6.4/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar\-input<输入目录>\#可以指定多个输入路径,例如:-input'/user/foo/dir1'-input'/user/foo/dir2'
-inputformat<输入格式JavaClassName>\-output<输出目录>\-outputformat<输出格式JavaClassName>\-mapper
\-reducer \-combiner \-partitioner \-cmdenv \#可以传递环境变量,可以当作参数传入到任务中,可以配置多个
-file<依赖的文件>\#配置文件,字典等依赖
-D
Map.py:
#!/usr/local/bin/python importsys forlineinsys.stdin: ss=line.strip().split('') forsinss: ifs.strip()!="": print"%s\t%s"%(s,1)
Reduce.py:
#!/usr/local/bin/python importsys current_word=None count_pool=[] sum=0 forlineinsys.stdin: word,val=line.strip().split('\t') ifcurrent_word==None: current_word=word ifcurrent_word!=word: forcountincount_pool: sum+=count print"%s\t%s"%(current_word,sum) current_word=word count_pool=[] sum=0 count_pool.append(int(val)) forcountincount_pool: sum+=count print"%s\t%s"%(current_word,str(sum))
Run.sh: HADOOP_CMD="/data/hadoop-2.7.0/bin/hadoop" STREAM_JAR_PATH="/data/hadoop-2.7.0/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar" INPUT_FILE_PATH_1="/The_Man_of_Property.txt" OUTPUT_PATH="/output" $HADOOP_CMDfs-rmr-skipTrash$OUTPUT_PATH #Step1. $HADOOP_CMDjar$STREAM_JAR_PATH\ -input$INPUT_FILE_PATH_1\ -output$OUTPUT_PATH\ -mapper"pythonmap.py"\ -reducer"pythonred.py"\ -file./map.py\ -file./red.py
目的:通过python模拟mr,计算每年的最高气温。
1.查看数据文件,需要截取年份和气温,生成key-value对。
[tianyc@TeletekHbasepython]$cattest.dat 0067011990999991950051507004...9999999N9+00001+99999999999... 0043011990999991950051512004...9999999N9+00221+99999999999... 0043011990999991950051518004...9999999N9-00111+99999999999... 0043012650999991949032412004...0500001N9+01111+99999999999... 0043012650999991949032418004...0500001N9+00781+99999999999...
2.编写map,打印key-value对
[tianyc@TeletekHbasepython]$catmap.py importre importsys forlineinsys.stdin: val=line.strip() (year,temp)=(val[15:19],val[40:45]) print"%s\t%s"%(year,temp) [tianyc@TeletekHbasepython]$cattest.dat|pythonmap.py 1950+0000 1950+0022 1950-0011 1949+0111 1949+0078
3.将结果排序
[tianyc@TeletekHbasepython]$cattest.dat|pythonmap.py|sort 1949+0078 1949+0111 1950+0000 1950-0011 1950+0022
4.编写redurce,对map中间结果进行处理,生成最终结果
[tianyc@TeletekHbasepython]$catred.py importsys (last_key,max_val)=(None,0) forlineinsys.stdin: (key,val)=line.strip().split('\t') iflast_keyandlast_key!=key: print'%s\t%s'%(last_key,max_val) (last_key,max_val)=(key,int(val)) else: (last_key,max_val)=(key,max(max_val,int(val))) iflast_key: print'%s\t%s'%(last_key,max_val)
5.执行。
[tianyc@TeletekHbasepython]$cattest.dat|pythonmap.py|sort|pythonred.py 1949111 195022
使用python语言进行MapReduce程序开发主要分为两个步骤,一是编写程序,二是用HadoopStreaming命令提交任务。
还是以词频统计为例
一、程序开发
1、Mapper
forlineinsys.stdin: filelds=line.strip.split('') foriteminfileds: printitem+''+'1'
2、Reducer
importsys result={} forlineinsys.stdin: kvs=line.strip().split('') k=kvs[0] v=kvs[1] ifkinresult: result[k]+=1 else: result[k]=1 fork,vinresult.items(): printk+''+v ....
写完发现其实只用map就可以处理了...reduce只用cat就好了
3、运行脚本
1)Streaming简介
Hadoop的MapReduce和HDFS均采用Java进行实现,默认提供Java编程接口,用户通过这些编程接口,可以定义map、reduce函数等等。
但是如果希望使用其他语言编写map、reduce函数怎么办呢?
Hadoop提供了一个框架Streaming,Streaming的原理是用Java实现一个包装用户程序的MapReduce程序,该程序负责调用hadoop提供的Java编程接口。
2)运行命令
/.../bin/hadoopstreaming -input/..../input -output/..../output -mapper"mapper.py" -reducer"reducer.py" -filemapper.py -filereducer.py -Dmapred.job.name="wordcount" -Dmapred.reduce.tasks="1"
3)Streaming常用命令
(1)-input
(2)-output
(3)-mapper:指定mapper可执行程序或Java类,必须指定且唯一。
(4)-reducer:指定reducer可执行程序或Java类,必须指定且唯一。
(5)-file,-cacheFile,-cacheArchive:分别用于向计算节点分发本地文件、HDFS文件和HDFS压缩文件,具体使用方法参考文件分发与打包。
(6)numReduceTasks:指定reducer的个数,如果设置-numReduceTasks0或者-reducerNONE则没有reducer程序,mapper的输出直接作为整个作业的输出。
(7)-jobconf|-DNAME=VALUE:指定作业参数,NAME是参数名,VALUE是参数值,可以指定的参数参考hadoop-default.xml。
-jobconfmapred.job.name='MyJobName'设置作业名
-jobconfmapred.job.priority=VERY_HIGH|HIGH|NORMAL|LOW|VERY_LOW设置作业优先级
-jobconfmapred.job.map.capacity=M设置同时最多运行M个map任务
-jobconfmapred.job.reduce.capacity=N设置同时最多运行N个reduce任务
-jobconfmapred.map.tasks设置map任务个数
-jobconfmapred.reduce.tasks设置reduce任务个数
-jobconfmapred.compress.map.output设置map的输出是否压缩
-jobconfmapred.map.output.compression.codec设置map的输出压缩方式
-jobconfmapred.output.compress设置reduce的输出是否压缩
-jobconfmapred.output.compression.codec设置reduce的输出压缩方式
-jobconfstream.map.output.field.separator设置map输出分隔符
例子:
-Dstream.map.output.field.separator=:\以冒号进行分隔
-Dstream.num.map.output.key.fields=2\指定在第二个冒号处进行分隔,也就是第二个冒号之前的作为key,之后的作为value
(8)-combiner:指定combinerJava类,对应的Java类文件打包成jar文件后用-file分发。
(9)-partitioner:指定partitionerJava类,Streaming提供了一些实用的partitioner实现,参考KeyBasedFiledPartitoner和IntHashPartitioner。
(10)-inputformat,-outputformat:指定inputformat和outputformatJava类,用于读取输入数据和写入输出数据,分别要实现InputFormat和OutputFormat接口。如果不指定,默认使用TextInputFormat和TextOutputFormat。
(11)cmdenvNAME=VALUE:给mapper和reducer程序传递额外的环境变量,NAME是变量名,VALUE是变量值。
(12)-mapdebug,-reducedebug:分别指定mapper和reducer程序失败时运行的debug程序。
(13)-verbose:指定输出详细信息,例如分发哪些文件,实际作业配置参数值等,可以用于调试。
以上这篇PythonAPI操作Hadoophdfs详解就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。