how to

Parsing csv files with Filebeat and Elasticsearch Ingest Pipelines

By April 6, 2017 August 18th, 2022 No Comments

One of the coolest new features in Elasticsearch 5 is the ingest node, which adds some Logstash-style processing to the Elasticsearch cluster, so data can be transformed before being indexed without needing another service and/or infrastructure to do it. A while back, we posted a quick blog on how to parse csv files with Logstash, so I’d like to provide the ingest pipeline version of that for comparison’s sake.

What we’ll show here is an example using Filebeat to ship data to an ingest pipeline, index it, and visualize it with Kibana.

The Data

There are tons of great sources out there for free data, but since most of us at ObjectRocket are in Austin, TX, we’re going to use some data from data.austintexas.gov. The restaurant inspection data set is a good size data set that has enough relevant information to give us a real world example.

Below are a few lines from this data set to give you an idea of the structure of the data:

Restaurant Name,Zip Code,Inspection Date,Score,Address,Facility ID,Process Description
Westminster Manor,78731,07/21/2015,96,"4100 JACKSON AVE
AUSTIN, TX 78731
(30.314499, -97.755166)",2800365,Routine Inspection
Wieland Elementary,78660,10/02/2014,100,"900 TUDOR HOUSE RD
AUSTIN, TX 78660
(30.422862, -97.640183)",10051637,Routine Inspection

DOH… This isn’t going to be a nice, friendly, single line per entry case, but that’s fine. As you’re about to see, Filebeat has some built-in ability to handle multiline entries and work around the newlines buried in the data.

Editorial Note: I was planning on a nice simple example with few “hitches”, but in the end, I thought it may be interesting to see some of the tools that the Elastic Stack gives you to work around these scenarios.

Setting up Filebeat

The first step is to get Filebeat ready to start shipping data to your Elasticsearch cluster. Once you’ve got Filebeat downloaded (try to use the same version as your ES cluster) and extracted, it’s extremely simple to set up via the included filebeat.yml configuration file. For our scenario, here’s the configuration that I’m using.

filebeat.prospectors:
- input_type: log
  paths:
    - /Path/To/logs/*.csv

  # Ignore the first line with column headings
  exclude_lines: ["^Restaurant Name,"]

  # Identifies the last two columns as the end of an entry and then prepends the previous lines to it
  multiline.pattern: ',\d+,[^\",]+$'
  multiline.negate: true
  multiline.match: before

#================================ Outputs =====================================

output.elasticsearch:
  # Array of hosts to connect to.
  hosts: ["https://dfw-xxxxx-0.es.objectrocket.com:xxxxx", "https://dfw-xxxxx-1.es.objectrocket.com:xxxxx", "https://dfw-xxxxx-2.es.objectrocket.com:xxxxx", "https://dfw-xxxxx-3.es.objectrocket.com:xxxxx"]
  pipeline: "inspectioncsvs"

  # Optional protocol and basic auth credentials.
  username: "esuser"
  password: "supersecretpassword"

Everything’s pretty straightforward here; you’ve got a section to specify where and how to grab the input files and a section to specify where to ship the data. The only parts I’ll call out specifically are the multiline bit and the Elasticsearch configuration piece.

Since the formatting for this data set is not super strict, with inconsistent use of double quotes and a number of newlines sprinkled in, the best option was to look for the end of an entry, which consists of a numeric ID followed by an inspection type without much variation or double-quotes/newlines. From there, Filebeat will just queue up any unmatched lines and prepend them to the final line that matches the pattern. If your data is cleaner and sticks to a simple line per entry format, you can pretty much ignore the multiline settings.

Looking at the Elasticsearch output section, it’s the standard Elasticsearch settings with a small addition of the name of the pipeline that you’d like to use with the pipeline: directive. If you’re on the ObjectRocket service, you can just grab the output snippet from the “Connect” tab in the UI, which will come prepopulated with all of the right hosts, and just add the pipeline line and fill in your user and password. Also, make sure you’ve got your system’s IP added to your cluster’s ACLs if you haven’t already done so.

Creating the Ingest Pipeline

Now that we have the input data and Filebeat ready to go, we can create and tweak our ingest pipeline. The main tasks the pipeline needs to perform are:

  • Split the csv content into the correct fields
  • Convert the inspection score to an integer
  • Set the @timestamp field
  • Clean up some other data formatting

Here’s a pipeline that can do all of that:

PUT _ingest/pipeline/inspectioncsvs
{
  "description" : "Convert Restaurant inspections csv data to indexed data",
  "processors" : [
    {
      "grok": {
        "field": "message",
        "patterns": ["%{REST_NAME:RestaurantName},%{REST_ZIP:ZipCode},%{MONTHNUM2:InspectionMonth}/%{MONTHDAY:InspectionDay}/%{YEAR:InspectionYear},%{NUMBER:Score},\"%{DATA:StreetAddress}\n%{DATA:City},?\\s+%{WORD:State}\\s*%{NUMBER:ZipCode2}\\s*\n\\(?%{DATA:Location}\\)?\",%{NUMBER:FacilityID},%{DATA:InspectionType}$"],
        "pattern_definitions": {
          "REST_NAME": "%{DATA}|%{QUOTEDSTRING}",
          "REST_ZIP": "%{QUOTEDSTRING}|%{NUMBER}"
        }
      }
    },
    {
      "grok": {
        "field": "ZipCode",
        "patterns": [".*%{ZIP:ZipCode}\"?$"],
        "pattern_definitions": {
          "ZIP": "\\d{5}"
        }
      }
    },
    {
      "convert": {
        "field" : "Score",
        "type": "integer"
      }
    },
    {
      "set": {
        "field" : "@timestamp",
        "value" : "//"
      }
    },
    {
      "date" : {
        "field" : "@timestamp",
        "formats" : ["yyyy/MM/dd"]
      }
    }
  ],
  "on_failure" : [
    {
      "set" : {
        "field" : "error",
        "value" : " - Error processing message - "
      }
    }
  ]
}

Unlike Logstash, the ingest pipeline does not (at the time of this writing) have a csv processor/plugin, so you’ll need to transform csv’s yourself. I used a grok processor to do the heavy lifting, since each row only had a few columns. For data with many more columns, the grok processor can get pretty hairy, so another option is to use the split processor and some painless scripting to process the line in a more iterative fashion. You also may notice the second grok processor, which is just there to deal with two different ways that zip codes were entered in this data set.

For debug purposes, I included a general on_failure section which will catch all errors and print out which type of processor failed and what the message is that broke the pipeline. This makes debug way easier. I can just query my index for any documents that have error set and can then debug with the simulate API. More on that now…

Testing the Pipeline

Now that we’ve got our ingest pipeline configured, let’s test and run it with the simulate API. First you’ll need a sample document. You can do this a couple of ways. You can either run Filebeat without the pipeline setting and then just grab an unprocessed document from Elasticsearch, or you can run Filebeat with the console output enabled, by commenting out the Elasticsearch section and adding the following to the yml file:

output.console:
  pretty: true

Here’s a sample document I grabbed from my environment:

POST _ingest/pipeline/inspectioncsvs/_simulate
{
  "docs" : [
    {
      "_index": "inspections",
      "_type": "log",
      "_id": "AVpsUYR_du9kwoEnKsSA",
      "_score": 1,
      "_source": {
        "@timestamp": "2017-03-31T18:22:25.981Z",
        "beat": {
          "hostname": "systemx",
          "name": "RestReviews",
          "version": "5.1.1"
        },
        "input_type": "log",
        "message": "Wieland Elementary,78660,10/02/2014,100,\"900 TUDOR HOUSE RD\nAUSTIN, TX 78660\n(30.422862, -97.640183)\",10051637,Routine Inspection",
        "offset": 2109798,
        "source": "/Path/to/my/logs/Restaurant_Inspection_Scores.csv",
        "tags": [
          "debug",
          "reviews"
        ],
        "type": "log"
      }
    }
  ]
}

And the response (I’ve trimmed it down to the fields we were trying to set):

{
  "docs": [
    {
      "doc": {
        "_id": "AVpsUYR_du9kwoEnKsSA",
        "_type": "log",
        "_index": "inspections",
        "_source": {
          "InspectionType": "Routine Inspection",
          "ZipCode": "78660",
          "InspectionMonth": "10",
          "City": "AUSTIN",
          "message": "Wieland Elementary,78660,10/02/2014,100,\"900 TUDOR HOUSE RD\nAUSTIN, TX 78660\n(30.422862, -97.640183)\",10051637,Routine Inspection",
          "RestaurantName": "Wieland Elementary",
          "FacilityID": "10051637",
          "Score": 100,
          "StreetAddress": "900 TUDOR HOUSE RD",
          "State": "TX",
          "InspectionDay": "02",
          "InspectionYear": "2014",
          "ZipCode2": "78660",
          "Location": "30.422862, -97.640183"
        },
        "_ingest": {
          "timestamp": "2017-03-31T20:36:59.574+0000"
        }
      }
    }
  ]
}

The pipeline definitely succeeded, but most importantly, all of the data appears to be in the right place.

Running filebeat

Before we run Filebeat, we’ll do one last thing. This part is completely optional if you just want to get comfortable with the ingest pipeline, but if you want to use the Location field that we set in the grok processor as a Geo-point, you’ll need to add the mapping to filebeat.template.json file, by adding the following to the properties section:

"Location": {
  "type": "geo_point"
},

Now that that’s out of the way, we can fire up Filebeat by running ./filebeat -e -c filebeat.yml -d “elasticsearch”.

Using the data

GET /filebeat-*/_count
{}

{
  "count": 25081,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  }
}

That’s a good sign! Let’s see if we had any errors:

GET /filebeat-*/_search
{
    "query": {
        "exists" : { "field" : "error" }
    }
}

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  },
  "hits": {
    "total": 0,
    "max_score": null,
    "hits": []
  }
}

Another good sign!

Now we’re all set to visualize and show our data in Kibana. We can walk through creating the Kibana dashboards another time, but given that we’ve got dates, restaurant names, scores and locations, we’ve got plenty of freedom to go create some cool visualizations.

Final notes

Once again, the ingest pipeline is pretty powerful and can handle transformations pretty easily. You can move all of your processing to Elasticsearch and only use lightweight Beats on your hosts, without requiring Logstash somewhere in the pipeline. However, there are still some gaps in the ingest node compared to Logstash. For example, the number of processors available in the ingest pipeline is still limited, so simple tasks like parsing a CSV are not as easy as in Logstash. The Elasticsearch team seems to be regularly rolling out new processors, so here’s to hoping that the list of differences will keep getting smaller and smaller.