-MapReduce连接课件.ppt

上传人:创****公 文档编号:2943218 上传时间:2020-05-31 格式:PPT 页数:45 大小:1.67MB
返回 下载 相关 举报
-MapReduce连接课件.ppt_第1页
第1页 / 共45页
-MapReduce连接课件.ppt_第2页
第2页 / 共45页
点击查看更多>>
资源描述

《-MapReduce连接课件.ppt》由会员分享,可在线阅读,更多相关《-MapReduce连接课件.ppt(45页珍藏版)》请在得力文库 - 分享文档赚钱的网站上搜索。

1、厦门大学数据库实验室MapReduce连接,报告人:李雨倩导师:林子雨2014.07.12,连接简介,MapReduce连接策略,连接,连接是关系运算,可以用于合并关系。,在数据库中,一般是表连接操作;在MapReduce中,连接可以用于合并两个或多个数据集。,例如,用户基本信息和用户活动详情。用户基本信息来自于OLTP数据库。用户活动详情来自于日志文件。,连接的类型,最常用的两个连接类型是内连接和外连接。,内连接比较两个关系中所有的数组,然后生成一个满足连接条件的结果集。,外连接,外连接并不需要两个关系的数组都满足连接条件。在连接条件不满足的时候,外连接可以将一方的数据保留在结果集中。,内连

2、接,左外连接右外连接全连接,连接关系图,连接实例,连接简介,MapReduce连接策略,连接,连接是关系运算,可以用于合并关系。,在数据库中,一般是表连接操作;在MapReduce中,连接可以用于合并两个或多个数据集。,例如,用户基本信息和用户活动详情。用户基本信息来自于OLTP数据库。用户活动详情来自于日志文件。,MapReduce的连接,welcometousethesePowerPointtemplates,NewContentdesign,10yearsexperience,用户的人口统计信息的聚合操作(例如:青少年和中年人的习惯差异)当用户超过一定时间没有使用网站后,发邮件提醒他们。

3、分析用户的浏览习惯,让系统可以提示用户有哪些网站特性还没有使用到,形成一个反馈循环。,MapReduce中的连接策略,重分区连接,复制连接,半连接,reduce端连接。使用场景:连接两个或多个大型数据集。,map端连接。使用场景:待连接的数据集中有一个数据集小到可以完全放在缓存中。,map端连接。使用场景:待连接的数据集中有一个数据集非常大,但同时这个数据集可以被过滤成小到可以放在缓存中。,重分区连接,重分区连接利用MapReduce的排序-合并机制来分组数据。它被实现为使用一个单独的MapReduce任务,并支持多路连接(这里的多路指的是多个数据集)。,Map阶段负责从多个数据集中读取数据,

4、决定每个数据的连接值,将连接值作为输出键。输出值则包含将在reduce阶段被合并的值。,Reduce阶段,一个reducer接收map函数传来的一个输出键的所有输出值,并将数据分为多个分区。在此之后,reducer对所有的分区进行笛卡尔积连接运算,并生成全部的结果集。,在如下示例中,用户数据中有用户姓名,年龄和所在州$cattest-data/ch4/users.txtanne22NYjoe39COalison35NYmike69VAmarie27ORjim21ORbob71CAmary53NYdave36VAdude50CA用户活动日志中有用户姓名,进行的动作,来源IP。这个文件一般都要比用

5、户数据要大得多。$cattest-data/ch4/user-logs.txtjimlogout93.24.237.12mikenew_tweet87.124.79.252bobnew_tweet58.133.120.100mikelogout55.237.104.36jimnew_tweet93.24.237.12marieview_user122.158.130.90,$hadoopfs-puttest-data/ch4/user-logs.txtuser-logs.txt$bin/run.shcom.manning.hip.ch4.joins.improved.SampleMainuse

6、rs.txt,user-logs.txtoutput$hadoopfs-catoutput/part*bob71CAnew_tweet58.133.120.100jim21ORlogout93.24.237.12jim21ORnew_tweet93.24.237.12jim21ORlogin198.184.237.49marie27ORlogin58.133.120.100marie27ORview_user122.158.130.90mike69VAnew_tweet87.124.79.252mike69VAlogout55.237.104.36,重分区连接,过滤(filter)指的是将ma

7、p极端的输入数据中不需要的部分丢弃。投影(project)是关系代数的概念。投影用于减少发送给reducer的字段。,优化重分区连接,传统重分区方法的实现空间效率低下。它需要将连接的所有的输出值都读取到内存中,然后进行多路连接。事实上,如果仅仅将小数据集读取到内存中,然后用小数据集来遍历大数据集,进行连接,这样将更加高效。下图是优化后的重分区连接的流程图。,Map输出的组合键和组合值,上图说明了map输出的组合键和组合值。二次排序将会根据连接键(joinkey)进行分区,但会用整个组合键来进行排序。组合键包括一个标识源数据集(较大或较小)的整形值,因此可以根据这个整形值来保证较小源数据集的值先

8、于较大源数据的值被reducer接收。,优化重分区连接,上图是实现的类图。类图中包含两个部分,一个通用框架和一些类的实现样例。使用这个连接框架需要实现抽象类OptimizedDataJoinMapperBase和OptimizedDataJoinReducerBase。,OptimizedDataJoinMapperBase,protectedabstractTextgenerateInputTag(StringinputFile);protectedabstractbooleanisInputSmaller(StringinputFile);publicvoidconfigure(JobCo

9、nfjob)this.inputFile=job.get(map.input.file);this.inputTag=generateInputTag(this.inputFile);if(isInputSmaller(this.inputFile)smaller=newBooleanWritable(true);outputKey.setOrder(0);elsesmaller=newBooleanWritable(false);outputKey.setOrder(1);,这个类的作用是辨认出较小的数据集,并生成输出键和输出值。Configure方法在mapper创建期调用。Configu

10、re方法的作用之一是标识每一个数据集,让reducer可以区分数据的源数据集。另一个作用是辨认当前的输入数据是否是较小的数据集。,OptimizedDataJoinMapperBase(续),protectedabstractOptimizedTaggedMapOutputgenerateTaggedMapOutput(Objectvalue);protectedabstractStringgenerateGroupKey(Objectkey,OptimizedTaggedMapOutputaRecord);publicvoidmap(Objectkey,Objectvalue,OutputC

11、ollectoroutput,Reporterreporter)throwsIOExceptionOptimizedTaggedMapOutputaRecord=generateTaggedMapOutput(value);if(aRecord=null)return;aRecord.setSmaller(smaller);StringgroupKey=generateGroupKey(aRecord);if(groupKey=null)return;outputKey.setKey(groupKey);output.collect(outputKey,aRecord);,Map方法首先调用自

12、定义的方法(generateTaggedMapOutput)来生成OutputValue对象。这个对象包含了在连接中需要使用的值,和一个标识较大或较小数据集的布尔值。如果map方法可以调用自定义的方法(generateGroupKey)来得到可以在连接中使用的键,那么这个键就作为map的输出键。,OptimizedDataJoinReducerBase,publicvoidreduce(Objectkey,Iteratorvalues,OutputCollectoroutput,Reporterreporter)throwsIOExceptionCompositeKeyk=(Composite

13、Key)key;Listsmaller=newArrayList();while(values.hasNext()Objectvalue=values.next();OptimizedTaggedMapOutputcloned=(OptimizedTaggedMapOutput)value).clone(job);if(cloned.isSmaller().get()smaller.add(cloned);elsejoinAndCollect(k,smaller,cloned,output,reporter);,Map端处理后已经可以保证较小源数据集的值将会先于较大源数据集的值被接收。这里就可

14、以将所有的较小源数据集的值放到缓存中。在开始接收较大源数据集的值的时候,就开始和缓存中的值做连接操作。,OptimizedDataJoinRuducerBase(续),protectedabstractOptimizedTaggedMapOutputcombine(Stringkey,OptimizedTaggedMapOutputvalue1,OptimizedTaggedMapOutputvalue2);privatevoidjoinAndCollect(CompositeKeykey,Listsmaller,OptimizedTaggedMapOutputvalue,OutputColl

15、ectoroutput,Reporterreporter)throwsIOExceptionif(smaller.size()1)OptimizedTaggedMapOutputcombined=combine(key.getKey(),null,value);collect(key,combined,output,reporter);elsefor(OptimizedTaggedMapOutputsmall:smaller)OptimizedTaggedMapOutputcombined=combine(key.getKey(),small,value);collect(key,combin

16、ed,output,reporter);,方法joinAndCollect包含了两个数据集的值,并输出它们。,优化重分区连接实例,例如,需要连接用户详情数据和用户活动日志。,第一步,判断两个数据集中哪一个比较小。对于一般的网站来说,用户详情数据会比较小,用户活动日志会比较大。首先,实现抽象类OptimizedDataJoinMapperBase。这个将在map端被调用。这个类将创建map的输出键和输出值。同时,它还将提示整个框架,当前处理的文件是不是比较小的那个。,Map端实现代码,publicclassSampleMapextendsOptimizedDataJoinMapperBasepr

17、ivatebooleansmaller;OverrideprotectedTextgenerateInputTag(StringinputFile)/tagtherowwithinputfilename(datasource)smaller=inputFile.contains(users.txt);returnnewText(inputFile);OverrideprotectedStringgenGroupKey(Objectkey,OutputValueoutput)returnkey.toString();OverrideprotectedbooleanisInputSmaller(S

18、tringinputFile)returnsmaller;OverrideprotectedOutputValuegenMapOutputValue(Objecto)returnnewTextTaggedOutputValue(Text)o);,Reduce端实现代码,第二步,你需要实现抽象类OptimizedDataJoinReducerBase。它将在reduce端被调用。在这个类中,将从map端传入不同数据集的输出键和输出值,然后返回reduce的输出数组。,publicclassSampleReduceextendsOptimizedDataJoinReducerBaseprivate

19、TextTaggedOutputValueoutput=newTextTaggedOutputValue();privateTexttextOutput=newText();OverrideprotectedOutputValuecombine(Stringkey,OutputValuesmallValue,OutputValuelargeValue)if(smallValue=null|largeValue=null)returnnull;Objectvalues=smallValue.getData(),largeValue.getData();textOutput.set(StringU

20、tils.join(values,t);output.setData(textOutput);returnoutput;,任务的主代码,最后,任务的主代码需要指明InputFormat类,并设置二次排序。,job.setInputFormat(KeyValueTextInputFormat.class);job.setMapOutputKeyClass(CompositeKey.class);job.setMapOutputValueClass(TextTaggedOutputValue.class);job.setOutputKeyClass(Text.class);job.setOutpu

21、tValueClass(Text.class);job.setPartitionerClass(CompositeKeyPartitioner.class);job.setOutputKeyComparatorClass(CompositeKeyComparator.class);job.setOutputValueGroupingComparator(CompositeKeyOnlyComparator.class);,MapReduce中的连接策略,重分区连接,复制连接,半连接,reduce端连接。使用场景:连接两个或多个大型数据集。,map端连接。使用场景:待连接的数据集中有一个数据集小

22、到可以完全放在缓存中。,map端连接。使用场景:待连接的数据集中有一个数据集非常大,但同时这个数据集可以被过滤成小到可以放在缓存中。,复制连接,复制连接得名于它的具体实现:连接中最小的数据集将会被复制到所有的map主机节点。复制连接有一个假设前提:在被连接的数据集中,有一个数据集足够小到可以缓存在内存中。,MapReduce的复制连接的工作原理如下:使用分布式缓存将这个小数据集复制到所有运行map任务的节点。用各个map任务初始化方法将这个小数据集装载到一个哈希表中。逐条用大数据集中的记录遍历这个哈希表,逐个判断是否符合连接条件输出符合连接条件的结果。,复制连接,一个复制连接通用框架,该复制连

23、接框架可以支持任意类型的数据集。这个框架中同样提供了一个优化的小功能:动态监测分布式缓存内容和输入块的大小,并判断哪个更大。如果输入块较小,那么就需要将map的输入块放到内存缓冲中,然后在mapper的cleanup方法中执行连接操作。下图为该框架的类图。并且提供了连接类(GenericReplicatedJoin)的具体实现,假设前提:每个数据文件的第一个标记是连接键。,连接框架的算法,Mapper的setup方法判断在map的输入块和分布式缓存的内容中哪个大。如果分布式缓存的内容比较小,那么它将被装载到内存缓存中。Map函数开始连接操作。如果输入块比较小,map函数将输入块的键值对装载到内

24、存缓存中。Map的cleanup方法将从分布式缓存中读取记录,逐条记录和在内存缓存中的键值对进行连接操作。,GenericReplicatedJoin,以下代码为GenericReplicatedJoin中的setup方法,它是在map的初始化阶段调用的。这个方法判断分布式缓存中的文件和输入块哪个大。如果文件比较小,则将文件装载到HashMap中。,protectedvoidsetup(Contextcontext)throwsIOException,InterruptedExceptiondistributedCacheFiles=DistributedCache.getLocalCache

25、Files(context.getConfiguration();intdistCacheSizes=0;for(PathdistFile:distributedCacheFiles)FiledistributedCacheFile=newFile(distFile.toString();distCacheSizes+=distributedCacheFile.length();if(context.getInputSplit()instanceofFileSplit)FileSplitsplit=(FileSplit)context.getInputSplit();longinputSpli

26、tSize=split.getLength();distributedCacheIsSmaller=(distCacheSizes)reader)addToCache(p);reader.close();,GenericReplicatedJoin(续),以下代码为GenericReplicatedJoin中的Map方法。它将会根据setup方法是否将了分布式缓存的内容装载到内存的缓存中来选择行为。如果分布式缓存的内容被装载到内存中,那么map方法就将输入块的记录和内存中的缓存做连接操作。如果分布式缓存的内容没有被装载到内存中,那么map方法就将输入块的记录装载到内存中,然后在cleanup方

27、法中使用。,protectedvoidmap(Objectkey,Objectvalue,Contextcontext)throwsIOException,InterruptedExceptionPairpair=readFromInputFormat(key,value);if(distributedCacheIsSmaller)joinAndCollect(pair,context);elseaddToCache(pair);,publicvoidjoinAndCollect(Pairp,Contextcontext)throwsIOException,InterruptedExcepti

28、onListcached=cachedRecords.get(p.getKey();if(cached!=null)for(Paircp:cached)Pairresult;if(distributedCacheIsSmaller)result=join(p,cp);elseresult=join(cp,p);if(result!=null)context.write(result.getKey(),result.getData();,publicPairjoin(PairinputSplitPair,PairdistCachePair)StringBuildersb=newStringBui

29、lder();if(inputSplitPair.getData()!=null)sb.append(inputSplitPair.getData();sb.append(t);if(distCachePair.getData()!=null)sb.append(distCachePair.getData();returnnewPair(newText(inputSplitPair.getKey().toString(),newText(sb.toString();,GenericReplicatedJoin(续),当所有的记录都被传输给map方法,MapReduce将会调用cleanup方法

30、。如果分布式缓存的内容比输入块大,连接将会在cleanup中进行。连接的对象是map函数的缓存中的输入块的记录和分布式缓存中的记录。,protectedvoidcleanup(Contextcontext)throwsIOException,InterruptedExceptionif(!distributedCacheIsSmaller)for(PathdistFile:distributedCacheFiles)FiledistributedCacheFile=newFile(distFile.toString();DistributedCacheFileReaderreader=getD

31、istributedCacheReader();reader.init(distributedCacheFile);for(Pairp:(Iterable)reader)joinAndCollect(p,context);reader.close();,GenericReplicatedJoin(续),最后,任务的驱动代码必须指定需要装载到分布式缓存中的文件。以下的代码可以处理一个文件,也可以处理MapReduce输入结果的一个目录。,Configurationconf=newConfiguration();FileSystemfs=smallFilePath.getFileSystem(co

32、nf);FileStatussmallFilePathStatus=fs.getFileStatus(smallFilePath);if(smallFilePathStatus.isDir()for(FileStatusf:fs.listStatus(smallFilePath)if(f.getPath().getName().startsWith(part)DistributedCache.addCacheFile(f.getPath().toUri(),conf);elseDistributedCache.addCacheFile(smallFilePath.toUri(),conf);,

33、MapReduce中的连接策略,重分区连接,复制连接,半连接,reduce端连接。使用场景:连接两个或多个大型数据集。,map端连接。使用场景:待连接的数据集中有一个数据集小到可以完全放在缓存中。,map端连接。使用场景:待连接的数据集中有一个数据集非常大,但同时这个数据集可以被过滤成小到可以放在缓存中。,半连接,假设一个场景,需要连接两个很大的数据集,例如用户日记和OLTP的用户数据。任何一个数据集都不是足够小到可以缓存在mapjob的内存中。如此看来,似乎就不得不应用reduce端的连接了。这时候,可以看一下题目本身:若是一个数据集中有的记录因为无法连接到另一个数据集的记录,将会被移除,还

34、需要将全部数据集放到内存中吗?在这个例子中,在用户日记中的用户仅仅是OLTP用户数据中的用户中的很小的一个项目组。那么就可以从OLTP用户数据中取出存在于用户日记中的那项目组用户的用户数据。如许就可以获得足够小到可以放在内存中的数据集。如许的解决规划就叫做半连接。,例子,¥bin/run.shcom.manning.hip.ch4.joins.semijoin.Mainusers.txtuser-logs.txtoutput¥hadoopfs-lsoutput/user/aholmes/output/filtered/user/aholmes/output/result/user/aholme

35、s/output/unique¥hadoopfs-catoutput/unique/part*bobjimmariemike¥hadoopfs-catoutput/filtered/part*mike69VAmarie27ORjim21ORbob71CA,¥hadoopfs-catoutput/result/part*jimlogout93.24.237.1221ORmikenew_tweet87.124.79.25269VAbobnew_tweet58.133.120.10071CAmikelogout55.237.104.3669VAjimnew_tweet93.24.237.1221OR

36、marieview_user122.158.130.9027ORjimlogin198.184.237.4921ORmarielogin58.133.120.10027OR,半连接的实现3个MapReducejob,半连接的实现job1,第一个MapReducejob的功能是从日记文件中提取出用户名,用这些用户名生成一个用户名唯一的凑集(Set)。这经由过程让map函数履行了用户名的投影操作。然后用reducer输出用户名。为了削减在map阶段和reduce简短之间传输的数据量,就在map任务中采取哈希集来保存用户名,在cleanup办法中输出哈希集的值。,Job1的实现代码,publicst

37、aticclassMapextendsMapperprivateSetkeys=newHashSet();Overrideprotectedvoidmap(Textkey,Textvalue,Contextcontext)throwsIOException,InterruptedExceptionkeys.add(key.toString();Overrideprotectedvoidcleanup(Contextcontext)throwsIOException,InterruptedExceptionTextoutputKey=newText();for(Stringkey:keys)ou

38、tputKey.set(key);context.write(outputKey,NullWritable.get();publicstaticclassReduceextendsReducerOverrideprotectedvoidreduce(Textkey,Iterablevalues,Contextcontext)throwsIOException,InterruptedExceptioncontext.write(key,NullWritable.get();,半连接的实现job2,第二步是一个进行过滤的MapReducejob。目标是从全部用户的用户数据集中移除不存在于日记文件中

39、的用户。这是一个只包含map的功课。它用到了复制连接来缓存存在于日记文件中的用户名,并把它们和全部用户的数据集连接。来自于job1的唯一的数据集要远远小于全部用户的数据集。很自然的就把小的那个数据集放到缓存中了。,复制连接通用框架,GenericReplicatedJoin类是履行连接的类。在该类列表中前三个类是可扩展的,相对应的复制连接的行动也是可定制的。readFromInputFormat方法可以用于任意的输入类型。getDistributedCacheReader办法可以被重载来支持来自于分布式缓存的任意的文件类型。在这一步中的核心是join方法。Join方法将会生成job的输出键和输

40、出值。在默认的实现中,两个数据集的值将会被归并以生成最后的输出值。这里可以把值输出变成来自于用户表的数据。,Job2的实现代码,重载join方法;OverridepublicPairjoin(PairinputSplitPair,PairdistCachePair)returninputSplitPair;把来自于job1的文件放到分布式缓存中;for(FileStatusf:fs.listStatus(uniqueUserStatus)if(f.getPath().getName().startsWith(part)DistributedCache.addCacheFile(f.getPat

41、h().toUri(),conf);在驱动代码中,调用GenericReplicatedJoin类;Jobjob=newJob(conf);job.setJarByClass(ReplicatedFilterJob.class);job.setMapperClass(ReplicatedFilterJob.class);job.setNumReduceTasks(0);job.setInputFormatClass(KeyValueTextInputFormat.class);outputPath.getFileSystem(conf).(outputPath,true);FileInputF

42、ormat.setInputPaths(job,usersPath);FileOutputFormat.setOutputPath(job,outputPath);,半连接的实现job3,在最后一步中,须要将job2生成的已过滤的用户集和原始的用户日记合并了。已过滤的用户集是足够小到可以放到内存中,同样也可以放到分布式缓存中。,这里要再次用到复制连接框架来执行连接。但这次不需调节join方法的行为,因为两个数据集中的数据都要呈现在最后的输出中。,最佳连接策略选择,如果数据集中有一个足够小到可以放到mapper的内存中,那么map端的复制连接就足够了。如果每个数据集都很大,同时其中一个数据集可以在经过一定条件过滤后大幅度地减小,那么半连接将会很有效。如果你无法预处理你的数据,并且数据集大到不能够被缓存,那么你就需要在reducer中使用重分区连接了。,Thankyou!,

展开阅读全文
相关资源
相关搜索

当前位置:首页 > 教育专区 > 大学资料

本站为文档C TO C交易模式,本站只提供存储空间、用户上传的文档直接被用户下载,本站只是中间服务平台,本站所有文档下载所得的收益归上传人(含作者)所有。本站仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。若文档所含内容侵犯了您的版权或隐私,请立即通知得利文库网,我们立即给予删除!客服QQ:136780468 微信:18945177775 电话:18904686070

工信部备案号:黑ICP备15003705号-8 |  经营许可证:黑B2-20190332号 |   黑公网安备:91230400333293403D

© 2020-2023 www.deliwenku.com 得利文库. All Rights Reserved 黑龙江转换宝科技有限公司 

黑龙江省互联网违法和不良信息举报
举报电话:0468-3380021 邮箱:hgswwxb@163.com