Category Archives: Hadoop

Pivotal HDB 2.0.1 Configuration Tips

Here are some tips for configuring Pivotal HDB (based on Apache HAWQ) with Ambari.

Temp Directories
A Hadoop cluster typically is configured with JBOD so utilize all data disks for temp space.

Here is an example of the “HAWQ Master Temp Directories” entry when the Master and Standby nodes each have 8 disks:

/data1/hawq/master_tmp,/data2/hawq/master_tmp,/data3/hawq/master_tmp,/data4/hawq/master_tmp,/data5/hawq/master_tmp,/data6/hawq/master_tmp,/data7/hawq/master_tmp,/data8/hawq/master_tmp

Here is an example of the “HAWQ Segment Temp Directories” entry when each Data Node has 8 disks:

/data1/hawq/segment_tmp,/data2/hawq/segment_tmp,/data3/hawq/segment_tmp,/data4/hawq/segment_tmp,/data5/hawq/segment_tmp,/data6/hawq/segment_tmp,/data7/hawq/segment_tmp,/data8/hawq/segment_tmp

VM Overcommit
VM Overcommit set to 2

VM Overcommit Ratio
2GB – 64GB: set the Overcommit Ratio to 50
>= 64GB of RAM: set the Overcommit Ratio to 100

Swap Space
2GB – 8GB: set swap space equal to RAM
8GB – 64GB: set swap space to 0.5 * RAM
>= 64GB: set swap space to 4GB

Segment Memory Usage Limit
Step 1: Calculate total memory (RAM * overcommit_ratio_percentage + SWAP)
Step 2: Calculate total memory used by other activities (2GB for OS, 2GB for Data Node, 2GB for Node Manager, 1GB for PXF)
Step 3: Subtract other memory from total memory to get the value for the Segment Memory Usage Limit

Example 1:
RAM: 256GB
SWAP: 4GB
Other: 7GB
Overcommit Ratio: 100

Using Yarn: ((256 * 1) + 4) – 7 = 253
Using Default Resource Manager: (256 * 1) – 7 = 249

Example 2:
RAM: 64GB
SWAP: 32GB
Other: 7GB
Overcommit Ratio: 50

Using Yarn: ((64 * 0.5) + 32) – 7 = 57
Using Default Resource Manager: (64 * 0.5) – 7 = 57

HDFS core-site.xml
Add:

ipc.client.connect.timeout = 300000

Change:

ipc.client.connection.maxidletime = 3600000

Optional HAWQ hawq-site.xml
Add:

hawq_rm_stmt_vseg_memory = 1gb 

By default, this is set to 128mb which is great for a high level of concurrency. If you need to utilize more memory in the cluster for each query, you can increase this value considerably. Here are the acceptable values:

128mb, 256mb, 512mb, 1gb, 2gb, 4gb, 8gb, 16gb

Alternatively, you can set this at the session level instead of the entire database.

Operating System gpadmin account
Log into the Master and Standby nodes and execute the following:

echo "source /usr/local/hawq/greenplum_path.sh" >> ~/.bashrc

Now set the database password. Below, I am using ‘password’ as the password so set this based on your organization’s password policy. By default, gpadmin doesn’t have a password set at all.

psql -c "alter user gpadmin password 'password'"

Enable encrypted password authentication. This assumes you are using the default /data/hawq/master path. Adjust if needed. This allows you to connect to the database remotely with an encrypted password.

echo "host all all 0.0.0.0/0 md5" >> /data/hawq/master/pg_hba.conf
hawq stop cluster -u -a

Hive SQL Language Support

Hive is probably the most popular SQL engine for Hadoop but not because it is fast nor does it support SQL used by most organizations. It is just the oldest. Or is it?

Pivotal HDB, powered by Apache HAWQ (incubating), is much faster and has much better SQL language support. Pivotal HDB gets its roots from Greenplum database which has been around longer so the SQL language support is much more mature.

Performance
In all of the performance tests I’ve done comparing Pivotal HDB with Apache Hive, the results are dramatic. Pivotal HDB loads data significantly faster and executes queries significantly faster too. And yes, I’ve tested it with Tez and even with LLAP. Hive doesn’t come close to the performance of Pivotal HDB. More will be published on this subject soon.

SQL Language Support
The real reason for this post is the lack of SQL language support that Hive has. This makes it more difficult to write SQL in this environment and it makes it more difficult to migrate from a legacy RDBMS to Hadoop. Here are some examples.

Example 1: Subqueries in the WHERE Clause
This is query 6 from the TPC-DS benchmark:

select  a.ca_state state, count(*) cnt
 from customer_address a
     ,customer c
     ,store_sales s
     ,date_dim d
     ,item i
 where       a.ca_address_sk = c.c_current_addr_sk
 	and c.c_customer_sk = s.ss_customer_sk
 	and s.ss_sold_date_sk = d.d_date_sk
 	and s.ss_item_sk = i.i_item_sk
 	and d.d_month_seq = 
 	     (select distinct (d_month_seq)
 	      from date_dim
               where d_year = 1998
 	        and d_moy = 2 )
 	and i.i_current_price > 1.2 * 
             (select avg(j.i_current_price) 
 	     from item j 
 	     where j.i_category = i.i_category)
 group by a.ca_state
 having count(*) >= 10
 order by cnt 
 limit 100;

This query won’t work in Hive because of the two subqueries in the WHERE clause. You will have to rewrite this query like this:

select  a.ca_state state, count(*) cnt
 from customer_address a
     ,customer c
     ,store_sales s
     ,date_dim d
     ,item i
     , (select distinct (d_month_seq) as d_month_seq
 	      from date_dim
               where d_year = 1998
 	        and d_moy = 2 ) as sq1
     , (select j.i_category, avg(j.i_current_price) as avg_i_current_price
 	     from item j 
        group by j.i_category) as sq2
 where       a.ca_address_sk = c.c_current_addr_sk
 	and c.c_customer_sk = s.ss_customer_sk
 	and s.ss_sold_date_sk = d.d_date_sk
 	and s.ss_item_sk = i.i_item_sk
 	and d.d_month_seq = sq1.d_month_seq
        and sq2.i_category = i.i_category
 	and i.i_current_price > 1.2 * sq2.avg_i_current_price
 group by a.ca_state
 having count(*) >= 10
 order by cnt 
 limit 100;

It took me a few minutes to rewrite this and I’m still not 100% sure it is right. In a conversion effort, I would have to execute it in the legacy RDBMS and see if the results are the same as in Hive. If the legacy RDBMS is changing, then I would have a very difficult time with just a single query.

If I was doing this for benchmarking purposes, I would have to execute TPC-DS in another database that does support this query with the same data to validate the results. Clearly, the lack of SQL language support is a problem.

Example 2: Subqueries in the SELECT Clause
This is query 9 from the TPC-DS benchmark:

select case when (select count(*) 
                  from store_sales 
                  where ss_quantity between 1 and 20) > 31003
            then (select avg(ss_ext_list_price) 
                  from store_sales 
                  where ss_quantity between 1 and 20) 
            else (select avg(ss_net_paid_inc_tax)
                  from store_sales
                  where ss_quantity between 1 and 20) end bucket1 ,
       case when (select count(*)
                  from store_sales
                  where ss_quantity between 21 and 40) > 24212
            then (select avg(ss_ext_list_price)
                  from store_sales
                  where ss_quantity between 21 and 40) 
            else (select avg(ss_net_paid_inc_tax)
                  from store_sales
                  where ss_quantity between 21 and 40) end bucket2,
       case when (select count(*)
                  from store_sales
                  where ss_quantity between 41 and 60) > 28398
            then (select avg(ss_ext_list_price)
                  from store_sales
                  where ss_quantity between 41 and 60)
            else (select avg(ss_net_paid_inc_tax)
                  from store_sales
                  where ss_quantity between 41 and 60) end bucket3,
       case when (select count(*)
                  from store_sales
                  where ss_quantity between 61 and 80) > 21646
            then (select avg(ss_ext_list_price)
                  from store_sales
                  where ss_quantity between 61 and 80)
            else (select avg(ss_net_paid_inc_tax)
                  from store_sales
                  where ss_quantity between 61 and 80) end bucket4,
       case when (select count(*)
                  from store_sales
                  where ss_quantity between 81 and 100) > 4078
            then (select avg(ss_ext_list_price)
                  from store_sales
                  where ss_quantity between 81 and 100)
            else (select avg(ss_net_paid_inc_tax)
                  from store_sales
                  where ss_quantity between 81 and 100) end bucket5
from reason
where r_reason_sk = 1

You get this error:

FAILED: ParseException line 2:18 cannot recognize input near '(' 'select' 'count' in expression specification (state=42000,code=40000)

The fix is to move all of those subqueries from the SELECT to the FROM section of the query. Or you can write an elaborate CASE statement. Again, this takes time to rewrite and even more time to validate.

Example 3: Correlated Subqueries
This is query 10 from the TPC-DS benchmark:

select  
  cd_gender,
  cd_marital_status,
  cd_education_status,
  count(*) cnt1,
  cd_purchase_estimate,
  count(*) cnt2,
  cd_credit_rating,
  count(*) cnt3,
  cd_dep_count,
  count(*) cnt4,
  cd_dep_employed_count,
  count(*) cnt5,
  cd_dep_college_count,
  count(*) cnt6
 from
  customer c,customer_address ca,customer_demographics
 where
  c.c_current_addr_sk = ca.ca_address_sk and
  ca_county in ('Clinton County','Platte County','Franklin County','Louisa County','Harmon County') and
  cd_demo_sk = c.c_current_cdemo_sk and 
  exists (select *
          from store_sales,date_dim
          where c.c_customer_sk = ss_customer_sk and
                ss_sold_date_sk = d_date_sk and
                d_year = 2002 and
                d_moy between 3 and 3+3) and
   (exists (select *
            from web_sales,date_dim
            where c.c_customer_sk = ws_bill_customer_sk and
                  ws_sold_date_sk = d_date_sk and
                  d_year = 2002 and
                  d_moy between 3 ANd 3+3) or 
    exists (select * 
            from catalog_sales,date_dim
            where c.c_customer_sk = cs_ship_customer_sk and
                  cs_sold_date_sk = d_date_sk and
                  d_year = 2002 and
                  d_moy between 3 and 3+3))
 group by cd_gender,
          cd_marital_status,
          cd_education_status,
          cd_purchase_estimate,
          cd_credit_rating,
          cd_dep_count,
          cd_dep_employed_count,
          cd_dep_college_count
 order by cd_gender,
          cd_marital_status,
          cd_education_status,
          cd_purchase_estimate,
          cd_credit_rating,
          cd_dep_count,
          cd_dep_employed_count,
          cd_dep_college_count
limit 100;

This query correlates data between customer and the tables in each subquery. Rewriting this can be tricky because you need to make sure you don’t duplicate data when joining to the subqueries.

Example 4: INTERSECT
INTERSECT is a feature used in SQL that is similar to UNION or EXCEPT. It gets a distinct list of values from two queries that are contained in both queries. More information on this feature: https://www.postgresql.org/docs/8.2/static/sql-select.html#SQL-INTERSECT

This is query 8 from the TPC-DS benchmark:

select  s_store_name
      ,sum(ss_net_profit)
 from store_sales
     ,date_dim
     ,store,
     (select ca_zip
     from (
     (SELECT substr(ca_zip,1,5) ca_zip
      FROM customer_address
      WHERE substr(ca_zip,1,5) IN (
                          '89436','30868','65085','22977','83927','77557',
                          '58429','40697','80614','10502','32779',
                          '91137','61265','98294','17921','18427',
                          '21203','59362','87291','84093','21505',
                          '17184','10866','67898','25797','28055',
                          '18377','80332','74535','21757','29742',
                          '90885','29898','17819','40811','25990',
                          '47513','89531','91068','10391','18846',
                          '99223','82637','41368','83658','86199',
                          '81625','26696','89338','88425','32200',
                          '81427','19053','77471','36610','99823',
                          '43276','41249','48584','83550','82276',
                          '18842','78890','14090','38123','40936',
                          '34425','19850','43286','80072','79188',
                          '54191','11395','50497','84861','90733',
                          '21068','57666','37119','25004','57835',
                          '70067','62878','95806','19303','18840',
                          '19124','29785','16737','16022','49613',
                          '89977','68310','60069','98360','48649',
                          '39050','41793','25002','27413','39736',
                          '47208','16515','94808','57648','15009',
                          '80015','42961','63982','21744','71853',
                          '81087','67468','34175','64008','20261',
                          '11201','51799','48043','45645','61163',
                          '48375','36447','57042','21218','41100',
                          '89951','22745','35851','83326','61125',
                          '78298','80752','49858','52940','96976',
                          '63792','11376','53582','18717','90226',
                          '50530','94203','99447','27670','96577',
                          '57856','56372','16165','23427','54561',
                          '28806','44439','22926','30123','61451',
                          '92397','56979','92309','70873','13355',
                          '21801','46346','37562','56458','28286',
                          '47306','99555','69399','26234','47546',
                          '49661','88601','35943','39936','25632',
                          '24611','44166','56648','30379','59785',
                          '11110','14329','93815','52226','71381',
                          '13842','25612','63294','14664','21077',
                          '82626','18799','60915','81020','56447',
                          '76619','11433','13414','42548','92713',
                          '70467','30884','47484','16072','38936',
                          '13036','88376','45539','35901','19506',
                          '65690','73957','71850','49231','14276',
                          '20005','18384','76615','11635','38177',
                          '55607','41369','95447','58581','58149',
                          '91946','33790','76232','75692','95464',
                          '22246','51061','56692','53121','77209',
                          '15482','10688','14868','45907','73520',
                          '72666','25734','17959','24677','66446',
                          '94627','53535','15560','41967','69297',
                          '11929','59403','33283','52232','57350',
                          '43933','40921','36635','10827','71286',
                          '19736','80619','25251','95042','15526',
                          '36496','55854','49124','81980','35375',
                          '49157','63512','28944','14946','36503',
                          '54010','18767','23969','43905','66979',
                          '33113','21286','58471','59080','13395',
                          '79144','70373','67031','38360','26705',
                          '50906','52406','26066','73146','15884',
                          '31897','30045','61068','45550','92454',
                          '13376','14354','19770','22928','97790',
                          '50723','46081','30202','14410','20223',
                          '88500','67298','13261','14172','81410',
                          '93578','83583','46047','94167','82564',
                          '21156','15799','86709','37931','74703',
                          '83103','23054','70470','72008','49247',
                          '91911','69998','20961','70070','63197',
                          '54853','88191','91830','49521','19454',
                          '81450','89091','62378','25683','61869',
                          '51744','36580','85778','36871','48121',
                          '28810','83712','45486','67393','26935',
                          '42393','20132','55349','86057','21309',
                          '80218','10094','11357','48819','39734',
                          '40758','30432','21204','29467','30214',
                          '61024','55307','74621','11622','68908',
                          '33032','52868','99194','99900','84936',
                          '69036','99149','45013','32895','59004',
                          '32322','14933','32936','33562','72550',
                          '27385','58049','58200','16808','21360',
                          '32961','18586','79307','15492'))
     intersect
     (select ca_zip
      from (SELECT substr(ca_zip,1,5) ca_zip,count(*) cnt
            FROM customer_address, customer
            WHERE ca_address_sk = c_current_addr_sk and
                  c_preferred_cust_flag='Y'
            group by ca_zip
            having count(*) > 10)A1))A2) V1
 where ss_store_sk = s_store_sk
  and ss_sold_date_sk = d_date_sk
  and d_qoy = 1 and d_year = 2002
  and (substr(s_zip,1,2) = substr(V1.ca_zip,1,2))
 group by s_store_name
 order by s_store_name
 limit 100;

The fix for this is to change the INTERSECT to UNION and make this another subquery that also does a GROUP BY so you have a distinct list of zip codes.

Summary
Of the 99 TPC-DS queries, 19 contains SQL that is not supported by Hive. Pivotal HDB can execute all 99 TPC-DS queries without modification. So not only it is much faster, it is easier to migrate from a legacy RDBMS to Hadoop and easier to start using because the SQL language support is so robust.

Loading Data into HAWQ

Loading data into the database is required to start using it but how? There are several approaches to achieve this basic requirement but achieve the result by approaching the problem in different ways. This allows you to load data that best matches your use case.

Table Setup
This table will be used for the testing in HAWQ. I have this table created in a single node VM running Hortonworks HDP with HAWQ 2.0 installed. I’m using the default Resource Manager too.

CREATE TABLE test_data
(id int,
 fname text,
 lname text)
 DISTRIBUTED RANDOMLY;

Singleton
Let’s start with probably the worst way first. Sometimes this way is ideal because you have very little data to load but in most cases, avoid singleton inserts. This approach inserts just a single tuple in a single transaction.

head si_test_data.sql
insert into test_data (id, fname, lname) values (1, 'jon_00001', 'roberts_00001');
insert into test_data (id, fname, lname) values (2, 'jon_00002', 'roberts_00002');
insert into test_data (id, fname, lname) values (3, 'jon_00003', 'roberts_00003');
insert into test_data (id, fname, lname) values (4, 'jon_00004', 'roberts_00004');
insert into test_data (id, fname, lname) values (5, 'jon_00005', 'roberts_00005');
insert into test_data (id, fname, lname) values (6, 'jon_00006', 'roberts_00006');
insert into test_data (id, fname, lname) values (7, 'jon_00007', 'roberts_00007');
insert into test_data (id, fname, lname) values (8, 'jon_00008', 'roberts_00008');
insert into test_data (id, fname, lname) values (9, 'jon_00009', 'roberts_00009');
insert into test_data (id, fname, lname) values (10, 'jon_00010', 'roberts_00010');

This repeats for 10,000 tuples.

time psql -f si_test_data.sql > /dev/null
real	5m49.527s

As you can see, this is pretty slow and not recommended for inserting large amounts of data. Nearly 6 minutes to load 10,000 tuples is crawling.

COPY
If you are familiar with PostgreSQL then you will feel right at home with this technique. This time, the data is in a file named test_data.txt and it is not wrapped with an insert statement.

head test_data.txt
1|jon_00001|roberts_00001
2|jon_00002|roberts_00002
3|jon_00003|roberts_00003
4|jon_00004|roberts_00004
5|jon_00005|roberts_00005
6|jon_00006|roberts_00006
7|jon_00007|roberts_00007
8|jon_00008|roberts_00008
9|jon_00009|roberts_00009
10|jon_00010|roberts_00010
COPY test_data FROM '/home/gpadmin/test_data.txt' WITH DELIMITER '|';
COPY 10000
Time: 128.580 ms

This method is significantly faster but it loads the data through the master. This means it doesn’t scale well as the master will become the bottleneck but it does allow you to load data from a host anywhere on your network so long as it has access to the master.

gpfdist
gpfdist is a web server that serves posix files for the segments to fetch. Segment processes will get the data directly from gpfdist and bypass the master when doing so. This enables you to scale by adding more gpfdist processes and/or more segments.

gpfdist -p 8888 &
[1] 128836
[gpadmin@hdb ~]$ Serving HTTP on port 8888, directory /home/gpadmin

Now you’ll need to create a new external table to read the data from gpfdist.

CREATE EXTERNAL TABLE gpfdist_test_data
(id int,
 fname text,
 lname text)
LOCATION ('gpfdist://hdb:8888/test_data.txt')
FORMAT 'TEXT' (DELIMITER '|');

And to load the data.

INSERT INTO test_data SELECT * FROM gpfdist_test_data;
INSERT 0 10000
Time: 98.362 ms

gpfdist is blazing fast and scales easily. You can add more than one gpfdist location in the external table, use wild cards, use different formats, and much more. The downside is the file must be on a host that all segments can reach. You also have to create a separate gpfdist process on that host.

gpload
gpload is a utility that automates the loading process by using gpfdist. Review the documentation for more on this utility. Technically, it is the same as gpfdist and external tables but just automates the commands for you.

Programmable Extension Framework (PXF)
PXF allows you to read and write data to HDFS using external tables. Like using gpfdist, it is done by each segment so it scales and executes in parallel.

For this example, I’ve loaded the test data into HDFS.

hdfs dfs -cat /test_data/* | head
1|jon_00001|roberts_00001
2|jon_00002|roberts_00002
3|jon_00003|roberts_00003
4|jon_00004|roberts_00004
5|jon_00005|roberts_00005
6|jon_00006|roberts_00006
7|jon_00007|roberts_00007
8|jon_00008|roberts_00008
9|jon_00009|roberts_00009
10|jon_00010|roberts_00010

The external table definition.

CREATE EXTERNAL TABLE et_test_data
(id int,
 fname text,
 lname text)
LOCATION ('pxf://hdb:51200/test_data?Profile=HdfsTextSimple')
FORMAT 'TEXT' (DELIMITER '|');

And now to load it.

INSERT INTO test_data SELECT * FROM et_test_data;
INSERT 0 10000
Time: 227.599 ms

PXF is probably the best way to load data when using the “Data Lake” design. You load your raw data into HDFS and then consume it with a variety of tools in the Hadoop ecosystem. PXF can also read and write other formats.

Outsourcer and gplink
Last but not least are software programs I created. Outsourcer automates the table creation and load of data directly to Greenplum or HAWQ using gpfdist. It sources data from SQL Server and Oracle as these are the two most common OLTP databases.

gplink is another tool that can read external data but this technique can connect to any valid JDBC source. It doesn’t automate many of the steps that Oustourcer does but it is a convenient tool to get data from a JDBC source.

You might be thinking that sqoop does this but not exactly. gplink and Outsourcer load data into HAWQ and Greenplum tables. It is optimized for these databases and fixes data for you automatically. Both remove null and newline characters and escapes the escape and delimiter characters. With sqoop, you will have to read the data from HDFS using PXF and then fix the errors that could be in the files.

Both tools are linked above.

Summary
This post gives a brief description on the various ways to load data into HAWQ. Pick the right technique for your use case. As you can see, HAWQ is very flexible and can handle a variety of ways to load data.

Pivotal HDB 2.0 (Apache HAWQ) Table Distribution

Pivotal HDB version 2.0 is very close to being generally available and how table distribution works between this version and 1.3 which are worth mentioning.

Distribution
HDB is a fork of Greenplum Database which is an MPP database. Greenplum distributes or shards the data across multiple “segments” which are located on multiple “segment hosts”. The distribution is typically set by a hash of a column or set of columns.

Example:

CREATE TABLE customer 
(customer_id int,
 customer_name text,
 customer_address text)
DISTRIBUTED BY (customer_id);

In Greenplum, this would create a file in each segment in each segment host. If you made the table column oriented, the number of files increases with a file per column, per segment. Add in partitioning which again uses separate files for each partition, you end up with possibly thousands of files for a single table. This is great for an MPP database with a robust optimizer that can skip scanning files it doesn’t need to in order to execute the query in the fastest way possible.

HDB 1.3 uses the same design pattern as Greenplum but it stores the files in HDFS. Hadoop loves big files but doesn’t work optimally with lots of files which meant that you didn’t typically use column orientation and partitions were larger.

HDB 2.0 Distribution
1. There are now a dynamic number of segment processes per host. There is just a single segment directory per data node and the database will dynamically create the number of buckets as needed.

2. When you create a table with the distribution set (as shown above), the number of buckets is fixed. This is set with the GUC default_hash_table_bucket_number which sets the number of buckets per host.

3. When you create a random distribution table, the number of buckets is dynamic.

So how does this work? Take our example “customer” table above with the distribution set to (customer_id).

INSERT INTO customer SELECT i, 'company_' || i, i || ' main st' 
FROM generate_series(1,1000) AS i;

Query returned successfully: 1000 rows affected, 784 msec execution time.

Now let’s go look at the files.

--use the OID values to find the location in HDFS
SELECT oid FROM pg_database WHERE datname = 'gpadmin';

16508

SELECT c.oid
FROM pg_namespace n
JOIN pg_class c ON n.oid = c.relnamespace
WHERE n.nspname = 'public'
AND c.relname = 'customer'
AND c.relkind = 'r';

24591

So our data files are in HDFS. This is just a single node VM I’m working with one segment. It has default_hash_table_bucket_number set to 6 so HDB will create 6 buckets of data in HDFS.

hdfs dfs -ls /hawq_default/16385/16508/24591
Found 6 items
-rw-------   1 gpadmin hdfs       6776 2016-05-04 16:51 /hawq_default/16385/16508/24591/1
-rw-------   1 gpadmin hdfs       6768 2016-05-04 16:51 /hawq_default/16385/16508/24591/2
-rw-------   1 gpadmin hdfs       6688 2016-05-04 16:51 /hawq_default/16385/16508/24591/3
-rw-------   1 gpadmin hdfs       6728 2016-05-04 16:51 /hawq_default/16385/16508/24591/4
-rw-------   1 gpadmin hdfs       7600 2016-05-04 16:51 /hawq_default/16385/16508/24591/5
-rw-------   1 gpadmin hdfs       7488 2016-05-04 16:51 /hawq_default/16385/16508/24591/6

Now recreate this table with random distribution, insert the data, and look at the files.

DROP TABLE IF EXISTS customer;
CREATE TABLE customer 
(customer_id int,
 customer_name text,
 customer_address text)
DISTRIBUTED RANDOMLY;

INSERT INTO customer SELECT i, 'company_' || i, i || ' main st' 
FROM generate_series(1,1000) AS i;

SELECT c.oid
FROM pg_namespace n
JOIN pg_class c ON n.oid = c.relnamespace
WHERE n.nspname = 'public'
AND c.relname = 'customer'
AND c.relkind = 'r';

24596

[gpadmin@hdp23 ~]$ hdfs dfs -ls /hawq_default/16385/16508/24596
Found 1 items
-rw-------   1 gpadmin hdfs      41968 2016-05-04 17:02 /hawq_default/16385/16508/24596/1

It only created a single file in HDFS with random distribution.
– This is great for HDFS because there are less files for the namenode to track.
– Allows for elasticity of the cluster. Grow or shrink the cluster without having to redistribute the data.
– The optimizer has also been enhanced to dynamically set the number of buckets based on the demand of the query.

As you might be concluding right about now, RANDOM DISTRIBUTION is the recommendation for tables in HDB 2.0. You can still set your distribution to a hash of a column or columns which will use a static number of buckets but random is recommended.

If you create a table now in HDB 2.0 without setting the distribution, the default will be RANDOM.

Proof

DROP TABLE IF EXISTS customer;
CREATE TABLE customer 
(customer_id int,
 customer_name text,
 customer_address text)
 DISTRIBUTED BY (customer_id);

SELECT sub.attname
FROM pg_namespace n
JOIN pg_class c ON n.oid = c.relnamespace
LEFT JOIN 
(SELECT p.attrelid, p.attname
FROM pg_attribute p
JOIN (SELECT localoid, unnest(attrnums) AS attnum FROM gp_distribution_policy) AS g ON g.localoid = p.attrelid AND g.attnum = p.attnum) AS sub
ON c.oid = sub.attrelid 
WHERE n.nspname = 'public'
AND c.relname = 'customer'
AND c.relkind = 'r';

"customer_id"

Now, recreate the table without setting the distribution key.

DROP TABLE IF EXISTS customer;
CREATE TABLE customer 
(customer_id int,
 customer_name text,
 customer_address text);

SELECT sub.attname
FROM pg_namespace n
JOIN pg_class c ON n.oid = c.relnamespace
LEFT JOIN 
(SELECT p.attrelid, p.attname
FROM pg_attribute p
JOIN (SELECT localoid, unnest(attrnums) AS attnum FROM gp_distribution_policy) AS g ON g.localoid = p.attrelid AND g.attnum = p.attnum) AS sub
ON c.oid = sub.attrelid 
WHERE n.nspname = 'public'
AND c.relname = 'customer'
AND c.relkind = 'r';

--no data

HAWQ Demystified

The genesis of the best SQL engine in Hadoop is not an overnight, “me too” product. It seems that it wasn’t too long ago the Hadoop vendors all but wrote off SQL but the demand just hasn’t gone away.

HAWQ is the result of many, many years of work leveraging open source software as well as contributing back to the open source community. I think a short history lesson is in order to fully understand how this product came to be.

greenplum
Greenplum Database began life in 2003. The founders used open source PostgreSQL and released a commercial product soon after.

bizgres
Bizgres, an open source version of Greenplum, was released in 2005. Very early on, the founders of Greenplum embraced contributing back to the open source community.


Madlib was released in 2010 as an open source project which later became an Apache Incubator project.

EMC
Greenplum was acquired by EMC in 2010 and almost immediately, EMC invested heavily into Hadoop. The Greenplum division was agile like a small startup company but with the deep pockets of a multi-billion dollar company.

GPHD
Greenplum released a Hadoop distribution in 2011 and integration between Greenplum Database and HDFS got more robust with the introduction of “gphdfs”. Greenplum supported External Tables to read/write data in parallel to HDFS from several different distributions.

HAWQ
HAWQ, a fork of Greenplum Database, was released in 2013. HAWQ was immediately extremely performant and compliant with the newest SQL syntax. HAWQ borrowed from the 10 years experience of developing Greenplum to provide a robust optimizer designed for HDFS.

pivotal155px
2013 also saw Pivotal become a company. EMC contributed Greenplum Database, VMWare contributed Gemfire and Cloud Foundry, and GE contributed capital as an active partner. Paul Maritiz became the CEO and the dedication to fully embrace open source became an integral part of the corporate culture.

During the last three years, HAWQ has become an Apache Incubator Project. The Pivotal product is now a rather boring name of “Pivotal HDB” while HAWQ is the name of the Apache project.

Pivotal also made Greenplum and Geode (Gemfire is the commercial product name) open source projects too. Clearly, Pivotal has embraced open source with probably more committers than those other “open source” data companies.

So what now? What is happening in 2016? Well, Pivotal is about to release Pivotal HDB (HAWQ) 2.0. I’ve been testing this product for months on various platforms and I keep getting amazed by the performance and ease of use.

HAWQ 2.0 embraces Hadoop fully. I believe the two biggest features are elasticity and performance. HAWQ now supports elasticity for growing or shrinking clusters without having to redistribute the data. The performance is also much improved as it better utilizes HDFS and YARN.

Pivotal HDB is certified to run on Hortonworks HDP with plans on becoming a first class citizen of the Open Data Platform (ODPi).

So you may be asking, “is it fast?” and the answer is yes! I haven’t found a SQL engine that is faster and I’ve been doing competitive analysis for months. The other question you may ask is, “can I run my SQL?” and the answer is yes! A major competitor in the SQL on Hadoop landscape requires more tweaking of SQL just to get the SQL to execute and more tuning to get decent performance.

That “other SQL on Hadoop” product can’t do the following things, as well as many more, that HAWQ can.

– Can’t handle a comment line at the end of SQL file (I found that rather strange)
– Won’t do partition elimination through a joined table (e.g. date dimension)
– Can’t get number of milliseconds when subtracting from a date. Only gets seconds.
– Has “interval” in SQL dialect but doesn’t have interval as a type.
– No time data type.
– Concatenating strings doesn’t use || or +. You must use concat() function.
– Doesn’t support intersect or except.
– Doesn’t support subqueries that return more than one row.
– Doesn’t support correlated subqueries.
– Doesn’t support group by rollup.
– Doesn’t support subqueries in having statement.
– Subqueries not supported in select list.

HAWQ is the real deal. It is an Apache project, going to be part of ODPi, faster than everyone else, integrated with Apache Ambari, certified with Hortonworks, and the most mature SQL engine for Hadoop.

Analyzedb Performance Metrics

I have already written about AnalyzeDB in this post but I thought I would write another post about it with performance metrics.

The old method that I used to analyze tables was to analyze every table and partition sequentially. Then I would analyze the root partition of partitioned tables. The script would look like this:

Tables and Partitions

psql -t -A -c "SELECT 'ANALYZE ' || n.nspname || '.' || c.relname || ';' FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid WHERE n.nspname = 'tpcds' AND c.relname NOT IN (SELECT DISTINCT tablename FROM pg_partitions p WHERE schemaname = 'tpcds') ORDER BY 1" | psql -e

Root Partitions of Partitioned Tables

psql -t -A -c "SELECT 'ANALYZE ROOTPARTITION ' || n.nspname || '.' || c.relname || ';' FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid WHERE n.nspname = 'tpcds' AND c.relname IN (SELECT DISTINCT tablename FROM pg_partitions p WHERE schemaname = 'tpcds') ORDER BY 1" | psql -e

The second method is with analyzedb which can be done with a single line.

analyzedb -d gpadmin -s tpcds --full -a

The default for analyzedb is to use 5 threads so you can adjust this to maximize performance. You can also take advantange of how analyzedb keeps track of the tables it has analyzed so it won’t unnecessarily analyze tables which makes the process even faster.

Here are some numbers to put this into perspective. I’m using virtual machines with only 1GB of data but the percentage improvement is what we are wanting to measure.

HAWQ 2.0 Beta
Sequential: 18 minutes and 37 seconds
Analyzedb: 8 minutes and 3 seconds
Improvement: 131% faster!

Greenplum Database 4.3
Sequential: 11 minutes and 25 seconds
Analyzedb: 6 minutes and 59 seconds
Improvement: 63% faster!

If you aren’t using analyzedb to maintain your HAWQ and/or Greenplum databases, start using it now! You’ll see much better performance in keeping your tables’ statistics up to date.

Dear Cloudera, Please Stop!

So I was alerted to this blog post by a colleague and I was floored on what Cloudera is doing.

Basically, they are creating a new feature for objects that can store multiple levels in a single table. In a traditional relational structure, you may have an Orders table with another table for Order_Details. You would simply JOIN the two tables together when needed.

CREATE SCHEMA example;

CREATE TABLE example.Orders
(order_id int NOT NULL,
 order_date date NOT NULL,
 customer_id int NOT NULL);

CREATE TABLE example.Order_Details
(order_detail_id int NOT NULL,
 order_id int NOT NULL,
 product_id int NOT NULL,
 quantity int NOT NULL,
 price numeric NOT NULL);

And when you want to query both tables:

SELECT o.order_date, sum(od.price)
FROM example.orders o 
JOIN example.order_details od ON o.order_id = od.order_id
GROUP BY o.order_date;

What Cloudera is doing is an Object approach which combines both tables into a single table using a feature called a STRUCT. This is similar to a composite type in Greenplum, PostgreSQL, and HAWQ.

--Impala
CREATE TABLE Orders
(order_id int NOT NULL,
 order_date date NOT NULL,
 customer_id int NOT NULL,
 Order_Details ARRAY<STRUCT<
    order_detail_id: int,
    product_id: int,
    quantity: int,
    price: numeric>>
);

I get this approach when working with a OLTP system and developers want to match their Classes to the database structure but I really don’t see the benefit of this in a big data platform. It just makes it difficult for Database Users to understand and use the data.

Schema.Table
Every database I’ve ever worked with uses Schema.Table to organize table structures but not Impala! Cloudera has decided to break away from the decades old standard and use the dot notation for their nested fields stored within a STRUCT. If you adopt this silliness, you are adopting a standard that no one else uses. Why would Cloudera want that? Vendor lock-in maybe????

--Impala
select * from orders.order_details;

Orders isn’t the schema in the above example. This is a table. Cloudera just broke SQL!

Cartesian Product? Nope, Just Vendor Specific Syntax
Notice how this query appears to reference two different tables without specifying a join.

SELECT o.order_date, sum(od.price)
FROM orders o, o.order_details od 
GROUP BY o.order_date;

They even allow you to do an OUTER JOIN to the STRUCT ARRAY without defining the columns to join on.

SELECT o.order_date, sum(od.price)
FROM orders o
LEFT OUTER JOIN o.order_details od 
GROUP BY o.order_date;

Reasons Why Cloudera is Doing This
Here are my guesses as to why they are doing this.
1. They hate SQL or just don’t understand it.
2. Vendor Lock-In. They need ways to make it difficult for customers to switch to a competitor’s product.
3. Impala is bad at joins. Organizing tables like this make it easier for Impala’s query optimizer to create a robust query plan.

Please Cloudera, just stop!

TPC-DS Benchmark

TPC
The TPC is a non-profit organization that provides several benchmarks for databases. The two common benchmarks they provide for Data Warehousing and Analytics are the Decision Support (TPC-DS) and Ad-Hoc (TPC-H) benchmarks. More information can be found here: http://www.tpc.org/.

SQL on Hadoop
HAWQ can execute all 99 queries without modification.
– IBM Big SQL can execute all 99 queries but requires modifying 12 queries (as of the most recent publication I can find).
– Impala can’t run all of the queries and many require modifications to execute.
– Hive can’t run all of the queries and many require modifications to execute.

There isn’t much point to compare HAWQ with these other SQL engines when none can execute all of the queries without modification! So… this post will focus on the capabilities of HAWQ as well as the automated TPC-DS benchmark process I have put together.

Sandbox
The tests will use a Hortonworks 2.2 Sandbox with 8GB of RAM and 4 cores dedicated to the VM. The dataset will be 2GB in size which works well in a VM. This is definitely not a huge test but it will demonstrate the capabilities of HAWQ. Again, all 99 queries can run in HAWQ without any modification!

TPC-DS
I’m using the tests provided here: https://github.com/pivotalguru/TPC-DS which I put together to help evaluate different hardware platforms. This script works with HAWQ and with Greenplum database.

This script automates the entire process of generating the data, building the tables, loading the tables, and executing the TPC-DS queries.

Results
1. Compile TPC-DS: 3.4 seconds
2. Generate the Data using dsdgen: 3 minutes 23 seconds
3. Create tables and ensure correct optimizer settings are enabled: 5.5 seconds
4. Load the tables: 2 minutes and 5 seconds
5. Execute all 99 TPC-DS queries: 6 minutes 46 seconds

TPC-DS_2GB

To put this into perspective, I attempted to run a 2GB TPC-DS benchmark in the same VM with another SQL on Hadoop tool and it took 2 hours to just get all of the data loaded! The engine was unable to execute the vendor provided SQL queries either so I gave up on that effort. HAWQ is the way to go!

HAWQ versus Hive on HDP 2.2.4.2 (Part 2)

Part 1 covered the comparison of HAWQ versus Hive using Map-Reduce instead of Tez. Now in Part 2, I will execute the same scripts but with Tez enabled.

Enabling Tez is pretty easy to do with HDP 2.2.4.2. You simply change this in the General section under Hive and restart Hive:

hive.execution.engine = tez

Next, I re-ran the same loads and queries. Overall, it is roughly 25% faster with Tez enabled but it is still much slower than HAWQ.

Here is the graph of results:
HAWQ vs Hive 2

Observations
– Loading was slower with Tez enabled but this is probably because I’m testing with a VM.
– Every query was about 40% faster in Hive with Tez versus without.
– HAWQ was about 30 times faster than Hive with Tez and about 57 times faster than Hive with Map-Reduce.
– The execute time reported by Hive was incorrect for each query. For example, I used “time hive -f icd9.sql” to execute a the icd9.sql query and capture the timings. Time reported:

real    0m25.060s

but Hive reported:

Time taken: 13.17 seconds, Fetched: 19 row(s)

So another test but the same result. Hive is much, much slower than HAWQ and even with Tez enabled. If you want Enterprise level SQL for Hadoop, HAWQ is the best solution.

HAWQ versus Hive on HDP 2.2.4.2 (Part 1)

I recently built a Virtual Machine running CentOS 6.4, Hortonworks installation of Ambari 1.7, Hortonworks 2.2.4.2, and Pivotal HAWQ 1.3.0.1. If you aren’t already familiar with with the Open Data Platform, it is the shared industry effort to standardize the core components of Hadoop (HDFS, YARN, MapReduce, and Ambari) that both Hortonworks and Pivotal are a part of. This means I have HAWQ running on the Hortonworks deployment and this is a supported configuration.

After I got this built, I decided to put a set of demo scripts I use for HAWQ in the VM. It contains a simple Star Schema of Center for Medicare and Medicaid Services (CMS) data. It consists of 5 Dimensions and a single claims Fact with 1 million claims. CMS Star Schema

The demo does the following:
1. Loads the claims Fact with 1 million claims
2. Create and loads the 5 Dimensions
3. Creates and loads a single “Sandbox” table that joins all Dimensions to build a 1NF table
4. Execute 3 basic ad-hoc queries

I then converted my scripts to do the same with Hive but I ran into one problem. CMS provides one table with a range of values rather than providing each distinct value. For example:

001|139|Infectious and parasitic diseases
140|239|Neoplasms
240|279|Endocrine, nutritional and metabolic diseases, and immunity disorders
...

With HAWQ, I used generate_series(int, int) in a simple SQL query to generate every value in the range. Unfortunately, Hive doesn’t support this function so I had two options; write my own UDF or borrow the transformed data from HAWQ. I chose the latter because I could get this done faster.

And now, the results!

HAWQ Timings
Load CMS Table: 3.1 seconds
Dimensions: 1.6 seconds
Sandbox Table: 3.1 seconds
Gender Query: 0.9 second
Location Query: 0.72 second
ICD9 Query: 0.95 second
Total: 10.37 seconds

Hive Timings
Load CMS Table: 44.5 seconds
Dimensions (4 of the 5): 3 minutes 42.6 seconds
Dimension Fix: 35.7 seconds
Sandbox Table: 45.6 seconds
Gender Query: 47.5 seconds
Location Query: 48.4 seconds
ICD9 Query: 48.8 seconds
Total: 8 minutes 13.2 seconds

This graph is very telling.
HAWQ vs Hive

Hive looks to be a good entry level tool for SQL on Hadoop. The performance isn’t great but it comes with every distribution of Hadoop and is easier to use than other Hadoop tools. But HAWQ will give you significantly better performance and this can be on either the Pivotal HD or the Hortonworks distributions. Both distributions are based on the ODP which delivers the standard Hadoop core so that HAWQ will work on either distribution.

In my next post, I plan on enabling TEZ for Hive to see what the performance benefit will be. It should help but I don’t think it will come close to the performance of HAWQ. We shall see.

XML Loading Again

I covered this topic here before with a solution that handles very large XML files in Hadoop. This blog post covers parsing XML as it is loaded in Hawq or Greenplum database.

sample.xml

<?xml version="1.0"?>
<catalog>
      <large-product>
         <name>foo1</name>
         <price>110</price>
      </large-product>
      <large-product>
         <name>foo2</name>
         <price>120</price>
      </large-product>
      <large-product>
         <name>foo3</name>
         <price>130</price>
      </large-product>
      <large-product>
         <name>foo4</name>
         <price>140</price>
      </large-product>
      <large-product>
         <name>foo5</name>
         <price>150</price>
      </large-product>
      <small-product>
         <name>bar1</name>
         <price>10</price>
      </small-product>
      <small-product>
         <name>bar2</name>
         <price>20</price>
      </small-product>
      <small-product>
         <name>bar3</name>
         <price>30</price>
      </small-product>      
      <small-product>
         <name>bar4</name>
         <price>40</price>
      </small-product>
      <small-product>
         <name>bar5</name>
         <price>50</price>
      </small-product>
</catalog>
</pre>

sample.xsl

<xsl:stylesheet version="1.0"
xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
<xsl:output omit-xml-declaration="yes"/>
<xsl:template match="catalog"><xsl:for-each select="large-product">
Large|<xsl:value-of select="name"/>|<xsl:value-of select="price"/>
</xsl:for-each>
<xsl:for-each select="small-product">
Small|<xsl:value-of select="name"/>|<xsl:value-of select="price"/>
</xsl:for-each>
</xsl:template>
</xsl:stylesheet>

sample.yml

---
VERSION: 1.0.0.1
TRANSFORMATIONS:
  sample:
     TYPE: input
     CONTENT: data
     COMMAND: /usr/bin/xsltproc sample.xsl %filename%

Start gpfdist:

gpfdist -c sample.yml -p 8080 >> sample.log 2>&1 < sample.log &

Create External Table

create external table sample
(product_type text, product_name text, product_price int)
location ('gpfdist://bigmac:8080/sample.xml#transform=sample')
format 'text' (delimiter '|' header);

Select the data.

gpdb=# select * from sample; 
product_type | product_name | product_price 
--------------+--------------+---------------
 Large        | foo1         |           110
 Large        | foo2         |           120
 Large        | foo3         |           130
 Large        | foo4         |           140
 Large        | foo5         |           150
 Small        | bar1         |            10
 Small        | bar2         |            20
 Small        | bar3         |            30
 Small        | bar4         |            40
 Small        | bar5         |            50
(10 rows)

This solution works great for parsing reasonably sized XML files into a relational format but if you have very large files, use Hadoop and review my other blog post here.

External Table “TRANSFORM” Option

In the Greenplum Administrator Guide, there is a section that covers loading XML data with gpload. It also mentions that you can create the External Table yourself. This demo will show you how to create an External Table that utilizes this feature so that you can execute any script you want on your ETL server. This demo also works for HAWQ.

XML Parsing is done sequentially from the top to the bottom of a file and the TRANSFORM option was built with this in mind. The gpfdist process will execute a script for you and the output is read by the segments in parallel. Instead of parsing XML with a script, this demo will execute a Unix command in a script to show you how you can leverage this feature to execute virtually any command you want on a remote server.

ETL Server
/data1/demo5/demo5.yml

---
VERSION: 1.0.0.1
TRANSFORMATIONS:
  transform_demo5:
     TYPE: input 
     CONTENT: data
     COMMAND: /bin/bash get_df.sh

As you can see, I created a transform named “transform_demo5” that executes the script “get_df.sh”. So let’s look at get_df.sh.
/data1/demo5/get_df.sh

df -k | awk '{print $1"|"$2"|"$3"|"$4"|"$5"|"$6}' | tail -n +2

This simple command executes the Unix df command and converts that to a pipe delimited file. Executing the command outputs this:

[pivhdsne:demo5]$ df -k | awk '{print $1"|"$2"|"$3"|"$4"|"$5"|"$6}' | tail -n +2
/dev/sda3|47472560|12080036|32981056|27%|/
tmpfs|4030752|0|4030752|0%|/dev/shm
/dev/sda1|99150|48764|45266|52%|/boot

Now start a gpfdist process that uses a configuration file. The “-c” option isn’t well documented with gpfdist but it is mentioned with the XML parsing.

gpfdist -p 8999 -c /data1/demo5/demo5.yml 2>&1 > demo5.log &

Now create an External Table. Notice the format of using #transform=transform_demo5. The filename of “foo” is ignored but you can reference the filename as a parameter to your transform scripts.

/data1/demo5/get_df.sql

CREATE EXTERNAL TABLE get_df 
(Filesystem text,
 K_blocks int,
 Used int,
 Available int,
 Used_percentage text,
 Mounted_on text)
LOCATION ('gpfdist://pivhdsne:8999/foo#transform=transform_demo5')
FORMAT 'TEXT' (DELIMITER '|');

Now create the table in your database server (Hawq or Greenplum).

psql -f get_df.sql -h gpdbvm43 
Password: 
Timing is on.
CREATE EXTERNAL TABLE
Time: 238.788 ms

Now let’s see the output.

[pivhdsne:demo5]$ psql -h gpdbvm43
Password: 
Timing is on.
psql (8.2.15)
Type "help" for help.

gpadmin=# select * from get_df;
 filesystem | k_blocks |   used   | available | used_percentage | mounted_on 
------------+----------+----------+-----------+-----------------+------------
 /dev/sda3  | 47472560 | 12104924 |  32956168 | 27%             | /
 tmpfs      |  4030752 |        0 |   4030752 | 0%              | /dev/shm
 /dev/sda1  |    99150 |    48764 |     45266 | 52%             | /boot
(3 rows)

Time: 32.149 ms

This is yet another feature of Greenplum and Hawq that gives you more flexibility in working with data. I’ve seen this feature used to move files, change permissions, get data from running programs, and of course parsing XML. Enjoy!

Open Data Platform

Pivotal teams up with Hortonworks for the Open Data Platform. Pivotal is also making the data products (Gemfire, Greenplum Database, and HAWQ) all open source.

http://blog.pivotal.io/big-data-pivotal/news-2/pivotal-big-data-suite-open-agile-cloud-ready

http://hortonworks.com/blog/pivotal-hortonworks-announce-alliance/

http://blog.pivotal.io/big-data-pivotal/news-2/why-the-open-data-platform-is-such-a-big-deal-for-big-data

Outsourcer 4.1.5 released

Bugs Fixed
1. Error messages raised from Oracle and SQL Server could include special characters which would cause the thread in Outsourcer to die and the status remain labeled as “processing”. Error messages no longer have escape and quote characters to prevent this problem.
2. On new installations, the permissions for the Oracle and SQL Server Jar files would remain owned by root. This has now been corrected to be the admin user (gpadmin).
3. On the “Sources” screen, the label for Append-Only now is correct for Greenplum Database 4.3 and reflects that these are really “Append-Optimized” tables.

Enhancements
1. Added External Tables to enable starting and stopping the UI rather than having to ssh to the Master host.
2. “Queue” screen now provides the ability to cancel processing jobs.
3. Stopping Outsourcer Queue Daemon (Environment Screen), now will cancel currently running Jobs in the Queue.
4. Starting Outsourcer Queue Daemon (Environment Screen), now will make sure orphaned jobs in the Queue will be marked as Failed and if any are currently running, it will cancel these jobs too.

Download
Documenation
Github

Greenplum Database Connecting to Hortonworks UPDATED

Hortonworks 2.1 is now officially supported by Pivotal in the Greenplum Database version 4.3.3.x and 4.2.8.x by using External Tables.

Greenplum Database 4.3.3 Documentation
Greenplum Database 4.2.8 Documentation

The documentation has the basic configuration but I wanted to provide more detailed instructions.

Hortonworks
For my test, I downloaded the Hortonworks HDP 2.1 Sandbox. Next, I started the instance and I created a user named “gpadmin” just to make it easier for me to connect to the cluster.

Pivotal Greenplum Database
Step 1: Downloaded the Greenplum Database Trial Virtual Machine and started the cluster.

Step 2: As root, add the Hortonworks repo

wget -nv http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.1.7.0/hdp.repo -O /etc/yum.repos.d/hdp.repo

Note: this will need to be done on all nodes in a full Greenplum cluster.

Step 3: As root, install the Hadoop client

yum install hadoop* openssl

Note: this will need to be done on all nodes in a full Greenplum cluster.

Step 4: As root, remove previous OpenJDK and install Oracle JDK. You will need to download the Oracle JDK from Oracle.

yum remove java
rpm -i jdk-7u67-linux-x64.rpm

Note: this will need to be done on all nodes in a full Greenplum cluster.

Step 5: As gpadmin, start the Greenplum database and set the Hadoop target version to Hortonworks

gpconfig -c gp_hadoop_target_version -v hdp2

Step 6: As gpadmin, edit your .bashrc and add the following

export JAVA_HOME=/usr/java/latest
export HADOOP_HOME=/usr/lib/hadoop/client

Note: this will need to be done on all nodes in a full Greenplum cluster.

Step 7: As gpadmin, restart the database

source .bashrc
gpstop -r -a

Step 8: As root, add host entry to the Master and Standby-Master for the Hadoop cluster

echo "192.168.239.219 hdp21" >> /etc/hosts

Note: This will need to be done on all nodes in a full Greenplum cluster and all Hortonworks nodes should be in the hosts file too. DNS can also be used.

Build Tables

DROP EXTERNAL TABLE IF EXISTS ext_foo;
CREATE WRITABLE EXTERNAL TABLE ext_foo
(i int, bar text) LOCATION ('gphdfs://hdp21/foo_bar') FORMAT 'text' (delimiter '|' null 'null');
--Query returned successfully with no result in 60 ms.
INSERT INTO ext_foo SELECT i, 'bar_' || i FROM generate_series(1, 100) AS i;
--Query returned successfully: 100 rows affected, 2636 ms execution time.
DROP EXTERNAL TABLE IF EXISTS ext_get_foo;
CREATE EXTERNAL TABLE ext_get_foo
(i int, bar text) LOCATION ('gphdfs://hdp21/foo_bar') FORMAT 'text' (delimiter '|' null 'null');
--Query returned successfully with no result in 28 ms.
SELECT * FROM ext_get_foo ORDER BY i LIMIT 10;
/*
1;"bar_1"
2;"bar_2"
3;"bar_3"
4;"bar_4"
5;"bar_5"
6;"bar_6"
7;"bar_7"
8;"bar_8"
9;"bar_9"
10;"bar_10"
*/

Maintenance Script for HAWQ and Greenplum database

A common question I get is what maintenance activity is needed in HAWQ and Greenplum database. There are lots of ways to handle this and a lot of customers do it as part of their ETL processing. I put together a script that is pretty generic and should be very useful for most implementations.

The script is available on Github in a new project I created for scripts. As I come up with more scripts for various activities, I will put it in this Github repository.

Link to repository

maintain.sh
Note: Run this script on a regular basis such as weekly or monthly.

Enjoy!

XML Parsing

I recently worked with a customer that receives very large and complex XML files from external partners. This customer wanted the XML files parsed and available for SQL access so they can do reporting and analytics.

There are many ways to handle XML files but in this case in which I had very large files, I needed a cluster of machines and Hadoop is pretty good at that. The processing can be done with Map Reduce or a tool like Pig which simplifies Map Reduce.

Solution 1
Steps

  • Load raw file to Hadoop
  • Transform XML to tab delimited file with Pig
  • Create External Table in HAWQ to read file data in Hadoop

Sample XML file.

<?xml version="1.0"?>
<catalog>
      <large-product>
         <name>foo1</name>
         <price>110</price>
      </large-product>
      <large-product>
         <name>foo2</name>
         <price>120</price>
      </large-product>
      <large-product>
         <name>foo3</name>
         <price>130</price>
      </large-product>
      <large-product>
         <name>foo4</name>
         <price>140</price>
      </large-product>
      <large-product>
         <name>foo5</name>
         <price>150</price>
      </large-product>
      <small-product>
         <name>bar1</name>
         <price>10</price>
      </small-product>
      <small-product>
         <name>bar2</name>
         <price>20</price>
      </small-product>
      <small-product>
         <name>bar3</name>
         <price>30</price>
      </small-product>
      <small-product>
         <name>bar4</name>
         <price>40</price>
      </small-product>
      <small-product>
         <name>bar5</name>
         <price>50</price>
      </small-product>
</catalog>

As you can see, I have two record sets of large products and small products but I just want the small products in a table.

Fist, put the raw XML data into Hadoop.

hdfs dfs -mkdir /demo4
hdfs dfs -put catalog.xml /demo4

Here is the Pig script.

REGISTER /usr/lib/gphd/pig/piggybank.jar;
A = LOAD '/demo4/catalog.xml' 
USING org.apache.pig.piggybank.storage.XMLLoader('small-product') 
AS (doc:chararray);

clean = foreach A GENERATE FLATTEN(REGEX_EXTRACT_ALL(doc,'<small-product>\\s*<name>(.*)</name>\\s*<price>(.*)</price>\\s*</small-product>'))
AS (name:chararray,price:int);
store clean into '/demo4/alt_small_data';

What Pig is doing for me is to first only get the small-product records. This only requires a single line in the script and is very useful. The next step is to use regular expressions to parse each tag. This is very painful to get right because Pig use Map Reduce to parse the data. This is powerful but relatively slow to iterate until you get it right. Even with a small file, each iteration took at least 30 seconds to execute and the full file took 22 minutes.

The last step is to create an External Table in HAWQ.

DROP EXTERNAL TABLE IF EXISTS ext_alt_demo4;
CREATE EXTERNAL TABLE ext_alt_demo4
(
  name text, price int
)
 LOCATION (
    'pxf://pivhdsne:50070/demo4/alt_small_data/part*?profile=HdfsTextSimple'
)
 FORMAT 'text' (delimiter E'\t');

And selecting the data in HAWQ.

SELECT * FROM ext_alt_demo4;
 name | price 
------+-------
 bar1 |    10
 bar2 |    20
 bar3 |    30
 bar4 |    40
 bar5 |    50
(5 rows)

Time: 127.334 ms

This was my first approach for XML parsing until I got frustrated with the many XML tags to create regular expressions for. The XML I had wasn’t as neat as my example so I had to re-run the Pig script over and over again for each slight modification to the parsing logic.

Solution 2
This the same basic process as Solution 1 but instead of parsing each record with regular expressions in Pig, I will create a single column and do the parsing with SQL in HAWQ.

Here is my Pig script.

REGISTER /usr/lib/gphd/pig/piggybank.jar;
A = LOAD '/demo4/catalog.xml' USING org.apache.pig.piggybank.storage.XMLLoader('small-product') AS (doc:chararray);
clean = foreach A generate REPLACE(REPLACE(doc, '\\u000D', ''), '\\u000A', '');
store clean into '/demo4/small_data';

So instead of using regular expressions, I’m replacing carriage return and newline characters from the XML so that each record is in one row. Then I store that back in Hadoop.

Here is the External Table in HAWQ.

CREATE EXTERNAL TABLE ext_demo4
(
xml_data text
)
LOCATION (
'pxf://pivhdsne:50070/demo4/small_data/part*?profile=HdfsTextSimple'
)
FORMAT 'TEXT' (delimiter E'\t');

I then created a simple SQL function to parse the data.

CREATE OR REPLACE FUNCTION fn_extract_xml_value(p_tag text, p_xml text) RETURNS TEXT AS
$$
SELECT SPLIT_PART(SUBSTRING($2 FROM '<' || $1 || '>(.*)</' || $1 || '>'), '<', 1)
$$
LANGUAGE SQL;

And my SQL statement that parses the data.

SELECT (fn_extract_xml_value('name', xml_data))::text AS name, (fn_extract_xml_value('price', xml_data))::int AS price FROM ext_demo4;                 
 name | price 
------+-------
 bar1 |    10
 bar2 |    20
 bar3 |    30
 bar4 |    40
 bar5 |    50
(5 rows)

Time: 94.887 ms

The benefit for me in this second approach is the huge performance increase in the iterative approach of getting the XML parsing correct. Instead of taking several minutes to validate my code in Pig, I could execute a SQL statement that takes less than 1 second to run. It took another quick second to modify the SQL function and then I would try again.

Summary
Hadoop is powerful and has become commodity software with many distributions available that are all pretty much the same. The difference in distributions is the software that is unique to each vendor. Some vendors rely on their management tools while Pivotal HD has HAWQ which is the most robust SQL engine for Hadoop. This example shows how you can leverage the built-in functionality of Hadoop plus HAWQ to be more productive compared to using any other Hadoop distribution.

When to ANALYZE in Greenplum and HAWQ?

Table statistics gives the cost based optimizer information needed to build the best query plan possible and this information is gathered with the ANALYZE command. But when should you execute an ANALYZE on a table? Doesn’t Grenplum and HAWQ do this automatically?

Greenplum and HAWQ will perform an ANALYZE automatically for you so the query optimizer will have good statistics and build a good plan. Greenplum and HAWQ also allow you to configure this if needed with gp_autostats_mode and gp_autostats_on_change_threshold.

gp_autostats_mode
This specifies when an automatic ANALYZE should take place. The values for this configuration parameter can be:
none
on_change
on_no_stats

on_no_stats
The default is on_no_stats and the database will automatically perform an ANALYZE for you. If you INSERT data into an empty table (new table or a table you just did a TRUNCATE on), the database will automatically gather statistics with an ANALYZE.

Typically in an Analytics Data Warehouse, you will insert data once and then read it many times. So the default of on_no_stats will automatically give you statistics as needed with no additional work from you.

none
Self explanatory. You have to execute ANALYZE if you want statistics.

on_change
With this setting, if perform an INSERT, UPDATE, or DELETE that exceeds the gp_autostats_on_change_threshold value, then an automatic ANALYZE will happen. Note for HAWQ, you can only INSERT.

In Greenplum and in the rare case where you are doing a DELETE or UPDATE to a table, you will need to execute an ANALYZE statement. Or, you can set the database to use on_change and the ANALYZE will happen automatically.

gp_autostats_on_change_threshold
This is only relevant for on_change and it is the number of rows that need to change before an automatic ANALYZE will happen. The default is over 2 billion rows so if you really want to use on_change, then you will also need to reduce this configuration parameter to something more realistic.

Examples

Example 1 – The default
gp_autostats_mode = on_no_stats
gp_autostats_on_change_threshold = 2147483647

CREATE TABLE my_table AS 
SELECT state, count(*) AS counter
FROM customer
GROUP BY state
DISTRIBUTED BY (state);
--Statistics will be gathered automatically.
CREATE TABLE my_table 
(state varchar(100),
 counter int)
DISTRIBUTED BY (state);

INSERT INTO my_table
SELECT state, count(*) AS counter
FROM customer
GROUP BY state;
--Statistics will be gathered automatically.
TRUNCATE my_table;

INSERT INTO my_table
SELECT state, count(*) AS counter
FROM customer
GROUP BY state;
--Statistics will be gathered automatically.
TRUNCATE my_table;

INSERT INTO my_table
VALUES ('dummy', 0);
--Statistics will be gathered automatically.

INSERT INTO my_table
SELECT state, count(*) AS counter
FROM customer
GROUP BY state;
--Statistics won't be updated.  The planner will think the table has only 1 row.

ANALYZE my_table;
--Statistics manually gathered and correctly shows the correct number of rows in the table.

Example 2 – Using on_change
gp_autostats_mode = on_change
gp_autostats_on_change_threshold = 1000000

CREATE TABLE my_table AS 
SELECT state, count(*) AS counter
FROM customer
GROUP BY state
DISTRIBUTED BY (state);
--Statistics will be gathered automatically only if the number of rows is 1M or more.
CREATE TABLE my_table 
(state varchar(100),
 counter int)
DISTRIBUTED BY (state);

INSERT INTO my_table
SELECT state, count(*) AS counter
FROM customer
GROUP BY state;
--Statistics will be gathered automatically only if the number of rows is 1M or more.
TRUNCATE my_table;

INSERT INTO my_table
SELECT state, count(*) AS counter
FROM customer
GROUP BY state;
--Statistics will be gathered automatically only if the number of rows is 1M or more.
TRUNCATE my_table;

INSERT INTO my_table
VALUES ('dummy', 0);
--Statistics will not be gathered automatically.

INSERT INTO my_table
SELECT state, count(*) AS counter
FROM customer
GROUP BY state;
--Statistics will be gathered automatically only if the number of rows is 1M or more.

Checking the Statistics
And here are a couple of queries you can use to see the statics information.

SELECT c.oid, c.relpages, c.reltuples 
FROM pg_class c 
JOIN pg_namespace n ON c.relnamespace = n.oid 
WHERE n.nspname = 'public' 
AND c.relname = 'my_table';
--using the oid from the previous query
SELECT * FROM pg_statistic WHERE starelid = 298610;

Summary
Greenplum and HAWQ automatically gather statistics for you in most cases. If you are doing lots of DML activity, you can change the configuration to still automatically gather statistics for you too. These automatic settings make life easier for DBAs, Developers, and Analysts but still give you the flexibility to configure it in the best way for your environment.

Hadoop Distributions

Today’s landscape of Hadoop vendors is mostly comprised of privately held companies that have big investments and partnerships with well established companies. This is very similar to the MPP marketplace in 2010 when these companies were purchased by much larger companies.

Here are a few of the MPP database vendors with their buyers:
Greenplum => EMC
Netezza => IBM
DATAllegro => Microsoft
Aster Data => Teradata
Vertica => HP

Similarly, I think the landscape of Hadoop vendors will change in the near future. Here are the major vendors in this Hadoop space as of September 2014:

Cloudera

  • Private
  • Investments: 2011 – $40M; 2014 – $900M
  • Around 600 employees
  • Founded in 2009
  • Partners with Oracle, Intel (funding), and Amazon (but also competes with Amazon)

Hortonworks

  • Private
  • Investements: 2011 – $23M + $25M
  • 201-500 employees
  • Founded in 2011
  • Partners with Yahoo, Teradata, and SAP

IBM

  • Public
  • $100B Revenue / year
  • 400K employees
  • Founded in 1911

MapR

  • Private
  • Investments: 2009 – $9M; 2014 – $110M
  • 201-500 employees
  • Founded in 2009
  • Partners with Google

Pivotal

  • Private
  • Investments: 2013 – $100M from GE and assets from EMC and VMWare
  • 3000+ employees
  • Founded in 2013 (Pivotal), 2003 (Greenplum), 1998 (VMWare) and 1979 (EMC)
  • Partners with EMC, VMWare, and GE

Amazon

  • Public
  • $75B Revenue / year
  • 132K employees
  • Founded in 1994

Hadoop Vendors Tomorrow
Cloudera => Oracle or Amazon
It will probably be Oracle because of the existing partnership and leadership that came from Oracle but Amazon may want it more. If Oracle doesn’t buy Cloudera, they will probably try to create their own distribution like they did with Linux.

Hortonworks => Teradata
It is only a matter of time before Teradata will have to buy Hortonworks. Microsoft might try to buy Hortonworks or just take a fork of the Windows version to rebrand. Microsoft worked with Sybase a long time ago with SQL Server and then took the code and ran rather than buying Sybase. So because of that history, I think Microsoft won’t buy and Teradata will.

Teradata bought Aster Data and Hortonworks would complete their data portfolio. Teradata for the EDW, Aster Data for Data Marts, and Hortonworks for their Data Lake.

MapR => Google
Google will snatch up MapR which will make MapR very happy.

So that leaves IBM and Amazon as the two publicly held companies left. Pivotal is privately held but by EMC, VMWare, and GE which gives all indications based on past actions by EMC that this company will go public and be big.

Post Acquisitions
So after the big shakeup, I think you’ll see these vendors remaining selling Hadoop:

  • Pivotal: 100% Apache based with the best SQL Engine
  • IBM: Big Insights
  • Teradata: Hortonworks
  • Oracle: Cloudera
  • Google: MapR
  • Amazon: Elastic MapReduce

I could be wrong but I really do think there will be a consolidation of vendors in the near future.

Greenplum 4.3 External Tables to Hortonworks 2.0

The following instructions are valid for connecting Greenplum Database to Hortonworks HDP 2.0. Note that this technique is not officially supported by Pivotal and Pivotal has added supported for HDP 2.1 in Greenplum Database version 4.3.3 and 4.2.8. See this post for more information.

Hortonworks (HDP) 2.0 is based on Apache Hadoop 2.2 which is also what Pivotal HD 2.0 is based on. If you haven’t read already, I was able to demonstrate how Greenplum can read and write data to a Pivotal HD cluster in parallel. http://www.pivotalguru.com/?p=700. You can use this same basic configuration to integrate Grenplum to Hortonworks 2.0!

Follow the same steps in configuring Greenplum database for Pivotal HD 2.0. We’ll use the same client libraries to connect to HDP 2.0. Next create a user in HDP 2.0 named gpadmin. Lastly, create the same types of External Tables but use the IP address of the HDP cluster.

CREATE WRITABLE EXTERNAL TABLE ext_foo_hdp20
(i int, bar text) LOCATION ('gphdfs://192.168.239.215/user/gpadmin/foo_bar') FORMAT 'text' (delimiter '|' null 'null');

INSERT INTO ext_foo_hdp20 SELECT i, 'bar_' || i FROM generate_series(1, 100) AS i;

CREATE EXTERNAL TABLE ext_get_foo_hdp20
(i int, bar text) LOCATION ('gphdfs://192.168.239.215/user/gpadmin/foo_bar') FORMAT 'text' (delimiter '|' null 'null');

Here are the files Greenplum created in the Hortonworks cluster.

[gpadmin@sandbox ~]$ hadoop version
Hadoop 2.2.0.2.0.6.0-76
Subversion git@github.com:hortonworks/hadoop.git -r 8656b1cfad13b03b29e98cad042626205e7a1c86
Compiled by jenkins on 2013-10-18T00:19Z
Compiled with protoc 2.5.0
From source with checksum d23ee1d271c6ac5bd27de664146be2
This command was run using /usr/lib/hadoop/hadoop-common-2.2.0.2.0.6.0-76.jar
[gpadmin@sandbox ~]$ hdfs dfs -ls /user/gpadmin/foo_bar/
Found 2 items
-rw-r--r--   3 gpadmin gpadmin        490 2014-08-05 06:19 /user/gpadmin/foo_bar/0_1407266192-0000000091
-rw-r--r--   3 gpadmin gpadmin        494 2014-08-05 06:19 /user/gpadmin/foo_bar/1_1407266192-0000000091

Here is the data from HDP in Greenplum.

postgres=# SELECT * FROM ext_get_foo_hdp20 ORDER BY i LIMIT 10;
 i  |  bar   
----+--------
  1 | bar_1
  2 | bar_2
  3 | bar_3
  4 | bar_4
  5 | bar_5
  6 | bar_6
  7 | bar_7
  8 | bar_8
  9 | bar_9
 10 | bar_10
(10 rows)

postgres=# 

I suspect the integration will get better and better between Greenplum and the various Hadoop distributions because for the most part, all of the distributions rely on the Apache distribution of Hadoop. Look for more to come in the future too!

HAWQ with Parquet Files

Parquet is a format for column oriented data to be stored in HDFS. It is part of the Apache distribution and is also available in Pivotal HD and HAWQ. HAWQ can store and read data in the Parquet format and it is also available with the open source components of Pivotal HD such as Pig and MapReduce.

Here is a quick example showing how this work.

First, create a table with some data. You can either use the CTAS method or the more traditional CREATE TABLE and then INSERT. Either method works and it is up to your preference on which you do. This example generates only 100 records of some fake customer data.

CTAS

CREATE TABLE CUSTOMER 
WITH (appendonly=true, orientation=parquet)
AS
SELECT i AS id, 'jon' || i AS fname, 'roberts' || i AS lname, i::text || ' main street'::text AS address, 'new york'::text AS city, 'ny'::text AS state, lpad(i, 5, '0') AS zip
FROM (SELECT generate_series(1, 100) AS i) AS sub
DISTRIBUTED BY (id);

CREATE and then INSERT

CREATE TABLE customer
(
  id integer,
  fname text,
  lname text,
  address text,
  city text,
  state text,
  zip text
)
WITH (APPENDONLY=true, ORIENTATION=parquet, 
  OIDS=FALSE
)
DISTRIBUTED BY (id);

INSERT INTO customer
SELECT i AS id, 'jon' || i AS fname, 'roberts' || i AS lname, i::text || ' main street'::text AS address, 'new york'::text AS city, 'ny'::text AS state, lpad(i, 5, '0') AS zip
FROM (SELECT generate_series(1, 100) AS i) AS sub;

Now you have data in the Parquet format in HAWQ. Pretty easy, huh?

Next, I’ll use a nifty tool that queries the HAWQ catalog which tells me where the Parquet files are.

gpextract -o customer.yaml -W customer -dgpadmin

And here is the customer.yaml file it created.

DBVersion: PostgreSQL 8.2.15 (Greenplum Database 4.2.0 build 1) (HAWQ 1.2.0.1 build
  8119) on x86_64-unknown-linux-gnu, compiled by GCC gcc (GCC) 4.4.2 compiled on Apr
  23 2014 16:12:32
DFS_URL: hdfs://phd1.pivotalguru.com:8020
Encoding: UTF8
FileFormat: Parquet
Parquet_FileLocations:
  Checksum: false
  CompressionLevel: 0
  CompressionType: null
  EnableDictionary: false
  Files:
  - path: /hawq_data/gpseg0/16385/16554/16622.0
    size: 4493
  - path: /hawq_data/gpseg1/16385/16554/16622.0
    size: 4499
  PageSize: 1048576
  RowGroupSize: 8388608
TableName: public.customer
Version: 1.0.0

Notice the path to the files which are in Hadoop and are in the Parquet format.

Now you can use a tool like Pig to look at the data.

grunt> A = load '/hawq_data/gpseg{0,1}/16385/16554/16622' USING parquet.pig.ParquetLoader();
grunt> describe A;                                                                          
A: {id: int,fname: bytearray,lname: bytearray,address: bytearray,city: bytearray,state: bytearray,zip: bytearray}
grunt> B = foreach A generate id, fname, lname, address, city, state, zip;
grunt> dump B;
(2,jon2,roberts2,2 main street,new york,ny,00002)
(4,jon4,roberts4,4 main street,new york,ny,00004)
(6,jon6,roberts6,6 main street,new york,ny,00006)
(8,jon8,roberts8,8 main street,new york,ny,00008)
.....

Parquet is easy to use in HAWQ and doesn’t lock you into a Pivotal HD and HAWQ only solution. It is easy to use the other tools like Pig or MapReduce to read the Parquet files in your Hadoop cluster. No vendor lock-in.

HAWQ versus Greenplum Database

HAWQ is a port of Greenplum database to work with the HDFS file system from Hadoop. It is only available with the Pivotal HD distribution of Hadoop even though Pivotal HD is 100% Apache Hadoop compliant.

This post will focus on the major differences in the filesystem, External Tables, DDL commands, and DML commands. I’m comparing Greenplum database version 4.3 with HAWQ version 1.2.0.1 which is shipped with Pivotal HD version 2.0.1.

HDFS versus Posix Filesystem
With Greenplum database, each segment’s files are always local to the host. Even in a failover condition when the mirror is acting as the primary, the data is local to the processing.

With HAWQ, HDFS handles high availability by having three copies of the data across multiple nodes. Because of this, the mirroring that is built into the Greenplum database is removed from HAWQ. If a node were to fail, then Hadoop automatically creates a third copy of the data. So a segment running on a Hadoop data node may not have the data it needs local to it and will need to get data from other physical nodes.

External Tables
Both HAWQ and Greenplum database have External Tables but differ when accessing external data in Hadoop.

HAWQ has PXF which is the Pivotal Extension Framework. It has the ability to access files in HDFS stored as plain text but also in Hive, Hbase, Avro, and Gemfire XD. You can write your own custom profiles to get data from HDFS. PXF can also get some statistics about these files so the optimizer is smarter when accessing these External Tables.

Greenplum database doesn’t have PXF but does have GPHDFS. GPHDFS enables Greenplum database to read and write data to HDFS. It doesn’t have built-in capabilities to Avro, Hive, HBase, and Gemfire XD. It also doesn’t have statistics for these External Tables.

HAWQ is great at exploring and transforming data in Hadoop while Greenplum database is great at bulk loading data from Hadoop into the database as well as bulk writing data from Greenplum database into Hadoop. So land all of your data in Hadoop, transform it with SQL and then create data marts in Greenplum database.

Functions
Both offer functions but HAWQ doesn’t have SECURITY DEFINER functions yet.

DDL Commands
Here is a list of commands that are in Greenplum database but not in HAWQ:

  • CREATE AGGREGATE: user defined aggregate like SUM and COUNT.
  • CREATE CAST: user defined conversion of two datatypes.
  • CREATE CONVERSION: user defined conversion of character set encodings.
  • CREATE DOMAIN: user defined datatype with optional constraints.
  • CREATE INDEX: indexes aren’t supported in HAWQ.
  • CREATE OPERATOR: user defined operator like != is the same as <>.
  • CREATE OPERATOR CLASS: user defined class of how a data type is used within an Index.
  • CREATE RULE: user defined filter placed on a table or view like “gender_code in (‘M’, ‘F’)”
  • CREATE TABLESPACE: user defined directory to be using in Posix filesystem to store database objects.
  • CREATE TRIGGER: user defined trigger for a table. Note that this is very limited in Greenplum database.

DML Commands
HDFS is designed for “write once, read many” and can not handle file pruning which is required for DELETE and UPDATE commands. Because of this HAWQ doesn’t support UPDATE and DELETE commands while Greenplum database does.

Summary
There are other small differences between the two products but these are the major ones.

Pivotal HD and Greenplum Database Integration

So you are thinking about building a Hadoop cluster or already have one and wondering how you can get data to and from Hadoop into an MPP database like Greenplum Database (GPDB). With GPDB, this can be done in parallel which makes it the ideal solution for an Analytics, Reporting, or Data Warehousing environment.

In this example, I will generate some dummy data with a SQL statement in GPDB and load it into PHD with an INSERT statement. I will then read the data back out of PHD with a SELECT statement.

One time steps Configuration

1. Download Pivotal HD that matches your version.
https://network.gopivotal.com/products/pivotal-hd

2. Copy the file to the Master server in the GPDB cluster. In my example, gpdbvm43 is single node VM with 2 segments.

scp PHD-2.0.1.0-148.tar.gz root@gpdbvm43:/root

3. ssh as gpadmin to GPDB 4.3 instance

vi .bashrc
[add these entries]
export JAVA_HOME=/usr/java/latest
export HADOOP_HOME=/usr/lib/gphd

Note: this needs to be repeated on every host in the cluster. You can use gpscp or gpssh to make these changes to all hosts too.

4. Make sure the database is running.

gpstart -a

5. Change the gp_hadoop_target_version to be compatible with Pivotal HD 1.0 and greater.

gpconfig -c gp_hadoop_target_version -v gphd-2.0

6. Add an entry to your /etc/hosts for all of the hosts in the Hadoop cluster. This needs to be done on all hosts in the GPDB cluster too. I’m using a single node VM of PHD so I just have one entry.

vi /etc/hosts 
[add every node in the PHD cluster]
192.168.239.203 pivhdsne.localdomain pivhdsne

Note: You can then use gpscp to copy the revised hosts file to the other hosts in the cluster.

7. Install the PHD client in the GPDB cluster.

su - 
tar --no-same-owner -zxvf PHD-1.1.0.0-76.tar.gz
cd PHD-2.0.1.0-148/utility/rpm
rpm -i *.rpm
cd ../../zookeeper/rpm
rpm -i *.rpm
cd ../../hadoop/rpm
yum install nc
rpm -i *.rpm
exit

Note: You can use gpscp to copy the tar.gz file to the other hosts in the cluster and then use gpssh to execute these commands. Be sure to source the greenplum_path.sh after connecting as root. “nc” may not be needed on your cluster but was required with my VM.

8. Now that you are gpadmin again, bounce the database.

gpstop -r

Pivotal HD Configuration
You likely already have this done but if you are using the single node VM of Pivotal HD, then you will need to edit your /etc/hosts file there so that Hadoop is accessible remotely.

1. ssh as root to the VM

[root@pivhdsne ~]# ifconfig
eth1      Link encap:Ethernet  HWaddr 00:0C:29:20:A3:8F  
          inet addr:192.168.239.203  Bcast:192.168.239.255  Mask:255.255.255.0
          inet6 addr: fe80::20c:29ff:fe20:a38f/64 Scope:Link
          UP BROADCAST RUNNING MULTICAST  MTU:1500  Metric:1
          RX packets:9168 errors:0 dropped:0 overruns:0 frame:0
          TX packets:1199 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:1000 
          RX bytes:653659 (638.3 KiB)  TX bytes:199320 (194.6 KiB)

lo        Link encap:Local Loopback  
          inet addr:127.0.0.1  Mask:255.0.0.0
          inet6 addr: ::1/128 Scope:Host
          UP LOOPBACK RUNNING  MTU:16436  Metric:1
          RX packets:3779367 errors:0 dropped:0 overruns:0 frame:0
          TX packets:3779367 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:0 
          RX bytes:4407394192 (4.1 GiB)  TX bytes:4407394192 (4.1 GiB)

Now vi /etc/hosts and change 127.0.0.1 to the IP address of the VM.

127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.239.203 pivhdsne pivhdsne.localdomain

GPDB External Tables
1. Example of Writable External Table to PHD.

CREATE WRITABLE EXTERNAL TABLE ext_foo
(i int, bar text) LOCATION ('gphdfs://pivhdsne/foo_bar') FORMAT 'text' (delimiter '|' null 'null');

2. Insert some data into PHD from GPDB.

INSERT INTO ext_foo SELECT i, 'bar_' || i FROM generate_series(1, 100) AS i;

3. Create External Table to view the data in PHD.

CREATE EXTERNAL TABLE ext_get_foo
(i int, bar text) LOCATION ('gphdfs://pivhdsne/foo_bar') FORMAT 'text' (delimiter '|' null 'null');

4. Select the data.

postgres=# SELECT * FROM ext_get_foo ORDER BY i LIMIT 10;
 i  |  bar   
----+--------
  1 | bar_1
  2 | bar_2
  3 | bar_3
  4 | bar_4
  5 | bar_5
  6 | bar_6
  7 | bar_7
  8 | bar_8
  9 | bar_9
 10 | bar_10
(10 rows)

Parallel!
Here is how you can see that it was done in parallel. Notice there are two files in the /foo_bar directory that I specified in the External Writable Table above.

[pivhdsne:~]$ hdfs dfs -ls /foo_bar
Found 2 items
-rw-r--r--   3 gpadmin hadoop        490 2014-05-24 00:19 /foo_bar/0_1400790662-0000004541
-rw-r--r--   3 gpadmin hadoop        494 2014-05-24 00:19 /foo_bar/1_1400790662-0000004541

There are two files because my single node VM of GPDB has two Segments. Each Segment wrote its file to Hadoop at the same time. Completely parallel and scalable.

More proof!

[pivhdsne:~]$ hdfs dfs -cat /foo_bar/0_1400790662-0000004541 | more
1|bar_1
3|bar_3
5|bar_5
7|bar_7
9|bar_9
...

[pivhdsne:~]$ hdfs dfs -cat /foo_bar/1_1400790662-0000004541 | more
2|bar_2
4|bar_4
6|bar_6
8|bar_8
10|bar_10
...

Extra Credit!
Log into HAWQ on the PHD cluster and create an External Table to the same files!

CREATE EXTERNAL TABLE ext_get_foo
(i int, bar text) LOCATION 
('pxf://pivhdsne:50070/foo_bar?profile=HdfsTextSimple') FORMAT 'text' (delimiter '|' null 'null');

gpadmin=# SELECT * FROM ext_get_foo ORDER BY i limit 10;
 i  |  bar   
----+--------
  1 | bar_1
  2 | bar_2
  3 | bar_3
  4 | bar_4
  5 | bar_5
  6 | bar_6
  7 | bar_7
  8 | bar_8
  9 | bar_9
 10 | bar_10
(10 rows)

As you can see, there are lots of ways to move data between Greenplum database and Hadoop to satisfy a large variety of use cases to solve business problems.

Outsourcer 4.1.0 Released!

Outsourcer now supports HAWQ! If you aren’t familiar with HAWQ, it is basically Greenplum Database that uses HDFS to store data. This makes Outsourcer even more useful as you can load data into Hadoop from SQL Server and Oracle by clicking through an easy to use web interface.

There are many more new features like the ability to specify Append-Optimized tables, compression, or column orientation. Check it out!

Zip File
Documentation
GitHub Source

Install Video:

When should I use Greenplum Database versus HAWQ?

I get this question a lot so I thought I would post the answer. Greenplum is a robust MPP database that works very well for Data Marts and Enterprise Data Warehouses that tackles historical Business Intelligence reporting as well as predictive analytical use cases. HAWQ provides the most robust SQL interface for Hadoop and can tackle data exploration and transformation in HDFS.

HAWQ
HAWQ is “HAdoop With Query” and is basically a port of Greenplum to store data natively in HDFS. This empowers everyone using Hadoop to use a friendlier and more robust programming interface to Hadoop. Tools for Hadoop have evolved over time because of the need to get more consumers using Hadoop in a more productive manner.

The HAWQ architecture is very similar right now with each segment having its own files but instead of storing these files local to each segment (shared nothing), the data is stored in HDFS. Over time, I believe this architecture will change to enable greater elasticity in the number of segments processing data but right now, Greenplum and HAWQ share this design.

Hadoop Architecture
There are lots of material online about how Hadoop works but I want to point out one of the principals of Hadoop and how it relates to having a relational database store its files in it. Simple Coherency Model is a “write-once-read-many access model for files” that HDFS uses to simplify the data model in Hadoop. Once you “put” a file into HDFS, you can’t edit it to change the file. You can remove the file but once you write it, that is it.

Database Features
There are many “small” features in Greenplum Database that either aren’t supported yet in HAWQ or will not because HDFS won’t allow it. These features need to be taken into consideration when choosing which solution to deploy on. For example, here is a short list of features not found in HAWQ but are in Greenplum database: LOCK TABLE, CREATE INDEX, CREATE TYPE, CREATE RULE, CREATE AGGREGATE, and CREATE LANGUAGE.

Two more commonly used features of Greenplum that are not in HAWQ are DELETE and UPDATE commands. When a database executes a DELETE or UPDATE, it is editing the file. It basically needs to mark the old row as removed which requires changing the file. This isn’t allowed in HDFS so consequently, UPDATE and DELETE commands are not allowed in HAWQ.

HAWQ Features
Now there are some features of HAWQ that don’t exist in Greenplum database that are also compelling. PXF is the Pivotal Extensible Framework that provides enhanced capabilities when working with data in HDFS from HAWQ. Simply put, you can create an External Table to HDFS that reads Text, Hive, HBase, and soon Parquet. These are optimized for these formats and can leverage partitioning and predicate push-down. Another feature of this is the ability to gather statistics on these external tables to HDFS. So if you a large amount invested in Hadoop already, HAWQ can leverage that investment and extend it with more capabilities.

How does it all fit together?
A lot of companies are embracing Hadoop as it is an inexpensive, commodity hardware based, scalable solution that can handle terabytes to petabytes worth of data. HAWQ makes Hadoop better by providing a robust SQL interface built on over 10 years development of the Greenplum database.

Companies are building their “data lakes” and using Pig, Hive, HBase, and now SQL to explore and transform their data. This is where HAWQ excels.

Once data is transformed, companies are loading the relational data for consumption into Data Marts or Enterprise Data Warehouses. This is where Greenplum database comes in by leveraging a robust MPP database to provide historical Business Intelligence capabilities and forward looking predictive analytics all in-database with SQL.

Architecture

Seeing “Invisible” Data

Note: Pivotal Support requested that I mention that this GUC, along with all other hidden GUCs, are not supported. Being able to see deleted data has some benefit in extreme situations but can get you into trouble if used improperly. Do not open a support ticket on this GUC or any other hidden GUC as Pivotal Support will not be able to help. It is hidden for a reason!

I stumbled on this neat little feature today from a hidden GUC (Grand Unified Configuration). If you aren’t familiar with the term GUC, it is basically a configuration value that can be set for a session with “SET” or in the postgresql.conf file using gpconfig.

First, let’s create a table and insert some data.

gpdb=# CREATE TABLE foo (id int NOT NULL, bar text NOT NULL) DISTRIBUTED BY (id);
CREATE TABLE
gpdb=# INSERT INTO foo SELECT i, 'test:' || i FROM generate_series(1, 10) AS i;
INSERT 0 10

Now select the data.

gpdb=# SELECT * FROM foo ORDER BY id;
 id |   bar   
----+---------
  1 | test:1
  2 | test:2
  3 | test:3
  4 | test:4
  5 | test:5
  6 | test:6
  7 | test:7
  8 | test:8
  9 | test:9
 10 | test:10
(10 rows)

Now do something like delete all of the data.

gpdb=# DELETE FROM foo;
DELETE 10

Oh no! I need that data still! How can I get it back?

gpdb=# SET gp_select_invisible = TRUE;
SET
gpdb=# SELECT * FROM foo ORDER BY id;
 id |   bar   
----+---------
  1 | test:1
  2 | test:2
  3 | test:3
  4 | test:4
  5 | test:5
  6 | test:6
  7 | test:7
  8 | test:8
  9 | test:9
 10 | test:10
(10 rows)

As you can see, the old data is still there but marked as deleted. You can still see it by turning this hidden GUC on.

Now here it is turned off to see the data normally.

gpdb=# SET gp_select_invisible = FALSE;
SET
gpdb=# SELECT * FROM foo ORDER BY id;
 id | bar 
----+-----
(0 rows)

You can use this to see non-committed data being inserted into a heap table but not an append-only table. Once you vacuum or truncate a table, the invisible data gets removed so you won’t be able to see it anymore. BTW, this works in HAWQ too!

Dimensional Modeling

I got an email asking me what my thoughts are on dimensional modeling so I thought it would be better to make a post about this rather than an email. Thanks Allan for the inspiration.

First, I think it would be very wise to read books from Ralph Kimball and Bill Inmon to understand the approach to data modeling for the Enterprise Data Warehouse. Dimensional modeling can be good if not great but it does have drawbacks. First, let’s talk about what it is.

Overview
Dimensional modeling is a data model that is broken up into Dimensions and Facts. Dimensions have hierarchies like Country, Region, State, and City while Facts have measures like quantity, price, cost, and score. This approach makes it simple for a BI tool to take a person’s input in a GUI and translate that to SQL that is reasonably sure it will get the correct result. If you have a good dimensional model then you can switch BI Tools rather easily too.

Building the Model
How you build the model is argued by both Inmon and Kimball fans. One approach is to take various disparate data models from source databases and merge this into a common third normal form (3NF) structure. Then you can create dimensional models “easily” based on the various customers’ needs. The other approach is to skip the 3NF model and build sets of dimensional models with conforming dimensions for the entire enterprise.

Kimball
I have found that the Kimball approach works better for the projects I’ve worked on. He does a great job in his books to stress the importance of having a strong sponsor so that IT doesn’t build a dimensional model they think the business wants. That is great advice. The problem with this is the “loser” of any dispute on how the model should be built won’t use the model. Terms are frequently overloaded in organizations and mean different things to different people. Getting everyone to agree on virtually every term in an organization is not usually feasible. So you end up with a model that only one group uses and it took too long to build because of the interviews and discussions.

Inmon
This approach build that 3NF structure that takes the disparate data into a single model. So you acquire a new system or company and then that new data has to fit into this model. How long does that take to do? How much rework is there every time there is something that you hadn’t thought of?

Golden Triangle
A basic concept in project management is the Golden Triangle. It simple refers to the 3 variables in any project which are time, resources, and deliverables. You can’t control all 3 but you can control 2. You have to give some with one of these variables.

Most likely you have a fixed set of resources like 3 developers. You also have a set deliverable; build a dimensional model. The last variable is time and you can’t control this unless you either add more developers or deliver less of the model. This is key in understanding why a dimensional model has a big flaw. It takes too long to build it.

So what does the business users do while they wait for you to build their data model?

Time to Market
Time is everything in organizations. How fast can I get to my data? How fast can my queries execute? Making business users wait for you to build a model eats up time which equates to money. So why don’t you get out of the way?

My approach
I think most business users are smart and I think they know SQL. I’ve never been in an organization that doesn’t have SQL savvy business users. So if they are smart and know how to program enough to get their job done, why don’t you just give them the data?

I think the best model is a simple one. Just replicate the transactional systems into a large database like Greenplum and even Pivotal Hadoop now that it has HAWQ. Business users will query the data and then create their own de-normalized tables. You got out of their way and they got their job done much faster than if you created a dimensional model for them. You might build a dimensional model to make it easier to use with some BI tools after they have access to the data.

Most databases crumble on thought of letting users just execute any “bad” query they want. That is because most databases are designed for Online Transaction Processing (OLTP) and not big data. Look at my Tuning Guide for Greenplum to see how easy it is to maintain a rather robust database platform.

Direction of Industry
More and more organizations are embracing this approach. With Pivotal Hadoop or Greenplum database, you load all of the data into one cluster that scales easily, get great insights out of the data, have excellent time to market, have little to no development costs, and have very little administrator costs.

Businesses can’t wait for a long running queries and they certainly can’t wait for you to build a dimensional model.

Hadoop Data Lake and Transforming Data

A Data Lake is a term describe a large enterprise repository of data stored in Hadoop. More and more companies are concluding that a Data Lake is the right solution over a traditional ETL process and a restricted EDW. The Data Lake is inexpensive, scales easily, uses commodity hardware, provides a very flexible schema, and enables an easy way to transform data in parallel.

Schema
So why not use a relational database for this Data Lake? In a relational database, the schema is defined first and then data is forced into it. With Hadoop, you first load the data and then apply a schema as you read it out. This means adding new data to Hadoop is easier and faster because you don’t have to first define the schema.

Parallel
Processing is also in parallel. You can transform the data using Hadoop tools like Pig to then be loaded into a relational data store or just use it in Hadoop.

Greenplum Database External Tables
There are use cases where a relational database like Greenplum database is easier to use and performs better than Hadoop. A great feature of Greenplum database is the ability to create an External Table to Hadoop. These External Tables can be defined to either READ or WRITE data to Hadoop. Because Greenplum is an MPP database, each segment connects to Hadoop to READ/WRITE data. This makes the integration of Greenplum Database with Hadoop much faster than a single threaded approach that you might see with other database products.

Transform Example
So let’s say you get an employee file from a company you just acquired. You want to take this file and then make it available to the Enterprise in Hadoop as well as a data mart in Greenplum database.

We will first load the data into a stage directory, run a program to remove a control character, and then put it in the enterprise directory.

hadoop dfs -mkdir /stage
hadoop dfs -mkdir /enterprise

Next, I use a Pig program to remove a control character. In this example, I’m removing the “null character”.

The name of this file is pig_cleaning.pig.

dirty = load '$STAGE' as (data:chararray);
clean = foreach dirty generate REPLACE(data, '\\u0000', '');
store clean into '$TARGET';

Here is my employees.txt file I created. Notice the ^@ symbols. These are control characters as they appear when using vi. The pig_cleaning.pig script will remove these control characters.

Jon|Roberts|123 Main Street|New York|NY|10202|Sales Engineer|NorthEast
Abby|Cromwell|77 Juniper Drive|Raleigh|NC|27605|CIO|NorthWest
Lilly|Vargas|7894 Rayna Rd|Atlanta|GA|30301|VP Sales|SouthEast
Emily|Palmer|224 Warren Drive|St. Louis|MO|63101|VP Marketing|MidWest
Winston|Rigby|84 West Road|^@|CA|^@|President|West

Now I created a shell script that accepts a filename to load and the name of the directory in Hadoop to store the results in.

The name of this file is load.sh.

#!/bin/bash

# $1 is the filename
# $2 is the target directory name
hadoop dfs -put $1 /stage
pig -param STAGE=/stage/$1 -param TARGET=/enterprise/$2 pig_cleaning.pig
hadoop dfs -rm /stage/$1

This script loads the file into the /stage directory, runs the pig program to clean the file of the null character (^@), stores the output to the /enterprise directory, and then removes the stage file.

Executing the script is as easy as:

./load.sh employees.txt employees

Now what about Greenplum database? Here is how you can READ that data in Hadoop from the database. Note that in this example, I have Hadoop and Greenplum database on the same single host. Typically, these will be on separate hosts and instead of localhost, you would have the name of the NameNode like hdm1.

create schema enterprise;

create external table enterprise.employees
(fname text,
 lname text,
 address1 text,
 city text,
 state text,
 zip text,
 job text,
 region text)
 LOCATION ('gphdfs://localhost:8020/enterprise/employees/part*')
 FORMAT 'TEXT' (delimiter '|');

And now let’s execute a SELECT statement.

gpdb=# select * from enterprise.employees;
  fname  |  lname   |     address1     |   city    | state |  zip  |      job       |  region   
---------+----------+------------------+-----------+-------+-------+----------------+-----------
 Jon     | Roberts  | 123 Main Street  | New York  | NY    | 10202 | Sales Engineer | NorthEast
 Abby    | Cromwell | 77 Juniper Drive | Raleigh   | NC    | 27605 | CIO            | NorthWest
 Lilly   | Vargas   | 7894 Rayna Rd    | Atlanta   | GA    | 30301 | VP Sales       | SouthEast
 Emily   | Palmer   | 224 Warren Drive | St. Louis | MO    | 63101 | VP Marketing   | MidWest
 Winston | Rigby    | 84 West Road     |           | CA    |       | President      | West
(5 rows)

Conclusion
Hadoop is being used by Enterprises to create a Data Lake. Once there, it is fast and easy to transform the data. And with Greenplum database, it is easy to use SQL tools to access the data.