AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • 主页
  • 系统&网络
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • 主页
  • 系统&网络
    • 最新
    • 热门
    • 标签
  • Ubuntu
    • 最新
    • 热门
    • 标签
  • Unix
    • 最新
    • 标签
  • DBA
    • 最新
    • 标签
  • Computer
    • 最新
    • 标签
  • Coding
    • 最新
    • 标签
主页 / server / 问题

问题[kafka](server)

Martin Hope
ElToro1966
Asked: 2022-01-29 09:52:01 +0800 CST

浮士德客户端没有连接到 Kafka

  • 1

我很难从运行浮士德脚本的客户端连接到运行 Kafka 的机器。脚本如下所示:

import faust
import logging
from asyncio import sleep


class Test(faust.Record):
    msg: str


app = faust.App('myapp', broker='kafka://10.0.0.20:9092')
topic = app.topic('test', value_type=Test)


@app.agent(topic)
async def hello(messages):
    async for message in messages:
        print(f'Received {message.msg}')


@app.timer(interval=5.0)
async def example_sender():
    await hello.send(
        value=Test(msg='Hello World!'),
    )


if __name__ == '__main__':
    app.main()

当我运行脚本时:

# faust -A myapp worker -l info
┌ƒaµS† v0.8.1─┬─────────────────────────────────────────────────┐
│ id          │ myapp                                           │
│ transport   │ [URL('kafka://10.0.0.20:9092')]                 │
│ store       │ memory:                                         │
│ web         │ http://hubbabubba:6066                   │
│ log         │ -stderr- (info)                                 │
│ pid         │ 260765                                          │
│ hostname    │ hubbabubba                               │
│ platform    │ CPython 3.8.10 (Linux x86_64)                   │
│ drivers     │                                                 │
│   transport │ aiokafka=0.7.2                                  │
│   web       │ aiohttp=3.8.1                                   │
│ datadir     │ /Git/faust-kafka/myapp-data    │
│ appdir      │ /Git/faust-kafka/myapp-data/v1 │
└─────────────┴─────────────────────────────────────────────────┘
[2022-01-28 13:09:57,018] [260765] [INFO] [^Worker]: Starting... 
[2022-01-28 13:09:57,021] [260765] [INFO] [^-App]: Starting... 
[2022-01-28 13:09:57,021] [260765] [INFO] [^--Monitor]: Starting... 
[2022-01-28 13:09:57,021] [260765] [INFO] [^--Producer]: Starting... 
[2022-01-28 13:09:57,022] [260765] [INFO] [^---ProducerBuffer]: Starting... 
[2022-01-28 13:09:57,024] [260765] [ERROR] Unable connect to "10.0.0.20:9092": [Errno 113] Connect call failed ('10.0.0.20', 9092) 
[2022-01-28 13:09:57,025] [260765] [ERROR] [^Worker]: Error: KafkaConnectionError("Unable to bootstrap from [('10.0.0.20', 9092, <AddressFamily.AF_INET: 2>)]") 
Traceback (most recent call last):
  File "/Git/faust-kafka/venv/lib/python3.8/site-packages/mode/worker.py", line 276, in execute_from_commandline
    self.loop.run_until_complete(self._starting_fut)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/Git/faust-kafka/venv/lib/python3.8/site-packages/mode/services.py", line 759, in start
    await self._default_start()
  File "/media/eric/DISK3/Git/faust-kafka/venv/lib/python3.8/site-packages/mode/services.py", line 766, in _default_start
    await self._actually_start()...
  File "/Git/faust-kafka/venv/lib/python3.8/site-packages/aiokafka/client.py", line 249, in bootstrap
    raise KafkaConnectionError(
kafka.errors.KafkaConnectionError: KafkaConnectionError: Unable to bootstrap from [('10.0.0.20', 9092, <AddressFamily.AF_INET: 2>)]
[2022-01-28 13:09:57,027] [260765] [INFO] [^Worker]: Stopping... 
[2022-01-28 13:09:57,027] [260765] [INFO] [^-App]: Stopping... 
[2022-01-28 13:09:57,027] [260765] [INFO] [^-App]: Flush producer buffer... 
[2022-01-28 13:09:57,028] [260765] [INFO] [^--TableManager]: Stopping... 
[2022-01-28 13:09:57,028] [260765] [INFO] [^---Fetcher]: Stopping... 
[2022-01-28 13:09:57,028] [260765] [INFO] [^---Conductor]: Stopping... 
[2022-01-28 13:09:57,028] [260765] [INFO] [^--AgentManager]: Stopping... 
[2022-01-28 13:09:57,029] [260765] [INFO] [^Agent: myapp.hello]: Stopping... 
[2022-01-28 13:09:57,029] [260765] [INFO] [^--ReplyConsumer]: Stopping... 
[2022-01-28 13:09:57,029] [260765] [INFO] [^--LeaderAssignor]: Stopping... 
[2022-01-28 13:09:57,029] [260765] [INFO] [^--Consumer]: Stopping... 
[2022-01-28 13:09:57,030] [260765] [INFO] [^--Web]: Stopping... 
[2022-01-28 13:09:57,030] [260765] [INFO] [^--CacheBackend]: Stopping... 
[2022-01-28 13:09:57,030] [260765] [INFO] [^--Producer]: Stopping... 
[2022-01-28 13:09:57,030] [260765] [INFO] [^---ProducerBuffer]: Stopping... 
[2022-01-28 13:09:57,031] [260765] [INFO] [^--Monitor]: Stopping... 
[2022-01-28 13:09:57,032] [260765] [INFO] [^Worker]: Gathering service tasks... 
[2022-01-28 13:09:57,032] [260765] [INFO] [^Worker]: Gathering all futures... 
[2022-01-28 13:09:58,033] [260765] [INFO] [^Worker]: Closing event loop

Kafka (v.2.8.1) 在 10.0.0.20 端口 9092 上运行。Kafka 配置如下所示:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://localhost:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

卡夫卡经纪人一开始就很顺利:

$ sudo bin/kafka-server-start.sh -daemon config/server.properties 

我得到了这个话题:

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --replication-factor 1 --partitions 1 --topic test

然后我检查:

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
test

所以我想知道我在哪里搞砸了。顺便说一句:可以从客户端机器访问服务器:

$ ping -c 5 10.0.0.20 -p 9092
PATTERN: 0x9092
PING 10.0.0.20 (10.0.0.20) 56(84) bytes of data.
64 bytes from 10.0.0.20: icmp_seq=1 ttl=64 time=0.468 ms
64 bytes from 10.0.0.20: icmp_seq=2 ttl=64 time=0.790 ms
64 bytes from 10.0.0.20: icmp_seq=3 ttl=64 time=0.918 ms
64 bytes from 10.0.0.20: icmp_seq=4 ttl=64 time=0.453 ms
64 bytes from 10.0.0.20: icmp_seq=5 ttl=64 time=0.827 ms

--- 10.0.0.20 ping statistics ---
5 packets transmitted, 5 received, 0% packet loss, time 4095ms
rtt min/avg/max/mdev = 0.453/0.691/0.918/0.192 ms
networking python opensuse firewalld kafka
  • 1 个回答
  • 550 Views
Martin Hope
Cabbage Parachute
Asked: 2021-12-24 06:04:54 +0800 CST

Kafka 将 log4j 日志存储在字面上称为 ${kafka.logs.dir} 的目录中

  • 3

我在 RHEL 8 服务器上安装了支持 log4j2 ( http://home.apache.org/~dongjin/post/apache-kafka-log4j2-support/ ) 的 Kafka 3.0 预览版。Kafka 和 Zookeeper 作为 systemd 用户服务成功运行。我在 systemd 单元文件中设置了环境变量KAFKA_LOG4J_OPTS="-Dlog4j.configurationFile=file:/home/username/kafka/bin/../config/log4j2.properties",以便使用 log4j 2.17。

但是,有一件奇怪的事情:所有 log4j 日志都存储在一个目录中,该目录实际上称为${kafka.logs.dir}主目录。垃圾收集器成功地将日志存储在正确的日志位置~/kafka/logs。所以这个目录~/kafka/logs包含了类似的文件kafkaServer-gc.log.0.current,而奇怪的目录~/${kafka.logs.dir}包含server.log,controller.log等等。

在查看 kafka 和 zookeeper 的进程时,我可以看到它们都有参数-Dkafka.logs.dir=/home/username/kafka/bin/../logs。这不应该定义kafka.logs.dir文件中使用的环境变量config/log4j2.properties吗?为什么 Zookeeper 和 Kafka 显然无法访问这个$kafka.logs.dir环境变量?

java systemd log4j kafka rhel8
  • 2 个回答
  • 604 Views
Martin Hope
John Mark
Asked: 2021-08-15 13:15:12 +0800 CST

zookeeper ssl 警报“警报错误证书”

  • 1

我正在使用 Kafka(2.3.0 版)和 Zookeeper(3.5.5-3 版)——稳定版是 3.6.3。

当我使用以下命令测试 Zookeeper 的 SSL 时:

openssl s_client -showcerts -connect 127.0.0.1:2280 -CAfile /certs/ca-chain.cert.pem

我收到了这个错误:

140371409225024:error:14094412:SSL routines:ssl3_read_bytes:sslv3 alert bad certificate:../ssl/record/rec_layer_s3.c:1543:SSL alert number 42

但是如果我要安装 Zookeeper 3.5.7 及更高版本,我可以在我的 zoo.cnf 或 zookeeper.properties 中添加它:

ssl.clientAuth=want而且我不再看到任何 SSL 错误。

有关如何在不升级的情况下修复此 SSL 错误的任何提示/建议(我目前不想更新以避免其他冲突,例如 Kafka Cruise Control 和其他)。

提前致谢!

ssl kafka zookeeper
  • 1 个回答
  • 423 Views
Martin Hope
mjahr
Asked: 2020-08-21 02:58:23 +0800 CST

如何使用公共接口实现安全的 kafka 环境?

  • 0

场景:使用 SSL 加密和 SASL/PLAIN 身份验证保护的 Kafka 集群位于专用 VPC 内的 AWS 私有子网中。在私有子网内,一切都很好。我使用自行生成的 CA 和密钥来保护通信,主机身份基于 AWS 内部 DNS。

我想要实现的目标:有可能从外部访问生产者 API(而不是REST API)。

我正在为密钥、DNS、kafka 侦听器的组合以及从生产者到代理的永久连接可能不是我用来启动连接的那个事实而苦苦挣扎。

多次尝试使用反向代理失败 - 由于无法解析密钥,即使 ssh 隧道也无法工作。

对于这种情况,有人有一种参考架构吗?我在这里省略了配置细节,因为它在不同的配置、键等中分布得太多,但如果需要,我可以提供我的设置。

ssl ssl-certificate kafka listener
  • 1 个回答
  • 168 Views
Martin Hope
ItsaMeTuni
Asked: 2020-04-14 11:43:59 +0800 CST

ECS容器如何设置固定私有IP?

  • 0

我有一个名为 的任务定义kafka-zookeeper,它有一个 Apache Kafka 代理和一个 Zookeeper 节点。这些容器需要相互通信(这很容易)以及与其他 kafka-zookeeper 任务中的 kafka 和 zookeeper 容器通信,这是我不知道该怎么做的。

基本上,我需要这个:

                     ------------------------
___________________  | ___________________  | ___________________
|      Task1      |  | |      Task2      |  | |      Task2      |
|(kafka-zookeeper)|  | |(kafka-zookeeper)|  | |(kafka-zookeeper)|
| _____________   |  | | _____________   |  | | _____________   |
| |           |<------ | |           |   |  --->|           |   |
| | Kafka     |<-------->| Kafka     |<-------->| Kafka     |   |
| |___________|   |    | |___________|   |    | |___________|   |
|      ^          |    |       ^         |    |       ^         |
|      |          |    |       |         |    |       |         |
| _____v_______   |    | ______v______   |    | ______v______   |
| |           |   |    | |           |   |    | |           |   |
| | Zookeeper |<-------->| Zookeeper |<-------->| Zookeeper |   |
| |___________|<------ | |___________|   |  --->|___________|   |
|_________________|  | |_________________|  | |_________________|
                     |                      |
                     ------------------------

但我不知道如何设置任务之间的通信,因为任务的私有 IP 是在启动后确定的。此外,如果一项任务由于某种原因失败,则在其位置创建另一个任务时,IP 将有所不同。我该如何处理?

我还认为值得一提的是,zookeeper 和 kafka 通过包含其他实例 IP 的环境变量了解其他实例。

我也无法为容器提供主机名,因为我使用的是awsvpc网络模式。

提前致谢!

amazon-ecs aws-fargate kafka zookeeper
  • 1 个回答
  • 233 Views
Martin Hope
vane
Asked: 2020-01-31 12:38:28 +0800 CST

Filebeat kafka输入与SASL?

  • 0

我正在尝试让 filebeat 使用 kafka 输入来使用来自 kafka 的消息。由于某种原因,我无法使用 SASL 进行身份验证,我不确定这是为什么。尝试将 Kafka 和 Filebeat 与 SASL 一起使用时,它的文档有点缺乏。

我的filebeat配置如下:

filebeat.config:
  modules:
    path: ${path.config}/modules.d/*.yml
    reload.enabled: false

filebeat.inputs:
- type: kafka
  hosts: 'the.kafka.server.com:9092'
  topics: 'my_topic'
  group_id: 'my_group'
  ssl.enabled: yes
  username: "$ConnectionString"
  password: "org.apache.kafka.common.security.plain.PlainLoginModule required username='my_username' password='my_password';"

processors:
- add_cloud_metadata: ~
- add_docker_metadata: ~

output.console:
  pretty: true

输出显示

INFO    input/input.go:114      Starting input of type: kafka; ID: 14409252276502564738
INFO    kafka/log.go:53 kafka message: Initializing new client
INFO    kafka/log.go:53 client/metadata fetching metadata for all topics from broker the.kafka.server.com:9092

INFO    crawler/crawler.go:106  Loading and starting Inputs completed. Enabled inputs: 1
INFO    cfgfile/reload.go:171   Config reloader started
INFO    cfgfile/reload.go:226   Loading of config files completed.
INFO    kafka/log.go:53 kafka message: Successful SASL handshake. Available mechanisms: %!(EXTRA []string=[PLAIN OAUTHBEARER])
INFO    kafka/log.go:53 Failed to read response while authenticating with SASL to broker the.kafka.server.com:9092: EOF

INFO    kafka/log.go:53 Closed connection to broker the.kafka.server.com:9092

INFO    kafka/log.go:53 client/metadata got error from broker -1 while fetching metadata: EOF

我不确定这里发生了什么。我还尝试添加compression: none没有帮助的内容,并使用 openssl 验证服务器证书可以被验证。我在这里做错了什么?有问题的 kafka 服务器是云托管的 kafka 服务器,我看不到服务器配置,我从 kafka 的云 UI 中获得了“连接字符串”。

kafka filebeat
  • 1 个回答
  • 1232 Views
Martin Hope
shalom
Asked: 2019-02-08 07:09:42 +0800 CST

集群中的kafka机器和kafka通信

  • 0

我们有带有 3 个 kafka 代理节点和 3 个 zookperes 服务器的 kafka 集群

kafka 版本 - 10.1 ( hortonworks )

据我了解,因为所有元数据都位于 zookeeper 服务器上,并且 kafka 代理正在使用这些数据(kafka 通过端口 2181 与 zookeeper 服务器交谈)

我只是想知道每台 kafka 机器是否与集群中的其他 kafka 通信,或者 kafka 是否仅在 Zookeepers 服务器上/从 Zookeepers 服务器上获取/放置数据?

那么 kafka 服务需要与集群中的其他 kafka 通信吗?, 或者也许 kafka 机器只需要从 zookeepers 服务器获取所有内容?

kafka
  • 1 个回答
  • 62 Views
Martin Hope
Matheus Portela
Asked: 2017-01-25 09:31:57 +0800 CST

JMX 报告错误的测量值并为 Apache Kafka 收集

  • 2

我正在使用 JMX 从 Apache Kafka 收集指标并通过 collectd 发送到可视化和监控服务 Librato。问题是某些指标似乎报告错误。例如,在没有任何人使用 Kafka 集群的情况下,一些节点报告每分钟有大量传入消息(如 15,000 条),而其他节点报告为 0,正如预期的那样。

这是 collectd 中的一项指标配置:

<MBean "kafka-all-messages">
  ObjectName "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec"
  InstancePrefix "all"
  <Value>
    InstancePrefix "kafka-messages-in"
    Type "counter"
    Table false
    Attribute "MeanRate"
  </Value>
</MBean>

这是 Librato 中的图表:

Librato 中的每秒消息数可视化

有谁知道出了什么问题?是我收集的配置,例如Type或类似的东西吗?

monitoring jmx collectd kafka
  • 1 个回答
  • 353 Views
Martin Hope
Matheus Portela
Asked: 2017-01-24 14:14:59 +0800 CST

无法将主管与 Apache Kafka 一起使用

  • 1

我有一台安装了 Apache Kafka 的 Ubuntu 16.04 机器。start_kafka.sh目前,我可以通过使用具有以下内容的脚本使其完美运行:

JMX_PORT=17264 KAFKA_HEAP_OPTS="-Xms1024M -Xmx3072M" /home/kafka/kafka_2.11-0.10.1.0/bin/kafka-server-start.sh -daemon /home/kafka/kafka_2.11-0.10.1.0/config/server.properties

现在,我想使用supervisor它来自动重启进程,如果它失败并在重启机器后立即启动。问题是我无法supervisor启动 Kafka。

我supervisor使用安装pip并将此配置文件放置在/etc/supervisord.conf:

; Supervisor config file.
;
; For more information on the config file, please see:
; http://supervisord.org/configuration.html

[unix_http_server]
file=/tmp/supervisor.sock   ; (the path to the socket file)

[supervisord]
logfile=/tmp/supervisord.log ; (main log file;default $CWD/supervisord.log)
logfile_maxbytes=50MB        ; (max main logfile bytes b4 rotation;default 50MB)
logfile_backups=10           ; (num of main logfile rotation backups;default 10)
loglevel=info                ; (log level;default info; others: debug,warn,trace)
pidfile=/tmp/supervisord.pid ; (supervisord pidfile;default supervisord.pid)
nodaemon=false               ; (start in foreground if true;default false)
minfds=1024                  ; (min. avail startup file descriptors;default 1024)
minprocs=200                 ; (min. avail process descriptors;default 200)

; the below section must remain in the config file for RPC
; (supervisorctl/web interface) to work, additional interfaces may be
; added by defining them in separate rpcinterface: sections
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

[supervisorctl]
serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL  for a unix socket

[program:kafka]
command=/home/kafka/kafka_2.11-0.10.1.0/start_kafka.sh ; the program (relative uses PATH, can take args)
;process_name=%(program_name)s ; process_name expr (default %(program_name)s)
startsecs=10                   ; # of secs prog must stay up to be running (def. 1)
startretries=3                ; max # of serial start failures when starting (default 3)
;autorestart=unexpected        ; when to restart if exited after running (def: unexpected)
;exitcodes=0,2                 ; 'expected' exit codes used with autorestart (default 0,2)
stopsignal=TERM               ; signal used to kill process (default TERM)
stopwaitsecs=180               ; max num secs to wait b4 SIGKILL (default 10)
stdout_logfile=NONE        ; stdout log path, NONE for none; default AUTO
;environment=A="1",B="2"       ; process environment additions (def no adds)

当我尝试启动 Kafka 时,出现以下错误:

# supervisorctl start kafka
kafka: ERROR (spawn error)

主管日志 (at /tmp/supervisord.log) 包含以下内容:

2017-01-23 22:10:24,532 INFO spawned: 'kafka' with pid 21311
2017-01-23 22:10:24,536 INFO exited: kafka (exit status 127; not expected)
2017-01-23 22:10:25,542 INFO spawned: 'kafka' with pid 21312
2017-01-23 22:10:25,559 INFO exited: kafka (exit status 127; not expected)
2017-01-23 22:10:27,562 INFO spawned: 'kafka' with pid 21313
2017-01-23 22:10:27,567 INFO exited: kafka (exit status 127; not expected)
2017-01-23 22:10:30,571 INFO spawned: 'kafka' with pid 21314
2017-01-23 22:10:30,576 INFO exited: kafka (exit status 127; not expected)
2017-01-23 22:10:31,578 INFO gave up: kafka entered FATAL state, too many start retries too quickly

必须说我已经尝试删除-daemon标志start_kafka.sh以使用supervisor但没有成功。

有人知道发生了什么吗?

ubuntu ubuntu-16.04 kafka supervisord
  • 1 个回答
  • 1261 Views
Martin Hope
Samriang
Asked: 2016-10-05 04:00:44 +0800 CST

系统日志。omkafka 的磁盘辅助队列

  • 1

我有以下管道:

nginx -> unix_socket -> rsyslog -> omkafka module -> kafka

对于 omkafka,我使用以下配置:

module(
  load="impstats"
  interval="10"             # how often to generate stats
  resetCounters="on"        # to get deltas (e.g. # of messages submitted in the last 10 seconds)
  log.file="/var/log/impstats"     # file to write those stats to
  log.syslog="off"          # don't send stats through the normal processing pipeline. More on that in a bit
)

#### LOAD MODULES ####
module(load="omkafka")

#### DEFINE GLOBALS ####
$MaxMessageSize 64k
$EscapeControlCharactersOnReceive off

#### TEMPLATES ####
$template ngFormat, "%msg:4:$%"

input(type="imuxsock" Socket="/spool/syslog" Ruleset="outwriter")

ruleset(name="outwriter"){
    action(
      type="omkafka"
      broker=["kafka666:9092"]
      topic="nginx_logs"
      partitions.auto="on"
      template="cerberFormat"
      queue.type="linkedlist"
      queue.dequeueBatchSize="10000"   # numbers of messages to be parsed from queue
      queue.highWatermark="450000"    # max no. of events to hold in memory
      queue.lowWatermark="250000"     # use memory queue again, when it's back to this level
      queue.spoolDirectory="/spool/logs"  # where to write on disk
      queue.fileName="rsyslog_queue"
      queue.maxDiskSpace="100g"        # it will stop at this much disk space
      queue.size="500000"           # or this many messages
      queue.saveOnShutdown="on"      # save memory queue contents to disk when rsyslog is exiting
    )
}

main_queue(
  queue.type="linkedlist"
  queue.dequeueBatchSize="10000"   # numbers of messages to be parsed from queue
  queue.highWatermark="450000"    # max no. of events to hold in memory
  queue.lowWatermark="250000"     # use memory queue again, when it's back to this level
  queue.spoolDirectory="/spool/logs"  # where to write on disk
  queue.fileName="rsyslog_main_queue"
  queue.maxDiskSpace="100g"        # it will stop at this much disk space
  queue.size="500000"           # or this many messages
  queue.saveOnShutdown="on"      # save memory queue contents to disk when rsyslog is exiting
)

我想如果 kafka 代理无法访问,所有 omkafka 消息都应该放入指定的 DA 队列中。但是,当我使用 impstats 观察计数器时,DA 队列始终为空,并且 omkafka 使用自己的输出队列。

如下所示:

Tue Oct  4 13:02:09 2016: global: origin=dynstats 
Tue Oct  4 13:02:09 2016: imuxsock: origin=imuxsock submitted=13060 ratelimit.discarded=0 ratelimit.numratelimiters=0 
Tue Oct  4 13:02:09 2016: **omkafka**: submitted=0 **maxoutqsize=100000** failures=0 topicdynacache.skipped=0 topicdynacache.miss=0 topicdynacache.evicted=0 
Tue Oct  4 13:02:09 2016: action 0: origin=core.action processed=13060 failed=13060 suspended=0 suspended.duration=300 resumed=0 
Tue Oct  4 13:02:09 2016: action 1: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 
Tue Oct  4 13:02:09 2016: action 3: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 
Tue Oct  4 13:02:09 2016: action 4: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 
Tue Oct  4 13:02:09 2016: action 5: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 
Tue Oct  4 13:02:09 2016: action 6: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 
Tue Oct  4 13:02:09 2016: action 7: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 
Tue Oct  4 13:02:09 2016: action 8: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 
Tue Oct  4 13:02:09 2016: action 9: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 
Tue Oct  4 13:02:09 2016: action 10: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 
Tue Oct  4 13:02:09 2016: action 11: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 
Tue Oct  4 13:02:09 2016: resource-usage: origin=impstats utime=24242276 stime=15882703 maxrss=125316 minflt=95642 majflt=0 inblock=0 oublock=632 nvcsw=1067580 nivcsw=513 
Tue Oct  4 13:02:09 2016: **main Q[DA]:** origin=core.queue size=0 enqueued=0 full=0 discarded.full=0 discarded.nf=0 **maxqsize=0** 
Tue Oct  4 13:02:09 2016: main Q: origin=core.queue size=0 enqueued=13060 full=0 discarded.full=0 discarded.nf=0 maxqsize=18 

我的配置有问题还是 omkafka 没有可靠的队列?

谢谢!

centos nginx rsyslog kafka
  • 2 个回答
  • 1066 Views

Sidebar

Stats

  • 问题 205573
  • 回答 270741
  • 最佳答案 135370
  • 用户 68524
  • 热门
  • 回答
  • Marko Smith

    新安装后 postgres 的默认超级用户用户名/密码是什么?

    • 5 个回答
  • Marko Smith

    SFTP 使用什么端口?

    • 6 个回答
  • Marko Smith

    命令行列出 Windows Active Directory 组中的用户?

    • 9 个回答
  • Marko Smith

    什么是 Pem 文件,它与其他 OpenSSL 生成的密钥文件格式有何不同?

    • 3 个回答
  • Marko Smith

    如何确定bash变量是否为空?

    • 15 个回答
  • Martin Hope
    Tom Feiner 如何按大小对 du -h 输出进行排序 2009-02-26 05:42:42 +0800 CST
  • Martin Hope
    Noah Goodrich 什么是 Pem 文件,它与其他 OpenSSL 生成的密钥文件格式有何不同? 2009-05-19 18:24:42 +0800 CST
  • Martin Hope
    Brent 如何确定bash变量是否为空? 2009-05-13 09:54:48 +0800 CST
  • Martin Hope
    cletus 您如何找到在 Windows 中打开文件的进程? 2009-05-01 16:47:16 +0800 CST

热门标签

linux nginx windows networking ubuntu domain-name-system amazon-web-services active-directory apache-2.4 ssh

Explore

  • 主页
  • 问题
    • 最新
    • 热门
  • 标签
  • 帮助

Footer

AskOverflow.Dev

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve