YARN Virtual Memory issue in Hadoop

We often get into YARN container memory issue like the error message below. This may happen if you are running various Hadoop applications like Hive, Shell, Pig, Sqoop, or Spark, either running from a command line (CLI) or running from a Oozie workflow.

Current usage: 135.2 MB of 2 GB physical memory used; 6.4 GB of 4.2 GB virtual memory used. Killing container.

I read many blogs, StackOverflow posts, Hadoop/YARN documentation, and they suggest to set the one or more of following parameters.

In mapred-site.xml:


<name>mapreduce.map.memory.mb</name>
<value>4096</value>

<name>mapreduce.reduce.memory.mb</name>
<value>8192</value>

<name>mapreduce.map.java.opts</name>
<value>-Xmx3072m</value>

<name>mapreduce.reduce.java.opts</name>
<value>-Xmx6144m</value>

In yarn-site.xml:


<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>2.1</value>

<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>

I was running my applications on AWS EMR (Elastic MapReduce – AWS’s Hadoop distribution) from an Oozie workflow, and none of those above settings helped. I was setting those parameters only on master node and restarting the YARN process. But when the application, especially a Shell script that can run on any slave nodes, is running on a slave node, the YARN settings on master node didn’t help. I had to set those parameters on every slave node (Node Manager) of the cluster.

And that can be done using configuration like below. This configuration has to be set while launching the EMR cluster. This can be directly set on the EMR console or load from a JSON file. This configuration setting sets the parameters on yarn-site.xml on all slave nodes.

[
 {
   "Classification": "yarn-site",
   "Properties": {
     "yarn.nodemanager.vmem-pmem-ratio": "10",
     "yarn.nodemanager.vmem-check-enabled": "false"
 }
]
Advertisements

AWS Solutions Architect – Associate Exam (2018 version)

I recently passed new version of AWS Solutions Architect – Associate exam released in Feb 2018.
I would like to post some details and share my experience.

Number of questions: 65
Time: 130 min
Cost: $150

There were questions on following topics:

  • ELB (Elastic Load Balancers)
  • Autoscaling
  • Networking topics (Subnets, Internet Gateways, VPC)
  • Storage solutions (EBS, EFS, S3)
  • Databases (RDS, DynamoDB, Redshift)
  • Lambda

My preparation

My experience

  • The time given was plenty
  • Overall, the exam is not very easy
  • Need to go through lot of documentation and practice on AWS
  • Some experience also would help though not necessary

Hope you find this information helpful.

Good luck!

S3 and Data Transfer issue from Spark

Spark is a general purpose distributed high performance computation engine that has APIs in many major languages like Java, Scala, Python.

S3 is Amazon Simple Storage Service for storing objects in a highly durable and reliable manner at very low cost.

Spark is used in combination with S3 for reading input and saving output data.

Spark can apply many transformations on input data, and finally store the data in some bucket on S3.

While processing large input data and storing output data on S3, I found that it is very fast in processing the data, but it’s very slow in writing the output data to S3.

Instead, I found that it’s very fast storing the data first on local HDFS (on Hadoop cluster), and then copy the data back to S3 from HDFS using s3-dist-cp (Amazon version of Hadoop’s distcp).

In one of the cases, to process data of 1TB, it took about 1.5 hrs to process, but about 4 hours to copy the output data to S3.
But with the above solution, it just took less than 5 min to copy the data to S3, saving lot of time and money.

s3-dist-cp command can be run from master node using the format below.

s3-dist-cp --src /input/data --dest s3://my-bucket/output

OR

hadoop jar s3-dist-cp.jar --src /input/data --dest s3://my-bucket/output

Connect to Redshift using JDBC

Redshift is a data warehouse from Amazon that supports SQL. It’s a managed solution on AWS cloud.

We can connect to Redshift database from Java using JDBC and query tables in just like any other database.
I am showing here code that connects to Redshift and queries table schema information.


package com.hadoopnotes;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class RedshiftTest {

	public static void main(String[] args) {

		Connection connection;
		String redshiftUrl = "jdbc:redshift://ip_address:5439/dbatabase";

		try {
			DriverManager.registerDriver(new com.amazon.redshift.jdbc41.Driver());

			Properties driverProperties = new Properties();
			driverProperties.put("user", "myuser");
			driverProperties.put("password", "mypassword");

			connection = DriverManager.getConnection(redshiftUrl, driverProperties);

			String schemaSql = "set search_path to myschema";
			String sql = "select \"column\", type from pg_table_def where schemaname='myschema' and tablename = 'mytable'";

			System.out.println("SQL:" + sql);
			Statement stmt = null;

			try {
				stmt = connection.createStatement();
				stmt.execute(schemaSql);
				System.out.println("Running query now...");
				ResultSet rs = stmt.executeQuery(sql);

				while (rs.next()) {
					// this prints column name and its type
					System.out.println(rs.getString(1) + "--" + rs.getString(2));
				}
			} finally {
				if (stmt != null)
					stmt.close();
			}
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}
}

If using Maven to build this Java project, add the following dependency.

<dependency>
     <groupId>com.amazon.redshift</groupId>
     <artifactId>redshift-jdbc41</artifactId>
     <version>1.2.10.1009</version>
</dependency>

Build project and run it.

mvn package
java -cp target/myjar.jar com.hadoopnotes.RedshiftTest

Or, from command line, compile and run:

javac -cp redshift-jdbc41-1.2.10.1009.jar com/hadoopnotes/RedshiftTest.java
java -cp .:redshift-jdbc41-1.2.10.1009.jar com.hadoopnotes.RedshiftTest

MapReduce Program using Maven

Maven is a build management tool used to setup a Java project and create JAR files. It uses pom.xml file to setup dependencies a project needs, compile, and build final artifact like JAR file.

Eclipse is an IDE (Integrated Development Environment) often used by Java developers to make development and debugging easier. Install it from eclipse.org website. Maven usually comes with Eclipse.

Steps to create and run MapReduce program using Maven and Eclipse:

    • In Eclipse IDE, create a new Maven project (New -> Project -> Maven Project)
    • Create the following WordCount.java code in the package
package com.hadoopnotes.MapReduceSample;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {

		@Override
		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

			StringTokenizer str = new StringTokenizer(value.toString());

			while (str.hasMoreTokens()) {
				String word = str.nextToken();

				context.write(new Text(word), new IntWritable(1));
			}
		}
	}

	public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

		@Override
		public void reduce(Text key, Iterable<IntWritable> values, Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable i : values) {
				sum += i.get();
			}

			context.write(key, new IntWritable(sum));
		}
	}

	public static void main(String[] args) throws Exception {

		if (args.length != 2) {
			System.err.println("Usage: WordCount <InPath> <OutPath>");
			System.exit(2);
		}

		Configuration conf = new Configuration();
		Job job = new Job(conf, "WordCount");

		job.setJarByClass(WordCount.class);
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);
		job.setNumReduceTasks(1);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}
    • Update pom file as below
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hadoopnotes</groupId>
    <artifactId>MapReduceSample</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>MapReduceSample</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.8.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.1</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>
    • Right click on the project, and then Run As -> Maven install. This will generate jar file in target folder:┬áMapReduceSample-0.0.1-SNAPSHOT.jar
    • Copy this jar file to Hadoop cluster (like master node on AWS EMR)
    • Create an input file input.txt. Write some text into it.
    • Copy this file to HDFS: hdfs dfs -put input.txt /user/hadoop/
    • Run the WordCount program using below command.

$ hadoop jar MapReduceSample-0.0.1-SNAPSHOT.jar com.hadoopnotes.MapReduceSample.WordCount input.txt output4

  • Check output:

    hdfs dfs -cat /user/hadoop/output4/part-r-00000

 

Text Processing using AWK

I came across a scenario where in I had to parse a file that has many INSERT hql commands in a single file and I had to create a separate file for each INSERT command.

My input file insert_all.hql is as below.

INSERT INTO TABLE table1
SELECT
 col1,
 col2,
 col3
FROM table11
WHERE date = '2017-07-12'
;

INSERT INTO TABLE table2
SELECT
 col1,
 col2,
 col3
FROM table22
WHERE date = '2017-07-12'
;

INSERT INTO TABLE table3
SELECT
 col1,
 col2,
 col3
FROM table33
WHERE date = '2017-07-12'

You can do this manually by creating each file and copy the required code from original file to target file for each table. But it’s error-prone and tedious.

I wrote a little script using Linux/Unix tool called awk. awk is very good text processing tool that can be used for this kind of text parsing tasks.

awk '
> BEGIN {RS=";"}
> { content=$0;
> if(content ~ /INSERT INTO TABLE ([A-Za-z].*)/) filename=$4;
> print content > filename;
> close filename;
> }
>' insert_all.hql

Explanation

  • In the BEGIN section, RS (Record separator) is initialized as semicolon. The default is newline, but this doesn’t work for us here.
  • Next section, I am copying the content of that record to a variable.
  • Next, I am extracting table name using REGEX. filename is table name
  • Next, I am copying the content into the file
  • This will generate files like table1, table2, etc.

You can run this code from a terminal on Mac or a Linux machine.

 

Hive Date Functions

Hive provides a SQL-like interface to data stored on HDFS. Some of the rare date functions are given below (Tested on version 2.1.0).

  • Get last date of month
$ hive -e 'select last_day("2017-05-01")'
OK
2017-05-31
Time taken: 2.15 seconds, Fetched: 1 row(s)
  • Get last day of month (Monday=1, Sunday=7)
$ hive -e 'select date_format(last_day("2017-05-01"), "u")'
OK
3
Time taken: 2.46 seconds, Fetched: 1 row(s)
  • Add days to a date
$ hive -e 'select date_add(last_day("2017-04-03"), 1)'
OK
2017-05-01
Time taken: 2.425 seconds, Fetched: 1 row(s)
  • Get first day of last month (Monday=1, Sunday=7)
$ hive -e 'select date_format(date_add(last_day("2017-04-03"), 1), "u")'
OK
1
Time taken: 2.301 seconds, Fetched: 1 row(s)

P.S. Find Hive version using command: hive –version

$ hive --version
Hive 2.1.0-amzn-0