博客
关于我
重要 | Spark分区并行度决定机制
阅读量:410 次
发布时间:2019-03-06

本文共 1838 字,大约阅读时间需要 6 分钟。

最近,我注意到有小伙伴在公众号留言,反映一个普遍的问题:他们在使用Spark处理不同文件源时,打印的分区数不一致。此外,他们提到spark.default.parallelism参数似乎并不总是起作用。虽然之前的文章已经有所涉及,但为了更深入地理解这一现象,我决定从分区决定机制和并行度决定机制两个方面进行全面分析。

首先,理解Spark如何在加载不同数据源时决定分区数是关键。对于通过HDFS加载数据生成的RDD,其分区数主要由HDFS的块划分机制决定。通常情况下,每个HDFS块对应一个分区。如果文件不可划分,则一个文件对应一个分区。这种情况下,分区数是固定的。

其次,通过SparkContext的parallelize方法或makeRDD生成的RDD的分区数可以在方法调用时显式指定。如果未指定,默认会参考spark.default.parallelism参数来确定分区数。这一参数的默认值通常是math.max(totalCoreCount.get(), 2),其中totalCoreCount是SparkContext的核心数。这表明默认并行度的计算与核心数密切相关。

值得注意的是,Spark任务的最小执行单元是Task,Stage的并行度直接影响性能。合理设置每个Stage的Task数量是性能优化的重要环节之一。然而,Spark在确定最佳并行度时存在一定局限性,因此我们需要通过测试和计算来确定最优参数配置。

在Spark作业执行过程中,RDD会被划分为多个Stage。Stage内的Task数量与最后一个RDD的分区数保持一致。关键在于宽依赖关系(Wide Dependencies),这些通常伴随着Shuffle操作。对于Stage之间的数据传递,通常会使用numPartitions参数指定分区数。例如,groupByKey操作会根据指定的numPartitions来决定分区数,这需要多次调试以确定最优值。

对于没有父RDD的RDD(如直接从HDFS加载的数据),其分区数由InputFormat的切分机制决定。通常情况下,每个HDFS块对应一个分区,不可分割的文件则对应一个分区。

对于通过SparkContext的parallelize或makeRDD生成的RDD,分区数可以在方法调用时指定。如果未指定,默认会使用spark.default.parallelism参数配置的值。这一参数的默认实现如下:

override def defaultParallelism(): Int = {
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}

这表明,默认并行度的计算与核心数密切相关。

在分析Spark任务的并行度决定机制时,需要关注以下几个特殊算子:

  • coalesce和repartition算子

    • coalesce算子在RDD和DataSet中都默认不触发Shuffle操作,因此生成的RDD分区数不会超过父RDD的分区数。
    • repartition算子会触发Shuffle操作,可以增加分区数。这种情况下,分区数会根据配置进行调整。
    • 需要注意的是,减少分区数到1会降低上游Stage的并行度,影响性能。因此,在使用repartition时,需要权衡Shuffle的代价和并行度带来的效益。
  • union算子

    • 当多个父RDD具有相同的分区器且分区数已定义时,Union操作后生成的RDD的分区数不会增加。
    • 如果父RDD的分区器不一致,Union操作后生成的RDD的分区数为父RDD分区数之和。
  • cartesian算子

    • Cartesian算子的分区数是父RDD分区数的乘积。这种情况通常用于数据跨度较大的操作。
  • 在Spark SQL中,任务并行度参数则要参考spark.sql.shuffle.partitions。具体实现会在后续内容中详细阐述。

    最后,在Spark流式处理中,需要关注两种微批接收方式:

  • Receiver方式:生成的微批 RDD(BlockRDD)的分区数与块数一致。
  • Direct方式:生成的微批 RDD(kafkaRDD)的分区数与Kafka分区数一一对应。
  • 通过以上分析,可以更清晰地理解Spark在不同场景下分区数和并行度的决定机制。合理配置参数并通过多次测试,能够显著提升Spark应用的性能和效率。

    转载地址:http://nivkz.baihongyu.com/

    你可能感兴趣的文章
    no connection could be made because the target machine actively refused it.问题解决
    查看>>
    No Datastore Session bound to thread, and configuration does not allow creation of non-transactional
    查看>>
    No fallbackFactory instance of type class com.ruoyi---SpringCloud Alibaba_若依微服务框架改造---工作笔记005
    查看>>
    No Feign Client for loadBalancing defined. Did you forget to include spring-cloud-starter-loadbalanc
    查看>>
    No mapping found for HTTP request with URI [/...] in DispatcherServlet with name ...的解决方法
    查看>>
    No mapping found for HTTP request with URI [/logout.do] in DispatcherServlet with name 'springmvc'
    查看>>
    No module named 'crispy_forms'等使用pycharm开发
    查看>>
    No module named cv2
    查看>>
    No module named tensorboard.main在安装tensorboardX的时候遇到的问题
    查看>>
    No module named ‘MySQLdb‘错误解决No module named ‘MySQLdb‘错误解决
    查看>>
    No new migrations found. Your system is up-to-date.
    查看>>
    No qualifying bean of type XXX found for dependency XXX.
    查看>>
    No resource identifier found for attribute 'srcCompat' in package的解决办法
    查看>>
    no session found for current thread
    查看>>
    No toolchains found in the NDK toolchains folder for ABI with prefix: mips64el-linux-android
    查看>>
    NO.23 ZenTaoPHP目录结构
    查看>>
    NO32 网络层次及OSI7层模型--TCP三次握手四次断开--子网划分
    查看>>
    NoClassDefFoundError: org/springframework/boot/context/properties/ConfigurationBeanFactoryMetadata
    查看>>
    Node JS: < 一> 初识Node JS
    查看>>
    Node-RED中使用JSON数据建立web网站
    查看>>