Pig Basics
Need of Pig
Installing and Running Pig
- Pig is an alternative abstraction on top of MapReduce
- Uses a dataflow scripting language called PigLatin (a dataflow language)
- The Pig interpreter runs on the client machine
- Takes the PigLatin script and turns it into a series of MapReduec jobs
- Submits those jobs to the cluster.
- Developed by Yahoo! now open source.
Need of Pig
- MapReduce requires Java programmer
- MapReduce can require multiple stages to come to solution
- User has to reinvent the common functionality (join, filter..etc)
- MapReduce has a long development cycle with rigorous testing states.
- Open the system for the users familiar with scripting languages such as Python, Ruby, PHP etc
- 10 lines of PigLatin = 200 Lines of Java
- What took 4 hours in java = 15 minutes in Pig Latin
- Provides common operation like join, filter, group, sort etc.
Installing and Running Pig
Installation
is straightforward. Java 6 is a prerequisite (and on Windows, you will need
Cygwin).
Download a stable release from http://pig.apache.org/releases.html, and unpack
the tarball
in a suitable place on your workstation:
% tar xzf
pig-x.y.z.tar.gz
It’s
convenient to add Pig’s binary directory to your command-line path. For
example:
% export
PIG_INSTALL=/home/tom/pig-x.y.z
% export
PATH=$PATH:$PIG_INSTALL/bin
You also
need to set the JAVA_HOME environment variable to point to a suitable Java
installation.
Try typing
pig -help to get usage instructions.
Execution
Types
Pig has two
execution types or modes: local mode and MapReduce mode.
Local mode
% pig -x
local
grunt>
MapReduce
mode
In MapReduce
mode, Pig translates queries into MapReduce jobs and runs them on a
Hadoop
cluster. The cluster may be a pseudo- or fully distributed cluster. MapReduce
mode (with a
fully distributed cluster) is what you use when you want to run Pig on
large datasets.
To use
MapReduce mode, you first need to check that the version of Pig you downloaded
is
compatible with the version of Hadoop you are using. Pig releases will only
work against
particular versions of Hadoop; this is documented in the release notes.
Pig honors
the HADOOP_HOME environment variable for finding which Hadoop client to
run. However
if it is not set, Pig will use a bundled copy of the Hadoop libraries. Note
that these
may not match the version of Hadoop running on your cluster, so it is best
to
explicitly set HADOOP_HOME.
Next, you
need to point Pig at the cluster’s namenode and jobtracker. If the installation
of Hadoop at
HADOOP_HOME is already configured for this, then there is nothing more to
do.
Otherwise, you can set HADOOP_CONF_DIR to a directory containing the Hadoop
site
file (or
files) that define fs.default.name and mapred.job.tracker.
Alternatively,
you can set these two properties in the pig.properties file in Pig’s conf
directory
(or the directory specified by PIG_CONF_DIR). Here’s an example for a
pseudodistributed
setup:
fs.default.name=hdfs://localhost/
mapred.job.tracker=localhost:8021
Once you
have configured Pig to connect
% pig -x
mapreduce
grunt>
Running Pig
Programs
There are
three ways of executing Pig programs, all of which work in both local and
MapReduce
mode:
Script
Pig can run
a script file that contains Pig commands. For example, pig
script.pig
runs the commands in the local file script.pig. Alternatively, for very
short
scripts, you can use the -e option to run a script specified as a string on the
command
line.
Grunt
Grunt is an
interactive shell for running Pig commands. Grunt is started when no
file is
specified for Pig to run, and the -e option is not used. It is also possible to
run Pig
scripts from within Grunt using run and exec.
Embedded
You can run
Pig programs from Java using the PigServer class, much like you can
use JDBC to
run SQL programs from Java. For programmatic access to Grunt, use
PigRunner.
EXAMPLE:
sample.txt
1950 0 1
1950 22 1
1950 -11 1
1949 111 1
1949 78 1
grunt>
records = LOAD 'input/ncdc/micro-tab/sample.txt'
>> AS
(year:chararray, temperature:int, quality:int);
grunt>
DUMP records;
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
(1949,78,1)
grunt>
DESCRIBE records;
records:
{year: chararray,temperature: int,quality: int}
grunt>
filtered_records = FILTER records BY temperature != 9999 AND (quality == 0 OR
quality == 9);
grunt>
DUMP filtered_records;
(1950,0,1)
(1950,22,1)
(1950,-11,1)
(1949,111,1)
(1949,78,1)
grunt>
grouped_records = GROUP filtered_records BY year;
grunt>
DUMP grouped_records;
(1949,{(1949,111,1),(1949,78,1)})
(1950,{(1950,0,1),(1950,22,1),(1950,-11,1)})
grunt>
DESCRIBE grouped_records;
grouped_records:
{group: chararray,filtered_records: {year: chararray,
temperature:
int,quality: int}}
grunt>
max_temp = FOREACH grouped_records GENERATE group,
MAX(filtered_records.temperature);
grunt>
DUMP max_temp;
(1949,111)
(1950,22)
Multiquery
execution
A = LOAD
'input/pig/multiquery/A';
B = FILTER A
BY $1 == 'banana';
C = FILTER A
BY $1 != 'banana';
STORE B INTO
'output/b';
STORE C INTO
'output/c';
Pig Latin relational operators
|
||
Loading and Storing
|
LOAD
|
|
STORE
|
||
DUMP
|
||
Filtering
|
FILTER
|
|
FOREACH…GENERATE
|
||
MAPREDUCE
|
||
STREAM
|
||
SAMPLE
|
||
Grouping and joining
|
JOIN
|
|
COGROUP
|
||
GROUP
|
||
CROSS
|
||
Sorting
|
ORDER
|
|
LIMIT
|
||
Combining and Splitting
|
UNION
|
|
SPLIT
|
||
Pig Latin diagnostic operators
|
||
DESCRIBE
|
Prints a relation’s schema
|
|
EXPLAIN
|
Prints the logical and physical plans
|
|
ILLUSTRATE
|
Shows a sample execution of the logical plan, using a generated
subset of the input
|
|
Pig Latin macro and UDF statements
|
||
REGISTER
|
Registers a JAR file with the Pig runtime
|
|
DEFINE
|
Creates an alias for a macro, a UDF, streaming script, or a
command specification
|
|
IMPORT
|
Import macros defined in
a separate file into a script
|
|
Pig Latin types
|
||
NUMERIC
|
int - 32 bit
|
|
long - 64 bit
|
||
float - 32 bit
|
||
double - 64 bit
|
||
TEXT
|
chararray
|
|
BINARY
|
bytearray
|
|
COMPLEX
|
tuple [Sequence of fields of any type (1,'pomegranate')]
|
|
bag [An unordered collection of tuples, possibly with duplicates
{(1,'pomegranate'),(2)}]
|
||
map [ A set of key-value pairs. Keys must be character
arrays;values may be any type
['a'#'pomegranate']] |
||
Validation
and nulls
1950 0 1
1950 22 1
1950 e 1
1949 111 1
1949 78 1
grunt>
records = LOAD 'input/ncdc/micro-tab/sample_corrupt.txt'
>> AS
(year:chararray, temperature:int, quality:int);
grunt>
DUMP records;
(1950,0,1)
(1950,22,1)
(1950,,1)
(1949,111,1)
(1949,78,1)
grunt>
corrupt_records = FILTER records BY temperature is null;
grunt>
DUMP corrupt_records;
(1950,,1)
grunt>
grouped = GROUP corrupt_records ALL;
grunt>
all_grouped = FOREACH grouped GENERATE group, COUNT(corrupt_records);
grunt>
DUMP all_grouped;
(all,1)
grunt>
SPLIT records INTO good_records IF temperature is not null,
>>
bad_records IF temperature is null;
grunt>
DUMP good_records;
(1950,0,1)
(1950,22,1)
(1949,111,1)
(1949,78,1)
grunt>
DUMP bad_records;
(1950,,1)
Missing
fields
grunt> A
= LOAD 'input/pig/corrupt/missing_fields';
grunt>
DUMP A;
(2,Tie)
(4,Coat)
(3)
(1,Scarf)
grunt> B
= FILTER A BY SIZE(TOTUPLE(*)) > 1;
grunt>
DUMP B;
(2,Tie)
(4,Coat)
(1,Scarf)
Functions in
Pig come in four types:
Eval
function
A function
that takes one or more expressions and returns another expression. An
example of a
built-in eval function is MAX, which returns the maximum value of the
entries in a
bag. Some eval functions are aggregate functions, which means they
operate on a
bag of data to produce a scalar value; MAX is an example of an aggregate
function.
Furthermore, many aggregate functions are algebraic, which means that
the result
of the function may be calculated incrementally. In MapReduce terms,
algebraic
functions make use of the combiner and are much more efficient to
calculate
(see “Combiner Functions” on page 34). MAX is an algebraic function,
whereas a
function to calculate the median of a collection of values is an example
of a
function that is not algebraic.
Filter
function
A special
type of eval function that returns a logical boolean result. As the name
suggests,
filter functions are used in the FILTER operator to remove unwanted
rows. They
can also be used in other relational operators that take boolean conditions
and, in
general, expressions using boolean or conditional expressions. An
example of a
built-in filter function is IsEmpty, which tests whether a bag or a map
contains any
items.
Load
function
A function
that specifies how to load data into a relation from external storage.
Store
function
A function
that specifies how to save the contents of a relation to external storage.
Often, load
and store functions are implemented by the same type. For example,
PigStorage,
which loads data from delimited text files, can store data in the same
format.
Pig’s built-in functions
|
||
Eval
|
AVG
|
|
CONCAT
|
||
COUNT
|
||
COUNT_STAR
|
||
DIFF
|
||
MAX
|
||
MIN
|
||
SIZE
|
||
SUM
|
||
TOBAG
|
||
TOKENIZE
|
||
TOMAP
|
||
TOP
|
||
TOTUPLE
|
||
Filter
|
IsEmpty
|
|
Load/Store
|
PigStorage
|
|
BinStorage
|
||
TextLoader
|
||
JsonLoader,JsonStorage
|
||
HBaseStorage
|
||
Macros
Macros
provide a way to package reusable pieces of Pig Latin code from within Pig
Latin
itself. For example, we can extract the part of our Pig Latin program that
performs
grouping on
a relation then finds the maximum value in each group, by defining a macro
as follows:
DEFINE
max_by_group(X, group_key, max_field) RETURNS Y {
A = GROUP $X
by $group_key;
$Y = FOREACH
A GENERATE group, MAX($X.$max_field);
};
User-Defined
Functions
filtered_records
= FILTER records BY temperature != 9999 AND
(quality ==
0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
to:
filtered_records
= FILTER records BY temperature != 9999 AND isGood(quality);
import
java.io.IOException;
import
java.util.ArrayList;
import
java.util.List;
import
org.apache.pig.FilterFunc;
import
org.apache.pig.backend.executionengine.ExecException;
import
org.apache.pig.data.DataType;
import
org.apache.pig.data.Tuple;
import
org.apache.pig.impl.logicalLayer.FrontendException;
public class
IsGoodQuality extends FilterFunc {
@Override
public
Boolean exec(Tuple tuple) throws IOException {
if (tuple ==
null || tuple.size() == 0) {
return
false;
}
try {
Object
object = tuple.get(0);
if (object
== null) {
return
false;
}
int i =
(Integer) object;
return i ==
0 || i == 1 || i == 4 || i == 5 || i == 9;
} catch
(ExecException e) {
throw new
IOException(e);
}
}
}
grunt>
REGISTER pig-examples.jar;
grunt>
filtered_records = FILTER records BY temperature != 9999 AND
>>
com.hadoopbook.pig.IsGoodQuality(quality);
grunt>
DEFINE isGood com.hadoopbook.pig.IsGoodQuality();
grunt>
filtered_records = FILTER records BY temperature != 9999 AND isGood(quality);
Loading and
Storing Data
grunt>
STORE A INTO 'out' USING PigStorage(':');
grunt>
cat out
Joe:cherry:2
Ali:apple:3
Joe:banana:2
Eve:apple:7
FOREACH...GENERATE
grunt>
DUMP A;
(Joe,cherry,2)
(Ali,apple,3)
(Joe,banana,2)
(Eve,apple,7)
grunt> B
= FOREACH A GENERATE $0, $2+1, 'Constant';
grunt>
DUMP B;
(Joe,3,Constant)
(Ali,4,Constant)
(Joe,3,Constant)
(Eve,8,Constant)
STREAM
The STREAM
operator allows you to transform data in a relation using an external
program or
script. It is named by analogy with Hadoop Streaming, which provides a
similar
capability for MapReduce.
STREAM can
use built-in commands with arguments. Here is an example that uses the
Unix cut
command to extract the second field of each tuple in A. Note that the command
and its
arguments are enclosed in backticks:
grunt> C
= STREAM A THROUGH `cut -f 2`;
grunt>
DUMP C;
(cherry)
(apple)
(banana)
(apple)
grunt>
DUMP A;
(2,Tie)
(4,Coat)
(3,Hat)
(1,Scarf)
grunt>
DUMP B;
(Joe,2)
(Hank,4)
(Ali,0)
(Eve,3)
(Hank,2)
JOIN
grunt> C
= JOIN A BY $0, B BY $1;
grunt>
DUMP C;
(2,Tie,Joe,2)
(2,Tie,Hank,2)
(3,Hat,Eve,3)
(4,Coat,Hank,4)
LEFT OUTER
JOIN
grunt> C
= JOIN A BY $0 LEFT OUTER, B BY $1;
grunt>
DUMP C;
(1,Scarf,,)
(2,Tie,Joe,2)
(2,Tie,Hank,2)
(3,Hat,Eve,3)
(4,Coat,Hank,4)
COGROUP
grunt> D
= COGROUP A BY $0, B BY $1;
grunt>
DUMP D;
(0,{},{(Ali,0)})
(1,{(1,Scarf)},{})
(2,{(2,Tie)},{(Joe,2),(Hank,2)})
(3,{(3,Hat)},{(Eve,3)})
(4,{(4,Coat)},{(Hank,4)})
grunt> E
= COGROUP A BY $0 INNER, B BY $1;
grunt>
DUMP E;
(1,{(1,Scarf)},{})
(2,{(2,Tie)},{(Joe,2),(Hank,2)})
(3,{(3,Hat)},{(Eve,3)})
(4,{(4,Coat)},{(Hank,4)})
grunt> F
= FOREACH E GENERATE FLATTEN(A), B.$0;
grunt>
DUMP F;
(1,Scarf,{})
(2,Tie,{(Joe),(Hank)})
(3,Hat,{(Eve)})
(4,Coat,{(Hank)})
Using a
combination of COGROUP, INNER, and FLATTEN (which removes nesting)
it’s
possible to simulate an (inner) JOIN:
grunt> G
= COGROUP A BY $0 INNER, B BY $1 INNER;
grunt> H
= FOREACH G GENERATE FLATTEN($1), FLATTEN($2);
grunt>
DUMP H;
(2,Tie,Joe,2)
(2,Tie,Hank,2)
(3,Hat,Eve,3)
(4,Coat,Hank,4)
This gives
the same result as JOIN A BY $0, B BY $1.
CROSS
grunt> I
= CROSS A, B;
grunt>
DUMP I;
(2,Tie,Joe,2)
(2,Tie,Hank,4)
(2,Tie,Ali,0)
(2,Tie,Eve,3)
(2,Tie,Hank,2)
(4,Coat,Joe,2)
(4,Coat,Hank,4)
(4,Coat,Ali,0)
(4,Coat,Eve,3)
(4,Coat,Hank,2)
(3,Hat,Joe,2)
(3,Hat,Hank,4)
(3,Hat,Ali,0)
(3,Hat,Eve,3)
(3,Hat,Hank,2)
(1,Scarf,Joe,2)
(1,Scarf,Hank,4)
(1,Scarf,Ali,0)
(1,Scarf,Eve,3)
(1,Scarf,Hank,2)
Sorting Data
Relations
are unordered in Pig. Consider a relation A:
grunt>
DUMP A;
(2,3)
(1,2)
(2,4)
grunt> B
= ORDER A BY $0, $1 DESC;
grunt>
DUMP B;
(1,2)
(2,4)
(2,3)
grunt> D
= LIMIT B 2;
grunt>
DUMP D;
(1,2)
(2,4)
Combining
and Splitting Data
grunt>
DUMP A;
(2,3)
(1,2)
(2,4)
grunt>
DUMP B;
(z,x,8)
(w,y,1)
grunt> C
= UNION A, B;
grunt>
DUMP C;
(2,3)
(1,2)
(2,4)
(z,x,8)
(w,y,1)
Parallelism
To explictly
set the number of reducers you want for each job, you can use a PARALLEL
clause for
operators that run in the reduce phase. These include all the grouping and
joining
operators (GROUP, COGROUP, JOIN, CROSS), as well as DISTINCT and
ORDER. The following
line sets the number of reducers to 30 for the GROUP:
grouped_records
= GROUP records BY year PARALLEL 30;
Alternatively,
you can set the default_parallel option, and it will take effect for all
subsequent
jobs:
grunt>
set default_parallel 30
No comments:
Post a Comment