大数据开发工程师-第十一周 Spark性能优化的道与术-Spark Streaming-6


第十一周 Spark性能优化的道与术-Spark Streaming-6

SparkStreaming wordcount程序开发

scala

1
2
3
4
5
大家好,下面我们来学习一下Spark中的Spark streaming。针对Spark Streaming,我们主要讲一些基本的用法,因为目前在实时计算领域,flink的应用场景会更多。Spark streaming啊,它是Spark Core API的一种扩展。它可以用于进行大规模、高吞吐量、容错的实时数据流的处理。大家注意这个实时啊,属于近实时。最小可以支持秒级别的实时处理。

那spark streaming的工作原理呢?是这样的。它呢会接收实时输入的数据流,然后呢,将数据啊拆分成多个Batch。比如呢,每收集一秒数据给它封装为一个batch,然后将每个batch呢交给这个spark计算引擎进行处理。最后呢,会产生出一个结果数据流。这个结果数据流里面数据呢,也是由一个一个的batch所组成的,所以说呢,spark streaming的实时处理,其实呢,就是一小批一小批的处理。那下面呢,我们就来开发一个spark streaming的实时wordcount程序来感受一下。

现在我们来创建一个项目。it。点击这个auto。把它那个基本环境再配置一下,因为我们在这呢,也要写了一个SC,所以说在这点右键。到这儿。在这下面创建一个GALA。对吧,然后注意右键把它视为S对吧。接下来到这个depends里面,注意添加那个scholar的SDK,注意针对这个我们需要添加02:11的。因为我们使用的那个卡夫卡集群,它那个是02:11编译的啊。那个SC的版本,所以说呢,在这儿使用02:11。好,这样就行了。那下面呢,我们需要找一下它对应的一个依赖,SPA,人命的依赖。现在我们来说一下。就那个。用02:11的,注意我们之前用的是2.4.3这个版本。S。把这个除掉是吧。好。那这样基本环境就OK了。下面呢,我们来开发这个word的程序,在这呢,先建一个package。I1克点。八。the stream count。需求呢,是这样的。通过socket。模拟产生数据。实时计算数据中。单词出现的次数。这个没方法。好,那在这注意,我们需要先创建一个streaming。context。然后呢,指定数据处理。间隔。所以五秒吧。因为我们前面说了,你SPA死命,他这个实时处理,其实还是一小批一小批的处理,所以说你需要指定它这个一小批这个间隔是多少秒。那现在我们直接利用一个streaming。这边呢,首先传一个。配一样康复。注意第二个呢,才是这个距离的时间叫。五秒。SSC吧。注意,那我们在上面来创建这个。mark。康复配置对象。嗯。嗯。了,我们现在本意来执行。注意咱们之前啊,开发这个发个离线代码的时候,我们呢,穿的都是logo对吧,注意这时候呢。你需要这样来写LOGO2。什么意思呢?所以。止住了。LOCAL2。表示启动两个进程。一个进程。否则读取。数据源的数据一个进程。负责处理数据。ABB name。好,这样就行了。嗯。好,接下来我们来通过socket。获取实时产生的数据。B04端口9001。这个可以叫SRD。是吧,这里面也是RDD啊。下面我们就对接收到的数据使用。空格进行切割。转换成单个单词。改个e lines RD。第二,find map。加你的SP。按空格切就行啊。这样返回的就是wasd是里面呢包含了每个单词。这样把每个单词。转换成。淘宝兔的形式啊。what map对吧?这个RD。下面来执行reduce。BYK操作啊,所以基于K进行求和。嗯。it is by k。嗯。有我。啊。下面呢,将这个结果数据打印到控制台。嗯。认识。对吧,你看这个代码是不是也和咱们前面写那个link代码很像呀。启动任务。嗯。注意它下面这种写法不太一样,和那个SPA离线写法都不一样啊。等待任务形式啊。嗯。啊。好,这样的话你就可以开启一个发达人命实时流处理程序了。那我们在这呢,把这个socket给它打开。来执行。这个没事啊,还是那个when you choose啊,这个不用管了。把这个日志清一下,好,下面注意在这我来输入点数据。右。the me。回来看到没有, hello2161。只要说你输入的那两项数据在它的一个时间段之内,对吧,在五秒之内,它其实就把它切到一块儿了。这就可以啊。所以说呢,你可以这样理解,它相当于是每隔五秒把前五秒的数据给你封装成一个batch。然后后面呢,其实执行的就类似于Spark核心的那个代码Spark I的对吧。其实就是离线的那一套。它前面的话是按照时间去切这个小批,后面的话把你一小批一小批去处理,这样的话可以达到一个进食时的一个效果啊,所以说这个就是方向十命它的一个执行的原理。好,这是实现,接下来呢,我们使用加来实现一下。键package。SPA。world can't。加。把这个需求拿过来。那下面呢,是一个密方法,好,那接下注意首先还是要获取这套什么使命contact这些东西。先获取。创建。sten。啊。所以这里面你去Java这面你要获取这个。Java。streaming。context。嗯。后面呢,传的还是一个。时间。R。u减。second。嗯嗯。SIC。那上面还是要创建这个SPA配对项啊。嗯。but。嗯。master。LOCAL2。name。好,这样也可以。注意这块报错。对,报错了,一般是你那个包引错了。你可以看一下,把鼠标放到这个上面,你看。说什么这是什么Java FX里面什么?这是有问题的。对吧,我们用的话肯定是用Spark里面。对,其实啊,你这后面是少了一个S啊。嗯。这样也可以。你看这个时候用的是发使命里面的。好。下面是通过。获取实时产生的数据。soirit。这个点零四。等零一。来阿。对接收到的数据使用空格进行切割。转换成。三个单词。嗯。哪一点find map?对,那这里面的话,我们就需要写一个函数了啊。你有一个map function。然后要返回一个swim。我们就不把那个map的代码也写进去啊。这样的话,它是一个。我可以这样来直接来写ari。there as list,它里面呢,直接line there。后面做。直接。这样就可以啊,因为它最终返回一个联系啊。这种写法啊,它返回的速度,这样把这个速度转成list,再把它转成这个就可以。我咋?那接下来是把每个。单词转换。喂。double two的形式。我1MAP。所以呢,这里面也是需要写一个函数的啊。你有一个T。嗯。这个呢是in。嗯。因为你最终要法是一个P2列,这就淘宝里面第一列,淘宝里面第二列。new。double two。这呢,其实就是一个over了,把名字改一下啊,看起来清晰点。war。一。这个呢,叫派。安你。接下来实行。YK。嗯。reduce。function。I1 I2。这个呢,就叫word count。最后将结果数据啊引到。台。我们直接使用那个不好1D啊。这里面呢,我们给它传一个你一个VID方。这里面其实也好办,这个呢,就是一个。higher。不好意思。然后这里面的话,再给它传一个VD方式。对,这个就是具体那个他。嗯。嗯。相量二。这样的话就可以把里面这些数据啊,给它迭代出来。然后呢,就剩下最后这个。行任务。start。还有一个,等待任务停止。好一场,好一起。嗯。嗯。好,这就可以了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.imooc.spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* 需求:通过socket模拟产生数据,实时计算数据中单词出现的次数
* Created by xuwei
*/
object StreamWordCountScala {
def main(args: Array[String]): Unit = {
//创建SparkConf配置对象
val conf = new SparkConf()
//注意:此处的local[2]表示启动2个进程,一个进程负责读取数据源的数据,一个进程负责处理数据
.setMaster("local[2]")
.setAppName("StreamWordCountScala")

//创建StreamingContext,指定数据处理间隔为5秒
val ssc = new StreamingContext(conf, Seconds(5))

//通过socket获取实时产生的数据
val linesRDD = ssc.socketTextStream("bigdata04", 9001)

//对接收到的数据使用空格进行切割,转换成单个单词
val wordsRDD = linesRDD.flatMap(_.split(" "))

//把每个单词转换成tuple2的形式
val tupRDD = wordsRDD.map((_, 1))

//执行reduceByKey操作
val wordcountRDD = tupRDD.reduceByKey(_ + _)

//将结果数据打印到控制台
wordcountRDD.print()

//启动任务
ssc.start()
//等待任务停止
ssc.awaitTermination()
}
}

image-20230423224530561

image-20230423224547229

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.imooc.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

/**
* 需求:通过socket模拟产生数据,实时计算数据中单词出现的次数
* Created by xuwei
*/
public class StreamWordCountJava {
public static void main(String[] args) throws Exception{
//创建SparkConf配置对象
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("StreamWordCountJava");

//创建StreamingContext
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));

//通过socket获取实时产生的数据
JavaReceiverInputDStream<String> linesRDD = ssc.socketTextStream("bigdata04", 9001);

//对接收到的数据使用空格进行切割,转换成单个单词
JavaDStream<String> wordsRDD = linesRDD.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(" ")).iterator();
}
});

//把每个单词转换为tuple2的形式
JavaPairDStream<String, Integer> pairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});

//执行reduceByKey操作
JavaPairDStream<String, Integer> wordCountRDD = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
//将结果数据打印到控制台
wordCountRDD.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
public void call(JavaPairRDD<String, Integer> pair) throws Exception {
pair.foreach(new VoidFunction<Tuple2<String, Integer>>() {
public void call(Tuple2<String, Integer> tup) throws Exception {
System.out.println(tup._1+"---"+tup._2);
}
});
}
});

//启动任务
ssc.start();
//等待任务停止
ssc.awaitTermination();
}
}

SparkStreaming整合Kafka

1
下面呢,我们来看一下这个sparkstreaming和kafka的整合。我们的需求是这样的,使用sparkstreaming实时消费kafka中的数据。这种场景也是比较常见的。注意,在这你想使用kafka,我们需要引入对应的一个依赖。它那个依赖是什么呢?到官网来看一下,进到官网点那个。文档SPA人命。你这里面来搜一下啊。就梳里的卡夫卡往下边走。到这块。看到没有,把这个点开右键啊,打开一个新页面。看到没有?你这个SPA命想要和卡夫卡进行交互,想要从卡夫卡里面去消费数据,你需要添加对应的依赖。这个呢是针对0.8.2.1级以上的,这个呢是针对卡夫卡零点十几以上的,那我们用的话肯定要用这个了,要用新一点的啊。那就这个东西。你可以。把它复制过来,到这儿来搜一下。就这个。2.4.3。格拉,02:11。把这个拿回来就可以了。嗯。这样就可以了。那下面呢,我们就来写一下具体的一个代码。swim。卡不卡?bug消费。卡夫卡中的数据。嗯。首先呢,还是希望见。streaming。嗯。context。康复。然后呢,还有一个second。嗯。有没有?我们还使用这个五秒。具体这个时间间隔啊,需要根据你们的业务而定啊。等于你有一个Spark。said master。LOCAL2。set name。嗯。获取消费卡夫卡的数据流。那怎么获取呢?现在我们需要用的这个卡夫卡u。csa。direct。这里面需要传两个泛型参数,three。SH。这里面呢,首先把这个SSC传给他。嗯,所以接下来需要第二个参数。什么那个location。street。这个。它这边会有提示啊。对吧,这几个参数。好使,用它呢,来调一个P开头的这个。就第一个就行。接下来只听下面那个。street。用这个。这个发型呢,还是string?对,这里面你要指定一下topic,对他接触的是一个topic s啊是一个。然后后面呢是一个map map里面传的是卡夫卡的一些参数。这里面这些都是固定写法啊。那下面我们就要呈现这两个。指定。指定卡夫卡的配置信息。好不好?等于一个map。object。我们直接在里面给它初始化就行啊。显得不报错。还有这个topic,在这一个它指定性。嗯。只要topics。注意,在这我们需要传一个。里面呢,可以同时使用多套贝。也就是说,它可以同时从多个topic里面去读取数据,都是可以的。那我们在这一个,我们就写一个就行。那既然把这个参数给它完善一下啊,嗯,首先需要指定卡不卡的。broke地址信息。would strive。B01。9092。零二。9092。039092。嗯。接下来我们需要指定那个K的序列化类型。K点。s Li。展开一个反序变化啊D。Siri。a。对啊。你如果怕拼数的话,可以先把后面这个写。后面的话,我们使用这个class of。spring。size对吧。可以把这个给它复制过去,把这个D改成小写就行了啊。这样也可以。嗯。还有这个value的。序列化类型。嗯。那就把这个改一下就行,改成V。都是死顿类型啊对,这就是咱们前面指定的那个对吧,配合没有一个泛型啊。嗯。那下面来指定那个。消费者ID。就那个YD。胳膊的ID。好,下面呢,再指定一下消费策略。there there。这块呢,只能一个。最后我们来指定一个自动提交。在设置啊。enable。there also?好。这样就可以了。这是一些核心的参数。OK,这样的话就可以从它里面去读取数据啊,这样就可以获取到一个类似于卡夫卡。我在这里面,我们可以把它称为。嗯,这是他们的一个概念啊。这个数据流。那下面我们就可以处理数据了。后面你可以调map啊,map啊这些算子去处理就可以。嗯。这样每次获取到一条数据啊,每次几的一条数据。然后把里面数据迭代出来之后呢,把它封装成一个他报,因为它本身呢是一个record一行记录,那我们在这呢。对的点。先获取的,你们K。然后再获取value。嗯。在这我们就把这个数据打印出来。将数据。印到后来。因为其实你在这只要能获取到数据,你后期你想做map map reduce go是不是都可以呀,对吧,那个就没什么区别了。启动任务。start。等待任务停止。嗯。嗯,好。那下面呢,我们把这个运行起来。但是呢,你发现这块他报错了。看到没有?31行。遇到问题不要怕啊,这个问题我们要排查一下。他说这个类型啊,有点问题。他呢,发现了是一个布尔类型,结果他需要了是一个OB。所以这块的话,你需要在这这样来指定一下。强制执行类型Java点拉点布尔。嗯。这样就可以了啊来执行。好,这样就可以了。那接下来呢,我们来开启一个生产者,往里面写点儿数据。其中一个卡不卡,剩下的我里面写着数据啊。其控制台的生产者。嗯。hello Spark。嗯。看到没有打一回啊。我把它停一下啊。注意。它这个呢,we know,为什么呀。因为现在我们卡卡里面数据啊,其实只有value是没有那个K的啊,所以说你K答出来是no,我们G的那些数据啊,一般都放到value里面啊。就是放到外。这个K的话是为了判定你这个数据到底是放哪个分区里面啊,一般会传一个K。所以说我们那种说法一般是不传的,然后随机分啊。这是没有问题的。那这样的话,我们就可以把那个卡字卡里面数据给他消费出来。是吧,那后面就可以实现你的业务逻辑。OK。那接下来呢,我们使用这个加代码来实现一下。卡夫卡。加了。嗯。把这个注释拿过来。好,首先呢,获取这个streaming context。在指定读取数据的。时间间隔为五秒。嗯。有一个Java。streaming context。好点。这个大家看五秒。嗯。new。said master。logo。嗯。下载APP name。把这个拿过来,嗯。而且这个变量Co。嗯。那接下来我们来获取消费卡夫卡的数据流。还是那个卡不卡。great。direct,那首先SC。后面还是一样的。嗯。嗯。嗯。嗯。嗯。好,接下来是这个。consumer。这个。there。首先是一个topics,还有一个。搞不搞?注意这块啊。你需要指定泛型,你这个泛型写到哪了。我们在SKY面里面是放在这个位置,但是在这里面你写这还是不对的啊,你在写前面。three。W。好。接下来创建这两个啊,把这个topic,还有这个卡夫卡。在线行。指令要读取的。名称。先写这个。three。topics。嗯。挨着。七。那接下来是这个。有一个map。object。好,不搞。下面就往里面添加参数了啊。RI。service。我们俩复制一下吧。这个又是提花。嗯。嗯嗯。第二个呢,是这个K的这个虚化类型。所以这个你别导错包了啊,你要导这个。巴阿巴奇,看到没有,卡布卡点common这个body。name。嗯。嗯嗯。说不爱你。嗯。also。offset。there reet。at。ne。点点commit。好,这样就可以了。of Australia。好,那接下来数数去。嗯。嗯嗯。选一个map,因为一个function。我们最终返回是一个double two。里面是一个string。LW。好,这个就是一个record。我们可以在这直接。你了一个。two,嗯。你告你。six。嗯。我看点160。这样转换成淘宝之后,后期用起来也方便。SP对吧。将数据打印的。启动任务,嗯。start。等待任务停止。嗯。好一场。嗯。嗯。好,这样就可以了,来。把它执行一下。好看没有,这是之前那条数据啊。我们可以再往里面加一条。哈哈哈。可以吧,也是可以的啊。好,这就是Java代码的一个实现。好,那针对Spark命这一块呢,我们暂时就讲到这儿,因为后期大部分的实施计算需求,我们需要使用link去实现了。在这呢,我们是把这个SPA命最常见那个消费卡不卡数据这种案例呢给大家讲一下。

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.imooc.spark

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* Spark 消费Kafka中的数据
* Created by xuwei
*/
object StreamKafkaScala {
def main(args: Array[String]): Unit = {
//创建StreamingContext
val conf = new SparkConf().setMaster("local[2]").setAppName("StreamKafkaScala")
val ssc = new StreamingContext(conf, Seconds(5))

//指定Kafka的配置信息
val kafkaParams = Map[String,Object](
//kafka的broker地址信息
"bootstrap.servers"->"bigdata01:9092,bigdata02:9092,bigdata03:9092",
//key的序列化类型
"key.deserializer"->classOf[StringDeserializer],
//value的序列化类型
"value.deserializer"->classOf[StringDeserializer],
//消费者组id
"group.id"->"con_2",
//消费策略
"auto.offset.reset"->"latest",
//自动提交offset
"enable.auto.commit"->(true: java.lang.Boolean)
)
//指定要读取的topic的名称
val topics = Array("t1")

//获取消费kafka的数据流
val kafkaDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)

//处理数据
kafkaDStream.map(record=>(record.key(),record.value()))
//将数据打印到控制台
.print()

//启动任务
ssc.start()
//等待任务停止
ssc.awaitTermination()
}
}

image-20230423225935601

image-20230423225949397

java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.imooc.spark;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.HashMap;

/**
* Spark 消费Kafka中的数据
* Created by xuwei
*/
public class StreamKafkaJava {
public static void main(String[] args) throws Exception{
//创建StreamingContext,指定读取数据的时间间隔为5秒
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("StreamKafkaJava");
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));

//指定kafka的配置信息
HashMap<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers","bigdata01:9092,bigdata02:9092,bigdata03:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class.getName());
kafkaParams.put("value.deserializer", StringDeserializer.class.getName());
kafkaParams.put("group.id","con_2");
kafkaParams.put("auto.offset.reset","latest");
kafkaParams.put("enable.auto.commit",true);


//指定要读取的topic名称
ArrayList<String> topics = new ArrayList<String>();
topics.add("t1");

//获取消费kafka的数据流
JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);

//处理数据
kafkaStream.map(new Function<ConsumerRecord<String, String>, Tuple2<String,String>>() {
public Tuple2<String, String> call(ConsumerRecord<String, String> record)
throws Exception {
return new Tuple2<String, String>(record.key(),record.value());
}
}).print();//将数据打印到控制台

//启动任务
ssc.start();
//等待任务停止
ssc.awaitTermination();
}
}

本文标题:大数据开发工程师-第十一周 Spark性能优化的道与术-Spark Streaming-6

文章作者:TTYONG

发布时间:2023年04月23日 - 22:04

最后更新:2023年04月26日 - 18:04

原始链接:http://tianyong.fun/%E5%A4%A7%E6%95%B0%E6%8D%AE%E5%BC%80%E5%8F%91%E5%B7%A5%E7%A8%8B%E5%B8%88-%E7%AC%AC%E5%8D%81%E4%B8%80%E5%91%A8-Spark%E6%80%A7%E8%83%BD%E4%BC%98%E5%8C%96%E7%9A%84%E9%81%93%E4%B8%8E%E6%9C%AF-6.html

许可协议: 转载请保留原文链接及作者。

多少都是爱
0%