S3 and Data Transfer issue from Spark

Spark is a general purpose distributed high performance computation engine that has APIs in many major languages like Java, Scala, Python.

S3 is Amazon Simple Storage Service for storing objects in a highly durable and reliable manner at very low cost.

Spark is used in combination with S3 for reading input and saving output data.

Spark can apply many transformations on input data, and finally store the data in some bucket on S3.

While processing large input data and storing output data on S3, I found that it is very fast in processing the data, but it’s very slow in writing the output data to S3.

Instead, I found that it’s very fast storing the data first on local HDFS (on Hadoop cluster), and then copy the data back to S3 from HDFS using s3-dist-cp (Amazon version of Hadoop’s distcp).

In one of the cases, to process data of 1TB, it took about 1.5 hrs to process, but about 4 hours to copy the output data to S3.
But with the above solution, it just took less than 5 min to copy the data to S3, saving lot of time and money.

s3-dist-cp command can be run from master node using the format below.

s3-dist-cp --src /input/data --dest s3://my-bucket/output

OR

hadoop jar s3-dist-cp.jar --src /input/data --dest s3://my-bucket/output
Advertisements

Connect to Redshift using JDBC

Redshift is a data warehouse from Amazon that supports SQL. It’s a managed solution on AWS cloud.

We can connect to Redshift database from Java using JDBC and query tables in just like any other database.
I am showing here code that connects to Redshift and queries table schema information.


package com.hadoopnotes;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class RedshiftTest {

	public static void main(String[] args) {

		Connection connection;
		String redshiftUrl = "jdbc:redshift://ip_address:5439/dbatabase";

		try {
			DriverManager.registerDriver(new com.amazon.redshift.jdbc41.Driver());

			Properties driverProperties = new Properties();
			driverProperties.put("user", "myuser");
			driverProperties.put("password", "mypassword");

			connection = DriverManager.getConnection(redshiftUrl, driverProperties);

			String schemaSql = "set search_path to myschema";
			String sql = "select \"column\", type from pg_table_def where schemaname='myschema' and tablename = 'mytable'";

			System.out.println("SQL:" + sql);
			Statement stmt = null;

			try {
				stmt = connection.createStatement();
				stmt.execute(schemaSql);
				System.out.println("Running query now...");
				ResultSet rs = stmt.executeQuery(sql);

				while (rs.next()) {
					// this prints column name and its type
					System.out.println(rs.getString(1) + "--" + rs.getString(2));
				}
			} finally {
				if (stmt != null)
					stmt.close();
			}
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}
}

If using Maven to build this Java project, add the following dependency.

<dependency>
     <groupId>com.amazon.redshift</groupId>
     <artifactId>redshift-jdbc41</artifactId>
     <version>1.2.10.1009</version>
</dependency>

Build project and run it.

mvn package
java -cp target/myjar.jar com.hadoopnotes.RedshiftTest

Or, from command line, compile and run:

javac -cp redshift-jdbc41-1.2.10.1009.jar com/hadoopnotes/RedshiftTest.java
java -cp .:redshift-jdbc41-1.2.10.1009.jar com.hadoopnotes.RedshiftTest

MapReduce Program using Maven

Maven is a build management tool used to setup a Java project and create JAR files. It uses pom.xml file to setup dependencies a project needs, compile, and build final artifact like JAR file.

Eclipse is an IDE (Integrated Development Environment) often used by Java developers to make development and debugging easier. Install it from eclipse.org website. Maven usually comes with Eclipse.

Steps to create and run MapReduce program using Maven and Eclipse:

    • In Eclipse IDE, create a new Maven project (New -> Project -> Maven Project)
    • Create the following WordCount.java code in the package
package com.hadoopnotes.MapReduceSample;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {

		@Override
		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

			StringTokenizer str = new StringTokenizer(value.toString());

			while (str.hasMoreTokens()) {
				String word = str.nextToken();

				context.write(new Text(word), new IntWritable(1));
			}
		}
	}

	public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

		@Override
		public void reduce(Text key, Iterable<IntWritable> values, Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable i : values) {
				sum += i.get();
			}

			context.write(key, new IntWritable(sum));
		}
	}

	public static void main(String[] args) throws Exception {

		if (args.length != 2) {
			System.err.println("Usage: WordCount <InPath> <OutPath>");
			System.exit(2);
		}

		Configuration conf = new Configuration();
		Job job = new Job(conf, "WordCount");

		job.setJarByClass(WordCount.class);
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);
		job.setNumReduceTasks(1);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}
    • Update pom file as below
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hadoopnotes</groupId>
    <artifactId>MapReduceSample</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>MapReduceSample</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.8.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.1</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>
    • Right click on the project, and then Run As -> Maven install. This will generate jar file in target folder:┬áMapReduceSample-0.0.1-SNAPSHOT.jar
    • Copy this jar file to Hadoop cluster (like master node on AWS EMR)
    • Create an input file input.txt. Write some text into it.
    • Copy this file to HDFS: hdfs dfs -put input.txt /user/hadoop/
    • Run the WordCount program using below command.

$ hadoop jar MapReduceSample-0.0.1-SNAPSHOT.jar com.hadoopnotes.MapReduceSample.WordCount input.txt output4

  • Check output:

    hdfs dfs -cat /user/hadoop/output4/part-r-00000

 

Text Processing using AWK

I came across a scenario where in I had to parse a file that has many INSERT hql commands in a single file and I had to create a separate file for each INSERT command.

My input file insert_all.hql is as below.

INSERT INTO TABLE table1
SELECT
 col1,
 col2,
 col3
FROM table11
WHERE date = '2017-07-12'
;

INSERT INTO TABLE table2
SELECT
 col1,
 col2,
 col3
FROM table22
WHERE date = '2017-07-12'
;

INSERT INTO TABLE table3
SELECT
 col1,
 col2,
 col3
FROM table33
WHERE date = '2017-07-12'

You can do this manually by creating each file and copy the required code from original file to target file for each table. But it’s error-prone and tedious.

I wrote a little script using Linux/Unix tool called awk. awk is very good text processing tool that can be used for this kind of text parsing tasks.

awk '
> BEGIN {RS=";"}
> { content=$0;
> if(content ~ /INSERT INTO TABLE ([A-Za-z].*)/) filename=$4;
> print content > filename;
> close filename;
> }
>' insert_all.hql

Explanation

  • In the BEGIN section, RS (Record separator) is initialized as semicolon. The default is newline, but this doesn’t work for us here.
  • Next section, I am copying the content of that record to a variable.
  • Next, I am extracting table name using REGEX. filename is table name
  • Next, I am copying the content into the file
  • This will generate files like table1, table2, etc.

You can run this code from a terminal on Mac or a Linux machine.

 

Hive Date Functions

Hive provides a SQL-like interface to data stored on HDFS. Some of the rare date functions are given below (Tested on version 2.1.0).

  • Get last date of month
$ hive -e 'select last_day("2017-05-01")'
OK
2017-05-31
Time taken: 2.15 seconds, Fetched: 1 row(s)
  • Get last day of month (Monday=1, Sunday=7)
$ hive -e 'select date_format(last_day("2017-05-01"), "u")'
OK
3
Time taken: 2.46 seconds, Fetched: 1 row(s)
  • Add days to a date
$ hive -e 'select date_add(last_day("2017-04-03"), 1)'
OK
2017-05-01
Time taken: 2.425 seconds, Fetched: 1 row(s)
  • Get first day of last month (Monday=1, Sunday=7)
$ hive -e 'select date_format(date_add(last_day("2017-04-03"), 1), "u")'
OK
1
Time taken: 2.301 seconds, Fetched: 1 row(s)

P.S. Find Hive version using command: hive –version

$ hive --version
Hive 2.1.0-amzn-0

Remove Special Characters from Data using Hive

I have data that has some special characters like carriage returns (\r), newlines (\n), and some non-printable control characters (like ^M). I need to process this data using Hive for some transformations and later load this data to a database like Oracle using Sqoop or Amazon Redshift using PSQL COPY for analysis.

The Sqoop or PSQL COPY fail to load data because of these invalid characters.
I was trying to find how to strip off these characters at Hive before loading to external database.
Hive’s regexp_extract function comes to rescue, but finding the right REGEX pattern was a challenge.
Finally I figured it out after searching on online forums. Good to know is that Hive supports Java REGEX patterns.
My final Hive query is given below.


SELECT regexp_extract(col1, '\\p{Print}*',0)
FROM table1;

Increase Hue Oozie Dashboard Job Entries

On Hue Oozie dashboard, by default you only see 100 entries for workflow jobs.
This won’t help if you have too many jobs running and want to take a look at jobs failed/killed, for example, two days ago.

How to increase the number of entries you see?

Open the file /etc/hue/conf/hue.ini on Hadoop master node.
Update the following entry in the file to a number you wish. Here I increased to 500.

# Maximum of Oozie workflows or coordinators to retrieve in one API call.
oozie_jobs_count=500

And then restart the Hue service using the commands below.

sudo /sbin/stop hue
sudo /sbin/start hue

Note: Those steps for AWS EMR. Other Hadoop distributions might have different ways of setting the config parameters.