Druid & Kafka Indexing Service: How to fix merging segments issue

This articles describes a bumpy road we went through to successfully run Hadoop Index Job on AWS EMR for the latest Druid 0.10.0 with deep storage on S3 Signature Version 4 Only.

Background

Deep.BI provides multitenant Druid infrastructure in the cloud with complementary Kafka and Flink for scalable and reliable data collection, enrichment, storage, analytics, real-time usage and robotic process automation (RPA). Recently we upgraded our infrastructure to the latest Druid (0.10.0) and new way of ingesting data through Kafka Indexing Service (KIS). The big problem of this setup is an enormous amount of sharded segments which are supposed to be merged by KIS, but they're not. This is planned to be fixed in the future Druid release, but till then we had to fix it by ourselves.

Our Druid architecture

We decided to run our Druid infrastructure in a hybrid cloud:

  • KIS (Middlemanagers + Overlord) run in the private cloud (dedicated servers from a datacenter)
  • Brokers + Historicals run in the private cloud too
  • Coordinator runs on AWS EC2
  • for deep storage we use Amazon S3
  • as a metadata store we choose AWS RDS

(we publish our reasons for these decisions in a separate article, in the meantime you can ask us via email: tech@deep.bi)

Our approach

To solve the mentioned problem we had to setup a Hadoop cluster to periodically run Hadoop Index Jobs.

Due to its period nature we decided to dynamically start an AWS Elastic Map Reduce (EMR) instance with EMR-terminate option after the job completion. The unexpected problems we faced were different S3 versions support and compatibility of Druid extensions, which resulted in non-trivial task like:

  • Druid recompilation with a proper extensions
  • VPN between AWS and our cloud

Details

Creating job description task

First we've assembled a job task description
according to http://druid.io/docs/0.10.0/ingestion/batch-ingestion.htm and http://druid.io/docs/latest/operations/other-hadoop.html.

Adding Hadoop EMR jars and config files to Druid

Next we've added Hadoop client jars compatible with our EMR version to DRUID_DIR/hadoop-dependencies/hadoop-client/2.7.3.

Another step was copying a bunch of the *-site.xml files from EMR's /etc/hadoop/conf to DRUID_DIR/conf/druid/_common directory.

Setting up a VPN between your Druid and EMR

After submitting the job to the indexer we got our first error:

2017-05-25T15:41:01,887 INFO [Thread-56] org.apache.hadoop.hdfs.DFSClient - Exception in createBlockOutputStream  
java.net.ConnectException: Connection refused  

The problem was that EMR data nodes were only accessible via local IP addresses while our middle managers were running on our premise.

Thus, we had to setup a VPN connection between our private cloud and AWS and configure Druid cluster to work with Amazon EMR and now we thought we are finally ready to successfully submit an indexing task, but...

Tuning configs and add another libraries to Druid

After successfully solving all networking problems we've hit another error:

2017-05-25T22:53:16,654 ERROR [task-runner-0-priority-0] io.druid.indexer.hadoop.DatasourceInputFormat - Exception thrown finding location of splits  
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found  

Quick look into core-site.xml revealed this:

  <property>
    <name>fs.s3.impl</name>
    <value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>
  </property>

  <property>
    <name>fs.s3n.impl</name>
    <value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value>
  </property>

So we were thinking of two possibilities:

At the beginning we went for the first solution, so we had downloaded a bunch of jar files from EMR but we immediately faced class dependency conflicts, so we decided not to follow this path anymore.

Instead we've replaced the above core-site.xml properties with:

  <property>
    <name>fs.s3.impl</name>
    <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
  </property>

  <property>
    <name>fs.s3n.impl</name>
    <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
  </property>

following with S3 credentials:

 <property>
    <name>fs.s3.awsAccessKeyId</name>
    <value>YOUR_S3_ACCESS_KEY</value>
  </property>

  <property>
    <name>fs.s3.awsSecretAccessKey</name>
    <value>YOUR_S3_SECRET_KEY</value>
  </property>

  <property>
    <name>fs.s3n.awsAccessKeyId</name>
    <value>YOUR_S3_ACCESS_KEY</value>
  </property>

  <property>
    <name>fs.s3n.awsSecretAccessKey</name>
    <value>YOUR_S3_SECRET_KEY</value>
  </property>

Well, the next error was obvious:

2017-05-26T00:17:09,753 ERROR [task-runner-0-priority-0] io.druid.indexer.hadoop.DatasourceInputFormat - Exception thrown finding location of splits  
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found  

as we didn't include hadoop-aws-2.7.3.jar in hadoop-dependencies. We've decided to use EMR's patched version hadoop-aws-2.7.3-amzn-2.jar. You could probably use vanilla version from maven as well but we didn't try it.

Adjusting configs to S3 region - solving "Signature Version 4 only" issue

EMR map reduce jobs finally started but soon failed with the following error:

2017-05-26T10:33:18,346 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1495712572601_0013_m_000003_0, Status : FAILED  
Error: java.lang.RuntimeException: java.io.IOException: s3n://SOME_BUCKET_NAME : 400 : Bad Request  

If you google it, you stumble upon HADOOP-13325 and other relevant issues. Long story short: our S3 region was Signature Version 4 Only and this isn't compatible with NativeS3FileSystem implemented in hadoop-aws-2.7.3.

Following some advices on google groups we've tried to enforce the new s3a filesystem implementation.

We've changed the relevant properties in core-site.xml to:

  <property>
    <name>fs.s3.impl</name>
    <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
  </property>

  <property>
    <name>fs.s3n.impl</name>
    <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
  </property>

following S3 credentials for s3a:

  <property>
    <name>fs.s3a.access.key</name>
    <value>YOUR_S3_ACCESS_KEY</value>
  </property>

  <property>
    <name>fs.s3a.secret.key</name>
    <value>YOUR_S3_SECRET_KEY</value>
  </property>

and some additional S3a magic:

  <property>
    <name>fs.s3a.endpoint</name>
    <value>s3.eu-central-1.amazonaws.com</value>
  </property>

  <property>
    <name>com.amazonaws.services.s3.enableV4</name>
    <value>true</value>
  </property>
Patching and recompiling Druid

After the last update our task didn't even managed to start map reduce job and failed with:

Caused by: java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold(I)V  

If you want more details, go to HADOOP-12420 and http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/
The one of the proposed solutions was to replace Druid's aws-sdk-java-1.10.21 with with aws-sdk-java-1.7.4 and "not a more recent version".
But fortunately at the end of the HADOOP-12420 thread this pull request is mentioned which leads us to #643.
So we've decided to just recompile the aws-java-sdk-s3-1.10.21.jar with this backward compatibility enabling patch and replace the vanilla file in DRUID_DIR/lib/aws-java-sdk-s3-1.10.21.jar.

This required all of the druid middle managers to be restarted. The mapreduce job has finally took off without errors. The job has successfully connected with S3 and accessed the source segments. But we weren't out of the woods yet. The last map reduce job has finished composing segments but while it was about to upload them this happened:

2017-05-26T16:55:59,095 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1495712572601_0016_r_000000_0, Status : FAILED  
Error: io.druid.java.util.common.IAE: Unknown file system scheme [s3a]  
  at io.druid.indexer.JobHelper.serializeOutIndex(JobHelper.java:449)

Looking at JobHelper.serializeOutIndex revealed how indexer is composing loadSpec for new segments from outputFS which in this instance was S3AFileSystem.

Without further ado we've assembled this simple patch, recompiled druid, reinstalled, restarted middle managers and voilà.

Whoa, after that the job finally succeeded!

Contributors:

  • Rafal Michalski, royal@deep.bi
  • Jan Kogut, jan@deep.bi

If you'd like even more details or support just drop us a line.

Rafał Michalski

Lead developer at Deep BI, Inc.

deepbi

Subscribe to Deep Blog

Get the latest posts delivered right to your inbox.

or subscribe via RSS with Feedly!