AskOverflow.Dev

AskOverflow.Dev Logo AskOverflow.Dev Logo

AskOverflow.Dev Navigation

  • 主页
  • 系统&网络
  • Ubuntu
  • Unix
  • DBA
  • Computer
  • Coding
  • LangChain

Mobile menu

Close
  • 主页
  • 系统&网络
    • 最新
    • 热门
    • 标签
  • Ubuntu
    • 最新
    • 热门
    • 标签
  • Unix
    • 最新
    • 标签
  • DBA
    • 最新
    • 标签
  • Computer
    • 最新
    • 标签
  • Coding
    • 最新
    • 标签
主页 / server / 问题 / 772023
Accepted
guettli
guettli
Asked: 2016-04-22 07:35:27 +0800 CST2016-04-22 07:35:27 +0800 CST 2016-04-22 07:35:27 +0800 CST

Collectd --> Elasticsearch 如果远程主机无法连接到中央弹性搜索

  • 772

目标

中央存储和分析性能数字的方法:

  • CPU负载
  • 内存使用
  • ...

当前策略

我想实现这样的设置:

  1. 收集
  2. 日志存储
  3. 弹性搜索
  4. 木花

就像这里解释的那样: https ://mtalavera.wordpress.com/2015/02/16/monitoring-with-collectd-and-kibana/

问题:远程主机无法推送数据

约束:

  • 我们只有ssh从中央服务器到远程主机。
  • ssh由于网络设置,从远程主机到中央服务器不起作用(遗憾的是我无法更改)。
  • 网络流量跨越多个非公共网络。每月两次无法访问主机,因为管理员使用防火墙规则。我不想松开一条线。这就是为什么我想将日志存储在远程主机上并获取(压缩)数据。

解决方案?

如何每小时获取数据?

networking elasticsearch collectd logstash
  • 2 2 个回答
  • 2093 Views

2 个回答

  • Voted
  1. Best Answer
    GregL
    2016-04-29T04:42:12+08:002016-04-29T04:42:12+08:00

    对于上面列出的问题,您需要在远程端缓冲统计信息,以免丢失任何内容。

    有很多方法可以做到这一点,但没有一种方法过于简单,并且需要进行大量测试以确保它们是可行的。它们都涉及在本地写入collectd's 输出,然后使用某种方法在中央服务器上获取该输出。

    我没有测试以下任何内容,所以有些可能根本不起作用。

    没有特别容易或复杂的顺序:

    1. Socket/Network Output to Script
      WriteCollectd的输出到套接字或 IP/端口,其中 PHP/Perl/Python/Bash 脚本正在侦听将命令写入文件。

      然后,这些文件可以被中央服务器推送/拉取,并由 Logstash 摄取。

      优点:捕获输出的简单脚本;使用的标准 Linux 命令
      缺点:如果您要获取大量统计信息,则无法扩展;需要维护脚本;不确定 LS 是否会处理普通协议

    2. Redis/AMQP/Kafka/MongoDB将输出 写入Collectd可能的“缓冲区”之一。它们各自的工作方式略有不同,并且具有不同的部署选项,因此我将留给您找出最好的方法,因为这超出了这个问题的范围。也就是说,它们中的任何一个都应该工作。

      然后,您需要一种方法将数据从缓冲解决方案返回到中央服务器。应用程序本机复制/镜像/集群或每隔 X 间隔运行以传送数据(在任一端运行)的脚本是两种可能性。

      优点:非常灵活的部署选项;应该可以很好地扩展;使用众所周知的工具/程序
      缺点:缓冲区程序可能需要大量资源,或安装许多软件包

    3. 套接字/网络输出到 Logstash 这与选项 1 几乎相同,但不是将collectd输出输出到脚本/程序,而是将其写入每个远程主机上的本地 Logstash 实例。

      Logstash 然后会在本地写入 CSV/JSON,您可以使用任何方法将这些文件返回到中央服务器,包括 LS 本身。

      优点:用于整个解决方案的单一工具集;提供一种在边缘转换数据的方法,然后集中摄取;移动部件很少 缺点:所有远程主机上都需要 Java/LS

    除了每个选项的优点/缺点之外,所有选项的一个共同缺点是您需要找到一种方法来维护所有服务器上的一致配置。如果您有很多远程节点(或者通常只有很多节点),您可能已经有一个配置管理系统,这将是微不足道的。

    • 1
  2. tacos_tacos_tacos
    2016-04-25T00:32:50+08:002016-04-25T00:32:50+08:00

    编辑:Achtung!警告!

    请使用它docker-compose而不是我链接的那个(它确实需要docker并且compose也许machine但它为你做了更多,你将不得不更少地挣扎。

    CELK:https ://github.com/codenamekt/celk-docker-compose/blob/master/logstash/logstash.conf

    还

    因此,从这里开始对工作系统有一个很好的概述。他们已经为您完成了一些工作,因此您只需担心与配置和部署有关的问题。

    即使你最终没有使用 Docker,这仍然会让你走上成功的道路,并有额外的好处向你展示它是如何组合在一起的。

    首先获取 Vagrant 并构建带有 vagrant 的 Vagrant 映像

    如果你不知道 Vagrant 是什么,那就太好了。这是一个允许人们共享整套虚拟机和配置器的程序,这样您就可以只定义一个虚拟机及其配置,而不是共享整个虚拟机,而且它“正常工作”。感觉很神奇,但它实际上只是可靠的系统工作。

    • (使用上面的 CELK 链接):https ://github.com/pblittle/docker-logstash

    你需要安装 Vagrant 才能使用它。去做就对了!然后,您不必安装docker,因为它将在 Vagrant VM 上运行。

    您有四种使用方式的选择,但首先,使用粗体命令准备 Vagrant....

    vagrant up


    确定您需要运行哪些程序

    您的选择是:

    • 全套或 ELK(elasticsearch、logstash、kibana)
    • 仅代理(Logstash 收集器)
    • 仅限 Kibana

    还有其他配置可用,但仅用于测试。


    开演时间

    现在是配置 Logstash 的时候了,这确实是唯一具有复杂行为的部分。

    Logstash 配置文件是以纯文本文件结尾,conf并且可以选择使用 atar或 gunzip组合在一起gz。

    您可以通过以下两种方式之一获取配置文件:

    • 您从 Internet 下载它们,使用环境变量LOGSTASH_CONFIG_URL指向您的配置的 url,并且 **如果您的 url 错误或有问题并且无法从 url 获取配置,它会退回到 knonw网址或其他
    • 从磁盘读取它们,有点——因为这是 docker,你实际上将创建一个卷(现在),每次运行容器时都会挂载该卷。

    以下是使用 Internet 上的配置运行时的样子:

    $ docker run -d \
      -e LOGSTASH_CONFIG_URL=https://secretlogstashesstash.com/myconfig.tar.gz \
      -p 9292:9292 \
      -p 9200:9200 \
      pblittle/docker-logstash
    

    作者docker 警告你:

    默认的 logstash.conf 只监听标准输入和文件输入。如果您希望配置 tcp 和/或 udp 输入,请使用您自己的 logstash 配置文件并自己公开端口。有关配置语法和更多信息,请参阅 logstash 文档。

    注意:默认的 logstash conf是什么?

    回想一下,当您没有为所需的环境变量输入正确的 URL 时,您会得到该文件LOGSTASH_CONFIG_URL

    这是输入部分:

    // As the author warned, this is all you get. StdIn and Syslog file inputs.
    
    input {
      stdin {
        type => "stdin-type"
      }
    
      file {
        type => "syslog"
        path => [ "/var/log/*.log", "/var/log/messages", "/var/log/syslog" ]
      }
    
      file {
        type => "logstash"
        path => [ "/var/log/logstash/logstash.log" ]
        start_position => "beginning"
      }
    }
    

    超越默认

    logstash在网站上阅读更多信息。

    现在logstash有将数据推送到input. 插件完全按照您的预期变化;这里有一些:

    • s3来自亚马逊(文件系统事件)
    • stdin从logstash(默认,读取stdin缓冲区)
    • http从logstash(你的猜测)
    • ...ETC...

    示例:UDP 套接字

    UDP是一种无连接的快速协议,运行在L4(传输)底层,支持多路复用,处理故障,一般是记录数据传输的不错选择。

    你选择你想要的端口;其他选项取决于您在做什么。

    TCP 的工作方式相同。

    udp { 端口 => 9999 编解码器 => json buffer_size => 1452 }

    collectd示例 2:来自过滤和输出的 UDP 套接字

    This is stolen from https://github.com/codenamekt/celk-docker-compose/blob/master/logstash/logstash.conf
    
    input {
      udp {
        port => 25826         # 25826 matches port specified in collectd.conf
        buffer_size => 1452   # 1452 is the default buffer size for Collectd
        codec => collectd { } # specific Collectd codec to invoke
        type => collectd
      }
    }
    output {
      elasticsearch {
        host => elasticsearch
        cluster  => logstash
        protocol => http
      }
    }
    

    过滤就是一个很好的例子: 也就是说,它真的很长,我认为它确实有用

    filter {
      # TEST implementation of parse for collectd
      if [type] == "collectd" {
        if [plugin] {
          mutate {
            rename => { "plugin" => "collectd_plugin" }
          }
        }
        if [plugin_instance] {
          mutate {
            rename => { "plugin_instance" => "collectd_plugin_instance" }
          }
        }
        if [type_instance] {
          mutate {
            rename => { "type_instance" => "collectd_type_instance" }
          }
        }
        if [value] {
          mutate {
            rename => { "value" => "collectd_value" }
          }
          mutate {
            convert => { "collectd_value" => "float" }
          }
        }
        if [collectd_plugin] == "interface" {
          mutate {
            add_field => {
              "collectd_value_instance" => "rx"
              "collectd_value" => "%{rx}"
            }
          }
          mutate {
            convert => {
              "tx" => "float"
              "collectd_value" => "float"
            }
          }
          # force clone for kibana3
          clone {
            clones => [ "tx" ]
          }
          ##### BUG EXISTS : AFTER clone 'if [type] == "foo"' NOT WORKING : ruby code is working #####
          ruby {
            code => "
              if event['type'] == 'tx'
                event['collectd_value_instance'] = 'tx'
                event['collectd_value'] = event['tx']
              end
            "
          }
          mutate {
            replace => { "_type" => "collectd" }
            replace => { "type" => "collectd" }
            remove_field => [ "rx", "tx" ]
          }
        }
        if [collectd_plugin] == "disk" {
          mutate {
            add_field => {
              "collectd_value_instance" => "read"
              "collectd_value" => "%{read}"
            }
          }
          mutate {
            convert => {
              "write" => "float"
              "collectd_value" => "float"
            }
          }
          # force clone for kibana3
          clone {
            clones => [ "write" ]
          }
          ##### BUG EXISTS : AFTER clone 'if [type] == "foo"' NOT WORKING : ruby code is working #####
          ruby {
            code => "
              if event['type'] == 'write'
                 event['collectd_value_instance'] = 'write'
                 event['collectd_value'] = event['write']
              end
            "
          }
          mutate {
            replace => { "_type" => "collectd" }
            replace => { "type" => "collectd" }
            remove_field => [ "read", "write" ]
          }
        }
        if [collectd_plugin] == "df" {
          mutate {
            add_field => {
              "collectd_value_instance" => "free"
              "collectd_value" => "%{free}"
            }
          }
          mutate {
            convert => {
              "used" => "float"
              "collectd_value" => "float"
            }
          }
          # force clone for kibana3
          clone {
            clones => [ "used" ]
          }
          ##### BUG EXISTS : AFTER clone 'if [type] == "foo"' NOT WORKING : ruby code is working  #####
          ruby {
            code => "
              if event['type'] == 'used'
                event['collectd_value_instance'] = 'used'
                event['collectd_value'] = event['used']
              end
            "
          }
          mutate {
            replace => { "_type" => "collectd" }
            replace => { "type" => "collectd" }
            remove_field => [ "used", "free" ]
          }
        }
        if [collectd_plugin] == "load" {
          mutate {
            add_field => {
              "collectd_value_instance" => "shortterm"
              "collectd_value" => "%{shortterm}"
            }
          }
          mutate {
            convert => {
              "longterm" => "float"
              "midterm" => "float"
              "collectd_value" => "float"
            }
          }
          # force clone for kibana3
          clone {
            clones => [ "longterm", "midterm" ]
          }
          ##### BUG EXISTS : AFTER clone 'if [type] == "foo"' NOT WORKING : ruby code is working #####
          ruby {
            code => "
              if event['type'] != 'collectd'
                event['collectd_value_instance'] = event['type']
                event['collectd_value'] = event[event['type']]
              end
            "
          }
          mutate {
            replace => { "_type" => "collectd" }
            replace => { "type" => "collectd" }
            remove_field => [ "longterm", "midterm", "shortterm" ]
          }
        }
      }
    }
    

    编辑3:我可能不应该为你做你的工作,但没关系。

    collectd像任何优秀的软件一样,封装了某些丑陋或难以让用户处理的方面,并试图让你更容易,因为它看起来就像你正在发送数据(在这种情况下是一个元组)而不是用序列化来愚弄。

    你的例子:

    (date_time, current_cpu_load), for example ('2016-0-04-24 11:09:12', 12.3)

    我不会花时间弄清楚你是如何形成它的。如果您能够使用 CPU 插件获取该数据,那就太好了。我将复制并粘贴我在网上找到的一个,以方便我使用。

    就是说,想一想……只要一点点,就不会痛了。

    您会看到下面加载了 CPU 插件。

    您看到文件中的接口collectd太小conf而无法指定字段。

    因此,如果您只是这样做,它会起作用,但是您将获得比 CPU 负载更多的数据。

    这就是您可以使用过滤器的地方。但我认为你也可以在 Kibana 中做到这一点。所以我宁愿不要浪费时间写一个你a)不需要的过滤器,b)如果你花一些时间就可以轻松编写。

    ## In `collectd`:
    # For each instance where collectd is running, we define 
    # hostname proper to that instance. When metrics from
    # multiple instances are aggregated, hostname will tell 
    # us were they came from.
    Hostname "**YOUR_HOSTNAME**"
    
    
        # Fully qualified domain name, false for our little lab
        FQDNLookup false
    
        # Plugins we are going to use with their configurations,
        # if needed
        LoadPlugin cpu
    
        LoadPlugin df
        <Plugin df>
                Device "/dev/sda1"
                MountPoint "/"
                FSType "ext4"
                ReportReserved "true"
        </Plugin>
    
        LoadPlugin interface
        <Plugin interface>
                Interface "eth0"
                IgnoreSelected false
        </Plugin>
    
        LoadPlugin network
        <Plugin network>
                Server "**YOUR.HOST.IP.ADDR**" "**PORTNUMBER**"
        </Plugin>
    
        LoadPlugin memory
    
        LoadPlugin syslog
        <Plugin syslog>
                LogLevel info
        </Plugin>
    
        LoadPlugin swap
    
        <Include "/etc/collectd/collectd.conf.d">
                Filter ".conf"
        </Include>
    

    你的logstash配置

        input {
          udp {
            port => **PORTNUMBER**         # 25826 matches port specified in collectd.conf
            buffer_size => **1452**   **# 1452 is the default buffer size for Collectd**
            codec => collectd { } # specific Collectd codec to invoke
            type => collectd 
          }
        }
        output {
          elasticsearch {
            cluster  => **ELASTICSEARCH_CLUSTER_NAME** # this matches out elasticsearch cluster.name
            protocol => http
          }
        }
    

    来自 collectd 的 aaron 的更新

    在 Logstash 1.3.x 中,我们引入了 collectd 输入插件。太棒了!我们可以在 Logstash 中处理指标,将它们存储在 Elasticsearch 中并使用 Kibana 查看它们。唯一的缺点是您每秒只能通过插件获得大约 3100 个事件。在 Logstash 1.4.0 中,我们引入了一个新改进的 UDP 输入插件,它是多线程的并且有一个队列。我将 collectd 输入插件重构为一个编解码器(在我的同事和社区的帮助下),以利用这一巨大的性能提升。现在我的双核 Macbook Air 上只有 3 个线程,我可以通过 collectd 编解码器每秒处理超过 45,000 个事件!

    因此,我想提供一些快速示例,您可以使用这些示例来更改插件配置以使用编解码器。

    旧方式:input { collectd {} } 新方式:

    input {   udp {
    port => 25826         # Must be specified. 25826 is the default for collectd
    buffer_size => 1452   # Should be specified. 1452 is the default for recent versions of collectd
    codec => collectd { } # This will invoke the default options for the codec
    type => "collectd"   } } This new configuration will use 2 threads and a queue size of 2000 by default for the UDP input plugin. With
    this you should easily be able to break 30,000 events per second!
    

    我提供了一些其他配置示例的要点。有关更多信息,请查看 collectd 编解码器的 Logstash 文档。

    快乐登录!

    • 0

相关问题

  • 谁能指出我的 802.11n 范围扩展器?

  • 我怎样才能得到一个网站的IP地址?

  • 在一个 LAN 中使用两台 DHCP 服务器

  • 如何在 Linux 下监控每个进程的网络 I/O 使用情况?

  • 为本地网络中的名称解析添加自定义 dns 条目

Sidebar

Stats

  • 问题 205573
  • 回答 270741
  • 最佳答案 135370
  • 用户 68524
  • 热门
  • 回答
  • Marko Smith

    新安装后 postgres 的默认超级用户用户名/密码是什么?

    • 5 个回答
  • Marko Smith

    SFTP 使用什么端口?

    • 6 个回答
  • Marko Smith

    命令行列出 Windows Active Directory 组中的用户?

    • 9 个回答
  • Marko Smith

    什么是 Pem 文件,它与其他 OpenSSL 生成的密钥文件格式有何不同?

    • 3 个回答
  • Marko Smith

    如何确定bash变量是否为空?

    • 15 个回答
  • Martin Hope
    Tom Feiner 如何按大小对 du -h 输出进行排序 2009-02-26 05:42:42 +0800 CST
  • Martin Hope
    Noah Goodrich 什么是 Pem 文件,它与其他 OpenSSL 生成的密钥文件格式有何不同? 2009-05-19 18:24:42 +0800 CST
  • Martin Hope
    Brent 如何确定bash变量是否为空? 2009-05-13 09:54:48 +0800 CST
  • Martin Hope
    cletus 您如何找到在 Windows 中打开文件的进程? 2009-05-01 16:47:16 +0800 CST

热门标签

linux nginx windows networking ubuntu domain-name-system amazon-web-services active-directory apache-2.4 ssh

Explore

  • 主页
  • 问题
    • 最新
    • 热门
  • 标签
  • 帮助

Footer

AskOverflow.Dev

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

Language

  • Pt
  • Server
  • Unix

© 2023 AskOverflow.DEV All Rights Reserve