Sunday, October 13, 2013

PIG

Pig Basics

  • Pig is an alternative abstraction on top of MapReduce
  • Uses a dataflow scripting language called PigLatin (a dataflow language)
  • The Pig interpreter runs on the client machine
  • Takes the PigLatin script and turns it into a series of MapReduec jobs
  • Submits those jobs to the cluster.
  • Developed by Yahoo! now open source.


Need of Pig


  • MapReduce requires Java programmer
  • MapReduce can require multiple stages to come to solution
  • User has to reinvent the common functionality (join, filter..etc)
  • MapReduce has a long development cycle with rigorous testing states.

  • Open the system for the users familiar with scripting languages such as Python, Ruby, PHP etc
  • 10 lines of PigLatin = 200 Lines of Java
  • What took 4 hours in java = 15 minutes in Pig Latin
  • Provides common operation like join, filter, group, sort etc.



Installing and Running Pig
Installation is straightforward. Java 6 is a prerequisite (and on Windows, you will need
Cygwin). Download a stable release from http://pig.apache.org/releases.html, and unpack
the tarball in a suitable place on your workstation:
% tar xzf pig-x.y.z.tar.gz
It’s convenient to add Pig’s binary directory to your command-line path. For example:
% export PIG_INSTALL=/home/tom/pig-x.y.z
% export PATH=$PATH:$PIG_INSTALL/bin
You also need to set the JAVA_HOME environment variable to point to a suitable Java
installation.
Try typing pig -help to get usage instructions.

Execution Types
Pig has two execution types or modes: local mode and MapReduce mode.

Local mode
% pig -x local
grunt>

MapReduce mode
In MapReduce mode, Pig translates queries into MapReduce jobs and runs them on a
Hadoop cluster. The cluster may be a pseudo- or fully distributed cluster. MapReduce
mode (with a fully distributed cluster) is what you use when you want to run Pig on
large datasets.
To use MapReduce mode, you first need to check that the version of Pig you downloaded
is compatible with the version of Hadoop you are using. Pig releases will only
work against particular versions of Hadoop; this is documented in the release notes.
Pig honors the HADOOP_HOME environment variable for finding which Hadoop client to
run. However if it is not set, Pig will use a bundled copy of the Hadoop libraries. Note
that these may not match the version of Hadoop running on your cluster, so it is best
to explicitly set HADOOP_HOME.
Next, you need to point Pig at the cluster’s namenode and jobtracker. If the installation
of Hadoop at HADOOP_HOME is already configured for this, then there is nothing more to
do. Otherwise, you can set HADOOP_CONF_DIR to a directory containing the Hadoop site
file (or files) that define fs.default.name and mapred.job.tracker.
Alternatively, you can set these two properties in the pig.properties file in Pig’s conf
directory (or the directory specified by PIG_CONF_DIR). Here’s an example for a pseudodistributed
setup:
fs.default.name=hdfs://localhost/
mapred.job.tracker=localhost:8021
Once you have configured Pig to connect
% pig -x mapreduce
grunt>

Running Pig Programs
There are three ways of executing Pig programs, all of which work in both local and
MapReduce mode:
Script
Pig can run a script file that contains Pig commands. For example, pig
script.pig runs the commands in the local file script.pig. Alternatively, for very
short scripts, you can use the -e option to run a script specified as a string on the
command line.
Grunt
Grunt is an interactive shell for running Pig commands. Grunt is started when no
file is specified for Pig to run, and the -e option is not used. It is also possible to
run Pig scripts from within Grunt using run and exec.
Embedded
You can run Pig programs from Java using the PigServer class, much like you can
use JDBC to run SQL programs from Java. For programmatic access to Grunt, use
PigRunner.

EXAMPLE:
sample.txt
1950       0              1
1950       22           1
1950       -11          1
1949       111         1
1949       78           1

grunt> records = LOAD 'input/ncdc/micro-tab/sample.txt'
>> AS (year:chararray, temperature:int, quality:int);

grunt> DUMP records;
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
(1949,78,1)

grunt> DESCRIBE records;
records: {year: chararray,temperature: int,quality: int}

grunt> filtered_records = FILTER records BY temperature != 9999 AND (quality == 0 OR quality == 9);
grunt> DUMP filtered_records;
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
(1949,78,1)

grunt> grouped_records = GROUP filtered_records BY year;
grunt> DUMP grouped_records;
(1949,{(1949,111,1),(1949,78,1)})
(1950,{(1950,0,1),(1950,22,1),(1950,-11,1)})

grunt> DESCRIBE grouped_records;
grouped_records: {group: chararray,filtered_records: {year: chararray,
temperature: int,quality: int}}

grunt> max_temp = FOREACH grouped_records GENERATE group, MAX(filtered_records.temperature);

grunt> DUMP max_temp;
(1949,111)
(1950,22)


Multiquery execution
A = LOAD 'input/pig/multiquery/A';
B = FILTER A BY $1 == 'banana';
C = FILTER A BY $1 != 'banana';
STORE B INTO 'output/b';
STORE C INTO 'output/c';


Pig Latin relational operators
Loading and Storing
LOAD

STORE

DUMP
Filtering
FILTER

FOREACH…GENERATE

MAPREDUCE

STREAM

SAMPLE
Grouping and joining
JOIN

COGROUP

GROUP

CROSS
Sorting
ORDER

LIMIT
Combining and Splitting
UNION

SPLIT


Pig Latin diagnostic operators
DESCRIBE
Prints a relation’s schema
EXPLAIN
Prints the logical and physical plans
ILLUSTRATE
Shows a sample execution of the logical plan, using a generated subset of the input


Pig Latin macro and UDF statements
REGISTER
Registers a JAR file with the Pig runtime
DEFINE
Creates an alias for a macro, a UDF, streaming script, or a command specification
IMPORT
 Import macros defined in a separate file into a script


Pig Latin types


NUMERIC
int - 32 bit

long - 64 bit

float - 32 bit

double - 64 bit
TEXT
chararray
BINARY
bytearray
COMPLEX
tuple [Sequence of fields of any type (1,'pomegranate')]

bag [An unordered collection of tuples, possibly with duplicates {(1,'pomegranate'),(2)}]


map [ A set of key-value pairs. Keys must be character arrays;values may be any type
['a'#'pomegranate']]


Validation and nulls
1950 0 1
1950 22 1
1950 e 1
1949 111 1
1949 78 1

grunt> records = LOAD 'input/ncdc/micro-tab/sample_corrupt.txt'
>> AS (year:chararray, temperature:int, quality:int);
grunt> DUMP records;
(1950,0,1)
(1950,22,1)
(1950,,1)
(1949,111,1)
(1949,78,1)

grunt> corrupt_records = FILTER records BY temperature is null;
grunt> DUMP corrupt_records;
(1950,,1)

grunt> grouped = GROUP corrupt_records ALL;
grunt> all_grouped = FOREACH grouped GENERATE group, COUNT(corrupt_records);
grunt> DUMP all_grouped;
(all,1)

grunt> SPLIT records INTO good_records IF temperature is not null,
>> bad_records IF temperature is null;
grunt> DUMP good_records;
(1950,0,1)
(1950,22,1)
(1949,111,1)
(1949,78,1)
grunt> DUMP bad_records;
(1950,,1)

Missing fields
grunt> A = LOAD 'input/pig/corrupt/missing_fields';
grunt> DUMP A;
(2,Tie)
(4,Coat)
(3)
(1,Scarf)
grunt> B = FILTER A BY SIZE(TOTUPLE(*)) > 1;
grunt> DUMP B;
(2,Tie)
(4,Coat)
(1,Scarf)

Functions in Pig come in four types:
Eval function
A function that takes one or more expressions and returns another expression. An
example of a built-in eval function is MAX, which returns the maximum value of the
entries in a bag. Some eval functions are aggregate functions, which means they
operate on a bag of data to produce a scalar value; MAX is an example of an aggregate
function. Furthermore, many aggregate functions are algebraic, which means that
the result of the function may be calculated incrementally. In MapReduce terms,
algebraic functions make use of the combiner and are much more efficient to
calculate (see “Combiner Functions” on page 34). MAX is an algebraic function,
whereas a function to calculate the median of a collection of values is an example
of a function that is not algebraic.
Filter function
A special type of eval function that returns a logical boolean result. As the name
suggests, filter functions are used in the FILTER operator to remove unwanted
rows. They can also be used in other relational operators that take boolean conditions
and, in general, expressions using boolean or conditional expressions. An
example of a built-in filter function is IsEmpty, which tests whether a bag or a map
contains any items.
Load function
A function that specifies how to load data into a relation from external storage.
Store function
A function that specifies how to save the contents of a relation to external storage.
Often, load and store functions are implemented by the same type. For example,
PigStorage, which loads data from delimited text files, can store data in the same
format.

Pig’s built-in functions
Eval
AVG

CONCAT

COUNT

COUNT_STAR

DIFF

MAX

MIN

SIZE

SUM

TOBAG

TOKENIZE

TOMAP

TOP

TOTUPLE
Filter
IsEmpty
Load/Store
PigStorage

BinStorage

TextLoader

JsonLoader,JsonStorage


HBaseStorage

Macros
Macros provide a way to package reusable pieces of Pig Latin code from within Pig
Latin itself. For example, we can extract the part of our Pig Latin program that performs
grouping on a relation then finds the maximum value in each group, by defining a macro
as follows:
DEFINE max_by_group(X, group_key, max_field) RETURNS Y {
A = GROUP $X by $group_key;
$Y = FOREACH A GENERATE group, MAX($X.$max_field);
};

User-Defined Functions
filtered_records = FILTER records BY temperature != 9999 AND
(quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
to:
filtered_records = FILTER records BY temperature != 9999 AND isGood(quality);

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.FilterFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
public class IsGoodQuality extends FilterFunc {
@Override
public Boolean exec(Tuple tuple) throws IOException {
if (tuple == null || tuple.size() == 0) {
return false;
}
try {
Object object = tuple.get(0);
if (object == null) {
return false;
}
int i = (Integer) object;
return i == 0 || i == 1 || i == 4 || i == 5 || i == 9;
} catch (ExecException e) {
throw new IOException(e);
}
}
}

grunt> REGISTER pig-examples.jar;
grunt> filtered_records = FILTER records BY temperature != 9999 AND
>> com.hadoopbook.pig.IsGoodQuality(quality);

grunt> DEFINE isGood com.hadoopbook.pig.IsGoodQuality();
grunt> filtered_records = FILTER records BY temperature != 9999 AND isGood(quality);


Loading and Storing Data
grunt> STORE A INTO 'out' USING PigStorage(':');
grunt> cat out
Joe:cherry:2
Ali:apple:3
Joe:banana:2
Eve:apple:7

FOREACH...GENERATE
grunt> DUMP A;
(Joe,cherry,2)
(Ali,apple,3)
(Joe,banana,2)
(Eve,apple,7)
grunt> B = FOREACH A GENERATE $0, $2+1, 'Constant';
grunt> DUMP B;
(Joe,3,Constant)
(Ali,4,Constant)
(Joe,3,Constant)
(Eve,8,Constant)

STREAM
The STREAM operator allows you to transform data in a relation using an external
program or script. It is named by analogy with Hadoop Streaming, which provides a
similar capability for MapReduce.
STREAM can use built-in commands with arguments. Here is an example that uses the
Unix cut command to extract the second field of each tuple in A. Note that the command
and its arguments are enclosed in backticks:
grunt> C = STREAM A THROUGH `cut -f 2`;
grunt> DUMP C;
(cherry)
(apple)
(banana)
(apple)

grunt> DUMP A;
(2,Tie)
(4,Coat)
(3,Hat)
(1,Scarf)
grunt> DUMP B;
(Joe,2)
(Hank,4)
(Ali,0)
(Eve,3)
(Hank,2)


JOIN
grunt> C = JOIN A BY $0, B BY $1;
grunt> DUMP C;
(2,Tie,Joe,2)
(2,Tie,Hank,2)
(3,Hat,Eve,3)
(4,Coat,Hank,4)

LEFT OUTER JOIN
grunt> C = JOIN A BY $0 LEFT OUTER, B BY $1;
grunt> DUMP C;
(1,Scarf,,)
(2,Tie,Joe,2)
(2,Tie,Hank,2)
(3,Hat,Eve,3)
(4,Coat,Hank,4)


COGROUP
grunt> D = COGROUP A BY $0, B BY $1;
grunt> DUMP D;
(0,{},{(Ali,0)})
(1,{(1,Scarf)},{})
(2,{(2,Tie)},{(Joe,2),(Hank,2)})
(3,{(3,Hat)},{(Eve,3)})
(4,{(4,Coat)},{(Hank,4)})

grunt> E = COGROUP A BY $0 INNER, B BY $1;
grunt> DUMP E;
(1,{(1,Scarf)},{})
(2,{(2,Tie)},{(Joe,2),(Hank,2)})
(3,{(3,Hat)},{(Eve,3)})
(4,{(4,Coat)},{(Hank,4)})

grunt> F = FOREACH E GENERATE FLATTEN(A), B.$0;
grunt> DUMP F;
(1,Scarf,{})
(2,Tie,{(Joe),(Hank)})
(3,Hat,{(Eve)})
(4,Coat,{(Hank)})

Using a combination of COGROUP, INNER, and FLATTEN (which removes nesting)
it’s possible to simulate an (inner) JOIN:
grunt> G = COGROUP A BY $0 INNER, B BY $1 INNER;
grunt> H = FOREACH G GENERATE FLATTEN($1), FLATTEN($2);
grunt> DUMP H;
(2,Tie,Joe,2)
(2,Tie,Hank,2)
(3,Hat,Eve,3)
(4,Coat,Hank,4)
This gives the same result as JOIN A BY $0, B BY $1.

CROSS
grunt> I = CROSS A, B;
grunt> DUMP I;
(2,Tie,Joe,2)
(2,Tie,Hank,4)
(2,Tie,Ali,0)
(2,Tie,Eve,3)
(2,Tie,Hank,2)
(4,Coat,Joe,2)
(4,Coat,Hank,4)
(4,Coat,Ali,0)
(4,Coat,Eve,3)
(4,Coat,Hank,2)
(3,Hat,Joe,2)
(3,Hat,Hank,4)
(3,Hat,Ali,0)
(3,Hat,Eve,3)
(3,Hat,Hank,2)
(1,Scarf,Joe,2)
(1,Scarf,Hank,4)
(1,Scarf,Ali,0)
(1,Scarf,Eve,3)
(1,Scarf,Hank,2)

Sorting Data
Relations are unordered in Pig. Consider a relation A:
grunt> DUMP A;
(2,3)
(1,2)
(2,4)

grunt> B = ORDER A BY $0, $1 DESC;
grunt> DUMP B;
(1,2)
(2,4)
(2,3)

grunt> D = LIMIT B 2;
grunt> DUMP D;
(1,2)
(2,4)

Combining and Splitting Data
grunt> DUMP A;
(2,3)
(1,2)
(2,4)
grunt> DUMP B;
(z,x,8)
(w,y,1)
grunt> C = UNION A, B;
grunt> DUMP C;
(2,3)
(1,2)
(2,4)
(z,x,8)
(w,y,1)

Parallelism
To explictly set the number of reducers you want for each job, you can use a PARALLEL
clause for operators that run in the reduce phase. These include all the grouping and
joining operators (GROUP, COGROUP, JOIN, CROSS), as well as DISTINCT and
ORDER. The following line sets the number of reducers to 30 for the GROUP:
grouped_records = GROUP records BY year PARALLEL 30;
Alternatively, you can set the default_parallel option, and it will take effect for all
subsequent jobs:
grunt> set default_parallel 30






No comments:

Post a Comment