Deep.BI provides multitenant Druid infrastructure in the cloud with complementary Kafka and Flink for scalable and reliable data collection, enrichment, storage, analytics, real-time usage and robotic process automation (RPA). Recently we upgraded our infrastructure to the latest Druid (0.10.0) and new way of ingesting data through Kafka Indexing Service (KIS). The big problem of this setup is an enormous amount of sharded segments which are supposed to be merged by KIS, but they're not. This is planned to be fixed in the future Druid release, but till then we had to fix it by ourselves.
Our Druid architecture
We decided to run our Druid infrastructure in a hybrid cloud:
- KIS (Middlemanagers + Overlord) run in the private cloud (dedicated servers from a datacenter)
- Brokers + Historicals run in the private cloud too
- Coordinator runs on AWS EC2
- for deep storage we use Amazon S3
- as a metadata store we choose AWS RDS
(we publish our reasons for these decisions in a separate article, in the meantime you can ask us via email: firstname.lastname@example.org)
To solve the mentioned problem we had to setup a Hadoop cluster to periodically run Hadoop Index Jobs.
Due to its period nature we decided to dynamically start an AWS Elastic Map Reduce (EMR) instance with EMR-terminate option after the job completion. The unexpected problems we faced were different S3 versions support and compatibility of Druid extensions, which resulted in non-trivial task like:
- Druid recompilation with a proper extensions
- VPN between AWS and our cloud
Creating job description task
First we've assembled a job task description
according to http://druid.io/docs/0.10.0/ingestion/batch-ingestion.htm and http://druid.io/docs/latest/operations/other-hadoop.html.
Adding Hadoop EMR jars and config files to Druid
Next we've added Hadoop client jars compatible with our EMR version to
Another step was copying a bunch of the
*-site.xml files from EMR's
Setting up a VPN between your Druid and EMR
After submitting the job to the indexer we got our first error:
2017-05-25T15:41:01,887 INFO [Thread-56] org.apache.hadoop.hdfs.DFSClient - Exception in createBlockOutputStream java.net.ConnectException: Connection refused
The problem was that EMR data nodes were only accessible via local IP addresses while our middle managers were running on our premise.
Thus, we had to setup a VPN connection between our private cloud and AWS and configure Druid cluster to work with Amazon EMR and now we thought we are finally ready to successfully submit an indexing task, but...
Tuning configs and add another libraries to Druid
After successfully solving all networking problems we've hit another error:
2017-05-25T22:53:16,654 ERROR [task-runner-0-priority-0] io.druid.indexer.hadoop.DatasourceInputFormat - Exception thrown finding location of splits java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
Quick look into
core-site.xml revealed this:
<property> <name>fs.s3.impl</name> <value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value> </property> <property> <name>fs.s3n.impl</name> <value>com.amazon.ws.emr.hadoop.fs.EmrFileSystem</value> </property>
So we were thinking of two possibilities:
- somehow get EmrFileSystem jars from somewhere
- replace implementation with
org.apache.hadoop.fs.s3native.NativeS3FileSystemas suggested by http://druid.io/docs/0.10.0/ingestion/batch-ingestion.html#loading-from-s3-with-emr
At the beginning we went for the first solution, so we had downloaded a bunch of jar files from EMR but we immediately faced class dependency conflicts, so we decided not to follow this path anymore.
Instead we've replaced the above
core-site.xml properties with:
<property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value> </property> <property> <name>fs.s3n.impl</name> <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value> </property>
following with S3 credentials:
<property> <name>fs.s3.awsAccessKeyId</name> <value>YOUR_S3_ACCESS_KEY</value> </property> <property> <name>fs.s3.awsSecretAccessKey</name> <value>YOUR_S3_SECRET_KEY</value> </property> <property> <name>fs.s3n.awsAccessKeyId</name> <value>YOUR_S3_ACCESS_KEY</value> </property> <property> <name>fs.s3n.awsSecretAccessKey</name> <value>YOUR_S3_SECRET_KEY</value> </property>
Well, the next error was obvious:
2017-05-26T00:17:09,753 ERROR [task-runner-0-priority-0] io.druid.indexer.hadoop.DatasourceInputFormat - Exception thrown finding location of splits java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found
as we didn't include
hadoop-dependencies. We've decided to use EMR's patched version
hadoop-aws-2.7.3-amzn-2.jar. You could probably use vanilla version from maven as well but we didn't try it.
Adjusting configs to S3 region - solving "Signature Version 4 only" issue
EMR map reduce jobs finally started but soon failed with the following error:
2017-05-26T10:33:18,346 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1495712572601_0013_m_000003_0, Status : FAILED Error: java.lang.RuntimeException: java.io.IOException: s3n://SOME_BUCKET_NAME : 400 : Bad Request
If you google it, you stumble upon HADOOP-13325 and other relevant issues. Long story short: our S3 region was
Signature Version 4 Only and this isn't compatible with
NativeS3FileSystem implemented in
Following some advices on google groups we've tried to enforce the new
s3a filesystem implementation.
We've changed the relevant properties in
<property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property> <property> <name>fs.s3n.impl</name> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property>
following S3 credentials for
<property> <name>fs.s3a.access.key</name> <value>YOUR_S3_ACCESS_KEY</value> </property> <property> <name>fs.s3a.secret.key</name> <value>YOUR_S3_SECRET_KEY</value> </property>
and some additional S3a magic:
<property> <name>fs.s3a.endpoint</name> <value>s3.eu-central-1.amazonaws.com</value> </property> <property> <name>com.amazonaws.services.s3.enableV4</name> <value>true</value> </property>
Patching and recompiling Druid
After the last update our task didn't even managed to start map reduce job and failed with:
Caused by: java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold(I)V
If you want more details, go to HADOOP-12420 and http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/
The one of the proposed solutions was to replace Druid's
aws-sdk-java-1.10.21 with with
aws-sdk-java-1.7.4 and "not a more recent version".
But fortunately at the end of the
HADOOP-12420 thread this pull request is mentioned which leads us to #643.
So we've decided to just recompile the
aws-java-sdk-s3-1.10.21.jar with this backward compatibility enabling patch and replace the vanilla file in
This required all of the druid middle managers to be restarted. The mapreduce job has finally took off without errors. The job has successfully connected with S3 and accessed the source segments. But we weren't out of the woods yet. The last map reduce job has finished composing segments but while it was about to upload them this happened:
2017-05-26T16:55:59,095 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1495712572601_0016_r_000000_0, Status : FAILED Error: io.druid.java.util.common.IAE: Unknown file system scheme [s3a] at io.druid.indexer.JobHelper.serializeOutIndex(JobHelper.java:449)
Looking at JobHelper.serializeOutIndex revealed how indexer is composing
loadSpec for new segments from
outputFS which in this instance was
Without further ado we've assembled this simple patch, recompiled druid, reinstalled, restarted middle managers and voilà.
Whoa, after that the job finally succeeded!
- Rafal Michalski, email@example.com
- Jan Kogut, firstname.lastname@example.org
If you'd like even more details or support just drop us a line.