Logstash dynamically split events

Updating a very old answer because there is a better way to do this in newer versions of logstash without resorting to a custom filter.

You can do this using a ruby filter and a split filter:

filter {
    ruby {
        code => '
            arrayOfEvents = Array.new()
            parts = event.get("parts")
            timestamp = event.get("timestamp")
            parts.each { |part|
                arrayOfEvents.push({ 
                    "key" => event.get("#{part}.key"), 
                    "value" => event.get("#{part}.value"),
                    "timestamp" => timestamp
                })
                event.remove("#{part}.key")
                event.remove("#{part}.value")
            }
            puts arrayOfEvents

            event.remove("parts")
            event.set("event",arrayOfEvents)
        '
    }
    split {
        field => 'event'
    }
    mutate {
        rename => {
            "[event][key]" => "key"
            "[event][value]" => "value"
            "[event][timestamp]" => "timestamp"
        }
        remove_field => ["event"]
    }
}

My original answer was:

You need to resort to a custom filter for this (you can't call yield from a ruby code filter which is what's needed to generate new events).

Something like this (dropped into lib/logstash/filters/custom_split.rb):

# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"

# custom code to break up an event into multiple
class LogStash::Filters::CustomSplit < LogStash::Filters::Base
  config_name "custom_split"
  milestone 1

  public
  def register
    # Nothing
  end # def register

  public
  def filter(event)
    return unless filter?(event)
    if event["parts"].is_a?(Array)
      event["parts"].each do |key|
        e =  LogStash::Event.new("timestamp" => event["timestamp"],
          "key" => event["#{key}.key"],
          "value" => event["#{key}.value"])
        yield e
      end
      event.cancel
    end
  end
end

And then just put filter { custom_split {} } into your config file.


For future reference and based on @alcanzar answer, it is now possible to do things like this:

  ruby {
    code => "
      # somefield is an array
      array = event.get('somefield')

      # drop the current event (this was my use case, I didn't need the feeding event)
      event.cancel

      # iterate over to construct new events
      array.each { |a|

        # creates a new logstash event
        generated = LogStash::Event.new({ 'foo' => 'something' })

        # puts the event in the pipeline queue
        new_event_block.call(generated)
      }
    "
  }