Deep.BI provides multitenant Druid infrastructure in the cloud with the complementary Kafka and Flink softwares for scalable and reliable data collection, enrichment, storage, analytics, real-time usage, and robotic process automation (RPA). Recently, we upgraded our infrastructure to the latest version of Druid (0.10.0) and the newest way of ingesting data through the Kafka Indexing Service (KIS). The biggest problem of this setup is an enormous amount of sharded segments which are supposed to be merged by KIS, but they aren't. This is planned to be fixed in future Druid releases, but til then we have had to fix it by ourselves.
Our Druid architecture
We decided to run our Druid infrastructure in a hybrid cloud:
- KIS (Middlemanagers + Overlord) run in the private cloud (dedicated servers from a datacenter)
- Brokers + Historicals run in the private cloud too
- Coordinator runs on AWS EC2
- for deep storage we use Amazon S3
- as a metadata store we choose AWS RDS
(we publish our reasons for these decisions in a separate article, in the meantime you can ask us via email: email@example.com)
To solve the mentioned problem we had to setup a Hadoop cluster to periodically run Hadoop Index Jobs.
Due to its periodic nature, we decided to dynamically start an AWS Elastic Map Reduce (EMR) instance with an EMR-terminate option after the job's completion. The unexpected problems we faced were related to different S3 versions support and compatibility of Druid extensions, which resulted in non-trivial tasks like:
- Druid recompilation with proper extensions
- VPN between AWS and our cloud
Creating job description task
First, we assembled a job task description
according to http://druid.io/docs/0.10.0/ingestion/batch-ingestion.htm and http://druid.io/docs/latest/operations/other-hadoop.html.
Adding Hadoop EMR jars and config files to Druid
Next, we added Hadoop client jars compatible with our EMR version to
Additionally, we copied a bunch of the
*-site.xml files from EMR's
Setting up a VPN between your Druid and EMR
After submitting the job to the indexer we got our first error:
2017-05-25T15:41:01,887 INFO [Thread-56] org.apache.hadoop.hdfs.DFSClient - Exception in createBlockOutputStream java.net.ConnectException: Connection refused
The problem was that the EMR data nodes were only accessible via local IP addresses, while our middle managers were running on our premises.
Thus, we had to setup a VPN connection between our private cloud and AWS and configure our Druid cluster to work with Amazon EMR. Now we thought we were finally ready to successfully submit an indexing task, but...
Tuning configs and add another libraries to Druid
After successfully solving all the networking problems, we hit another error:
2017-05-25T22:53:16,654 ERROR [task-runner-0-priority-0] io.druid.indexer.hadoop.DatasourceInputFormat - Exception thrown finding location of splits java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
A quick look into
core-site.xml revealed this:
<property> <name>fs.s3.impl</name> <value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value> </property> <property> <name>fs.s3n.impl</name> <value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value> </property>
So we thought of two possible solutions:
somehow get EmrFileSystem jars from somewhere
replace implementation with
org.apache.hadoop.fs.s3native.NativeS3FileSystemas suggested by this article.
At the beginning, we went with the first solution, so we downloaded a bunch of jar files from EMR, but we immediately faced class dependency conflicts, so we decided not to continue with this solution.
Instead we replaced the above
core-site.xml properties with:
<property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value> </property> <property> <name>fs.s3n.impl</name> <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value> </property>
following with S3 credentials:
<property> <name>fs.s3.awsAccessKeyId</name> <value>YOUR_S3_ACCESS_KEY</value> </property> <property> <name>fs.s3.awsSecretAccessKey</name> <value>YOUR_S3_SECRET_KEY</value> </property> <property> <name>fs.s3n.awsAccessKeyId</name> <value>YOUR_S3_ACCESS_KEY</value> </property> <property> <name>fs.s3n.awsSecretAccessKey</name> <value>YOUR_S3_SECRET_KEY</value> </property>
Well, the next error was obvious:
2017-05-26T00:17:09,753 ERROR [task-runner-0-priority-0] io.druid.indexer.hadoop.DatasourceInputFormat - Exception thrown finding location of splits java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
as we didn't include
hadoop-dependencies. We decided to use EMR's patched version
hadoop-aws-2.7.3-amzn-2.jar. You could probably use the vanilla version from maven as well but we didn't try it.
Adjusting configs to S3 region - solving "Signature Version 4 only" issue
EMR map reduce jobs finally started but soon failed with the following error:
2017-05-26T10:33:18,346 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1495712572601_0013_m_000003_0, Status : FAILED Error: java.lang.RuntimeException: java.io.IOException: s3n://SOME_BUCKET_NAME : 400 : Bad Request
If you google it, you might stumble upon HADOOP-13325 and other relevant issues. Long story short: our S3 region was
Signature Version 4 Only and this isn't compatible with the
NativeS3FileSystem implemented in
Following some advice on google groups we tried to enforce the new
s3a filesystem implementation.
We eventually changed the relevant properties in
<property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property> <property> <name>fs.s3n.impl</name> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property>
following S3 credentials for
<property> <name>fs.s3a.access.key</name> <value>YOUR_S3_ACCESS_KEY</value> </property> <property> <name>fs.s3a.secret.key</name> <value>YOUR_S3_SECRET_KEY</value> </property>
and some additional S3a magic:
<property> <name>fs.s3a.endpoint</name> <value>s3.eu-central-1.amazonaws.com</value> </property> <property> <name>com.amazonaws.services.s3.enableV4</name> <value>true</value> </property>
Patching and recompiling Druid
After the last update, our task didn't even manage to start the map reduce job, and failed with:
Caused by: java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold(I)V
If you want more details, go to HADOOP-12420 and http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/
One of the proposed solutions was to replace Druid's
aws-sdk-java-1.10.21 with with
aws-sdk-java-1.7.4 and "not a more recent version".
But fortunately at the end of the
HADOOP-12420 thread this pull request is mentioned which leads us to #643.
So we decided to just recompile the
aws-java-sdk-s3-1.10.21.jar with this backward compatibility enabling patch and replace the vanilla file in
This required all of the Druid middle managers to be restarted. The mapreduce job finally took off without errors. The job successfully connected with S3 and accessed the source segments. But we weren't out of the woods yet! The last map reduce job finished composing segments, but while it was about to upload them, this happened:
2017-05-26T16:55:59,095 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1495712572601_0016_r_000000_0, Status : FAILED Error: io.druid.java.util.common.IAE: Unknown file system scheme [s3a] at io.druid.indexer.JobHelper.serializeOutIndex(JobHelper.java:449)
Looking at JobHelper.serializeOutIndex revealed how indexer is composing
loadSpec for new segments from
outputFS which, in this instance, was
Without further ado we assembled this simple patch, recompiled druid, reinstalled, restarted middle managers and voilà.
After that, the job finally succeeded!
- Rafal Michalski, firstname.lastname@example.org
- Jan Kogut, email@example.com
If you'd like more details or support just drop us a line.