用户报告在spark-cassandra-connector中设置spark.cassandra.input.readsPerSec时,范围查询吞吐量远远高于预期。
工作依赖性。Java 驱动程序版本设置为 4.13.0。
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.12</artifactId>
<version>3.2.0</version>
<exclusions>
<exclusion>
<groupId> com.datastax.oss</groupId>
<artifactId>java-driver-core-shaded</artifactId>
</exclusion>
</exclusions>
</dependency>
...
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.13.0</version>
</dependency>
该作业有两个步骤(都是 FTS):
Dataset<Row> dataset = sparkSession.sqlContext().read()
.format("org.apache.spark.sql.cassandra")
.option("table", "inbox_user_msg_dummy")
.option("keyspace", "ssmp_inbox2").load();
-和-
Dataset<Row> olderDataset = sparkSession.sql("SELECT * FROM inbox_user_msg_dummy where app_uuid = 'cb663e07-7bcc-4039-ae97-8fb8e8a9ff77' AND " +
"create_hour < '" + minus180DaysInstant + "'");
作业配置:
SparkConf sparkConf = new SparkConf()
.setMaster("local[*]") //uncomment while running in local
.setAppName("inbox-gateway-spark-job")
.set("spark.scheduler.mode", "FAIR")
.set("spark.cassandra.connection.port", "9042")
.set("keyspace", "ssmp_inbox2")
.set("spark.cassandra.connection.host", "cass-556799284-1-1276056270.stg.ssmp-inbox2-stg.ms-df-cassandra.stg-az-southcentralus-6.prod.us.walmart.net,
cass-556799284-2-1276056276.stg.ssmp-inbox2-stg.ms-df-cassandra.stg-az-southcentralus-6.prod.us.walmart.net,
cass-556799284-3-1276056282.stg.ssmp-inbox2-stg.ms-df-cassandra.stg-az-southcentralus-6.prod.us.walmart.net")
.set("spark.cassandra.auth.username", "ssmp-inbox-app-v2")
.set("spark.cassandra.auth.password", "*")
.set("spark.cassandra.input.consistency.level", "LOCAL_ONE")
.set("spark.cassandra.concurrent.reads", "1")
.set("spark.cassandra.input.readsPerSec", "10")
.set("spark.cassandra.input.fetch.sizeInRows", "10")
.set("spark.cassandra.input.split.sizeInMB", "10")
.set("spark.cores.max", "20")
.set("spark.executor.memory", "20G")
.set("spark.yarn.executor.memoryOverhead", "12000")
.set("spark.cassandra.read.timeoutMS", "200000")
.set("spark.task.maxFailures", "10")
.set("spark.cassandra.connection.localDC", "southcentral");
请注意,Spark 将实际核心限制为 16 个,因为工作线程有 8 个核心。执行人1人。
当作业运行时,可以观察到第一个 FTS 每秒约有 22k 范围查询,集群上的 CPU 几乎饱和,而对于第二个 FTS,表上每秒约有 725 个范围查询。
预期总共有 16 个 Spark 核心,范围查询吞吐量将限制为 160/s(spark.cassandra.input.readsPerSec * Spark 核心)。
这个推理正确吗?对于控制 Spark-cassandra-connector 的读取吞吐量有什么建议?
我知道我们之前已经有其他用户成功配置了此限制,但我们从未仔细研究过最终的吞吐量是多少。不过,这似乎确实是一个很大的差异,因为这两个步骤本质上运行相同的操作 - 全表扫描。连接器最终运行的查询是相同的。
架构:
CREATE TABLE ssmp_inbox2.inbox_user_msg_dummy (
user_id text,
create_hour timestamp,
app_uuid text,
message_id text,
app_name text,
create_ts bigint,
is_actiontaken boolean,
is_compensable boolean,
is_deleted boolean,
is_read boolean,
message_payload text,
mini_app_name text,
notification text,
PRIMARY KEY ((user_id, create_hour, app_uuid), message_id)
) WITH CLUSTERING ORDER BY (message_id DESC)
AND additional_write_policy = '99p'
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND cdc = false
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND default_time_to_live = 0
AND extensions = {}
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair = 'BLOCKING'
AND speculative_retry = '99p';
查询:
SELECT * FROM ssmp_inbox2.inbox_user_msg_dummy WHERE token(user_id, create_hour, app_uuid) >= token(G9e7Y4Y, 2023-08-10T04:17:27.234Z, cb663e07-7bcc-4039-ae97-8fb8e8a9ff77) AND token(user_id, create_hour, app_uuid) <= 9121832956220923771 LIMIT 10
FWIW,平均分区大小为 649 字节,最大为 2.7kb。