Author Archives: Jon

S3 External Tables

S3 is Amazon’s Simple Storage Service which is an inexpensive cloud storage solution and has quickly become a solution for cold data and backups. Greenplum now has External Tables that can read and write data to S3 so you can leverage this popular storage service with Greenplum. Here is how you do it!

Configuration
1. You will need an Amazon account with your Access Key ID and Secret Access Key. If you have the aws CLI installed and configured, just cat ~/.aws/credentials

2. Get your default region. This is in ~/.aws/config

3. You’ll need a bucket in this region and this can be done with the AWS web interface.

4. You’ll need a configuration file. Here is an example (be sure to change the secret and accessid).

[default]
secret = <secret>
accessid = <accesssid>
threadnum = 4
chunksize = 67108864
low_speed_limit = 10240
low_speed_time = 60
encryption = true

5. Copy the configuration to every Segment Host in your cluster.

for i in $(cat segment_hosts.txt); do scp s3_demo.conf $i:/home/gpadmin; done

Writing to S3

1. Create the Writable External Table with the S3 protocol, AWS URL that has the correct region, and the configuration file that is found on every Segment Host.

CREATE WRITABLE EXTERNAL TABLE public.demo_write
(id int, fname text, lname text)
LOCATION ('s3://s3-us-east-1.amazonaws.com/pivotalguru/demo config=/home/gpadmin/s3_demo.conf')
FORMAT 'TEXT' (DELIMITER '|' NULL AS '');

2. Execute an INSERT statement:

INSERT INTO demo_write 
SELECT i, 'Jon_' || i, 'Roberts_' || i
FROM generate_series(1,10000) as i;

Note: Each Segment will create a file in S3 in your bucket with the prefix you specify in the location specified in the Writable External Table. In this demo, each file is prefixed with “demo”. An example filename is “demo9767abbb3.txt”.

Reading from S3
1. Create the External Table with the same location and configuration as before.

CREATE EXTERNAL TABLE public.demo_read
(id int, fname text, lname text)
LOCATION ('s3://s3-us-east-1.amazonaws.com/pivotalguru/demo config=/home/gpadmin/s3_demo.conf')
FORMAT 'TEXT' (DELIMITER '|' NULL AS '');

2. Select the data.

gpadmin=# select * from demo_read limit 10;
 id  |  fname  |    lname    
-----+---------+-------------
  58 | Jon_58  | Roberts_58
  90 | Jon_90  | Roberts_90
 122 | Jon_122 | Roberts_122
 191 | Jon_191 | Roberts_191
 207 | Jon_207 | Roberts_207
 239 | Jon_239 | Roberts_239
 271 | Jon_271 | Roberts_271
 319 | Jon_319 | Roberts_319
 335 | Jon_335 | Roberts_335
 351 | Jon_351 | Roberts_351
(10 rows)

Time: 1139.538 ms

Tips
1. An S3 External Table that references a single file will only use a single Segment to read the data. Instead, try to have at least 1 file per Segment for an S3 External Table.
2. S3 External Tables supports gzip compression only.
3. Use S3 External Tables for cold storage or to create a backup of a table or query using a Writable External Table.

Greenplum Append Optimized Tables

Greenplum has two major types of table storage techniques. The first is inherited from PostgreSQL so it uses Multi-Version Concurrency Control (MVCC) and is referred to as HEAP storage. The second is more efficient technique called Append Optimized (AO). This blog post discusses the performance benefits of AO over HEAP tables.

HEAP
This is the default storage technique when you create a table in Greenplum. It handles UPDATE and DELETE commands just like PostgreSQL in terms of marking the old row invalid and inserting a new record into the table. This hides the deleted or stale row for new queries but provides read consistency to sessions that may have started querying the table prior to the UPDATE or DELETE.

Here is an example of a HEAP table.

CREATE TABLE heap_table
(id int,
 fname text,
 lname text,
 address1 text,
 address2 text,
 city text,
 state text,
 zip text)
DISTRIBUTED BY (id);

But how does this HEAP table store data? How much space does it use?

INSERT INTO heap_table (id, fname, lname, address1, address2, city, state, zip) 
SELECT i, 'Jon_' || i, 'Roberts_' || i, i || ' Main Street', 'Apartment ' || i, 'New York', 'NY', i::text
FROM generate_series(1, 10000) AS i;

ANALYZE heap_table;

SELECT sotdsize
FROM gp_toolkit.gp_size_of_table_disk
WHERE sotdschemaname = 'public'
AND sotdtablename = 'heap_table';

1114112

So it uses 1,114,112 bytes in my small single node test for this generated data of 10,000 records.

AO
AO storage was originally designed to be Append Only but starting with Greenplum version 4.3, these tables became Append Optimized because the tables now allow UPDATE and INSERT like a HEAP table but retain the efficient storage.

Create the same table and data but as an AO table.

CREATE TABLE ao_table
(id int,
 fname text,
 lname text,
 address1 text,
 address2 text,
 city text,
 state text,
 zip text)
WITH (appendonly=true)
DISTRIBUTED BY (id);


INSERT INTO ao_table (id, fname, lname, address1, address2, city, state, zip) 
SELECT i, 'Jon_' || i, 'Roberts_' || i, i || ' Main Street', 'Apartment ' || i, 'New York', 'NY', i::text
FROM generate_series(1, 10000) AS i;

ANALYZE ao_table;

SELECT sotdsize
FROM gp_toolkit.gp_size_of_table_disk
WHERE sotdschemaname = 'public'
AND sotdtablename = 'ao_table';

973016

This is now only using 973,016 bytes or a 12.6% reduction in disk space being used. For large tables, the performance difference will be more noticeable because it will take less time to scan your disks.

AO tables also let you compress it where a HEAP table does not. What does that look like?

CREATE TABLE ao_compressed_table
(id int,
 fname text,
 lname text,
 address1 text,
 address2 text,
 city text,
 state text,
 zip text)
WITH (appendonly=true, compresstype=quicklz)
DISTRIBUTED BY (id);


INSERT INTO ao_compressed_table (id, fname, lname, address1, address2, city, state, zip) 
SELECT i, 'Jon_' || i, 'Roberts_' || i, i || ' Main Street', 'Apartment ' || i, 'New York', 'NY', i::text
FROM generate_series(1, 10000) AS i;

ANALYZE ao_compressed_table;

SELECT sotdsize
FROM gp_toolkit.gp_size_of_table_disk
WHERE sotdschemaname = 'public'
AND sotdtablename = 'ao_compressed_table';

234344

Dropped more to 234,344 bytes. This is a huge decrease in disk space being used and measures at another 75.9% reduction! This is trading CPU cycles for IO scans which in most cases, will provide better query times.

In the last test, make the table also column oriented. This is also a feature only available for AO tables.

CREATE TABLE ao_co_compressed_table
(id int,
 fname text,
 lname text,
 address1 text,
 address2 text,
 city text,
 state text,
 zip text)
WITH (appendonly=true, compresstype=quicklz, orientation=column)
DISTRIBUTED BY (id);


INSERT INTO ao_co_compressed_table (id, fname, lname, address1, address2, city, state, zip) 
SELECT i, 'Jon_' || i, 'Roberts_' || i, i || ' Main Street', 'Apartment ' || i, 'New York', 'NY', i::text
FROM generate_series(1, 10000) AS i;

ANALYZE ao_co_compressed_table;

SELECT sotdsize
FROM gp_toolkit.gp_size_of_table_disk
WHERE sotdschemaname = 'public'
AND sotdtablename = 'ao_co_compressed_table';

196840

It dropped again! This time to 196,840 bytes or another 16%.

The total percentage difference between a HEAP table and the AO table that is column oriented and compressed is 82.33%!

Summary
Use AO tables. These are faster than HEAP tables and also let you compress and column orient your larger tables for even better performance.

Pivotal HDB 2.0.1 Configuration Tips

Here are some tips for configuring Pivotal HDB (based on Apache HAWQ) with Ambari.

Temp Directories
A Hadoop cluster typically is configured with JBOD so utilize all data disks for temp space.

Here is an example of the “HAWQ Master Temp Directories” entry when the Master and Standby nodes each have 8 disks:

/data1/hawq/master_tmp,/data2/hawq/master_tmp,/data3/hawq/master_tmp,/data4/hawq/master_tmp,/data5/hawq/master_tmp,/data6/hawq/master_tmp,/data7/hawq/master_tmp,/data8/hawq/master_tmp

Here is an example of the “HAWQ Segment Temp Directories” entry when each Data Node has 8 disks:

/data1/hawq/segment_tmp,/data2/hawq/segment_tmp,/data3/hawq/segment_tmp,/data4/hawq/segment_tmp,/data5/hawq/segment_tmp,/data6/hawq/segment_tmp,/data7/hawq/segment_tmp,/data8/hawq/segment_tmp

VM Overcommit
VM Overcommit set to 2

VM Overcommit Ratio
2GB – 64GB: set the Overcommit Ratio to 50
>= 64GB of RAM: set the Overcommit Ratio to 100

Swap Space
2GB – 8GB: set swap space equal to RAM
8GB – 64GB: set swap space to 0.5 * RAM
>= 64GB: set swap space to 4GB

Segment Memory Usage Limit
Step 1: Calculate total memory (RAM * overcommit_ratio_percentage + SWAP)
Step 2: Calculate total memory used by other activities (2GB for OS, 2GB for Data Node, 2GB for Node Manager, 1GB for PXF)
Step 3: Subtract other memory from total memory to get the value for the Segment Memory Usage Limit

Example 1:
RAM: 256GB
SWAP: 4GB
Other: 7GB
Overcommit Ratio: 100

Using Yarn: ((256 * 1) + 4) – 7 = 253
Using Default Resource Manager: (256 * 1) – 7 = 249

Example 2:
RAM: 64GB
SWAP: 32GB
Other: 7GB
Overcommit Ratio: 50

Using Yarn: ((64 * 0.5) + 32) – 7 = 57
Using Default Resource Manager: (64 * 0.5) – 7 = 57

HDFS core-site.xml
Add:

ipc.client.connect.timeout = 300000

Change:

ipc.client.connection.maxidletime = 3600000

Optional HAWQ hawq-site.xml
Add:

hawq_rm_stmt_vseg_memory = 1gb 

By default, this is set to 128mb which is great for a high level of concurrency. If you need to utilize more memory in the cluster for each query, you can increase this value considerably. Here are the acceptable values:

128mb, 256mb, 512mb, 1gb, 2gb, 4gb, 8gb, 16gb

Alternatively, you can set this at the session level instead of the entire database.

Operating System gpadmin account
Log into the Master and Standby nodes and execute the following:

echo "source /usr/local/hawq/greenplum_path.sh" >> ~/.bashrc

Now set the database password. Below, I am using ‘password’ as the password so set this based on your organization’s password policy. By default, gpadmin doesn’t have a password set at all.

psql -c "alter user gpadmin password 'password'"

Enable encrypted password authentication. This assumes you are using the default /data/hawq/master path. Adjust if needed. This allows you to connect to the database remotely with an encrypted password.

echo "host all all 0.0.0.0/0 md5" >> /data/hawq/master/pg_hba.conf
hawq stop cluster -u -a

Outsourcer 5.2.0 and Pivotal HDB 2.0.1

Pivotal HDB 2.0.1 (based on Apache HAWQ)
The newest release of Pivotal HDB is fantastic! It adds new features and resolves some issues as well. Here are the release notes.

JDBC Bug
One resolved issue affected Outsourcer which is a JDBC problem and is documented here: HAWQ-738. Pivotal HDB 2.0.0 was released in May and I found that Outsourcer suffered from stability issues under load and was generally slower navigating in the UI than HDB 1.3 or Greenplum Database. The issue was quickly resolved in that same month but the fix wasn’t publicly available until October of 2016 with this new release.

Quicklz
Quicklz is a compression library that uses GPL licensing. It is bundled with Greenplum Database and Pivotal HDB 1.3 and 2.0.0. Starting with Pivotal HDB 2.0.1, Quicklz has been removed. Why? Because of that GPL license and HAWQ is an Apache project.

In HDB 1.3 and 2.0.0, the guidance was to use Quicklz compression for row oriented tables but starting with 2.0.1, you should use Snappy compression. It is an easy change too:

CREATE TABLE mytable (id INT, fname TEXT) 
WITH (APPENDONLY=TRUE, COMPRESSTYPE=SNAPPY) 
DISTRIBUTED RANDOMLY;

Note: When upgrading to 2.0.1, be sure to first change your Quicklz compressed tables to either Zlib, Parquet with Snappy, or no compression at all. Then upgrade and then you can change back to row orientation and use Snappy compression. More details are in this section of the release notes.

Future Releases
Future releases of Pivotal HDB should come quicker because the licensing hurdle is now complete. This means developers can focus on enhancements and fixes rather than licensing.

Outsourcer 5.2.0
This new release officially supports Pivotal HDB 2.0.1. It makes the compression for row oriented tables to now use Snappy instead of Quicklz. If you are using Pivotal HDB 2.0.0 or Greenplum Database, Outsourcer will still use Quicklz compression.

Please consider upgrading to Pivotal HDB 2.0.0 to 2.0.1 especially if you are using Outsourcer. When I test Outsourcer with Pivotal HDB 2.0, I use build 22425 rather than build 22126 which is what you can download from Pivotal’s site. 22126 has the JDBC bug while 22425 and new builds do not. And when you upgrade to 2.0.1, also upgrade to Outsourcer 5.2.0.

Download 5.2.0!
Documentation
Source Code

New Project: gpresize

I was working with a customer recently that wanted to try more segments per host than what was originally configured in their cluster without adding more hosts. This is possible with gpexpand but only if mirroring has not been enabled. Disabling mirror, however, is not a supported feature of Greenplum but it is possible. So, to facilitate this analysis, I created “gpresize”.

Steps
– Removes mirrors (unsupported feature in Greenplum)
– Backup the database with gpcrondump
– Expands the cluster using gpexpand
– Shrink the database by reinitializing the database and restoring from backup
– Add mirroring back using gpaddmirrors and gpinitstandy

Please, don’t use this in a Production cluster. I’m using unsupported commands to do this work and it is intended for evaluation purposes only. It is intended to help you easily increase the number of segments per host and then revert back if needed.

Github: https://github.com/pivotalguru/gpresize

Hive SQL Language Support

Hive is probably the most popular SQL engine for Hadoop but not because it is fast nor does it support SQL used by most organizations. It is just the oldest. Or is it?

Pivotal HDB, powered by Apache HAWQ (incubating), is much faster and has much better SQL language support. Pivotal HDB gets its roots from Greenplum database which has been around longer so the SQL language support is much more mature.

Performance
In all of the performance tests I’ve done comparing Pivotal HDB with Apache Hive, the results are dramatic. Pivotal HDB loads data significantly faster and executes queries significantly faster too. And yes, I’ve tested it with Tez and even with LLAP. Hive doesn’t come close to the performance of Pivotal HDB. More will be published on this subject soon.

SQL Language Support
The real reason for this post is the lack of SQL language support that Hive has. This makes it more difficult to write SQL in this environment and it makes it more difficult to migrate from a legacy RDBMS to Hadoop. Here are some examples.

Example 1: Subqueries in the WHERE Clause
This is query 6 from the TPC-DS benchmark:

select  a.ca_state state, count(*) cnt
 from customer_address a
     ,customer c
     ,store_sales s
     ,date_dim d
     ,item i
 where       a.ca_address_sk = c.c_current_addr_sk
 	and c.c_customer_sk = s.ss_customer_sk
 	and s.ss_sold_date_sk = d.d_date_sk
 	and s.ss_item_sk = i.i_item_sk
 	and d.d_month_seq = 
 	     (select distinct (d_month_seq)
 	      from date_dim
               where d_year = 1998
 	        and d_moy = 2 )
 	and i.i_current_price > 1.2 * 
             (select avg(j.i_current_price) 
 	     from item j 
 	     where j.i_category = i.i_category)
 group by a.ca_state
 having count(*) >= 10
 order by cnt 
 limit 100;

This query won’t work in Hive because of the two subqueries in the WHERE clause. You will have to rewrite this query like this:

select  a.ca_state state, count(*) cnt
 from customer_address a
     ,customer c
     ,store_sales s
     ,date_dim d
     ,item i
     , (select distinct (d_month_seq) as d_month_seq
 	      from date_dim
               where d_year = 1998
 	        and d_moy = 2 ) as sq1
     , (select j.i_category, avg(j.i_current_price) as avg_i_current_price
 	     from item j 
        group by j.i_category) as sq2
 where       a.ca_address_sk = c.c_current_addr_sk
 	and c.c_customer_sk = s.ss_customer_sk
 	and s.ss_sold_date_sk = d.d_date_sk
 	and s.ss_item_sk = i.i_item_sk
 	and d.d_month_seq = sq1.d_month_seq
        and sq2.i_category = i.i_category
 	and i.i_current_price > 1.2 * sq2.avg_i_current_price
 group by a.ca_state
 having count(*) >= 10
 order by cnt 
 limit 100;

It took me a few minutes to rewrite this and I’m still not 100% sure it is right. In a conversion effort, I would have to execute it in the legacy RDBMS and see if the results are the same as in Hive. If the legacy RDBMS is changing, then I would have a very difficult time with just a single query.

If I was doing this for benchmarking purposes, I would have to execute TPC-DS in another database that does support this query with the same data to validate the results. Clearly, the lack of SQL language support is a problem.

Example 2: Subqueries in the SELECT Clause
This is query 9 from the TPC-DS benchmark:

select case when (select count(*) 
                  from store_sales 
                  where ss_quantity between 1 and 20) > 31003
            then (select avg(ss_ext_list_price) 
                  from store_sales 
                  where ss_quantity between 1 and 20) 
            else (select avg(ss_net_paid_inc_tax)
                  from store_sales
                  where ss_quantity between 1 and 20) end bucket1 ,
       case when (select count(*)
                  from store_sales
                  where ss_quantity between 21 and 40) > 24212
            then (select avg(ss_ext_list_price)
                  from store_sales
                  where ss_quantity between 21 and 40) 
            else (select avg(ss_net_paid_inc_tax)
                  from store_sales
                  where ss_quantity between 21 and 40) end bucket2,
       case when (select count(*)
                  from store_sales
                  where ss_quantity between 41 and 60) > 28398
            then (select avg(ss_ext_list_price)
                  from store_sales
                  where ss_quantity between 41 and 60)
            else (select avg(ss_net_paid_inc_tax)
                  from store_sales
                  where ss_quantity between 41 and 60) end bucket3,
       case when (select count(*)
                  from store_sales
                  where ss_quantity between 61 and 80) > 21646
            then (select avg(ss_ext_list_price)
                  from store_sales
                  where ss_quantity between 61 and 80)
            else (select avg(ss_net_paid_inc_tax)
                  from store_sales
                  where ss_quantity between 61 and 80) end bucket4,
       case when (select count(*)
                  from store_sales
                  where ss_quantity between 81 and 100) > 4078
            then (select avg(ss_ext_list_price)
                  from store_sales
                  where ss_quantity between 81 and 100)
            else (select avg(ss_net_paid_inc_tax)
                  from store_sales
                  where ss_quantity between 81 and 100) end bucket5
from reason
where r_reason_sk = 1

You get this error:

FAILED: ParseException line 2:18 cannot recognize input near '(' 'select' 'count' in expression specification (state=42000,code=40000)

The fix is to move all of those subqueries from the SELECT to the FROM section of the query. Or you can write an elaborate CASE statement. Again, this takes time to rewrite and even more time to validate.

Example 3: Correlated Subqueries
This is query 10 from the TPC-DS benchmark:

select  
  cd_gender,
  cd_marital_status,
  cd_education_status,
  count(*) cnt1,
  cd_purchase_estimate,
  count(*) cnt2,
  cd_credit_rating,
  count(*) cnt3,
  cd_dep_count,
  count(*) cnt4,
  cd_dep_employed_count,
  count(*) cnt5,
  cd_dep_college_count,
  count(*) cnt6
 from
  customer c,customer_address ca,customer_demographics
 where
  c.c_current_addr_sk = ca.ca_address_sk and
  ca_county in ('Clinton County','Platte County','Franklin County','Louisa County','Harmon County') and
  cd_demo_sk = c.c_current_cdemo_sk and 
  exists (select *
          from store_sales,date_dim
          where c.c_customer_sk = ss_customer_sk and
                ss_sold_date_sk = d_date_sk and
                d_year = 2002 and
                d_moy between 3 and 3+3) and
   (exists (select *
            from web_sales,date_dim
            where c.c_customer_sk = ws_bill_customer_sk and
                  ws_sold_date_sk = d_date_sk and
                  d_year = 2002 and
                  d_moy between 3 ANd 3+3) or 
    exists (select * 
            from catalog_sales,date_dim
            where c.c_customer_sk = cs_ship_customer_sk and
                  cs_sold_date_sk = d_date_sk and
                  d_year = 2002 and
                  d_moy between 3 and 3+3))
 group by cd_gender,
          cd_marital_status,
          cd_education_status,
          cd_purchase_estimate,
          cd_credit_rating,
          cd_dep_count,
          cd_dep_employed_count,
          cd_dep_college_count
 order by cd_gender,
          cd_marital_status,
          cd_education_status,
          cd_purchase_estimate,
          cd_credit_rating,
          cd_dep_count,
          cd_dep_employed_count,
          cd_dep_college_count
limit 100;

This query correlates data between customer and the tables in each subquery. Rewriting this can be tricky because you need to make sure you don’t duplicate data when joining to the subqueries.

Example 4: INTERSECT
INTERSECT is a feature used in SQL that is similar to UNION or EXCEPT. It gets a distinct list of values from two queries that are contained in both queries. More information on this feature: https://www.postgresql.org/docs/8.2/static/sql-select.html#SQL-INTERSECT

This is query 8 from the TPC-DS benchmark:

select  s_store_name
      ,sum(ss_net_profit)
 from store_sales
     ,date_dim
     ,store,
     (select ca_zip
     from (
     (SELECT substr(ca_zip,1,5) ca_zip
      FROM customer_address
      WHERE substr(ca_zip,1,5) IN (
                          '89436','30868','65085','22977','83927','77557',
                          '58429','40697','80614','10502','32779',
                          '91137','61265','98294','17921','18427',
                          '21203','59362','87291','84093','21505',
                          '17184','10866','67898','25797','28055',
                          '18377','80332','74535','21757','29742',
                          '90885','29898','17819','40811','25990',
                          '47513','89531','91068','10391','18846',
                          '99223','82637','41368','83658','86199',
                          '81625','26696','89338','88425','32200',
                          '81427','19053','77471','36610','99823',
                          '43276','41249','48584','83550','82276',
                          '18842','78890','14090','38123','40936',
                          '34425','19850','43286','80072','79188',
                          '54191','11395','50497','84861','90733',
                          '21068','57666','37119','25004','57835',
                          '70067','62878','95806','19303','18840',
                          '19124','29785','16737','16022','49613',
                          '89977','68310','60069','98360','48649',
                          '39050','41793','25002','27413','39736',
                          '47208','16515','94808','57648','15009',
                          '80015','42961','63982','21744','71853',
                          '81087','67468','34175','64008','20261',
                          '11201','51799','48043','45645','61163',
                          '48375','36447','57042','21218','41100',
                          '89951','22745','35851','83326','61125',
                          '78298','80752','49858','52940','96976',
                          '63792','11376','53582','18717','90226',
                          '50530','94203','99447','27670','96577',
                          '57856','56372','16165','23427','54561',
                          '28806','44439','22926','30123','61451',
                          '92397','56979','92309','70873','13355',
                          '21801','46346','37562','56458','28286',
                          '47306','99555','69399','26234','47546',
                          '49661','88601','35943','39936','25632',
                          '24611','44166','56648','30379','59785',
                          '11110','14329','93815','52226','71381',
                          '13842','25612','63294','14664','21077',
                          '82626','18799','60915','81020','56447',
                          '76619','11433','13414','42548','92713',
                          '70467','30884','47484','16072','38936',
                          '13036','88376','45539','35901','19506',
                          '65690','73957','71850','49231','14276',
                          '20005','18384','76615','11635','38177',
                          '55607','41369','95447','58581','58149',
                          '91946','33790','76232','75692','95464',
                          '22246','51061','56692','53121','77209',
                          '15482','10688','14868','45907','73520',
                          '72666','25734','17959','24677','66446',
                          '94627','53535','15560','41967','69297',
                          '11929','59403','33283','52232','57350',
                          '43933','40921','36635','10827','71286',
                          '19736','80619','25251','95042','15526',
                          '36496','55854','49124','81980','35375',
                          '49157','63512','28944','14946','36503',
                          '54010','18767','23969','43905','66979',
                          '33113','21286','58471','59080','13395',
                          '79144','70373','67031','38360','26705',
                          '50906','52406','26066','73146','15884',
                          '31897','30045','61068','45550','92454',
                          '13376','14354','19770','22928','97790',
                          '50723','46081','30202','14410','20223',
                          '88500','67298','13261','14172','81410',
                          '93578','83583','46047','94167','82564',
                          '21156','15799','86709','37931','74703',
                          '83103','23054','70470','72008','49247',
                          '91911','69998','20961','70070','63197',
                          '54853','88191','91830','49521','19454',
                          '81450','89091','62378','25683','61869',
                          '51744','36580','85778','36871','48121',
                          '28810','83712','45486','67393','26935',
                          '42393','20132','55349','86057','21309',
                          '80218','10094','11357','48819','39734',
                          '40758','30432','21204','29467','30214',
                          '61024','55307','74621','11622','68908',
                          '33032','52868','99194','99900','84936',
                          '69036','99149','45013','32895','59004',
                          '32322','14933','32936','33562','72550',
                          '27385','58049','58200','16808','21360',
                          '32961','18586','79307','15492'))
     intersect
     (select ca_zip
      from (SELECT substr(ca_zip,1,5) ca_zip,count(*) cnt
            FROM customer_address, customer
            WHERE ca_address_sk = c_current_addr_sk and
                  c_preferred_cust_flag='Y'
            group by ca_zip
            having count(*) > 10)A1))A2) V1
 where ss_store_sk = s_store_sk
  and ss_sold_date_sk = d_date_sk
  and d_qoy = 1 and d_year = 2002
  and (substr(s_zip,1,2) = substr(V1.ca_zip,1,2))
 group by s_store_name
 order by s_store_name
 limit 100;

The fix for this is to change the INTERSECT to UNION and make this another subquery that also does a GROUP BY so you have a distinct list of zip codes.

Summary
Of the 99 TPC-DS queries, 19 contains SQL that is not supported by Hive. Pivotal HDB can execute all 99 TPC-DS queries without modification. So not only it is much faster, it is easier to migrate from a legacy RDBMS to Hadoop and easier to start using because the SQL language support is so robust.

GPLink 0.1.30 Supports Hive!

Enhancements
GPLink has been tested successfully with Hive. This is most useful in the Greenplum environment where you want to query a Hive table. HAWQ has the PXF protocol which supports Hive and is very performant. Please use PXF in HAWQ to access Hive. But with Greenplum, gplink now supports Hive!

Hive
This is in the gplink README file but the configuration is a bit more involved than a typical JDBC driver. This is because of how Hive is configured for logging. Plus, this configuration has changed through the Hive versions making it more complex to configure.

You will need the following jar files from your Hadoop cluster.

I tested with a Hortonworks cluster with Hive 1.2.1.2.4.
/usr/hdp/2.4.2.0-258/hive/lib/hive-jdbc.jar
/usr/hdp/2.4.2.0-258/hadoop/client/hadoop-common.jar
/usr/hdp/2.4.2.0-258/hadoop/client/log4j.jar
/usr/hdp/2.4.2.0-258/hadoop/client/slf4j-api.jar
/usr/hdp/2.4.2.0-258/hadoop/client/slf4j-log4j12.jar

Older versions of Hive may have have duplicate SLF4J bindings and fail to work
properly. The error message is, “Class path contains multiple SLF4J bindings”. If
you get this, remove the slf4j-log4j12.jar file from the jar/ directory, source the
gplink_path.sh file, and then try again.

Bug Fixes
The extraProps value for external connections wasn’t being parsed correctly. The extraProps is most useful for Oracle in setting the fetch size. If you are using Oracle as a source with gplink, be sure to upgrade to this new version and change your connection from “extraProperies” or “extraProperties” to “extraProps”.

Link
Download 0.1.30 here.

GPLink 0.1.26

GPLink has a new version with the only change being the name of a script variable. The variable was renamed because it is shared between this project and another one I work on (TPC-DS). This common variable name would cause issues with the TPC-DS scripts when logging activities.

0.1.26

Loading Data into HAWQ

Loading data into the database is required to start using it but how? There are several approaches to achieve this basic requirement but achieve the result by approaching the problem in different ways. This allows you to load data that best matches your use case.

Table Setup
This table will be used for the testing in HAWQ. I have this table created in a single node VM running Hortonworks HDP with HAWQ 2.0 installed. I’m using the default Resource Manager too.

CREATE TABLE test_data
(id int,
 fname text,
 lname text)
 DISTRIBUTED RANDOMLY;

Singleton
Let’s start with probably the worst way first. Sometimes this way is ideal because you have very little data to load but in most cases, avoid singleton inserts. This approach inserts just a single tuple in a single transaction.

head si_test_data.sql
insert into test_data (id, fname, lname) values (1, 'jon_00001', 'roberts_00001');
insert into test_data (id, fname, lname) values (2, 'jon_00002', 'roberts_00002');
insert into test_data (id, fname, lname) values (3, 'jon_00003', 'roberts_00003');
insert into test_data (id, fname, lname) values (4, 'jon_00004', 'roberts_00004');
insert into test_data (id, fname, lname) values (5, 'jon_00005', 'roberts_00005');
insert into test_data (id, fname, lname) values (6, 'jon_00006', 'roberts_00006');
insert into test_data (id, fname, lname) values (7, 'jon_00007', 'roberts_00007');
insert into test_data (id, fname, lname) values (8, 'jon_00008', 'roberts_00008');
insert into test_data (id, fname, lname) values (9, 'jon_00009', 'roberts_00009');
insert into test_data (id, fname, lname) values (10, 'jon_00010', 'roberts_00010');

This repeats for 10,000 tuples.

time psql -f si_test_data.sql > /dev/null
real	5m49.527s

As you can see, this is pretty slow and not recommended for inserting large amounts of data. Nearly 6 minutes to load 10,000 tuples is crawling.

COPY
If you are familiar with PostgreSQL then you will feel right at home with this technique. This time, the data is in a file named test_data.txt and it is not wrapped with an insert statement.

head test_data.txt
1|jon_00001|roberts_00001
2|jon_00002|roberts_00002
3|jon_00003|roberts_00003
4|jon_00004|roberts_00004
5|jon_00005|roberts_00005
6|jon_00006|roberts_00006
7|jon_00007|roberts_00007
8|jon_00008|roberts_00008
9|jon_00009|roberts_00009
10|jon_00010|roberts_00010
COPY test_data FROM '/home/gpadmin/test_data.txt' WITH DELIMITER '|';
COPY 10000
Time: 128.580 ms

This method is significantly faster but it loads the data through the master. This means it doesn’t scale well as the master will become the bottleneck but it does allow you to load data from a host anywhere on your network so long as it has access to the master.

gpfdist
gpfdist is a web server that serves posix files for the segments to fetch. Segment processes will get the data directly from gpfdist and bypass the master when doing so. This enables you to scale by adding more gpfdist processes and/or more segments.

gpfdist -p 8888 &
[1] 128836
[gpadmin@hdb ~]$ Serving HTTP on port 8888, directory /home/gpadmin

Now you’ll need to create a new external table to read the data from gpfdist.

CREATE EXTERNAL TABLE gpfdist_test_data
(id int,
 fname text,
 lname text)
LOCATION ('gpfdist://hdb:8888/test_data.txt')
FORMAT 'TEXT' (DELIMITER '|');

And to load the data.

INSERT INTO test_data SELECT * FROM gpfdist_test_data;
INSERT 0 10000
Time: 98.362 ms

gpfdist is blazing fast and scales easily. You can add more than one gpfdist location in the external table, use wild cards, use different formats, and much more. The downside is the file must be on a host that all segments can reach. You also have to create a separate gpfdist process on that host.

gpload
gpload is a utility that automates the loading process by using gpfdist. Review the documentation for more on this utility. Technically, it is the same as gpfdist and external tables but just automates the commands for you.

Programmable Extension Framework (PXF)
PXF allows you to read and write data to HDFS using external tables. Like using gpfdist, it is done by each segment so it scales and executes in parallel.

For this example, I’ve loaded the test data into HDFS.

hdfs dfs -cat /test_data/* | head
1|jon_00001|roberts_00001
2|jon_00002|roberts_00002
3|jon_00003|roberts_00003
4|jon_00004|roberts_00004
5|jon_00005|roberts_00005
6|jon_00006|roberts_00006
7|jon_00007|roberts_00007
8|jon_00008|roberts_00008
9|jon_00009|roberts_00009
10|jon_00010|roberts_00010

The external table definition.

CREATE EXTERNAL TABLE et_test_data
(id int,
 fname text,
 lname text)
LOCATION ('pxf://hdb:51200/test_data?Profile=HdfsTextSimple')
FORMAT 'TEXT' (DELIMITER '|');

And now to load it.

INSERT INTO test_data SELECT * FROM et_test_data;
INSERT 0 10000
Time: 227.599 ms

PXF is probably the best way to load data when using the “Data Lake” design. You load your raw data into HDFS and then consume it with a variety of tools in the Hadoop ecosystem. PXF can also read and write other formats.

Outsourcer and gplink
Last but not least are software programs I created. Outsourcer automates the table creation and load of data directly to Greenplum or HAWQ using gpfdist. It sources data from SQL Server and Oracle as these are the two most common OLTP databases.

gplink is another tool that can read external data but this technique can connect to any valid JDBC source. It doesn’t automate many of the steps that Oustourcer does but it is a convenient tool to get data from a JDBC source.

You might be thinking that sqoop does this but not exactly. gplink and Outsourcer load data into HAWQ and Greenplum tables. It is optimized for these databases and fixes data for you automatically. Both remove null and newline characters and escapes the escape and delimiter characters. With sqoop, you will have to read the data from HDFS using PXF and then fix the errors that could be in the files.

Both tools are linked above.

Summary
This post gives a brief description on the various ways to load data into HAWQ. Pick the right technique for your use case. As you can see, HAWQ is very flexible and can handle a variety of ways to load data.