问题
- Spark的分区数量是如何决定的?
- 我是否需要在某处明确指定可用 CPU 内核的数量,以便分区数量相同(例如并行化方法的 numPartition arg,但每当内核数量发生变化时需要更新程序)?
背景
在环境中安装了 Spark 集群,没有更改 spark-env.sh、spark-defaults.conf 文件和程序中的 SparkConf 对象。
对于 N Queen 程序,分区数为 2,仅分配一个节点任务。对于字数统计程序,分区数为 22,任务分配给所有节点。对这两个程序都使用了 spark-submit。
程式
N皇后
val sparkConf = new SparkConf().setAppName("NQueen").set("spark.files.overwrite", "true")
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
def isSafe(column: Int, placement: List[Int]): Boolean = { ... }
def placeQueensAt(row: Int, placements: Set[List[Int]]): Set[List[Int]] = { ... }
val initial = sc.parallelize(queensAtFirst)
//val initial = sc.parallelize(queensAtFirst, 12)
println("Partitions = %d".format(initial.partitions.size))
val result = initial.flatMap(x => placeQueensAt(1, Set(x))).collect()
字数
val sparkConf = new SparkConf().setAppName("WordCount").set("spark.files.overwrite", "true")
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val lines = sc.textFile("hdfs:/user/wynadmin/sfpd.csv")
println("Patitions = %d".format(lines.partitions.size))
val words = for (line <- lines; word <- line.split(",") if word.toLowerCase.matches("[a-z]+")) yield (word, 1)
val counts = words.reduceByKey(_ + _)
环境
Ubuntu 14.04 上的 Spark 2.0.1(3 个节点,每个节点有 4 个 CPU)。
独立部署(不是 YARN 也不是 Mesos)
在How-to: Tune Your Apache Spark Jobs (Part 2)中找到信息。
spark.default.parallelism 选项修复了症状。
设置为 12(与核心数相同)会导致节点使用不均。