我有一个 sqlserver2016 数据库,我想使用 logstash 在 elasticsearch 中建立索引。
这是我的 logstash 配置文件,它有点工作:
input {
jdbc {
jdbc_driver_library => "C:\elastic\Microsoft-JDBC-Driver-6.0-for-SQL-Server\sqljdbc_6.0\enu\jre8\sqljdbc42.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://10.11.12.13:1433;databaseName=testdb1;integratedSecurity=false;user=ElasticExtractor;password=flyingweisels;"
jdbc_user => "ElasticExtractor"
jdbc_password => "flyingweisels"
statement => "select top 150000 Item_ID itemid,merchant_id merchantid,modelnumber,language_id from items order by Item_Id desc"
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "testdata"
document_type => "testtype"
document_id => "%{itemid}"
}
}
那么这个文件应该做什么,正如配置的那样,在 elasticSearch 中插入 150k 项。在某种程度上,它只进口了其中的三分之一,例如本例中的 62 382。如果我尝试插入 50k,它只会插入大约 20k。这样做有明显的原因吗?
这是当前的执行日志:
[2017-09-01T08:16:31,923][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2017-09-01T08:16:31,927][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2017-09-01T08:16:32,006][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2017-09-01T08:16:32,007][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-09-01T08:16:32,042][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-09-01T08:16:32,050][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2017-09-01T08:16:32,053][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2017-09-01T08:16:32,219][INFO ][logstash.pipeline ] Pipeline main started
[2017-09-01T08:16:32,313][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2017-09-01T08:16:32,643][INFO ][logstash.inputs.jdbc ] (0.050000s) select top 150000 Item_ID itemid,merchant_id merchantid,modelnumber,language from items order by Item_Id desc
[2017-09-01T08:16:49,805][WARN ][logstash.agent ] stopping pipeline {:id=>"main"}
第二件事是,假设我想从来自此输入的 SQL 服务器插入一行,我可以使用什么插件,以便如果该行具有特定的“merchant_id”,它会进入一个以该 ID 命名的弹性类型。此外,如果它有一种特定的“语言”,它会进入一个以该语言为名称的弹性索引。那可以吗?我是否应该简单地创建多个 Logstash 配置文件,每个任务一个?
所以我弄清楚我做错了什么,有两个不同的问题。
1.插入行数错误
这是由我使用的 SQL 查询引起的。Elastic 需要唯一
document_id
值,而left join
我使用的导致多行返回相同的值。在这种情况下,elastic 所做的是用新值覆盖现有行。2.将行分配给不同的索引和类型
document_id
我曾尝试使用与最终看起来像这样的语法相同的语法:这将导致行被插入到名为 的索引
%{Language}
中,这显然不是预期的结果。问题很简单:ELASTIC 不喜欢大写字母。
所以我的最终配置文件最终看起来像这样:
效果很好!