Data Engineer Interview at Amazon

Recently I had an opportunity to interview for Data Engineer position at Amazon. I like to post info about the process and my experience so as to help anyone needing.

Process

After applied online, I got a call from recruiter asking for availably for telephonic interview with Hiring Manager. Telephonic interview had questions and discussion around my past experience. And few questions about complex situations I faced and how I solved.

Once cleared in that round, I had an onsite interview: this was five 50-minute rounds. Each is divided into two sections: a problem solving and few behavioral questions.

Problem solving focussed on SQL, Data Modeling, and ETL pipeline design.

Behavioral questions were based on Amazon leadership principles: like tell me about a situation where you tried to do something but midway you had to change course of action and how you communicated to customers and team members.

My experience

Recruiter was good and guide thru the entire process as needed.

Onsite travel plans and communication were good.

The leadership questions were annoying as most questions are repeated and I ran out of examples for the same questions asked again and again.

I am surprised to see no questions on AWS or Hadoop technologies though they are mentioned on the job requirements.

Memory error in Spark

Spark is a general purpose cluster computing framework used to process large datasets. It runs on Standalone, Mesos, and YARN clusters.

You often encounter memory issue like something below.

No space left on device.

Cause:

Spark uses disk space on worker nodes to store shuffle data/files. So if the disk space is not enough to hold the data you’re processing, it will throw error.

Solution:

I used to get this error on AWS EMR while running spark jobs. By increasing the EBS volume size on core nodes using the below AWS CLI, I was able to fix the memory issue. Another solution is to increase the cluster size so the shuffle file sizes would be smaller.

aws emr create-cluster --release-label emr-5.9.0  --use-default-roles --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=d2.xlarge 'InstanceGroupType=CORE,InstanceCount=2,InstanceType=d2.xlarge,EbsConfiguration={EbsOptimized=true,EbsBlockDeviceConfigs=[{VolumeSpecification={VolumeType=gp2,SizeInGB=100}},{VolumeSpecification={VolumeType=io1,SizeInGB=100,Iops=100},VolumesPerInstance=4}]}' --auto-terminate

Reference:

https://docs.aws.amazon.com/cli/latest/reference/emr/create-cluster.html

YARN Virtual Memory issue in Hadoop

We often get into YARN container memory issue like the error message below. This may happen if you are running various Hadoop applications like Hive, Shell, Pig, Sqoop, or Spark, either running from a command line (CLI) or running from a Oozie workflow.

Current usage: 135.2 MB of 2 GB physical memory used; 6.4 GB of 4.2 GB virtual memory used. Killing container.

I read many blogs, StackOverflow posts, Hadoop/YARN documentation, and they suggest to set the one or more of following parameters.

In mapred-site.xml:


<name>mapreduce.map.memory.mb</name>
<value>4096</value>

<name>mapreduce.reduce.memory.mb</name>
<value>8192</value>

<name>mapreduce.map.java.opts</name>
<value>-Xmx3072m</value>

<name>mapreduce.reduce.java.opts</name>
<value>-Xmx6144m</value>

In yarn-site.xml:


<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>2.1</value>

<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>

I was running my applications on AWS EMR (Elastic MapReduce – AWS’s Hadoop distribution) from an Oozie workflow, and none of those above settings helped. I was setting those parameters only on master node and restarting the YARN process. But when the application, especially a Shell script that can run on any slave nodes, is running on a slave node, the YARN settings on master node didn’t help. I had to set those parameters on every slave node (Node Manager) of the cluster.

And that can be done using configuration like below. This configuration has to be set while launching the EMR cluster. This can be directly set on the EMR console or load from a JSON file. This configuration setting sets the parameters on yarn-site.xml on all slave nodes.

[
 {
   "Classification": "yarn-site",
   "Properties": {
     "yarn.nodemanager.vmem-pmem-ratio": "10",
     "yarn.nodemanager.vmem-check-enabled": "false"
 }
]

AWS Solutions Architect – Associate Exam (2018 version)

I recently passed new version of AWS Solutions Architect – Associate exam released in Feb 2018.
I would like to post some details and share my experience.

Number of questions: 65
Time: 130 min
Cost: $150

There were questions on following topics:

  • ELB (Elastic Load Balancers)
  • Autoscaling
  • Networking topics (Subnets, Internet Gateways, VPC)
  • Storage solutions (EBS, EFS, S3)
  • Databases (RDS, DynamoDB, Redshift)
  • Lambda

My preparation

My experience

  • The time given was plenty
  • Overall, the exam is not very easy
  • Need to go through lot of documentation and practice on AWS
  • Some experience also would help though not necessary

Hope you find this information helpful.

Good luck!

S3 and Data Transfer issue from Spark

Spark is a general purpose distributed high performance computation engine that has APIs in many major languages like Java, Scala, Python.

S3 is Amazon Simple Storage Service for storing objects in a highly durable and reliable manner at very low cost.

Spark is used in combination with S3 for reading input and saving output data.

Spark can apply many transformations on input data, and finally store the data in some bucket on S3.

While processing large input data and storing output data on S3, I found that it is very fast in processing the data, but it’s very slow in writing the output data to S3.

Instead, I found that it’s very fast storing the data first on local HDFS (on Hadoop cluster), and then copy the data back to S3 from HDFS using s3-dist-cp (Amazon version of Hadoop’s distcp).

In one of the cases, to process data of 1TB, it took about 1.5 hrs to process, but about 4 hours to copy the output data to S3.
But with the above solution, it just took less than 5 min to copy the data to S3, saving lot of time and money.

s3-dist-cp command can be run from master node using the format below.

s3-dist-cp --src /input/data --dest s3://my-bucket/output

OR

hadoop jar s3-dist-cp.jar --src /input/data --dest s3://my-bucket/output

Connect to Redshift using JDBC

Redshift is a data warehouse from Amazon that supports SQL. It’s a managed solution on AWS cloud.

We can connect to Redshift database from Java using JDBC and query tables in just like any other database.
I am showing here code that connects to Redshift and queries table schema information.


package com.hadoopnotes;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class RedshiftTest {

	public static void main(String[] args) {

		Connection connection;
		String redshiftUrl = "jdbc:redshift://ip_address:5439/dbatabase";

		try {
			DriverManager.registerDriver(new com.amazon.redshift.jdbc41.Driver());

			Properties driverProperties = new Properties();
			driverProperties.put("user", "myuser");
			driverProperties.put("password", "mypassword");

			connection = DriverManager.getConnection(redshiftUrl, driverProperties);

			String schemaSql = "set search_path to myschema";
			String sql = "select \"column\", type from pg_table_def where schemaname='myschema' and tablename = 'mytable'";

			System.out.println("SQL:" + sql);
			Statement stmt = null;

			try {
				stmt = connection.createStatement();
				stmt.execute(schemaSql);
				System.out.println("Running query now...");
				ResultSet rs = stmt.executeQuery(sql);

				while (rs.next()) {
					// this prints column name and its type
					System.out.println(rs.getString(1) + "--" + rs.getString(2));
				}
			} finally {
				if (stmt != null)
					stmt.close();
			}
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}
}

If using Maven to build this Java project, add the following dependency.

<dependency>
     <groupId>com.amazon.redshift</groupId>
     <artifactId>redshift-jdbc41</artifactId>
     <version>1.2.10.1009</version>
</dependency>

Build project and run it.

mvn package
java -cp target/myjar.jar com.hadoopnotes.RedshiftTest

Or, from command line, compile and run:

javac -cp redshift-jdbc41-1.2.10.1009.jar com/hadoopnotes/RedshiftTest.java
java -cp .:redshift-jdbc41-1.2.10.1009.jar com.hadoopnotes.RedshiftTest

MapReduce Program using Maven

Maven is a build management tool used to setup a Java project and create JAR files. It uses pom.xml file to setup dependencies a project needs, compile, and build final artifact like JAR file.

Eclipse is an IDE (Integrated Development Environment) often used by Java developers to make development and debugging easier. Install it from eclipse.org website. Maven usually comes with Eclipse.

Steps to create and run MapReduce program using Maven and Eclipse:

    • In Eclipse IDE, create a new Maven project (New -> Project -> Maven Project)
    • Create the following WordCount.java code in the package
package com.hadoopnotes.MapReduceSample;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {

		@Override
		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

			StringTokenizer str = new StringTokenizer(value.toString());

			while (str.hasMoreTokens()) {
				String word = str.nextToken();

				context.write(new Text(word), new IntWritable(1));
			}
		}
	}

	public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

		@Override
		public void reduce(Text key, Iterable<IntWritable> values, Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable i : values) {
				sum += i.get();
			}

			context.write(key, new IntWritable(sum));
		}
	}

	public static void main(String[] args) throws Exception {

		if (args.length != 2) {
			System.err.println("Usage: WordCount <InPath> <OutPath>");
			System.exit(2);
		}

		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "WordCount");

		job.setJarByClass(WordCount.class);
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);
		job.setNumReduceTasks(1);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}
    • Update pom file as below
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hadoopnotes</groupId>
    <artifactId>MapReduceSample</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>MapReduceSample</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.8.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.1</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>
    • Right click on the project, and then Run As -> Maven install. This will generate jar file in target folder: MapReduceSample-0.0.1-SNAPSHOT.jar
    • Copy this jar file to Hadoop cluster (like master node on AWS EMR)
    • Create an input file input.txt. Write some text into it.
    • Copy this file to HDFS: hdfs dfs -put input.txt /user/hadoop/
    • Run the WordCount program using below command.

$ hadoop jar MapReduceSample-0.0.1-SNAPSHOT.jar com.hadoopnotes.MapReduceSample.WordCount input.txt output4

  • Check output:

    hdfs dfs -cat /user/hadoop/output4/part-r-00000

 

Text Processing using AWK

I came across a scenario where in I had to parse a file that has many INSERT hql commands in a single file and I had to create a separate file for each INSERT command.

My input file insert_all.hql is as below.

INSERT INTO TABLE table1
SELECT
 col1,
 col2,
 col3
FROM table11
WHERE date = '2017-07-12'
;

INSERT INTO TABLE table2
SELECT
 col1,
 col2,
 col3
FROM table22
WHERE date = '2017-07-12'
;

INSERT INTO TABLE table3
SELECT
 col1,
 col2,
 col3
FROM table33
WHERE date = '2017-07-12'

You can do this manually by creating each file and copy the required code from original file to target file for each table. But it’s error-prone and tedious.

I wrote a little script using Linux/Unix tool called awk. awk is very good text processing tool that can be used for this kind of text parsing tasks.

awk '
> BEGIN {RS=";"}
> { content=$0;
> if(content ~ /INSERT INTO TABLE ([A-Za-z].*)/) filename=$4;
> print content > filename;
> close filename;
> }
>' insert_all.hql

Explanation

  • In the BEGIN section, RS (Record separator) is initialized as semicolon. The default is newline, but this doesn’t work for us here.
  • Next section, I am copying the content of that record to a variable.
  • Next, I am extracting table name using REGEX. filename is table name
  • Next, I am copying the content into the file
  • This will generate files like table1, table2, etc.

You can run this code from a terminal on Mac or a Linux machine.

 

Hive Date Functions

Hive provides a SQL-like interface to data stored on HDFS. Some of the rare date functions are given below (Tested on version 2.1.0).

  • Get last date of month
$ hive -e 'select last_day("2017-05-01")'
OK
2017-05-31
Time taken: 2.15 seconds, Fetched: 1 row(s)
  • Get last day of month (Monday=1, Sunday=7)
$ hive -e 'select date_format(last_day("2017-05-01"), "u")'
OK
3
Time taken: 2.46 seconds, Fetched: 1 row(s)
  • Add days to a date
$ hive -e 'select date_add(last_day("2017-04-03"), 1)'
OK
2017-05-01
Time taken: 2.425 seconds, Fetched: 1 row(s)
  • Get first day of last month (Monday=1, Sunday=7)
$ hive -e 'select date_format(date_add(last_day("2017-04-03"), 1), "u")'
OK
1
Time taken: 2.301 seconds, Fetched: 1 row(s)

P.S. Find Hive version using command: hive –version

$ hive --version
Hive 2.1.0-amzn-0

Remove Special Characters from Data using Hive

I have data that has some special characters like carriage returns (\r), newlines (\n), and some non-printable control characters (like ^M). I need to process this data using Hive for some transformations and later load this data to a database like Oracle using Sqoop or Amazon Redshift using PSQL COPY for analysis.

The Sqoop or PSQL COPY fail to load data because of these invalid characters.
I was trying to find how to strip off these characters at Hive before loading to external database.
Hive’s regexp_extract function comes to rescue, but finding the right REGEX pattern was a challenge.
Finally I figured it out after searching on online forums. Good to know is that Hive supports Java REGEX patterns.
My final Hive query is given below. This will make sure to extract only printable characters from the input text.


SELECT regexp_extract(col1, '\\p{Print}*',0)
FROM table1;