Um usuário relata que a taxa de transferência da consulta de intervalo é muito maior do que o esperado ao definir spark.cassandra.input.readsPerSec no conector spark-cassandra.
Dependências de trabalho. A versão do driver Java está definida como 4.13.0.
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.12</artifactId>
<version>3.2.0</version>
<exclusions>
<exclusion>
<groupId> com.datastax.oss</groupId>
<artifactId>java-driver-core-shaded</artifactId>
</exclusion>
</exclusions>
</dependency>
...
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.13.0</version>
</dependency>
Existem duas etapas no trabalho (ambas do STF):
Dataset<Row> dataset = sparkSession.sqlContext().read()
.format("org.apache.spark.sql.cassandra")
.option("table", "inbox_user_msg_dummy")
.option("keyspace", "ssmp_inbox2").load();
-e-
Dataset<Row> olderDataset = sparkSession.sql("SELECT * FROM inbox_user_msg_dummy where app_uuid = 'cb663e07-7bcc-4039-ae97-8fb8e8a9ff77' AND " +
"create_hour < '" + minus180DaysInstant + "'");
Configuração do trabalho:
SparkConf sparkConf = new SparkConf()
.setMaster("local[*]") //uncomment while running in local
.setAppName("inbox-gateway-spark-job")
.set("spark.scheduler.mode", "FAIR")
.set("spark.cassandra.connection.port", "9042")
.set("keyspace", "ssmp_inbox2")
.set("spark.cassandra.connection.host", "cass-556799284-1-1276056270.stg.ssmp-inbox2-stg.ms-df-cassandra.stg-az-southcentralus-6.prod.us.walmart.net,
cass-556799284-2-1276056276.stg.ssmp-inbox2-stg.ms-df-cassandra.stg-az-southcentralus-6.prod.us.walmart.net,
cass-556799284-3-1276056282.stg.ssmp-inbox2-stg.ms-df-cassandra.stg-az-southcentralus-6.prod.us.walmart.net")
.set("spark.cassandra.auth.username", "ssmp-inbox-app-v2")
.set("spark.cassandra.auth.password", "*")
.set("spark.cassandra.input.consistency.level", "LOCAL_ONE")
.set("spark.cassandra.concurrent.reads", "1")
.set("spark.cassandra.input.readsPerSec", "10")
.set("spark.cassandra.input.fetch.sizeInRows", "10")
.set("spark.cassandra.input.split.sizeInMB", "10")
.set("spark.cores.max", "20")
.set("spark.executor.memory", "20G")
.set("spark.yarn.executor.memoryOverhead", "12000")
.set("spark.cassandra.read.timeoutMS", "200000")
.set("spark.task.maxFailures", "10")
.set("spark.cassandra.connection.localDC", "southcentral");
Observe que o Spark está limitando os núcleos reais a 16 porque os trabalhadores têm 8 núcleos. Existe 1 executor.
Quando o trabalho é executado, observa-se que há aproximadamente 22 mil consultas/s de intervalo para o primeiro FTS quase saturando a CPU no cluster, e para o segundo FTS, há aproximadamente 725 consultas/s de intervalo na tabela.
A expectativa é que, com um total de 16 núcleos Spark, o rendimento da consulta de intervalo seja limitado a 160/s (spark.cassandra.input.readsPerSec * spark cores).
Este raciocínio está correto? Qual é a recomendação para controlar a taxa de transferência de leitura do conector spark-cassandra?
Eu sei que outros usuários já configuraram esse acelerador com sucesso antes, mas nunca analisamos atentamente qual é o rendimento resultante. Esta parece ser uma grande discrepância porque as duas etapas estão essencialmente executando a mesma operação - uma varredura completa da tabela. As consultas que o conector executa são as mesmas.
O esquema:
CREATE TABLE ssmp_inbox2.inbox_user_msg_dummy (
user_id text,
create_hour timestamp,
app_uuid text,
message_id text,
app_name text,
create_ts bigint,
is_actiontaken boolean,
is_compensable boolean,
is_deleted boolean,
is_read boolean,
message_payload text,
mini_app_name text,
notification text,
PRIMARY KEY ((user_id, create_hour, app_uuid), message_id)
) WITH CLUSTERING ORDER BY (message_id DESC)
AND additional_write_policy = '99p'
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND cdc = false
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND default_time_to_live = 0
AND extensions = {}
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair = 'BLOCKING'
AND speculative_retry = '99p';
A pergunta:
SELECT * FROM ssmp_inbox2.inbox_user_msg_dummy WHERE token(user_id, create_hour, app_uuid) >= token(G9e7Y4Y, 2023-08-10T04:17:27.234Z, cb663e07-7bcc-4039-ae97-8fb8e8a9ff77) AND token(user_id, create_hour, app_uuid) <= 9121832956220923771 LIMIT 10
FWIW, o tamanho médio da partição é 649 bytes, o máximo é 2,7kb.
A melhor maneira de começar é olhar aqui: Conector Spark Cassandra | Leia os parâmetros de ajuste (Github).
A tabela não é muito clara, mas
spark.cassandra.concurrent.reads
éspark.cassandra.input.readsPerSec
usada para juntar.Para limitar a verificação completa, você precisa usar o
spark.cassandra.input.throughputMBPerSec
.