Using Logstash to load CSV data into Elasticsearch

Do you have a brand new Elasticsearch instance, but all your useful data you’d like to search lives in a CSV file? No problem. Logstash makes turning almost any data into something easily searchable in an Elasticsearch index.

To start with, you need some data (and a unix-like environment to use these examples – Windows works fine with some minor adjustments); In my case, I wanted to take an export of the data from my Davis Vantage Pro2 weather station, in CSV format, and create a new index with it.

I started out with a few million lines that looked like this, stored in a local file:

$ head -3 /home/erik/weather.csv

Don’t have any interesting data already? Here are a few fun sources:

Ok, so we have data, let’s get started. First, make sure you have a version of Java installed:

$ java -version
openjdk version "1.8.0_51"

Pretty much any JVM is fine for this – OpenJDK, Oracle, etc.

$ curl -O https://download.elastic.co/logstash/logstash/logstash-1.5.4.tar.gz
$ tar xfz logstash-1.5.4.tar.gz
$ cd logstash-1.5.4
$ mkdir conf

Now it’s time to build a configuration file.

First, we’ll define an ‘input’ section where we tell Logstash where to find the data.

input {
    file {
        path => "/home/erik/weather.csv"
        start_position => beginning


This just tells it where to look, and that we want to load from the beginning of the file.

Next we need a filter – Logstash has loads of filter plugins available by default. We’re going to use a couple different ones to parse our data. So far, Logstash doesn’t really know anything about the data in the file – we need to tell it the format, and any other specifics on how to handle various fields.

filter {
    csv {
        columns => [
        separator => ","
        remove_field => ["message"]
    date {
        match => ["time", "ISO8601"]
    mutate {
        convert => ["TempOut", "float"]

I left the real columns in that correspond to my data; They should be pretty self explanatory, but there are a couple important pieces. First, I tell it to remove the ‘message’ field – which is an entry containing the entire row; I won’t need it, since I’m searching on specific attributes. Second, I tell it that the ‘time’ field contains an ISO8601-formatted date, so that Elasticsearch knows not to treat it as a plain string. Finally, I use the mutate function to convert the ‘TempOut’ value into a floating point number.

Ok, so now we’re set up to ingest the data, and parse it – but now we need to store it in Elasticsearch:

output {
    elasticsearch {
        protocol => "https"
        host => ["iad1-20999-0.es.objectrocket.com:20999"]
        user => "erik"
        password => "mysupersecretpassword"
        action => "index"
        index => "eriks_weather_index"
    stdout { }

This should be pretty self-explanatory too – just configure for your host/port, authentication data, and the name of the index you’d like to store it in.

Ok, let’s fire it up! If it’s working, it should look something like this:

$ bin/logstash -f conf/logstash.conf -v
Logstash startup completed

Did it work? Let’s ask Elasticsearch:

$ curl -u erik:mysupersecretpassword 'https://iad1-20999-0.es.objectrocket.com:20999/_cat/indices?v'
health status index               pri rep docs.count store.size pri.store.size
green  open   eriks_weather_index 5   1   294854     95.8mb     48.5mb

Looks like my documents are there! Let’s query for one:

$ curl -u erik:mysupersecretpassword 'https://iad1-20999-0.es.objectrocket.com:20999/eriks_weather_index/_search?q=TempOut:>75&pretty&terminate_after=1'

This tells Elasticsearch to find documents with TempOut greater than 75 (Tempout:>75), to format it for human consumption (pretty), and to return no more than one result per shard (terminate_after=1). It should return something like this:

  "took" : 4,
  "timed_out" : false,
  "terminated_early" : true,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  "hits" : {
    "total" : 5,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "eriks_weather_index",
      "_type" : "logs",
      "_id" : "AU-yXZJIJb3HnhKvpdNC",
      "_score" : 1.0,
    } ]

Success. Logstash is a great Swiss army knife for turning any data you have laying around into something you can easily play with in Elasticsearch – have fun!

Exit mobile version