第十一周 Spark性能优化的道与术-Spark Streaming-6
SparkStreaming wordcount程序开发
scala
1 | 大家好,下面我们来学习一下Spark中的Spark streaming。针对Spark Streaming,我们主要讲一些基本的用法,因为目前在实时计算领域,flink的应用场景会更多。Spark streaming啊,它是Spark Core API的一种扩展。它可以用于进行大规模、高吞吐量、容错的实时数据流的处理。大家注意这个实时啊,属于近实时。最小可以支持秒级别的实时处理。 |
1 | package com.imooc.spark |


java
1 | package com.imooc.spark; |
SparkStreaming整合Kafka
1 | 下面呢,我们来看一下这个sparkstreaming和kafka的整合。我们的需求是这样的,使用sparkstreaming实时消费kafka中的数据。这种场景也是比较常见的。注意,在这你想使用kafka,我们需要引入对应的一个依赖。它那个依赖是什么呢?到官网来看一下,进到官网点那个。文档SPA人命。你这里面来搜一下啊。就梳里的卡夫卡往下边走。到这块。看到没有,把这个点开右键啊,打开一个新页面。看到没有?你这个SPA命想要和卡夫卡进行交互,想要从卡夫卡里面去消费数据,你需要添加对应的依赖。这个呢是针对0.8.2.1级以上的,这个呢是针对卡夫卡零点十几以上的,那我们用的话肯定要用这个了,要用新一点的啊。那就这个东西。你可以。把它复制过来,到这儿来搜一下。就这个。2.4.3。格拉,02:11。把这个拿回来就可以了。嗯。这样就可以了。那下面呢,我们就来写一下具体的一个代码。swim。卡不卡?bug消费。卡夫卡中的数据。嗯。首先呢,还是希望见。streaming。嗯。context。康复。然后呢,还有一个second。嗯。有没有?我们还使用这个五秒。具体这个时间间隔啊,需要根据你们的业务而定啊。等于你有一个Spark。said master。LOCAL2。set name。嗯。获取消费卡夫卡的数据流。那怎么获取呢?现在我们需要用的这个卡夫卡u。csa。direct。这里面需要传两个泛型参数,three。SH。这里面呢,首先把这个SSC传给他。嗯,所以接下来需要第二个参数。什么那个location。street。这个。它这边会有提示啊。对吧,这几个参数。好使,用它呢,来调一个P开头的这个。就第一个就行。接下来只听下面那个。street。用这个。这个发型呢,还是string?对,这里面你要指定一下topic,对他接触的是一个topic s啊是一个。然后后面呢是一个map map里面传的是卡夫卡的一些参数。这里面这些都是固定写法啊。那下面我们就要呈现这两个。指定。指定卡夫卡的配置信息。好不好?等于一个map。object。我们直接在里面给它初始化就行啊。显得不报错。还有这个topic,在这一个它指定性。嗯。只要topics。注意,在这我们需要传一个。里面呢,可以同时使用多套贝。也就是说,它可以同时从多个topic里面去读取数据,都是可以的。那我们在这一个,我们就写一个就行。那既然把这个参数给它完善一下啊,嗯,首先需要指定卡不卡的。broke地址信息。would strive。B01。9092。零二。9092。039092。嗯。接下来我们需要指定那个K的序列化类型。K点。s Li。展开一个反序变化啊D。Siri。a。对啊。你如果怕拼数的话,可以先把后面这个写。后面的话,我们使用这个class of。spring。size对吧。可以把这个给它复制过去,把这个D改成小写就行了啊。这样也可以。嗯。还有这个value的。序列化类型。嗯。那就把这个改一下就行,改成V。都是死顿类型啊对,这就是咱们前面指定的那个对吧,配合没有一个泛型啊。嗯。那下面来指定那个。消费者ID。就那个YD。胳膊的ID。好,下面呢,再指定一下消费策略。there there。这块呢,只能一个。最后我们来指定一个自动提交。在设置啊。enable。there also?好。这样就可以了。这是一些核心的参数。OK,这样的话就可以从它里面去读取数据啊,这样就可以获取到一个类似于卡夫卡。我在这里面,我们可以把它称为。嗯,这是他们的一个概念啊。这个数据流。那下面我们就可以处理数据了。后面你可以调map啊,map啊这些算子去处理就可以。嗯。这样每次获取到一条数据啊,每次几的一条数据。然后把里面数据迭代出来之后呢,把它封装成一个他报,因为它本身呢是一个record一行记录,那我们在这呢。对的点。先获取的,你们K。然后再获取value。嗯。在这我们就把这个数据打印出来。将数据。印到后来。因为其实你在这只要能获取到数据,你后期你想做map map reduce go是不是都可以呀,对吧,那个就没什么区别了。启动任务。start。等待任务停止。嗯。嗯,好。那下面呢,我们把这个运行起来。但是呢,你发现这块他报错了。看到没有?31行。遇到问题不要怕啊,这个问题我们要排查一下。他说这个类型啊,有点问题。他呢,发现了是一个布尔类型,结果他需要了是一个OB。所以这块的话,你需要在这这样来指定一下。强制执行类型Java点拉点布尔。嗯。这样就可以了啊来执行。好,这样就可以了。那接下来呢,我们来开启一个生产者,往里面写点儿数据。其中一个卡不卡,剩下的我里面写着数据啊。其控制台的生产者。嗯。hello Spark。嗯。看到没有打一回啊。我把它停一下啊。注意。它这个呢,we know,为什么呀。因为现在我们卡卡里面数据啊,其实只有value是没有那个K的啊,所以说你K答出来是no,我们G的那些数据啊,一般都放到value里面啊。就是放到外。这个K的话是为了判定你这个数据到底是放哪个分区里面啊,一般会传一个K。所以说我们那种说法一般是不传的,然后随机分啊。这是没有问题的。那这样的话,我们就可以把那个卡字卡里面数据给他消费出来。是吧,那后面就可以实现你的业务逻辑。OK。那接下来呢,我们使用这个加代码来实现一下。卡夫卡。加了。嗯。把这个注释拿过来。好,首先呢,获取这个streaming context。在指定读取数据的。时间间隔为五秒。嗯。有一个Java。streaming context。好点。这个大家看五秒。嗯。new。said master。logo。嗯。下载APP name。把这个拿过来,嗯。而且这个变量Co。嗯。那接下来我们来获取消费卡夫卡的数据流。还是那个卡不卡。great。direct,那首先SC。后面还是一样的。嗯。嗯。嗯。嗯。嗯。好,接下来是这个。consumer。这个。there。首先是一个topics,还有一个。搞不搞?注意这块啊。你需要指定泛型,你这个泛型写到哪了。我们在SKY面里面是放在这个位置,但是在这里面你写这还是不对的啊,你在写前面。three。W。好。接下来创建这两个啊,把这个topic,还有这个卡夫卡。在线行。指令要读取的。名称。先写这个。three。topics。嗯。挨着。七。那接下来是这个。有一个map。object。好,不搞。下面就往里面添加参数了啊。RI。service。我们俩复制一下吧。这个又是提花。嗯。嗯嗯。第二个呢,是这个K的这个虚化类型。所以这个你别导错包了啊,你要导这个。巴阿巴奇,看到没有,卡布卡点common这个body。name。嗯。嗯嗯。说不爱你。嗯。also。offset。there reet。at。ne。点点commit。好,这样就可以了。of Australia。好,那接下来数数去。嗯。嗯嗯。选一个map,因为一个function。我们最终返回是一个double two。里面是一个string。LW。好,这个就是一个record。我们可以在这直接。你了一个。two,嗯。你告你。six。嗯。我看点160。这样转换成淘宝之后,后期用起来也方便。SP对吧。将数据打印的。启动任务,嗯。start。等待任务停止。嗯。好一场。嗯。嗯。好,这样就可以了,来。把它执行一下。好看没有,这是之前那条数据啊。我们可以再往里面加一条。哈哈哈。可以吧,也是可以的啊。好,这就是Java代码的一个实现。好,那针对Spark命这一块呢,我们暂时就讲到这儿,因为后期大部分的实施计算需求,我们需要使用link去实现了。在这呢,我们是把这个SPA命最常见那个消费卡不卡数据这种案例呢给大家讲一下。 |
scala
1 | package com.imooc.spark |


java
1 | package com.imooc.spark; |