AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • 主页
  • 系统&网络
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • 主页
  • 系统&网络
    • 最新
    • 热门
    • 标签
  • Ubuntu
    • 最新
    • 热门
    • 标签
  • Unix
    • 最新
    • 标签
  • DBA
    • 最新
    • 标签
  • Computer
    • 最新
    • 标签
  • Coding
    • 最新
    • 标签
主页 / user-263329

Paul's questions

Martin Hope
Paul
Asked: 2023-11-08 03:56:00 +0800 CST

Spark-cassandra-connector 读取吞吐量不可预测

  • 5

用户报告在spark-cassandra-connector中设置spark.cassandra.input.readsPerSec时,范围查询吞吐量远远高于预期。

工作依赖性。Java 驱动程序版本设置为 4.13.0。

    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.12</artifactId>
        <version>3.2.0</version>
        <exclusions>
            <exclusion>
                <groupId> com.datastax.oss</groupId>
                <artifactId>java-driver-core-shaded</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

...

    <dependency>
        <groupId>com.datastax.oss</groupId>
        <artifactId>java-driver-core</artifactId>
        <version>4.13.0</version>
    </dependency>

该作业有两个步骤(都是 FTS):

Dataset<Row> dataset = sparkSession.sqlContext().read()
.format("org.apache.spark.sql.cassandra")
.option("table", "inbox_user_msg_dummy")
.option("keyspace", "ssmp_inbox2").load();

-和-

Dataset<Row> olderDataset = sparkSession.sql("SELECT * FROM inbox_user_msg_dummy where app_uuid = 'cb663e07-7bcc-4039-ae97-8fb8e8a9ff77' AND " +
"create_hour < '" + minus180DaysInstant + "'");

作业配置:

SparkConf sparkConf = new SparkConf()
        .setMaster("local[*]") //uncomment while running in local
        .setAppName("inbox-gateway-spark-job")
       .set("spark.scheduler.mode", "FAIR")
        .set("spark.cassandra.connection.port", "9042")
        .set("keyspace", "ssmp_inbox2")
        .set("spark.cassandra.connection.host", "cass-556799284-1-1276056270.stg.ssmp-inbox2-stg.ms-df-cassandra.stg-az-southcentralus-6.prod.us.walmart.net,
        cass-556799284-2-1276056276.stg.ssmp-inbox2-stg.ms-df-cassandra.stg-az-southcentralus-6.prod.us.walmart.net,
        cass-556799284-3-1276056282.stg.ssmp-inbox2-stg.ms-df-cassandra.stg-az-southcentralus-6.prod.us.walmart.net")
        .set("spark.cassandra.auth.username", "ssmp-inbox-app-v2")
        .set("spark.cassandra.auth.password", "*")
        .set("spark.cassandra.input.consistency.level", "LOCAL_ONE")
        .set("spark.cassandra.concurrent.reads", "1")
        .set("spark.cassandra.input.readsPerSec", "10")
        .set("spark.cassandra.input.fetch.sizeInRows", "10")
        .set("spark.cassandra.input.split.sizeInMB", "10")
        .set("spark.cores.max", "20")
        .set("spark.executor.memory", "20G")
        .set("spark.yarn.executor.memoryOverhead", "12000")
        .set("spark.cassandra.read.timeoutMS", "200000")
        .set("spark.task.maxFailures", "10")
        .set("spark.cassandra.connection.localDC", "southcentral");

请注意,Spark 将实际核心限制为 16 个,因为工作线程有 8 个核心。执行人1人。

当作业运行时,可以观察到第一个 FTS 每秒约有 22k 范围查询,集群上的 CPU 几乎饱和,而对于第二个 FTS,表上每秒约有 725 个范围查询。

预期总共有 16 个 Spark 核心,范围查询吞吐量将限制为 160/s(spark.cassandra.input.readsPerSec * Spark 核心)。

这个推理正确吗?对于控制 Spark-cassandra-connector 的读取吞吐量有什么建议?

我知道我们之前已经有其他用户成功配置了此限制,但我们从未仔细研究过最终的吞吐量是多少。不过,这似乎确实是一个很大的差异,因为这两个步骤本质上运行相同的操作 - 全表扫描。连接器最终运行的查询是相同的。

架构:

CREATE TABLE ssmp_inbox2.inbox_user_msg_dummy (    
  user_id text,    
  create_hour timestamp,    
  app_uuid text,    
  message_id text,    
  app_name text,    
  create_ts bigint,    
  is_actiontaken boolean,    
  is_compensable boolean,    
  is_deleted boolean,    
  is_read boolean,    
  message_payload text,    
  mini_app_name text,    
  notification text,    
  PRIMARY KEY ((user_id, create_hour, app_uuid), message_id)    
) WITH CLUSTERING ORDER BY (message_id DESC)    
  AND additional_write_policy = '99p'    
  AND bloom_filter_fp_chance = 0.01    
  AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}    
  AND cdc = false    
  AND comment = ''    
  AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}    
  AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}    
  AND crc_check_chance = 1.0    
  AND default_time_to_live = 0    
  AND extensions = {}    
  AND gc_grace_seconds = 864000    
  AND max_index_interval = 2048    
  AND memtable_flush_period_in_ms = 0    
  AND min_index_interval = 128
  AND read_repair = 'BLOCKING'
  AND speculative_retry = '99p';

查询:

SELECT * FROM ssmp_inbox2.inbox_user_msg_dummy WHERE token(user_id, create_hour, app_uuid) >= token(G9e7Y4Y, 2023-08-10T04:17:27.234Z, cb663e07-7bcc-4039-ae97-8fb8e8a9ff77) AND token(user_id, create_hour, app_uuid) <= 9121832956220923771 LIMIT 10

FWIW,平均分区大小为 649 字节,最大为 2.7kb。

cassandra
  • 1 个回答
  • 23 Views

Sidebar

Stats

  • 问题 205573
  • 回答 270741
  • 最佳答案 135370
  • 用户 68524
  • 热门
  • 回答
  • Marko Smith

    连接到 PostgreSQL 服务器:致命:主机没有 pg_hba.conf 条目

    • 12 个回答
  • Marko Smith

    如何让sqlplus的输出出现在一行中?

    • 3 个回答
  • Marko Smith

    选择具有最大日期或最晚日期的日期

    • 3 个回答
  • Marko Smith

    如何列出 PostgreSQL 中的所有模式?

    • 4 个回答
  • Marko Smith

    列出指定表的所有列

    • 5 个回答
  • Marko Smith

    如何在不修改我自己的 tnsnames.ora 的情况下使用 sqlplus 连接到位于另一台主机上的 Oracle 数据库

    • 4 个回答
  • Marko Smith

    你如何mysqldump特定的表?

    • 4 个回答
  • Marko Smith

    使用 psql 列出数据库权限

    • 10 个回答
  • Marko Smith

    如何从 PostgreSQL 中的选择查询中将值插入表中?

    • 4 个回答
  • Marko Smith

    如何使用 psql 列出所有数据库和表?

    • 7 个回答
  • Martin Hope
    Jin 连接到 PostgreSQL 服务器:致命:主机没有 pg_hba.conf 条目 2014-12-02 02:54:58 +0800 CST
  • Martin Hope
    Stéphane 如何列出 PostgreSQL 中的所有模式? 2013-04-16 11:19:16 +0800 CST
  • Martin Hope
    Mike Walsh 为什么事务日志不断增长或空间不足? 2012-12-05 18:11:22 +0800 CST
  • Martin Hope
    Stephane Rolland 列出指定表的所有列 2012-08-14 04:44:44 +0800 CST
  • Martin Hope
    haxney MySQL 能否合理地对数十亿行执行查询? 2012-07-03 11:36:13 +0800 CST
  • Martin Hope
    qazwsx 如何监控大型 .sql 文件的导入进度? 2012-05-03 08:54:41 +0800 CST
  • Martin Hope
    markdorison 你如何mysqldump特定的表? 2011-12-17 12:39:37 +0800 CST
  • Martin Hope
    Jonas 如何使用 psql 对 SQL 查询进行计时? 2011-06-04 02:22:54 +0800 CST
  • Martin Hope
    Jonas 如何从 PostgreSQL 中的选择查询中将值插入表中? 2011-05-28 00:33:05 +0800 CST
  • Martin Hope
    Jonas 如何使用 psql 列出所有数据库和表? 2011-02-18 00:45:49 +0800 CST

热门标签

sql-server mysql postgresql sql-server-2014 sql-server-2016 oracle sql-server-2008 database-design query-performance sql-server-2017

Explore

  • 主页
  • 问题
    • 最新
    • 热门
  • 标签
  • 帮助

Footer

AskOverflow.Dev

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve