Logstash dynamically split events
Updating a very old answer because there is a better way to do this in newer versions of logstash without resorting to a custom filter.
You can do this using a ruby filter and a split filter:
filter {
ruby {
code => '
arrayOfEvents = Array.new()
parts = event.get("parts")
timestamp = event.get("timestamp")
parts.each { |part|
arrayOfEvents.push({
"key" => event.get("#{part}.key"),
"value" => event.get("#{part}.value"),
"timestamp" => timestamp
})
event.remove("#{part}.key")
event.remove("#{part}.value")
}
puts arrayOfEvents
event.remove("parts")
event.set("event",arrayOfEvents)
'
}
split {
field => 'event'
}
mutate {
rename => {
"[event][key]" => "key"
"[event][value]" => "value"
"[event][timestamp]" => "timestamp"
}
remove_field => ["event"]
}
}
My original answer was:
You need to resort to a custom filter for this (you can't call yield
from a ruby code filter which is what's needed to generate new events).
Something like this (dropped into lib/logstash/filters/custom_split.rb):
# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
# custom code to break up an event into multiple
class LogStash::Filters::CustomSplit < LogStash::Filters::Base
config_name "custom_split"
milestone 1
public
def register
# Nothing
end # def register
public
def filter(event)
return unless filter?(event)
if event["parts"].is_a?(Array)
event["parts"].each do |key|
e = LogStash::Event.new("timestamp" => event["timestamp"],
"key" => event["#{key}.key"],
"value" => event["#{key}.value"])
yield e
end
event.cancel
end
end
end
And then just put filter { custom_split {} }
into your config file.
For future reference and based on @alcanzar answer, it is now possible to do things like this:
ruby {
code => "
# somefield is an array
array = event.get('somefield')
# drop the current event (this was my use case, I didn't need the feeding event)
event.cancel
# iterate over to construct new events
array.each { |a|
# creates a new logstash event
generated = LogStash::Event.new({ 'foo' => 'something' })
# puts the event in the pipeline queue
new_event_block.call(generated)
}
"
}