Eu tenho um banco de dados sqlserver2016 que desejo indexar no elasticsearch usando o logstash.
Aqui está o meu arquivo de configuração do logstash que funciona um pouco:
input {
jdbc {
jdbc_driver_library => "C:\elastic\Microsoft-JDBC-Driver-6.0-for-SQL-Server\sqljdbc_6.0\enu\jre8\sqljdbc42.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://10.11.12.13:1433;databaseName=testdb1;integratedSecurity=false;user=ElasticExtractor;password=flyingweisels;"
jdbc_user => "ElasticExtractor"
jdbc_password => "flyingweisels"
statement => "select top 150000 Item_ID itemid,merchant_id merchantid,modelnumber,language_id from items order by Item_Id desc"
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "testdata"
document_type => "testtype"
document_id => "%{itemid}"
}
}
Então, o que esse arquivo deve fazer, conforme configurado, é inserir 150 mil itens no elasticSearch. Um pouco importa apenas cerca de um terço disso, como 62 382 neste caso. Se eu tentar inserir 50k ele só insere cerca de 20k. Existe uma razão óbvia para fazer isso?
Aqui está o log de execução atual:
[2017-09-01T08:16:31,923][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2017-09-01T08:16:31,927][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2017-09-01T08:16:32,006][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2017-09-01T08:16:32,007][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-09-01T08:16:32,042][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-09-01T08:16:32,050][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2017-09-01T08:16:32,053][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2017-09-01T08:16:32,219][INFO ][logstash.pipeline ] Pipeline main started
[2017-09-01T08:16:32,313][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2017-09-01T08:16:32,643][INFO ][logstash.inputs.jdbc ] (0.050000s) select top 150000 Item_ID itemid,merchant_id merchantid,modelnumber,language from items order by Item_Id desc
[2017-09-01T08:16:49,805][WARN ][logstash.agent ] stopping pipeline {:id=>"main"}
A segunda coisa é, digamos que eu queira inserir uma linha do servidor SQL que vem dessa entrada, qual plugin posso usar para que se a linha tiver um “merchant_id” específico ela vá em um TYPE elástico nomeado com esse ID. Além disso, se tiver um “idioma” específico, ele vai em um ÍNDICE elástico com esse idioma como nome. Isso pode ser feito? Devo simplesmente criar vários arquivos de configuração do Logstash, um para cada uma dessas tarefas?
Então eu descobri o que estava fazendo de errado, havia 2 problemas diferentes.
1. Contagem incorreta de linhas inseridas
Isso foi causado pela consulta SQL que eu estava usando. Elastic quer
document_id
valores únicos e oleft join
que eu estava usando fez com que várias linhas fossem retornadas com os mesmos valores. O que o elastic faz nesse caso é substituir a linha existente pelos novos valores.2. Atribua linhas a diferentes índices e tipos
Eu tentei usar a mesma sintaxe que no
document_id
que acabou ficando assim:Isso resultaria em linhas sendo inseridas em um índice chamado
%{Language}
que obviamente não era o resultado desejado.O problema era simples: ELÁSTICO NÃO GOSTA DE LETRAS MAIÚSCULAS.
Então meu arquivo de configuração final ficou assim:
Funciona muito bem!