From e0d40fa3e251e30730d52b4c9f445a9bbb1ac3f9 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Tue, 24 Nov 2015 16:52:06 -0600 Subject: [PATCH 1/2] Common configs --- .travis.yml | 11 + Gemfile | 2 +- lib/logstash/outputs/elasticsearch_java.rb | 425 ++--------------- .../elasticsearch-template.json | 93 ---- .../outputs/elasticsearch_java/protocol.rb | 429 ++++++++---------- logstash-output-elasticsearch_java.gemspec | 1 + .../outputs/elasticsearch/node_spec.rb | 25 - spec/integration/outputs/index_spec.rb | 6 +- spec/integration/outputs/retry_spec.rb | 38 +- spec/integration/outputs/routing_spec.rb | 3 +- spec/integration/outputs/secure_spec.rb | 8 +- spec/integration/outputs/templates_spec.rb | 24 +- .../outputs/transport_create_spec.rb | 11 +- spec/integration/outputs/update_spec.rb | 11 +- .../outputs/elasticsearch/protocol_spec.rb | 9 +- spec/unit/outputs/elasticsearch_spec.rb | 1 - 16 files changed, 311 insertions(+), 786 deletions(-) create mode 100644 .travis.yml delete mode 100644 lib/logstash/outputs/elasticsearch_java/elasticsearch-template.json diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..8deea25 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,11 @@ +sudo: false +before_install: + - curl -s https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.0.0/elasticsearch-2.0.0.tar.gz > elasticsearch.tar.gz + - tar -xzf elasticsearch.tar.gz + - cd elasticsearch*/ && bin/elasticsearch & + - sleep 10 && curl http://localhost:9200 +language: ruby +cache: bundler +rvm: + - jruby-19mode +script: bundle exec rspec spec && bundle exec rspec spec diff --git a/Gemfile b/Gemfile index fa75df1..cd8aa9e 100644 --- a/Gemfile +++ b/Gemfile @@ -1,3 +1,3 @@ source 'https://rubygems.org' -gemspec +gemspec \ No newline at end of file diff --git a/lib/logstash/outputs/elasticsearch_java.rb b/lib/logstash/outputs/elasticsearch_java.rb index 7d9bec7..04125f4 100644 --- a/lib/logstash/outputs/elasticsearch_java.rb +++ b/lib/logstash/outputs/elasticsearch_java.rb @@ -4,11 +4,11 @@ require "logstash/outputs/base" require "logstash/json" require "concurrent" -require "stud/buffer" require "socket" # for Socket.gethostname require "thread" # for safe queueing require "uri" # for escaping user input require "logstash/outputs/elasticsearch_java/protocol" +require "logstash/outputs/elasticsearch" # This output lets you store logs in Elasticsearch using the native 'node' and 'transport' # protocols. It is highly recommended to use the regular 'logstash-output-elasticsearch' output @@ -67,66 +67,25 @@ class LogStash::Outputs::ElasticSearchJava < LogStash::Outputs::Base attr_reader :client - include Stud::Buffer + include LogStash::Outputs::ElasticSearch::CommonConfigs + include LogStash::Outputs::ElasticSearch::Common + RETRYABLE_CODES = [409, 429, 503] SUCCESS_CODES = [200, 201] config_name "elasticsearch_java" - # The index to write events to. This can be dynamic using the `%{foo}` syntax. - # The default value will partition your indices by day so you can more easily - # delete old data or only search specific date ranges. - # Indexes may not contain uppercase characters. - # For weekly indexes ISO 8601 format is recommended, eg. logstash-%{+xxxx.ww} - config :index, :validate => :string, :default => "logstash-%{+YYYY.MM.dd}" - - # The index type to write events to. Generally you should try to write only - # similar events to the same 'type'. String expansion `%{foo}` works here. - # - # Deprecated in favor of `document_type` field. - config :index_type, :validate => :string, :deprecated => "Please use the 'document_type' setting instead. It has the same effect, but is more appropriately named." - - # The document type to write events to. Generally you should try to write only - # similar events to the same 'type'. String expansion `%{foo}` works here. - # Unless you set 'document_type', the event 'type' will be used if it exists - # otherwise the document type will be assigned the value of 'logs' - config :document_type, :validate => :string - - # Starting in Logstash 1.3 (unless you set option `manage_template` to false) - # a default mapping template for Elasticsearch will be applied, if you do not - # already have one set to match the index pattern defined (default of - # `logstash-%{+YYYY.MM.dd}`), minus any variables. For example, in this case - # the template will be applied to all indices starting with `logstash-*` + # The Elasticsearch action to perform. Valid actions are: # - # If you have dynamic templating (e.g. creating indices based on field names) - # then you should set `manage_template` to false and use the REST API to upload - # your templates manually. - config :manage_template, :validate => :boolean, :default => true - - # This configuration option defines how the template is named inside Elasticsearch. - # Note that if you have used the template management features and subsequently - # change this, you will need to prune the old template manually, e.g. - # - # `curl -XDELETE ` + # - index: indexes a document (an event from Logstash). + # - delete: deletes a document by id (An id is required for this action) + # - create: indexes a document, fails if a document by that id already exists in the index. + # - update: updates a document by id. Update has a special case where you can upsert -- update a + # document if not already present. See the `upsert` option + # - create_unless_exists: create the document unless it already exists, in which case do nothing. # - # where `OldTemplateName` is whatever the former setting was. - config :template_name, :validate => :string, :default => "logstash" - - # You can set the path to your own template here, if you so desire. - # If not set, the included template will be used. - config :template, :validate => :path - - # Overwrite the current template with whatever is configured - # in the `template` and `template_name` directives. - config :template_overwrite, :validate => :boolean, :default => false - - # The document ID for the index. Useful for overwriting existing entries in - # Elasticsearch with the same ID. - config :document_id, :validate => :string - - # A routing override to be applied to all processed events. - # This can be dynamic using the `%{foo}` syntax. - config :routing, :validate => :string + # For more details on actions, check out the http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html[Elasticsearch bulk API documentation] + config :action, :validate => %w(index delete create update create_unless_exists), :default => "index" # The name of your cluster if you set it on the Elasticsearch side. Useful # for discovery when using `node` or `transport` protocols. @@ -134,53 +93,6 @@ class LogStash::Outputs::ElasticSearchJava < LogStash::Outputs::Base # Equivalent to the Elasticsearch option 'cluster.name' config :cluster, :validate => :string - # For the `node` protocol, if you do not specify `host`, it will attempt to use - # multicast discovery to connect to Elasticsearch. If http://www.elastic.co/guide/en/elasticsearch/guide/current/_important_configuration_changes.html#_prefer_unicast_over_multicast[multicast is disabled] in Elasticsearch, - # you must include the hostname or IP address of the host(s) to use for Elasticsearch unicast discovery. - # Remember the `node` protocol uses the http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-transport.html#modules-transport[transport] address (eg. 9300, not 9200). - # `"127.0.0.1"` - # `["127.0.0.1:9300","127.0.0.2:9300"]` - # When setting hosts for `node` protocol, it is important to confirm that at least one non-client - # node is listed in the `host` list. Also keep in mind that the `host` parameter when used with - # the `node` protocol is for *discovery purposes only* (not for load balancing). When multiple hosts - # are specified, it will contact the first host to see if it can use it to discover the cluster. If not, - # then it will contact the second host in the list and so forth. With the `node` protocol, - # Logstash will join the Elasticsearch cluster as a node client (which has a copy of the cluster - # state) and this node client is the one that will automatically handle the load balancing of requests - # across data nodes in the cluster. - # If you are looking for a high availability setup, our recommendation is to use the `transport` protocol (below), - # set up multiple http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-node.html[client nodes] and list the client nodes in the `host` parameter. - # - # For the `transport` protocol, it will load balance requests across the hosts specified in the `host` parameter. - # Remember the `transport` protocol uses the http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-transport.html#modules-transport[transport] address (eg. 9300, not 9200). - # `"127.0.0.1"` - # `["127.0.0.1:9300","127.0.0.2:9300"]` - # There is also a `sniffing` option (see below) that can be used with the transport protocol to instruct it to use the host to sniff for - # "alive" nodes in the cluster and automatically use it as the hosts list (but will skip the dedicated master nodes). - # If you do not use the sniffing option, it is important to exclude http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-node.html[dedicated master nodes] from the `host` list - # to prevent Logstash from sending bulk requests to the master nodes. So this parameter should only reference either data or client nodes. - # - # For the `http` protocol, it will load balance requests across the hosts specified in the `host` parameter. - # Remember the `http` protocol uses the http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html#modules-http[http] address (eg. 9200, not 9300). - # `"127.0.0.1"` - # `["127.0.0.1:9200","127.0.0.2:9200"]` - # It is important to exclude http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-node.html[dedicated master nodes] from the `host` list - # to prevent LS from sending bulk requests to the master nodes. So this parameter should only reference either data or client nodes. - config :hosts, :validate => :array, :default => ["127.0.0.1"] - - # The port for Elasticsearch transport to use. - # - # If you do not set this, the following defaults are used: - # * `protocol => transport` - port 9300-9305 - # * `protocol => node` - port 9300-9305 - config :port, :validate => :string, :default => "9300-9305" - - # The name/address of the host to bind to for Elasticsearch clustering. Equivalent to the Elasticsearch option 'network.host' - # option. - # This MUST be set for either protocol to work (node or transport)! The internal Elasticsearch node - # will bind to this ip. This ip MUST be reachable by all nodes in the Elasticsearch cluster - config :network_host, :validate => :string, :required => true - # This sets the local port to bind to. Equivalent to the Elasticsrearch option 'transport.tcp.port' config :transport_tcp_port, :validate => :number @@ -193,24 +105,6 @@ class LogStash::Outputs::ElasticSearchJava < LogStash::Outputs::Base # By default, this is generated internally by the ES client. config :node_name, :validate => :string - # This plugin uses the bulk index api for improved indexing performance. - # To make efficient bulk api calls, we will buffer a certain number of - # events before flushing that out to Elasticsearch. This setting - # controls how many events will be buffered before sending a batch - # of events. - config :flush_size, :validate => :number, :default => 500 - - # The amount of time since last flush before a flush is forced. - # - # This setting helps ensure slow event rates don't get stuck in Logstash. - # For example, if your `flush_size` is 100, and you have received 10 events, - # and it has been more than `idle_flush_time` seconds since the last flush, - # Logstash will flush those 10 events automatically. - # - # This helps keep both fast and slow log streams moving along in - # near-real-time. - config :idle_flush_time, :validate => :number, :default => 1 - # Choose the protocol used to talk to Elasticsearch. # # The 'node' protocol (default) will connect to the cluster as a normal Elasticsearch @@ -231,134 +125,18 @@ class LogStash::Outputs::ElasticSearchJava < LogStash::Outputs::Base # All protocols will use bulk requests when talking to Elasticsearch. config :protocol, :validate => [ "node", "transport"], :default => "transport" - # The Elasticsearch action to perform. Valid actions are: `index`, `delete`. - # - # Use of this setting *REQUIRES* you also configure the `document_id` setting - # because `delete` actions all require a document id. - # - # What does each action do? - # - # - index: indexes a document (an event from Logstash). - # - delete: deletes a document by id - # - create: indexes a document, fails if a document by that id already exists in the index. - # - update: updates a document by id - # following action is not supported by HTTP protocol - # - create_unless_exists: creates a document, fails if no id is provided - # - # For more details on actions, check out the http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html[Elasticsearch bulk API documentation] - config :action, :validate => :string, :default => "index" - - # Validate the server's certificate - # Disabling this severely compromises security - # For more information read https://www.cs.utexas.edu/~shmat/shmat_ccs12.pdf - config :ssl_certificate_verification, :validate => :boolean, :default => true - - # The .cer or .pem file to validate the server's certificate - config :cacert, :validate => :path - - # The JKS truststore to validate the server's certificate - # Use either `:truststore` or `:cacert` - config :truststore, :validate => :path - - # Set the truststore password - config :truststore_password, :validate => :password - - # The keystore used to present a certificate to the server - # It can be either .jks or .p12 - config :keystore, :validate => :path - - # Set the truststore password - config :keystore_password, :validate => :password - # Enable cluster sniffing (transport only). # Asks host for the list of all cluster nodes and adds them to the hosts list # Equivalent to the Elasticsearch option 'client.transport.sniff' config :sniffing, :validate => :boolean, :default => false - # Set max retry for each event - config :max_retries, :validate => :number, :default => 3 - - # Set retry policy for events that failed to send - config :retry_max_items, :validate => :number, :default => 5000 - - # Set max interval between bulk retries - config :retry_max_interval, :validate => :number, :default => 5 - - # Enable doc_as_upsert for update mode - # create a new document with source if document_id doesn't exists - config :doc_as_upsert, :validate => :boolean, :default => false - - # Set upsert content for update mode - # create a new document with this parameter as json string if document_id doesn't exists - config :upsert, :validate => :string, :default => "" - - public - def register - @submit_mutex = Mutex.new - # retry-specific variables - @retry_flush_mutex = Mutex.new - @retry_teardown_requested = Concurrent::AtomicBoolean.new(false) - # needs flushing when interval - @retry_queue_needs_flushing = ConditionVariable.new - @retry_queue_not_full = ConditionVariable.new - @retry_queue = Queue.new - - - if @protocol =='node' && !@network_host - raise LogStash::ConfigurationError, "network_host MUST be set if the 'node' protocol is in use! If this is set incorrectly Logstash will hang attempting to connect!" - end - - if (@hosts.nil? || @hosts.empty?) && @protocol != "node" # node can use zen discovery - @logger.info("No 'hosts' set in elasticsearch output. Defaulting to localhost") - @hosts = ["localhost"] - end - - client_class = case @protocol - when "transport" - LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::TransportClient - when "node" - LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient - end - - @client = client_class.new(client_options) - - if @manage_template - begin - @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s) - client.template_install(@template_name, get_template, @template_overwrite) - rescue => e - @logger.error("Failed to install template", - :message => e.message, - :error_class => e.class.name, - ) - end - end - - @logger.info("New Elasticsearch output", :cluster => @cluster, - :hosts => @host, :port => @port, :protocol => @protocol) - - buffer_initialize( - :max_items => @flush_size, - :max_interval => @idle_flush_time, - :logger => @logger - ) - - @retry_timer_thread = Thread.new do - loop do - sleep(@retry_max_interval) - @retry_flush_mutex.synchronize { @retry_queue_needs_flushing.signal } - end - end - - @retry_thread = Thread.new do - while @retry_teardown_requested.false? - @retry_flush_mutex.synchronize { @retry_queue_needs_flushing.wait(@retry_flush_mutex) } - retry_flush - end - end - end # def register + # The name/address of the host to bind to for Elasticsearch clustering. Equivalent to the Elasticsearch option 'network.host' + # option. + # This MUST be set for either protocol to work (node or transport)! The internal Elasticsearch node + # will bind to this ip. This ip MUST be reachable by all nodes in the Elasticsearch cluster + config :network_host, :validate => :string, :required => true - def client_options + def build_client client_settings = {} client_settings["cluster.name"] = @cluster if @cluster client_settings["network.host"] = @network_host if @network_host @@ -371,16 +149,10 @@ def client_options client_settings["node.name"] = "logstash-#{Socket.gethostname}-#{$$}-#{object_id}" end - @@plugins.each do |plugin| - name = plugin.name.split('_')[-1] - client_settings.merge!(LogStash::Outputs::ElasticSearchJava.const_get(name.capitalize).create_client_config(self)) - end - - common_options = { + options = { :protocol => @protocol, :client_settings => client_settings, - :hosts => @hosts, - :port => @port + :hosts => @hosts } # Update API setup @@ -388,155 +160,32 @@ def client_options :upsert => @upsert, :doc_as_upsert => @doc_as_upsert } - common_options.merge! update_options if @action == 'update' - - common_options - end - - - public - def get_template - if @template.nil? - @template = ::File.expand_path('elasticsearch_java/elasticsearch-template.json', ::File.dirname(__FILE__)) - if !File.exists?(@template) - raise "You must specify 'template => ...' in your elasticsearch output (I looked for '#{@template}')" - end - end - template_json = IO.read(@template).gsub(/\n/,'') - template = LogStash::Json.load(template_json) - @logger.info("Using mapping template", :template => template) - return template - end # def get_template + options.merge! update_options if @action == 'update' - public - def receive(event) - + options - # block until we have not maxed out our - # retry queue. This is applying back-pressure - # to slow down the receive-rate - @retry_flush_mutex.synchronize { - @retry_queue_not_full.wait(@retry_flush_mutex) while @retry_queue.size > @retry_max_items - } - - event['@metadata']['retry_count'] = 0 - - # Set the 'type' value for the index. - type = if @document_type - event.sprintf(@document_type) - elsif @index_type # deprecated - event.sprintf(@index_type) - else - event["type"] || "logs" - end - - params = { - :_id => @document_id ? event.sprintf(@document_id) : nil, - :_index => event.sprintf(@index), - :_type => type, - :_routing => @routing ? event.sprintf(@routing) : nil - } - - params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @action == 'update' && @upsert != "" - - buffer_receive([event.sprintf(@action), params, event]) - end # def receive - - public - # The submit method can be called from both the - # Stud::Buffer flush thread and from our own retry thread. - def submit(actions) - es_actions = actions.map { |a, doc, event| [a, doc, event.to_hash] } - @submit_mutex.lock - begin - bulk_response = @client.bulk(es_actions) - ensure - @submit_mutex.unlock - end - if bulk_response["errors"] - actions_with_responses = actions.zip(bulk_response['statuses']) - actions_to_retry = [] - actions_with_responses.each do |action, resp_code| - if RETRYABLE_CODES.include?(resp_code) - @logger.warn "retrying failed action with response code: #{resp_code}" - actions_to_retry << action - elsif not SUCCESS_CODES.include?(resp_code) - @logger.warn "failed action with response of #{resp_code}, dropping action: #{action}" - end - end - retry_push(actions_to_retry) unless actions_to_retry.empty? - end + @client = client_class.new(options) end - # When there are exceptions raised upon submission, we raise an exception so that - # Stud::Buffer will retry to flush - public - def flush(actions, teardown = false) - begin - submit(actions) - rescue => e - @logger.error "Got error to send bulk of actions: #{e.message}" - raise e - end - end # def flush - - public def close - @retry_teardown_requested.make_true - # First, make sure retry_timer_thread is stopped - # to ensure we do not signal a retry based on - # the retry interval. - Thread.kill(@retry_timer_thread) - @retry_timer_thread.join - # Signal flushing in the case that #retry_flush is in - # the process of waiting for a signal. - @retry_flush_mutex.synchronize { @retry_queue_needs_flushing.signal } - # Now, #retry_flush is ensured to not be in a state of - # waiting and can be safely joined into the main thread - # for further final execution of an in-process remaining call. - @retry_thread.join - - # execute any final actions along with a proceeding retry for any - # final actions that did not succeed. - buffer_flush(:final => true) - retry_flush + @stopping.make_true + @buffer.stop end - private - # in charge of submitting any actions in @retry_queue that need to be - # retried - # - # This method is not called concurrently. It is only called by @retry_thread - # and once that thread is ended during the teardown process, a final call - # to this method is done upon teardown in the main thread. - def retry_flush() - unless @retry_queue.empty? - buffer = @retry_queue.size.times.map do - next_action, next_doc, next_event = @retry_queue.pop - next_event['@metadata']['retry_count'] += 1 - - if next_event['@metadata']['retry_count'] > @max_retries - @logger.error "too many attempts at sending event. dropping: #{next_event}" - nil - else - [next_action, next_doc, next_event] - end - end.compact - - submit(buffer) unless buffer.empty? + def get_plugin_options + @@plugins.each do |plugin| + name = plugin.name.split('-')[-1] + client_settings.merge!(LogStash::Outputs::ElasticSearchJava.const_get(name.capitalize).create_client_config(self)) end - - @retry_flush_mutex.synchronize { - @retry_queue_not_full.signal if @retry_queue.size < @retry_max_items - } end - private - def retry_push(actions) - Array(actions).each{|action| @retry_queue << action} - @retry_flush_mutex.synchronize { - @retry_queue_needs_flushing.signal if @retry_queue.size >= @retry_max_items - } + def client_class + case @protocol + when "transport" + LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::TransportClient + when "node" + LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient + end end @@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch_java_/ } diff --git a/lib/logstash/outputs/elasticsearch_java/elasticsearch-template.json b/lib/logstash/outputs/elasticsearch_java/elasticsearch-template.json deleted file mode 100644 index 1be0dcc..0000000 --- a/lib/logstash/outputs/elasticsearch_java/elasticsearch-template.json +++ /dev/null @@ -1,93 +0,0 @@ -{ - "template" : "logstash-*", - "settings" : { - "index.refresh_interval" : "5s" - }, - "mappings" : { - "_default_" : { - "_all" : {"enabled" : true, "omit_norms" : true}, - "dynamic_templates" : [ { - "message_field" : { - "match" : "message", - "match_mapping_type" : "string", - "mapping" : { - "type" : "string", "index" : "analyzed", "omit_norms" : true - } - } - }, { - "string_fields" : { - "match" : "*", - "match_mapping_type" : "string", - "mapping" : { - "type" : "string", "index" : "analyzed", "omit_norms" : true, - "fields" : { - "raw" : {"type": "string", "index" : "not_analyzed", "doc_values" : true, "ignore_above" : 256} - } - } - } - }, { - "float_fields" : { - "match" : "*", - "match_mapping_type" : "float", - "mapping" : { "type" : "float", "doc_values" : true } - } - }, { - "double_fields" : { - "match" : "*", - "match_mapping_type" : "double", - "mapping" : { "type" : "double", "doc_values" : true } - } - }, { - "byte_fields" : { - "match" : "*", - "match_mapping_type" : "byte", - "mapping" : { "type" : "byte", "doc_values" : true } - } - }, { - "short_fields" : { - "match" : "*", - "match_mapping_type" : "short", - "mapping" : { "type" : "short", "doc_values" : true } - } - }, { - "integer_fields" : { - "match" : "*", - "match_mapping_type" : "integer", - "mapping" : { "type" : "integer", "doc_values" : true } - } - }, { - "long_fields" : { - "match" : "*", - "match_mapping_type" : "long", - "mapping" : { "type" : "long", "doc_values" : true } - } - }, { - "date_fields" : { - "match" : "*", - "match_mapping_type" : "date", - "mapping" : { "type" : "date", "doc_values" : true } - } - }, { - "geo_point_fields" : { - "match" : "*", - "match_mapping_type" : "geo_point", - "mapping" : { "type" : "geo_point", "doc_values" : true } - } - } ], - "properties" : { - "@timestamp": { "type": "date", "doc_values" : true }, - "@version": { "type": "string", "index": "not_analyzed", "doc_values" : true }, - "geoip" : { - "type" : "object", - "dynamic": true, - "properties" : { - "ip": { "type": "ip", "doc_values" : true }, - "location" : { "type" : "geo_point", "doc_values" : true }, - "latitude" : { "type" : "float", "doc_values" : true }, - "longitude" : { "type" : "float", "doc_values" : true } - } - } - } - } - } -} diff --git a/lib/logstash/outputs/elasticsearch_java/protocol.rb b/lib/logstash/outputs/elasticsearch_java/protocol.rb index 2209e4f..9ec9728 100644 --- a/lib/logstash/outputs/elasticsearch_java/protocol.rb +++ b/lib/logstash/outputs/elasticsearch_java/protocol.rb @@ -3,263 +3,236 @@ require 'logstash-output-elasticsearch_java_jars.rb' require 'logstash/outputs/elasticsearch_java' -module LogStash - module Outputs - module ElasticSearchJavaPlugins - module Protocols - class Base - private - def initialize(options={}) - # host(s), port, cluster - @logger = Cabin::Channel.get - end +module LogStash module Outputs module ElasticSearchJavaPlugins module Protocols + DEFAULT_OPTIONS = { + :port => 9300, + } - def template_install(name, template, force=false) - if template_exists?(name) && !force - @logger.debug("Found existing Elasticsearch template. Skipping template management", :name => name) - return - end - template_put(name, template) - end + class NodeClient + attr_reader :settings, :client_options - # Do a bulk request with the given actions. - # - # 'actions' is expected to be an array of bulk requests as string json - # values. - # - # Each 'action' becomes a single line in the bulk api call. For more - # details on the format of each. - def bulk(actions) - raise NotImplemented, "You must implement this yourself" - # bulk([ - # '{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }', - # '{ "field1" : "value1" }' - #]) - end + CLIENT_MUTEX = Mutex.new - public(:initialize, :template_install) - end + def initialize(options={}) + @logger = Cabin::Channel.get + @client_options = DEFAULT_OPTIONS.merge(options) + create_settings + end + + def client_mutex_synchronize + CLIENT_MUTEX.synchronize { yield } + end - class NodeClient < Base - CLIENT_MUTEX = Mutex.new + def client + client_mutex_synchronize { @@client ||= make_client } + end - def self.get_client(settings) - CLIENT_MUTEX.synchronize { - if @client - @client - else - nodebuilder = org.elasticsearch.node.NodeBuilder.nodeBuilder - @client = nodebuilder.settings(settings.build).node().client() - end - } - end + # For use in test helpers + def self.clear_node_client + client_mutex_synchronize { @@client = nil } + end - def self.clear_client() - CLIENT_MUTEX.synchronize { - @client = null - } - end + def create_settings + @settings = org.elasticsearch.common.settings.Settings.settingsBuilder() + if @client_options[:hosts] + @settings.put("discovery.zen.ping.multicast.enabled", false) + @settings.put("discovery.zen.ping.unicast.hosts", hosts(@client_options)) + end - private + @settings.put("node.client", true) + @settings.put("http.enabled", false) + @settings.put("path.home", Dir.pwd) - DEFAULT_OPTIONS = { - :port => 9300, - } + if @client_options[:client_settings] + @client_options[:client_settings].each do |key, value| + @settings.put(key, value) + end + end - def initialize(options={}) - super - require "java" - @options = DEFAULT_OPTIONS.merge(options) - setup(@options) - end # def initialize + @settings + end - def settings - return @settings + def hosts(options) + # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/ + result = Array.new + if options[:hosts].class == Array + options[:hosts].each do |host| + if host.to_s =~ /^.+:.+$/ + # For host in format: host:port, ignore options[:port] + result << host + else + if options[:port].to_s =~ /^\d+-\d+$/ + # port ranges are 'host[port1-port2]'b + result << Range.new(*options[:port].split("-")).collect { |p| "#{host}:#{p}" } + else + result << "#{host}:#{options[:port]}" + end end - - def client - self.class.get_client(settings) + end + else + if options[:hosts].to_s =~ /^.+:.+$/ + # For host in format: host:port, ignore options[:port] + result << options[:hosts] + else + if options[:port].to_s =~ /^\d+-\d+$/ + # port ranges are 'host[port1-port2]' according to + # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/ + # However, it seems to only query the first port. + # So generate our own list of unicast hosts to scan. + range = Range.new(*options[:port].split("-")) + result << range.collect { |p| "#{options[:hosts]}:#{p}" } + else + result << "#{options[:hosts]}:#{options[:port]}" end + end + end + result.flatten.join(",") + end - def setup(options={}) - @settings = org.elasticsearch.common.settings.Settings.settingsBuilder() - if options[:hosts] - @settings.put("discovery.zen.ping.multicast.enabled", false) - @settings.put("discovery.zen.ping.unicast.hosts", NodeClient.hosts(options)) - end + # Normalizes the Java response to a reasonable approximation of the HTTP datastructure for interop + # with the HTTP code + def normalize_bulk_response(bulk_response) + # TODO(talevy): parse item response objects to retrieve correct 200 (OK) or 201(created) status codes + items = bulk_response.map {|i| + items = bulk_response.map { |i| + if i.is_failed + [[i.get_op_type, {"status" => i.get_failure.get_status.get_status, "message" => i.failureMessage}]] + else + [[i.get_op_type, {"status" => 200, "message" => "OK"}]] + end + } + if bulk_response.has_failures() + {"errors" => true, "items" => items} + else + {"errors" => false} + end + end - @settings.put("node.client", true) - @settings.put("http.enabled", false) - @settings.put("path.home", Dir.pwd) + def make_client + nodebuilder = org.elasticsearch.node.NodeBuilder.nodeBuilder + nodebuilder.settings(settings.build).node().client() + end - if options[:client_settings] - options[:client_settings].each do |key, value| - @settings.put(key, value) - end - end + def bulk(actions) + # Actions an array of [ action, action_metadata, source ] + prep = client.prepareBulk + actions.each do |action, args, source| + prep.add(build_request(action, args, source)) + end + response = prep.execute.actionGet() - return @settings - end + self.normalize_bulk_response(response) + end - def self.hosts(options) - # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/ - result = Array.new - if options[:hosts].class == Array - options[:hosts].each do |host| - if host.to_s =~ /^.+:.+$/ - # For host in format: host:port, ignore options[:port] - result << host - else - if options[:port].to_s =~ /^\d+-\d+$/ - # port ranges are 'host[port1-port2]' - result << Range.new(*options[:port].split("-")).collect { |p| "#{host}:#{p}" } - else - result << "#{host}:#{options[:port]}" - end - end - end - else - if options[:hosts].to_s =~ /^.+:.+$/ - # For host in format: host:port, ignore options[:port] - result << options[:hosts] - else - if options[:port].to_s =~ /^\d+-\d+$/ - # port ranges are 'host[port1-port2]' according to - # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/ - # However, it seems to only query the first port. - # So generate our own list of unicast hosts to scan. - range = Range.new(*options[:port].split("-")) - result << range.collect { |p| "#{options[:hosts]}:#{p}" } - else - result << "#{options[:hosts]}:#{options[:port]}" - end - end - end - result.flatten.join(",") + # def bulk + + def build_request(action, args, source) + case action + when "index" + request = org.elasticsearch.action.index.IndexRequest.new(args[:_index]) + request.id(args[:_id]) if args[:_id] + request.routing(args[:_routing]) if args[:_routing] + request.source(source) + when "delete" + request = org.elasticsearch.action.delete.DeleteRequest.new(args[:_index]) + request.id(args[:_id]) + request.routing(args[:_routing]) if args[:_routing] + when "create" + request = org.elasticsearch.action.index.IndexRequest.new(args[:_index]) + request.id(args[:_id]) if args[:_id] + request.routing(args[:_routing]) if args[:_routing] + request.source(source) + request.opType("create") + when "create_unless_exists" + unless args[:_id].nil? + request = org.elasticsearch.action.index.IndexRequest.new(args[:_index]) + request.id(args[:_id]) + request.routing(args[:_routing]) if args[:_routing] + request.source(source) + request.opType("create") + else + raise(LogStash::ConfigurationError, "Specifying action => 'create_unless_exists' without a document '_id' is not supported.") end - - def self.normalize_bulk_response(bulk_response) - # TODO(talevy): parse item response objects to retrieve correct 200 (OK) or 201(created) status codes - if bulk_response.has_failures() - {"errors" => true, - "statuses" => bulk_response.map { |i| (i.is_failed && i.get_failure.get_status.get_status) || 200 }} + when "update" + unless args[:_id].nil? + request = org.elasticsearch.action.update.UpdateRequest.new(args[:_index], args[:_type], args[:_id]) + request.routing(args[:_routing]) if args[:_routing] + request.doc(source) + if @client_options[:doc_as_upsert] + request.docAsUpsert(true) else - {"errors" => false} + request.upsert(args[:_upsert]) if args[:_upsert] end + else + raise(LogStash::ConfigurationError, "Specifying action => 'update' without a document '_id' is not supported.") end + else + raise(LogStash::ConfigurationError, "action => '#{action_name}' is not currently supported.") + end # case action - def bulk(actions) - # Actions an array of [ action, action_metadata, source ] - prep = client.prepareBulk - actions.each do |action, args, source| - prep.add(build_request(action, args, source)) - end - response = prep.execute.actionGet() - - self.class.normalize_bulk_response(response) - end # def bulk - - def build_request(action, args, source) - case action - when "index" - request = org.elasticsearch.action.index.IndexRequest.new(args[:_index]) - request.id(args[:_id]) if args[:_id] - request.routing(args[:_routing]) if args[:_routing] - request.source(source) - when "delete" - request = org.elasticsearch.action.delete.DeleteRequest.new(args[:_index]) - request.id(args[:_id]) - request.routing(args[:_routing]) if args[:_routing] - when "create" - request = org.elasticsearch.action.index.IndexRequest.new(args[:_index]) - request.id(args[:_id]) if args[:_id] - request.routing(args[:_routing]) if args[:_routing] - request.source(source) - request.opType("create") - when "create_unless_exists" - unless args[:_id].nil? - request = org.elasticsearch.action.index.IndexRequest.new(args[:_index]) - request.id(args[:_id]) - request.routing(args[:_routing]) if args[:_routing] - request.source(source) - request.opType("create") - else - raise(LogStash::ConfigurationError, "Specifying action => 'create_unless_exists' without a document '_id' is not supported.") - end - when "update" - unless args[:_id].nil? - request = org.elasticsearch.action.update.UpdateRequest.new(args[:_index], args[:_type], args[:_id]) - request.routing(args[:_routing]) if args[:_routing] - request.doc(source) - if @options[:doc_as_upsert] - request.docAsUpsert(true) - else - request.upsert(args[:_upsert]) if args[:_upsert] - end - else - raise(LogStash::ConfigurationError, "Specifying action => 'update' without a document '_id' is not supported.") - end - else - raise(LogStash::ConfigurationError, "action => '#{action_name}' is not currently supported.") - end # case action - - request.type(args[:_type]) if args[:_type] - return request - end # def build_request - - def template_exists?(name) - return !client.admin.indices. - prepareGetTemplates(name). - execute(). - actionGet(). - getIndexTemplates(). - isEmpty - end # def template_exists? - - def template_put(name, template) - response = client.admin.indices. - preparePutTemplate(name). - setSource(LogStash::Json.dump(template)). - execute(). - actionGet() + request.type(args[:_type]) if args[:_type] + return request + end - raise "Could not index template!" unless response.isAcknowledged - end # template_put + # def build_request - public(:initialize, :bulk) - end # class NodeClient + def template_exists?(name) + return !client.admin.indices. + prepareGetTemplates(name). + execute(). + actionGet(). + getIndexTemplates(). + isEmpty + end - class TransportClient < NodeClient - def client - return @client if @client - @client = build_client(@options) - return @client - end + def template_install(name, template, force=false) + if template_exists?(name) && !force + @logger.debug("Found existing Elasticsearch template. Skipping template management", :name => name) + return + end + template_put(name, template) + end + def template_put(name, template) + response = client.admin.indices. + preparePutTemplate(name). + setSource(LogStash::Json.dump(template)). + execute(). + actionGet() - private - def build_client(options) - client = org.elasticsearch.client.transport.TransportClient. - builder(). - settings((settings.build)). - build() + raise "Could not index template!" unless response.isAcknowledged + end + end # class NodeClient + + class TransportClient < NodeClient + private + def make_client + client = org.elasticsearch.client.transport.TransportClient. + builder(). + settings((settings.build)). + build() + + client_options[:hosts].each do |host| + matches = host.match /([^:+]+):(\d+)/ + + inet_addr = java.net.InetAddress.getByName(matches[1]) + port = (matches[2] || 9300).to_i + client.addTransportAddress( + org.elasticsearch.common.transport.InetSocketTransportAddress.new( + inet_addr, port + ) + ) + end - options[:hosts].each do |host| - matches = host.match /(.+)(?:.*)/ + return client + end - inet_addr = java.net.InetAddress.getByName(matches[1]) - port = (matches[2] || options[:port]).to_i - client.addTransportAddress( - org.elasticsearch.common.transport.InetSocketTransportAddress.new( - inet_addr, port - ) - ) - end + # We want a separate client per instance for transport + def client + client_mutex_synchronize { @client ||= make_client } + end - return client - end - end - end + def clear_client() + client_mutex_synchronize { @client = nil } end end -end \ No newline at end of file +end end end end \ No newline at end of file diff --git a/logstash-output-elasticsearch_java.gemspec b/logstash-output-elasticsearch_java.gemspec index 7d72fc0..62bf0e2 100644 --- a/logstash-output-elasticsearch_java.gemspec +++ b/logstash-output-elasticsearch_java.gemspec @@ -24,6 +24,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency 'stud', ['>= 0.0.17', '~> 0.0'] s.add_runtime_dependency 'cabin', ['~> 0.6'] s.add_runtime_dependency "logstash-core", ">= 2.0.0.beta2", "< 3.0.0" + s.add_runtime_dependency "logstash-output-elasticsearch", ">= 2.1.4" s.add_development_dependency 'ftw', '~> 0.0.42' s.add_development_dependency 'logstash-input-generator' diff --git a/spec/integration/outputs/elasticsearch/node_spec.rb b/spec/integration/outputs/elasticsearch/node_spec.rb index 2efd081..d9fce1a 100644 --- a/spec/integration/outputs/elasticsearch/node_spec.rb +++ b/spec/integration/outputs/elasticsearch/node_spec.rb @@ -7,30 +7,5 @@ subject { LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient } - it "should support hosts in both string and array" do - # Because we defined *hosts* method in NodeClient as private, - # we use *obj.send :method,[args...]* to call method *hosts* - # Node client should support host in string - # Case 1: default :hosts in string - insist { subject.send :hosts, :hosts => "host",:port => 9300 } == "host:9300" - # Case 2: :port =~ /^\d+_\d+$/ - insist { subject.send :hosts, :hosts => "host",:port => "9300-9302"} == "host:9300,host:9301,host:9302" - # Case 3: :hosts =~ /^.+:.+$/ - insist { subject.send :hosts, :hosts=> "host:9303",:port => 9300 } == "host:9303" - # Case 4: :hosts=~ /^.+:.+$/ and :port =~ /^\d+_\d+$/ - insist { subject.send :hosts, :hosts => "host:9303",:port => "9300-9302"} == "host:9303" - - # Node client should support host in array - # Case 5: :hosts in array with single item - insist { subject.send :hosts, :hosts => ["host"],:port => 9300 } == ("host:9300") - # Case 6: :hostsin array with more than one items - insist { subject.send :hosts, :hosts=> ["host1","host2"],:port => 9300 } == "host1:9300,host2:9300" - # Case 7: :hostsin array with more than one items and :port =~ /^\d+_\d+$/ - insist { subject.send :hosts, :hosts=> ["host1","host2"],:port => "9300-9302" } == "host1:9300,host1:9301,host1:9302,host2:9300,host2:9301,host2:9302" - # Case 8: :hostsin array with more than one items and some :hosts=~ /^.+:.+$/ - insist { subject.send :hosts, :hosts=> ["host1","host2:9303"],:port => 9300 } == "host1:9300,host2:9303" - # Case 9: :hostsin array with more than one items, :port =~ /^\d+_\d+$/ and some :hosts=~ /^.+:.+$/ - insist { subject.send :hosts, :hosts => ["host1","host2:9303"],:port => "9300-9302" } == "host1:9300,host1:9301,host1:9302,host2:9303" - end end diff --git a/spec/integration/outputs/index_spec.rb b/spec/integration/outputs/index_spec.rb index 50b8928..a9f0cc5 100644 --- a/spec/integration/outputs/index_spec.rb +++ b/spec/integration/outputs/index_spec.rb @@ -52,9 +52,8 @@ } output { elasticsearch_java { - hosts => "#{get_host()}" + hosts => "#{get_host()}:#{get_port('transport')}" network_host => "#{get_local_host}" - port => "#{get_port('transport')}" protocol => "transport" index => "#{index}" flush_size => #{flush_size} @@ -78,9 +77,8 @@ } output { elasticsearch_java { - hosts => "#{get_host()}" + hosts => "#{get_host()}:#{get_port('transport')}" network_host => "#{get_local_host}" - port => "#{get_port('transport')}" protocol => "transport" index => "#{index}" flush_size => #{flush_size} diff --git a/spec/integration/outputs/retry_spec.rb b/spec/integration/outputs/retry_spec.rb index 6c2f88c..60a73b8 100644 --- a/spec/integration/outputs/retry_spec.rb +++ b/spec/integration/outputs/retry_spec.rb @@ -12,9 +12,19 @@ let(:max_retries) { 3 } def mock_actions_with_response(*resp) - allow_any_instance_of(LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient).to receive(:bulk).and_return(*resp) - end + expanded_responses = resp.map do |resp| + items = resp["statuses"] && resp["statuses"].map do |status| + {"create" => {"status" => status, "error" => "Error for #{status}"}} + end + + { + "errors" => resp["errors"], + "items" => items + } + end + allow_any_instance_of(LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient).to receive(:bulk).and_return(*expanded_responses) + end subject! do settings = { @@ -23,8 +33,7 @@ def mock_actions_with_response(*resp) "template_overwrite" => true, "network_host" => get_local_host, "protocol" => 'transport', - "hosts" => get_host(), - "port" => get_port('transport'), + "hosts" => "#{get_host()}:#{get_port('transport')}", "retry_max_items" => 10, "retry_max_interval" => 1, "max_retries" => max_retries @@ -46,22 +55,24 @@ def mock_actions_with_response(*resp) subject.register subject.receive(event1) subject.receive(event2) - subject.buffer_flush(:final => true) + subject.flush sleep(2) end - it "should raise exception and be retried by stud::buffer" do + it "retry exceptions within the submit body" do call_count = 0 - expect(subject).to receive(:submit).with([action1]).exactly(3).times do + subject.register + + expect(subject.client).to receive(:bulk).with(anything).exactly(3).times do if (call_count += 1) <= 2 raise "error first two times" else {"errors" => false} end end - subject.register + subject.receive(event1) - subject.close + subject.flush end it "should retry actions with response status of 503" do @@ -77,7 +88,7 @@ def mock_actions_with_response(*resp) subject.receive(event1) subject.receive(event1) subject.receive(event2) - subject.buffer_flush(:final => true) + subject.flush sleep(3) end @@ -87,7 +98,7 @@ def mock_actions_with_response(*resp) expect(subject).to receive(:submit).with([action1]).twice.and_call_original subject.register subject.receive(event1) - subject.buffer_flush(:final => true) + subject.flush sleep(3) end @@ -101,7 +112,7 @@ def mock_actions_with_response(*resp) expect(subject).to receive(:submit).with([action1]).exactly(max_retries+1).times.and_call_original subject.register subject.receive(event1) - subject.buffer_flush(:final => true) + subject.flush sleep(5) end @@ -122,9 +133,8 @@ def mock_actions_with_response(*resp) it "successful requests should not be appended to retry queue" do subject.register subject.receive(event1) - expect(subject).not_to receive(:retry_push) + expect(subject).to receive(:submit).once.and_call_original subject.close - @es.indices.refresh sleep(5) Stud::try(10.times) do diff --git a/spec/integration/outputs/routing_spec.rb b/spec/integration/outputs/routing_spec.rb index 0aa7563..d711223 100644 --- a/spec/integration/outputs/routing_spec.rb +++ b/spec/integration/outputs/routing_spec.rb @@ -46,8 +46,7 @@ } output { elasticsearch_java { - hosts => "#{get_host()}" - port => "#{get_port('transport')}" + hosts => "#{get_host()}:#{get_port('transport')}" protocol => "transport" index => "#{index}" flush_size => #{flush_size} diff --git a/spec/integration/outputs/secure_spec.rb b/spec/integration/outputs/secure_spec.rb index 798146c..1b1fd35 100644 --- a/spec/integration/outputs/secure_spec.rb +++ b/spec/integration/outputs/secure_spec.rb @@ -26,7 +26,7 @@ it "sends events to ES" do expect { subject.receive(LogStash::Event.new("message" => "sample message here")) - subject.buffer_flush(:final => true) + subject.flush }.to_not raise_error end end @@ -51,7 +51,7 @@ it "sends events to ES" do expect { subject.receive(LogStash::Event.new("message" => "sample message here")) - subject.buffer_flush(:final => true) + subject.flush }.to_not raise_error end end @@ -82,7 +82,7 @@ it "sends events to ES" do expect { subject.receive(LogStash::Event.new("message" => "sample message here")) - subject.buffer_flush(:final => true) + subject.flush }.to_not raise_error end end @@ -107,7 +107,7 @@ it "sends events to ES" do expect { subject.receive(LogStash::Event.new("message" => "sample message here")) - subject.buffer_flush(:final => true) + subject.flush }.to_not raise_error end end diff --git a/spec/integration/outputs/templates_spec.rb b/spec/integration/outputs/templates_spec.rb index a8e43c3..b90f47d 100644 --- a/spec/integration/outputs/templates_spec.rb +++ b/spec/integration/outputs/templates_spec.rb @@ -10,8 +10,7 @@ "manage_template" => true, "template_overwrite" => true, "protocol" => protocol, - "hosts" => "#{get_host()}", - "port" => "#{get_port('transport')}", + "hosts" => "#{get_host()}:#{get_port('transport')}", "network_host" => get_local_host } next LogStash::Outputs::ElasticSearchJava.new(settings) @@ -30,14 +29,17 @@ subject.register - subject.receive(LogStash::Event.new("message" => "sample message here")) - subject.receive(LogStash::Event.new("somevalue" => 100)) - subject.receive(LogStash::Event.new("somevalue" => 10)) - subject.receive(LogStash::Event.new("somevalue" => 1)) - subject.receive(LogStash::Event.new("country" => "us")) - subject.receive(LogStash::Event.new("country" => "at")) - subject.receive(LogStash::Event.new("geoip" => { "location" => [ 0.0, 0.0 ] })) - subject.buffer_flush(:final => true) + events = [ + LogStash::Event.new("message" => "sample message here"), + LogStash::Event.new("somevalue" => 100), + LogStash::Event.new("somevalue" => 10), + LogStash::Event.new("somevalue" => 1), + LogStash::Event.new("country" => "us"), + LogStash::Event.new("country" => "at"), + LogStash::Event.new("geoip" => { "location" => [ 0.0, 0.0 ] }) + ] + subject.multi_receive(events) + @es.indices.refresh # Wait or fail until everything's indexed. @@ -85,7 +87,7 @@ end it "should index stopwords like 'at' " do - results = @es.search(:body => { "aggregations" => { "my_agg" => { "terms" => { "field" => "country" } } } })["aggregations"]["my_agg"] + results = @es.search(:body => { "aggregations" => { "my_agg" => { "terms" => { "field" => "country.raw" } } } })["aggregations"]["my_agg"] terms = results["buckets"].collect { |b| b["key"] } insist { terms }.include?("us") diff --git a/spec/integration/outputs/transport_create_spec.rb b/spec/integration/outputs/transport_create_spec.rb index ef4096d..ff73f46 100644 --- a/spec/integration/outputs/transport_create_spec.rb +++ b/spec/integration/outputs/transport_create_spec.rb @@ -10,8 +10,7 @@ def get_es_output(action, id = nil) "index" => "logstash-create", "template_overwrite" => true, "protocol" => "transport", - "hosts" => get_host(), - "port" => get_port('transport'), + "hosts" => "#{get_host()}:#{get_port('transport')}", "action" => action, "network_host" => get_local_host } @@ -39,7 +38,7 @@ def get_es_output(action, id = nil) subject = get_es_output("create", "id123") subject.register subject.receive(LogStash::Event.new("message" => "sample message here")) - subject.buffer_flush(:final => true) + subject.flush @es.indices.refresh # Wait or fail until everything's indexed. Stud::try(3.times) do @@ -52,7 +51,7 @@ def get_es_output(action, id = nil) subject = get_es_output("create") subject.register subject.receive(LogStash::Event.new("message" => "sample message here")) - subject.buffer_flush(:final => true) + subject.flush @es.indices.refresh # Wait or fail until everything's indexed. Stud::try(3.times) do @@ -67,7 +66,7 @@ def get_es_output(action, id = nil) subject = get_es_output("create_unless_exists", "id123") subject.register subject.receive(LogStash::Event.new("message" => "sample message here")) - subject.buffer_flush(:final => true) + subject.flush @es.indices.refresh # Wait or fail until everything's indexed. Stud::try(3.times) do @@ -89,7 +88,7 @@ def get_es_output(action, id = nil) subject.register subject.receive(LogStash::Event.new("message" => "sample message here")) subject.receive(LogStash::Event.new("message" => "sample message here")) # 400 status failure (same id) - subject.buffer_flush(:final => true) + subject.flush @es.indices.refresh # Wait or fail until everything's indexed. Stud::try(3.times) do diff --git a/spec/integration/outputs/update_spec.rb b/spec/integration/outputs/update_spec.rb index 16bfea5..3ac1889 100644 --- a/spec/integration/outputs/update_spec.rb +++ b/spec/integration/outputs/update_spec.rb @@ -10,8 +10,7 @@ def get_es_output( protocol, id = nil, upsert = nil, doc_as_upsert=nil) "index" => "logstash-update", "template_overwrite" => true, "protocol" => protocol, - "hosts" => get_host(), - "port" => get_port(protocol), + "hosts" => "#{get_host()}:#{get_port(protocol)}", "network_host" => get_local_host, "action" => "update" } @@ -52,7 +51,7 @@ def get_es_output( protocol, id = nil, upsert = nil, doc_as_upsert=nil) subject = get_es_output(protocol, "456") subject.register subject.receive(LogStash::Event.new("message" => "sample message here")) - subject.buffer_flush(:final => true) + subject.flush expect {@es.get(:index => 'logstash-update', :type => 'logs', :id => "456", :refresh => true)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) end @@ -60,7 +59,7 @@ def get_es_output( protocol, id = nil, upsert = nil, doc_as_upsert=nil) subject = get_es_output(protocol, "123") subject.register subject.receive(LogStash::Event.new("message" => "updated message here")) - subject.buffer_flush(:final => true) + subject.flush r = @es.get(:index => 'logstash-update', :type => 'logs', :id => "123", :refresh => true) insist { r["_source"]["message"] } == 'updated message here' end @@ -71,7 +70,7 @@ def get_es_output( protocol, id = nil, upsert = nil, doc_as_upsert=nil) subject = get_es_output(protocol, "456", '{"message": "upsert message"}') subject.register subject.receive(LogStash::Event.new("message" => "sample message here")) - subject.buffer_flush(:final => true) + subject.flush r = @es.get(:index => 'logstash-update', :type => 'logs', :id => "456", :refresh => true) insist { r["_source"]["message"] } == 'upsert message' end @@ -80,7 +79,7 @@ def get_es_output( protocol, id = nil, upsert = nil, doc_as_upsert=nil) subject = get_es_output(protocol, "456", nil, true) subject.register subject.receive(LogStash::Event.new("message" => "sample message here")) - subject.buffer_flush(:final => true) + subject.flush r = @es.get(:index => 'logstash-update', :type => 'logs', :id => "456", :refresh => true) insist { r["_source"]["message"] } == 'sample message here' end diff --git a/spec/unit/outputs/elasticsearch/protocol_spec.rb b/spec/unit/outputs/elasticsearch/protocol_spec.rb index c913e80..7089de3 100644 --- a/spec/unit/outputs/elasticsearch/protocol_spec.rb +++ b/spec/unit/outputs/elasticsearch/protocol_spec.rb @@ -3,6 +3,8 @@ require "java" describe LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient do + subject { LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient.new() } + context "successful" do it "should map correctly" do index_response = org.elasticsearch.action.index.IndexResponse.new("my_index", "my_type", "my_id", 123, true) @@ -12,7 +14,7 @@ bulk_item_response_update = org.elasticsearch.action.bulk.BulkItemResponse.new(32, "update", update_response) bulk_item_response_delete = org.elasticsearch.action.bulk.BulkItemResponse.new(32, "delete", delete_response) bulk_response = org.elasticsearch.action.bulk.BulkResponse.new([bulk_item_response_index, bulk_item_response_update, bulk_item_response_delete], 0) - ret = LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient.normalize_bulk_response(bulk_response) + ret = subject.normalize_bulk_response(bulk_response) insist { ret } == {"errors" => false} end end @@ -24,8 +26,9 @@ bulk_item_response_update = org.elasticsearch.action.bulk.BulkItemResponse.new(32, "update", failure) bulk_item_response_delete = org.elasticsearch.action.bulk.BulkItemResponse.new(32, "delete", failure) bulk_response = org.elasticsearch.action.bulk.BulkResponse.new([bulk_item_response_index, bulk_item_response_update, bulk_item_response_delete], 0) - actual = LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient.normalize_bulk_response(bulk_response) - insist { actual } == {"errors" => true, "statuses" => [400, 400, 400]} + actual = subject.normalize_bulk_response(bulk_response) + expect(actual["errors"]).to eql(true) + expect(actual["items"].map {|i| i.first[1]["status"]}).to eql([400,400,400]) end end end \ No newline at end of file diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 05356c4..dcfccb8 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -28,7 +28,6 @@ it "should set host to localhost" do expect(LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::TransportClient).to receive(:new).with({ :hosts => ["127.0.0.1"], - :port => "9300-9305", :protocol => "transport", :client_settings => { "client.transport.sniff" => false, From 9f3a984919ba3f5efdc7f9510d2df49e2b79a268 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Tue, 24 Nov 2015 16:55:05 -0600 Subject: [PATCH 2/2] Pull vendor jars when testing --- .travis.yml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 8deea25..94409c8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,9 +3,10 @@ before_install: - curl -s https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.0.0/elasticsearch-2.0.0.tar.gz > elasticsearch.tar.gz - tar -xzf elasticsearch.tar.gz - cd elasticsearch*/ && bin/elasticsearch & - - sleep 10 && curl http://localhost:9200 + - sleep 10 + - curl http://localhost:9200 language: ruby cache: bundler rvm: - jruby-19mode -script: bundle exec rspec spec && bundle exec rspec spec +script: bundle exec rake vendor && bundle exec rspec spec && bundle exec rspec spec