GPLink 0.1.30 Supports Hive!

Enhancements
GPLink has been tested successfully with Hive. This is most useful in the Greenplum environment where you want to query a Hive table. HAWQ has the PXF protocol which supports Hive and is very performant. Please use PXF in HAWQ to access Hive. But with Greenplum, gplink now supports Hive!

Hive
This is in the gplink README file but the configuration is a bit more involved than a typical JDBC driver. This is because of how Hive is configured for logging. Plus, this configuration has changed through the Hive versions making it more complex to configure.

You will need the following jar files from your Hadoop cluster.

I tested with a Hortonworks cluster with Hive 1.2.1.2.4.
/usr/hdp/2.4.2.0-258/hive/lib/hive-jdbc.jar
/usr/hdp/2.4.2.0-258/hadoop/client/hadoop-common.jar
/usr/hdp/2.4.2.0-258/hadoop/client/log4j.jar
/usr/hdp/2.4.2.0-258/hadoop/client/slf4j-api.jar
/usr/hdp/2.4.2.0-258/hadoop/client/slf4j-log4j12.jar

Older versions of Hive may have have duplicate SLF4J bindings and fail to work
properly. The error message is, “Class path contains multiple SLF4J bindings”. If
you get this, remove the slf4j-log4j12.jar file from the jar/ directory, source the
gplink_path.sh file, and then try again.

Bug Fixes
The extraProps value for external connections wasn’t being parsed correctly. The extraProps is most useful for Oracle in setting the fetch size. If you are using Oracle as a source with gplink, be sure to upgrade to this new version and change your connection from “extraProperies” or “extraProperties” to “extraProps”.

Link
Download 0.1.30 here.

GPLink 0.1.26

GPLink has a new version with the only change being the name of a script variable. The variable was renamed because it is shared between this project and another one I work on (TPC-DS). This common variable name would cause issues with the TPC-DS scripts when logging activities.

0.1.26

Loading Data into HAWQ

Loading data into the database is required to start using it but how? There are several approaches to achieve this basic requirement but achieve the result by approaching the problem in different ways. This allows you to load data that best matches your use case.

Table Setup
This table will be used for the testing in HAWQ. I have this table created in a single node VM running Hortonworks HDP with HAWQ 2.0 installed. I’m using the default Resource Manager too.

CREATE TABLE test_data
(id int,
 fname text,
 lname text)
 DISTRIBUTED RANDOMLY;

Singleton
Let’s start with probably the worst way first. Sometimes this way is ideal because you have very little data to load but in most cases, avoid singleton inserts. This approach inserts just a single tuple in a single transaction.

head si_test_data.sql
insert into test_data (id, fname, lname) values (1, 'jon_00001', 'roberts_00001');
insert into test_data (id, fname, lname) values (2, 'jon_00002', 'roberts_00002');
insert into test_data (id, fname, lname) values (3, 'jon_00003', 'roberts_00003');
insert into test_data (id, fname, lname) values (4, 'jon_00004', 'roberts_00004');
insert into test_data (id, fname, lname) values (5, 'jon_00005', 'roberts_00005');
insert into test_data (id, fname, lname) values (6, 'jon_00006', 'roberts_00006');
insert into test_data (id, fname, lname) values (7, 'jon_00007', 'roberts_00007');
insert into test_data (id, fname, lname) values (8, 'jon_00008', 'roberts_00008');
insert into test_data (id, fname, lname) values (9, 'jon_00009', 'roberts_00009');
insert into test_data (id, fname, lname) values (10, 'jon_00010', 'roberts_00010');

This repeats for 10,000 tuples.

time psql -f si_test_data.sql > /dev/null
real	5m49.527s

As you can see, this is pretty slow and not recommended for inserting large amounts of data. Nearly 6 minutes to load 10,000 tuples is crawling.

COPY
If you are familiar with PostgreSQL then you will feel right at home with this technique. This time, the data is in a file named test_data.txt and it is not wrapped with an insert statement.

head test_data.txt
1|jon_00001|roberts_00001
2|jon_00002|roberts_00002
3|jon_00003|roberts_00003
4|jon_00004|roberts_00004
5|jon_00005|roberts_00005
6|jon_00006|roberts_00006
7|jon_00007|roberts_00007
8|jon_00008|roberts_00008
9|jon_00009|roberts_00009
10|jon_00010|roberts_00010
COPY test_data FROM '/home/gpadmin/test_data.txt' WITH DELIMITER '|';
COPY 10000
Time: 128.580 ms

This method is significantly faster but it loads the data through the master. This means it doesn’t scale well as the master will become the bottleneck but it does allow you to load data from a host anywhere on your network so long as it has access to the master.

gpfdist
gpfdist is a web server that serves posix files for the segments to fetch. Segment processes will get the data directly from gpfdist and bypass the master when doing so. This enables you to scale by adding more gpfdist processes and/or more segments.

gpfdist -p 8888 &
[1] 128836
[gpadmin@hdb ~]$ Serving HTTP on port 8888, directory /home/gpadmin

Now you’ll need to create a new external table to read the data from gpfdist.

CREATE EXTERNAL TABLE gpfdist_test_data
(id int,
 fname text,
 lname text)
LOCATION ('gpfdist://hdb:8888/test_data.txt')
FORMAT 'TEXT' (DELIMITER '|');

And to load the data.

INSERT INTO test_data SELECT * FROM gpfdist_test_data;
INSERT 0 10000
Time: 98.362 ms

gpfdist is blazing fast and scales easily. You can add more than one gpfdist location in the external table, use wild cards, use different formats, and much more. The downside is the file must be on a host that all segments can reach. You also have to create a separate gpfdist process on that host.

gpload
gpload is a utility that automates the loading process by using gpfdist. Review the documentation for more on this utility. Technically, it is the same as gpfdist and external tables but just automates the commands for you.

Programmable Extension Framework (PXF)
PXF allows you to read and write data to HDFS using external tables. Like using gpfdist, it is done by each segment so it scales and executes in parallel.

For this example, I’ve loaded the test data into HDFS.

hdfs dfs -cat /test_data/* | head
1|jon_00001|roberts_00001
2|jon_00002|roberts_00002
3|jon_00003|roberts_00003
4|jon_00004|roberts_00004
5|jon_00005|roberts_00005
6|jon_00006|roberts_00006
7|jon_00007|roberts_00007
8|jon_00008|roberts_00008
9|jon_00009|roberts_00009
10|jon_00010|roberts_00010

The external table definition.

CREATE EXTERNAL TABLE et_test_data
(id int,
 fname text,
 lname text)
LOCATION ('pxf://hdb:51200/test_data?Profile=HdfsTextSimple')
FORMAT 'TEXT' (DELIMITER '|');

And now to load it.

INSERT INTO test_data SELECT * FROM et_test_data;
INSERT 0 10000
Time: 227.599 ms

PXF is probably the best way to load data when using the “Data Lake” design. You load your raw data into HDFS and then consume it with a variety of tools in the Hadoop ecosystem. PXF can also read and write other formats.

Outsourcer and gplink
Last but not least are software programs I created. Outsourcer automates the table creation and load of data directly to Greenplum or HAWQ using gpfdist. It sources data from SQL Server and Oracle as these are the two most common OLTP databases.

gplink is another tool that can read external data but this technique can connect to any valid JDBC source. It doesn’t automate many of the steps that Oustourcer does but it is a convenient tool to get data from a JDBC source.

You might be thinking that sqoop does this but not exactly. gplink and Outsourcer load data into HAWQ and Greenplum tables. It is optimized for these databases and fixes data for you automatically. Both remove null and newline characters and escapes the escape and delimiter characters. With sqoop, you will have to read the data from HDFS using PXF and then fix the errors that could be in the files.

Both tools are linked above.

Summary
This post gives a brief description on the various ways to load data into HAWQ. Pick the right technique for your use case. As you can see, HAWQ is very flexible and can handle a variety of ways to load data.

GPLink Version 0.1.25

GPLink has a new version that fixes a situation where you may have started other gpfdist processes on your ETL server and you used a different order of parameters than what GPLink uses. The older versions would have a parsing error and this new version handles this situation. As before, if the port range of other gpfdist processes are outside the Upper and Lower port range defined by GPLink, then those gpfdist processes are ignored.

Download Version 0.1.25

Getting the filename with gpfdist

Occasionally, I see the request to get the filename added to a file read by gpfdist. Here is a way to do it!

First, create a YML file named “transform_config.yml” with the following:

---
VERSION: 1.0.0.1
TRANSFORMATIONS:
  transformation_input:
     TYPE: input 
     CONTENT: data
     COMMAND: /bin/bash transform.sh

Next, create that “transform.sh” file. This is just a simple example that gets all txt files but you can also pass in a parameter to this script. The filename in the external table gets passed to the script.

#!/bin/bash
set -e
for i in $(ls *.txt); do awk '{print FILENAME"|"$0}' $i; done

Create two test files (test_file_1.txt and test_file_2.txt).

cat test_file_1.txt 
1|foo1
2|foo2
3|foo3
4|foo4
5|foo5
6|foo6
7|foo7
8|foo8
9|foo9
10|foo10
cat test_file_2.txt 
11|foo11
12|foo12
13|foo13
14|foo14
15|foo15
16|foo16
17|foo17
18|foo18
19|foo19
20|foo20

Start gpfdist in the background.

gpfdist -p 8999 -c transform_config.yml > mylog 2>&1 < mylog &

Create the External Table but be sure to change the hostname. Note that "foo" in the LOCATION is the filename. I'm ignoring it for this example but this is how you can pass parameters to the script. You add %filename% to the YML file as the parameter to the script.

CREATE EXTERNAL TABLE ext_transform_table 
(filename text, id int, descrption text) 
LOCATION ('gpfdist://gpdbsne:8999/foo#transform=transformation_input') 
FORMAT 'text' (DELIMITER '|')

Now select from the External Table.

select * from ext_transform_table;
    filename     | id | descrption 
-----------------+----+------------
 test_file_1.txt |  1 | foo1
 test_file_1.txt |  2 | foo2
 test_file_1.txt |  3 | foo3
 test_file_1.txt |  4 | foo4
 test_file_1.txt |  5 | foo5
 test_file_1.txt |  6 | foo6
 test_file_1.txt |  7 | foo7
 test_file_1.txt |  8 | foo8
 test_file_1.txt |  9 | foo9
 test_file_1.txt | 10 | foo10
 test_file_2.txt | 11 | foo11
 test_file_2.txt | 12 | foo12
 test_file_2.txt | 13 | foo13
 test_file_2.txt | 14 | foo14
 test_file_2.txt | 15 | foo15
 test_file_2.txt | 16 | foo16
 test_file_2.txt | 17 | foo17
 test_file_2.txt | 18 | foo18
 test_file_2.txt | 19 | foo19
 test_file_2.txt | 20 | foo20
(20 rows)

HAWQ 2.0 Generally Available and Outsourcer 5.1.4

HAWQ, or commercially known as Pivotal HDB, just had a major release that I’m really excited about.

Major Features
– Based on Apache HAWQ and also includes support for Quicklz table compression plus support for PL/R, PL/Java, and pgCrypto
– Elastic runtime which means more segments (resources) can be allocated automatically based on the complexity of the query
– YARN integration
– Dynamic sizing of the cluster
– Block level storage which enables maximum parallelism
– Single HDFS directory per table which makes it easier to share and manage data
– Fault tolerance enhancements makes it easier and quicker to add or remove data nodes
– HDFS catalog cacheing
– HCatalog integration which greatly simplifies accessing Hive data
– New management interface with “hawq” commands
– Support for Ambari 2.2.2
– Plugin support for Kerberos
– Better logging for runaway query termination

Product Page
Documentation
Download

Outsourcer 5.1.4
I also have updated Outsourcer to take advantage of HAWQ 2.0/Pivotal HDB 2.0. In HAWQ 2.0/Pivotal HDB 2.0, tables should be distributed randomly in order to take advantage of many of the new features. Starting with version 5.1.4, Outsourcer will now make all tables distributed randomly when the database is HAWQ 2.0/Pivotal HDB 2.0. For Greenplum and HAWQ 1.3, the tables will still be distributed by the source’s primary key if one is found.

Documentation
Download 5.1.4
Source Code

Pivotal HDB 2.0 (Apache HAWQ) Table Distribution

Pivotal HDB version 2.0 is very close to being generally available and how table distribution works between this version and 1.3 which are worth mentioning.

Distribution
HDB is a fork of Greenplum Database which is an MPP database. Greenplum distributes or shards the data across multiple “segments” which are located on multiple “segment hosts”. The distribution is typically set by a hash of a column or set of columns.

Example:

CREATE TABLE customer 
(customer_id int,
 customer_name text,
 customer_address text)
DISTRIBUTED BY (customer_id);

In Greenplum, this would create a file in each segment in each segment host. If you made the table column oriented, the number of files increases with a file per column, per segment. Add in partitioning which again uses separate files for each partition, you end up with possibly thousands of files for a single table. This is great for an MPP database with a robust optimizer that can skip scanning files it doesn’t need to in order to execute the query in the fastest way possible.

HDB 1.3 uses the same design pattern as Greenplum but it stores the files in HDFS. Hadoop loves big files but doesn’t work optimally with lots of files which meant that you didn’t typically use column orientation and partitions were larger.

HDB 2.0 Distribution
1. There are now a dynamic number of segment processes per host. There is just a single segment directory per data node and the database will dynamically create the number of buckets as needed.

2. When you create a table with the distribution set (as shown above), the number of buckets is fixed. This is set with the GUC default_hash_table_bucket_number which sets the number of buckets per host.

3. When you create a random distribution table, the number of buckets is dynamic.

So how does this work? Take our example “customer” table above with the distribution set to (customer_id).

INSERT INTO customer SELECT i, 'company_' || i, i || ' main st' 
FROM generate_series(1,1000) AS i;

Query returned successfully: 1000 rows affected, 784 msec execution time.

Now let’s go look at the files.

--use the OID values to find the location in HDFS
SELECT oid FROM pg_database WHERE datname = 'gpadmin';

16508

SELECT c.oid
FROM pg_namespace n
JOIN pg_class c ON n.oid = c.relnamespace
WHERE n.nspname = 'public'
AND c.relname = 'customer'
AND c.relkind = 'r';

24591

So our data files are in HDFS. This is just a single node VM I’m working with one segment. It has default_hash_table_bucket_number set to 6 so HDB will create 6 buckets of data in HDFS.

hdfs dfs -ls /hawq_default/16385/16508/24591
Found 6 items
-rw-------   1 gpadmin hdfs       6776 2016-05-04 16:51 /hawq_default/16385/16508/24591/1
-rw-------   1 gpadmin hdfs       6768 2016-05-04 16:51 /hawq_default/16385/16508/24591/2
-rw-------   1 gpadmin hdfs       6688 2016-05-04 16:51 /hawq_default/16385/16508/24591/3
-rw-------   1 gpadmin hdfs       6728 2016-05-04 16:51 /hawq_default/16385/16508/24591/4
-rw-------   1 gpadmin hdfs       7600 2016-05-04 16:51 /hawq_default/16385/16508/24591/5
-rw-------   1 gpadmin hdfs       7488 2016-05-04 16:51 /hawq_default/16385/16508/24591/6

Now recreate this table with random distribution, insert the data, and look at the files.

DROP TABLE IF EXISTS customer;
CREATE TABLE customer 
(customer_id int,
 customer_name text,
 customer_address text)
DISTRIBUTED RANDOMLY;

INSERT INTO customer SELECT i, 'company_' || i, i || ' main st' 
FROM generate_series(1,1000) AS i;

SELECT c.oid
FROM pg_namespace n
JOIN pg_class c ON n.oid = c.relnamespace
WHERE n.nspname = 'public'
AND c.relname = 'customer'
AND c.relkind = 'r';

24596

[gpadmin@hdp23 ~]$ hdfs dfs -ls /hawq_default/16385/16508/24596
Found 1 items
-rw-------   1 gpadmin hdfs      41968 2016-05-04 17:02 /hawq_default/16385/16508/24596/1

It only created a single file in HDFS with random distribution.
– This is great for HDFS because there are less files for the namenode to track.
– Allows for elasticity of the cluster. Grow or shrink the cluster without having to redistribute the data.
– The optimizer has also been enhanced to dynamically set the number of buckets based on the demand of the query.

As you might be concluding right about now, RANDOM DISTRIBUTION is the recommendation for tables in HDB 2.0. You can still set your distribution to a hash of a column or columns which will use a static number of buckets but random is recommended.

If you create a table now in HDB 2.0 without setting the distribution, the default will be RANDOM.

Proof

DROP TABLE IF EXISTS customer;
CREATE TABLE customer 
(customer_id int,
 customer_name text,
 customer_address text)
 DISTRIBUTED BY (customer_id);

SELECT sub.attname
FROM pg_namespace n
JOIN pg_class c ON n.oid = c.relnamespace
LEFT JOIN 
(SELECT p.attrelid, p.attname
FROM pg_attribute p
JOIN (SELECT localoid, unnest(attrnums) AS attnum FROM gp_distribution_policy) AS g ON g.localoid = p.attrelid AND g.attnum = p.attnum) AS sub
ON c.oid = sub.attrelid 
WHERE n.nspname = 'public'
AND c.relname = 'customer'
AND c.relkind = 'r';

"customer_id"

Now, recreate the table without setting the distribution key.

DROP TABLE IF EXISTS customer;
CREATE TABLE customer 
(customer_id int,
 customer_name text,
 customer_address text);

SELECT sub.attname
FROM pg_namespace n
JOIN pg_class c ON n.oid = c.relnamespace
LEFT JOIN 
(SELECT p.attrelid, p.attname
FROM pg_attribute p
JOIN (SELECT localoid, unnest(attrnums) AS attnum FROM gp_distribution_policy) AS g ON g.localoid = p.attrelid AND g.attnum = p.attnum) AS sub
ON c.oid = sub.attrelid 
WHERE n.nspname = 'public'
AND c.relname = 'customer'
AND c.relkind = 'r';

--no data