Category Archives: Hadoop

Pivotal HDB 2.0.1 Configuration Tips

Here are some tips for configuring Pivotal HDB (based on Apache HAWQ) with Ambari.

Temp Directories
A Hadoop cluster typically is configured with JBOD so utilize all data disks for temp space.

Here is an example of the “HAWQ Master Temp Directories” entry when the Master and Standby nodes each have 8 disks:

/data1/hawq/master_tmp,/data2/hawq/master_tmp,/data3/hawq/master_tmp,/data4/hawq/master_tmp,/data5/hawq/master_tmp,/data6/hawq/master_tmp,/data7/hawq/master_tmp,/data8/hawq/master_tmp

Here is an example of the “HAWQ Segment Temp Directories” entry when each Data Node has 8 disks:

/data1/hawq/segment_tmp,/data2/hawq/segment_tmp,/data3/hawq/segment_tmp,/data4/hawq/segment_tmp,/data5/hawq/segment_tmp,/data6/hawq/segment_tmp,/data7/hawq/segment_tmp,/data8/hawq/segment_tmp

VM Overcommit
VM Overcommit set to 2

VM Overcommit Ratio
2GB – 64GB: set the Overcommit Ratio to 50
>= 64GB of RAM: set the Overcommit Ratio to 100

Swap Space
2GB – 8GB: set swap space equal to RAM
8GB – 64GB: set swap space to 0.5 * RAM
>= 64GB: set swap space to 4GB

Segment Memory Usage Limit
Step 1: Calculate total memory (RAM * overcommit_ratio_percentage + SWAP)
Step 2: Calculate total memory used by other activities (2GB for OS, 2GB for Data Node, 2GB for Node Manager, 1GB for PXF)
Step 3: Subtract other memory from total memory to get the value for the Segment Memory Usage Limit

Example 1:
RAM: 256GB
SWAP: 4GB
Other: 7GB
Overcommit Ratio: 100

Using Yarn: ((256 * 1) + 4) – 7 = 253
Using Default Resource Manager: (256 * 1) – 7 = 249

Example 2:
RAM: 64GB
SWAP: 32GB
Other: 7GB
Overcommit Ratio: 50

Using Yarn: ((64 * 0.5) + 32) – 7 = 57
Using Default Resource Manager: (64 * 0.5) – 7 = 57

HDFS core-site.xml
Add:

ipc.client.connect.timeout = 300000

Change:

ipc.client.connection.maxidletime = 3600000

Optional HAWQ hawq-site.xml
Add:

hawq_rm_stmt_vseg_memory = 1gb 

By default, this is set to 128mb which is great for a high level of concurrency. If you need to utilize more memory in the cluster for each query, you can increase this value considerably. Here are the acceptable values:

128mb, 256mb, 512mb, 1gb, 2gb, 4gb, 8gb, 16gb

Alternatively, you can set this at the session level instead of the entire database.

Operating System gpadmin account
Log into the Master and Standby nodes and execute the following:

echo "source /usr/local/hawq/greenplum_path.sh" >> ~/.bashrc

Now set the database password. Below, I am using ‘password’ as the password so set this based on your organization’s password policy. By default, gpadmin doesn’t have a password set at all.

psql -c "alter user gpadmin password 'password'"

Enable encrypted password authentication. This assumes you are using the default /data/hawq/master path. Adjust if needed. This allows you to connect to the database remotely with an encrypted password.

echo "host all all 0.0.0.0/0 md5" >> /data/hawq/master/pg_hba.conf
hawq stop cluster -u -a

Hive SQL Language Support

Hive is probably the most popular SQL engine for Hadoop but not because it is fast nor does it support SQL used by most organizations. It is just the oldest. Or is it?

Pivotal HDB, powered by Apache HAWQ (incubating), is much faster and has much better SQL language support. Pivotal HDB gets its roots from Greenplum database which has been around longer so the SQL language support is much more mature.

Performance
In all of the performance tests I’ve done comparing Pivotal HDB with Apache Hive, the results are dramatic. Pivotal HDB loads data significantly faster and executes queries significantly faster too. And yes, I’ve tested it with Tez and even with LLAP. Hive doesn’t come close to the performance of Pivotal HDB. More will be published on this subject soon.

SQL Language Support
The real reason for this post is the lack of SQL language support that Hive has. This makes it more difficult to write SQL in this environment and it makes it more difficult to migrate from a legacy RDBMS to Hadoop. Here are some examples.

Example 1: Subqueries in the WHERE Clause
This is query 6 from the TPC-DS benchmark:

select  a.ca_state state, count(*) cnt
 from customer_address a
     ,customer c
     ,store_sales s
     ,date_dim d
     ,item i
 where       a.ca_address_sk = c.c_current_addr_sk
 	and c.c_customer_sk = s.ss_customer_sk
 	and s.ss_sold_date_sk = d.d_date_sk
 	and s.ss_item_sk = i.i_item_sk
 	and d.d_month_seq = 
 	     (select distinct (d_month_seq)
 	      from date_dim
               where d_year = 1998
 	        and d_moy = 2 )
 	and i.i_current_price > 1.2 * 
             (select avg(j.i_current_price) 
 	     from item j 
 	     where j.i_category = i.i_category)
 group by a.ca_state
 having count(*) >= 10
 order by cnt 
 limit 100;

This query won’t work in Hive because of the two subqueries in the WHERE clause. You will have to rewrite this query like this:

select  a.ca_state state, count(*) cnt
 from customer_address a
     ,customer c
     ,store_sales s
     ,date_dim d
     ,item i
     , (select distinct (d_month_seq) as d_month_seq
 	      from date_dim
               where d_year = 1998
 	        and d_moy = 2 ) as sq1
     , (select j.i_category, avg(j.i_current_price) as avg_i_current_price
 	     from item j 
        group by j.i_category) as sq2
 where       a.ca_address_sk = c.c_current_addr_sk
 	and c.c_customer_sk = s.ss_customer_sk
 	and s.ss_sold_date_sk = d.d_date_sk
 	and s.ss_item_sk = i.i_item_sk
 	and d.d_month_seq = sq1.d_month_seq
        and sq2.i_category = i.i_category
 	and i.i_current_price > 1.2 * sq2.avg_i_current_price
 group by a.ca_state
 having count(*) >= 10
 order by cnt 
 limit 100;

It took me a few minutes to rewrite this and I’m still not 100% sure it is right. In a conversion effort, I would have to execute it in the legacy RDBMS and see if the results are the same as in Hive. If the legacy RDBMS is changing, then I would have a very difficult time with just a single query.

If I was doing this for benchmarking purposes, I would have to execute TPC-DS in another database that does support this query with the same data to validate the results. Clearly, the lack of SQL language support is a problem.

Example 2: Subqueries in the SELECT Clause
This is query 9 from the TPC-DS benchmark:

select case when (select count(*) 
                  from store_sales 
                  where ss_quantity between 1 and 20) > 31003
            then (select avg(ss_ext_list_price) 
                  from store_sales 
                  where ss_quantity between 1 and 20) 
            else (select avg(ss_net_paid_inc_tax)
                  from store_sales
                  where ss_quantity between 1 and 20) end bucket1 ,
       case when (select count(*)
                  from store_sales
                  where ss_quantity between 21 and 40) > 24212
            then (select avg(ss_ext_list_price)
                  from store_sales
                  where ss_quantity between 21 and 40) 
            else (select avg(ss_net_paid_inc_tax)
                  from store_sales
                  where ss_quantity between 21 and 40) end bucket2,
       case when (select count(*)
                  from store_sales
                  where ss_quantity between 41 and 60) > 28398
            then (select avg(ss_ext_list_price)
                  from store_sales
                  where ss_quantity between 41 and 60)
            else (select avg(ss_net_paid_inc_tax)
                  from store_sales
                  where ss_quantity between 41 and 60) end bucket3,
       case when (select count(*)
                  from store_sales
                  where ss_quantity between 61 and 80) > 21646
            then (select avg(ss_ext_list_price)
                  from store_sales
                  where ss_quantity between 61 and 80)
            else (select avg(ss_net_paid_inc_tax)
                  from store_sales
                  where ss_quantity between 61 and 80) end bucket4,
       case when (select count(*)
                  from store_sales
                  where ss_quantity between 81 and 100) > 4078
            then (select avg(ss_ext_list_price)
                  from store_sales
                  where ss_quantity between 81 and 100)
            else (select avg(ss_net_paid_inc_tax)
                  from store_sales
                  where ss_quantity between 81 and 100) end bucket5
from reason
where r_reason_sk = 1

You get this error:

FAILED: ParseException line 2:18 cannot recognize input near '(' 'select' 'count' in expression specification (state=42000,code=40000)

The fix is to move all of those subqueries from the SELECT to the FROM section of the query. Or you can write an elaborate CASE statement. Again, this takes time to rewrite and even more time to validate.

Example 3: Correlated Subqueries
This is query 10 from the TPC-DS benchmark:

select  
  cd_gender,
  cd_marital_status,
  cd_education_status,
  count(*) cnt1,
  cd_purchase_estimate,
  count(*) cnt2,
  cd_credit_rating,
  count(*) cnt3,
  cd_dep_count,
  count(*) cnt4,
  cd_dep_employed_count,
  count(*) cnt5,
  cd_dep_college_count,
  count(*) cnt6
 from
  customer c,customer_address ca,customer_demographics
 where
  c.c_current_addr_sk = ca.ca_address_sk and
  ca_county in ('Clinton County','Platte County','Franklin County','Louisa County','Harmon County') and
  cd_demo_sk = c.c_current_cdemo_sk and 
  exists (select *
          from store_sales,date_dim
          where c.c_customer_sk = ss_customer_sk and
                ss_sold_date_sk = d_date_sk and
                d_year = 2002 and
                d_moy between 3 and 3+3) and
   (exists (select *
            from web_sales,date_dim
            where c.c_customer_sk = ws_bill_customer_sk and
                  ws_sold_date_sk = d_date_sk and
                  d_year = 2002 and
                  d_moy between 3 ANd 3+3) or 
    exists (select * 
            from catalog_sales,date_dim
            where c.c_customer_sk = cs_ship_customer_sk and
                  cs_sold_date_sk = d_date_sk and
                  d_year = 2002 and
                  d_moy between 3 and 3+3))
 group by cd_gender,
          cd_marital_status,
          cd_education_status,
          cd_purchase_estimate,
          cd_credit_rating,
          cd_dep_count,
          cd_dep_employed_count,
          cd_dep_college_count
 order by cd_gender,
          cd_marital_status,
          cd_education_status,
          cd_purchase_estimate,
          cd_credit_rating,
          cd_dep_count,
          cd_dep_employed_count,
          cd_dep_college_count
limit 100;

This query correlates data between customer and the tables in each subquery. Rewriting this can be tricky because you need to make sure you don’t duplicate data when joining to the subqueries.

Example 4: INTERSECT
INTERSECT is a feature used in SQL that is similar to UNION or EXCEPT. It gets a distinct list of values from two queries that are contained in both queries. More information on this feature: https://www.postgresql.org/docs/8.2/static/sql-select.html#SQL-INTERSECT

This is query 8 from the TPC-DS benchmark:

select  s_store_name
      ,sum(ss_net_profit)
 from store_sales
     ,date_dim
     ,store,
     (select ca_zip
     from (
     (SELECT substr(ca_zip,1,5) ca_zip
      FROM customer_address
      WHERE substr(ca_zip,1,5) IN (
                          '89436','30868','65085','22977','83927','77557',
                          '58429','40697','80614','10502','32779',
                          '91137','61265','98294','17921','18427',
                          '21203','59362','87291','84093','21505',
                          '17184','10866','67898','25797','28055',
                          '18377','80332','74535','21757','29742',
                          '90885','29898','17819','40811','25990',
                          '47513','89531','91068','10391','18846',
                          '99223','82637','41368','83658','86199',
                          '81625','26696','89338','88425','32200',
                          '81427','19053','77471','36610','99823',
                          '43276','41249','48584','83550','82276',
                          '18842','78890','14090','38123','40936',
                          '34425','19850','43286','80072','79188',
                          '54191','11395','50497','84861','90733',
                          '21068','57666','37119','25004','57835',
                          '70067','62878','95806','19303','18840',
                          '19124','29785','16737','16022','49613',
                          '89977','68310','60069','98360','48649',
                          '39050','41793','25002','27413','39736',
                          '47208','16515','94808','57648','15009',
                          '80015','42961','63982','21744','71853',
                          '81087','67468','34175','64008','20261',
                          '11201','51799','48043','45645','61163',
                          '48375','36447','57042','21218','41100',
                          '89951','22745','35851','83326','61125',
                          '78298','80752','49858','52940','96976',
                          '63792','11376','53582','18717','90226',
                          '50530','94203','99447','27670','96577',
                          '57856','56372','16165','23427','54561',
                          '28806','44439','22926','30123','61451',
                          '92397','56979','92309','70873','13355',
                          '21801','46346','37562','56458','28286',
                          '47306','99555','69399','26234','47546',
                          '49661','88601','35943','39936','25632',
                          '24611','44166','56648','30379','59785',
                          '11110','14329','93815','52226','71381',
                          '13842','25612','63294','14664','21077',
                          '82626','18799','60915','81020','56447',
                          '76619','11433','13414','42548','92713',
                          '70467','30884','47484','16072','38936',
                          '13036','88376','45539','35901','19506',
                          '65690','73957','71850','49231','14276',
                          '20005','18384','76615','11635','38177',
                          '55607','41369','95447','58581','58149',
                          '91946','33790','76232','75692','95464',
                          '22246','51061','56692','53121','77209',
                          '15482','10688','14868','45907','73520',
                          '72666','25734','17959','24677','66446',
                          '94627','53535','15560','41967','69297',
                          '11929','59403','33283','52232','57350',
                          '43933','40921','36635','10827','71286',
                          '19736','80619','25251','95042','15526',
                          '36496','55854','49124','81980','35375',
                          '49157','63512','28944','14946','36503',
                          '54010','18767','23969','43905','66979',
                          '33113','21286','58471','59080','13395',
                          '79144','70373','67031','38360','26705',
                          '50906','52406','26066','73146','15884',
                          '31897','30045','61068','45550','92454',
                          '13376','14354','19770','22928','97790',
                          '50723','46081','30202','14410','20223',
                          '88500','67298','13261','14172','81410',
                          '93578','83583','46047','94167','82564',
                          '21156','15799','86709','37931','74703',
                          '83103','23054','70470','72008','49247',
                          '91911','69998','20961','70070','63197',
                          '54853','88191','91830','49521','19454',
                          '81450','89091','62378','25683','61869',
                          '51744','36580','85778','36871','48121',
                          '28810','83712','45486','67393','26935',
                          '42393','20132','55349','86057','21309',
                          '80218','10094','11357','48819','39734',
                          '40758','30432','21204','29467','30214',
                          '61024','55307','74621','11622','68908',
                          '33032','52868','99194','99900','84936',
                          '69036','99149','45013','32895','59004',
                          '32322','14933','32936','33562','72550',
                          '27385','58049','58200','16808','21360',
                          '32961','18586','79307','15492'))
     intersect
     (select ca_zip
      from (SELECT substr(ca_zip,1,5) ca_zip,count(*) cnt
            FROM customer_address, customer
            WHERE ca_address_sk = c_current_addr_sk and
                  c_preferred_cust_flag='Y'
            group by ca_zip
            having count(*) > 10)A1))A2) V1
 where ss_store_sk = s_store_sk
  and ss_sold_date_sk = d_date_sk
  and d_qoy = 1 and d_year = 2002
  and (substr(s_zip,1,2) = substr(V1.ca_zip,1,2))
 group by s_store_name
 order by s_store_name
 limit 100;

The fix for this is to change the INTERSECT to UNION and make this another subquery that also does a GROUP BY so you have a distinct list of zip codes.

Summary
Of the 99 TPC-DS queries, 19 contains SQL that is not supported by Hive. Pivotal HDB can execute all 99 TPC-DS queries without modification. So not only it is much faster, it is easier to migrate from a legacy RDBMS to Hadoop and easier to start using because the SQL language support is so robust.

Loading Data into HAWQ

Loading data into the database is required to start using it but how? There are several approaches to achieve this basic requirement but achieve the result by approaching the problem in different ways. This allows you to load data that best matches your use case.

Table Setup
This table will be used for the testing in HAWQ. I have this table created in a single node VM running Hortonworks HDP with HAWQ 2.0 installed. I’m using the default Resource Manager too.

CREATE TABLE test_data
(id int,
 fname text,
 lname text)
 DISTRIBUTED RANDOMLY;

Singleton
Let’s start with probably the worst way first. Sometimes this way is ideal because you have very little data to load but in most cases, avoid singleton inserts. This approach inserts just a single tuple in a single transaction.

head si_test_data.sql
insert into test_data (id, fname, lname) values (1, 'jon_00001', 'roberts_00001');
insert into test_data (id, fname, lname) values (2, 'jon_00002', 'roberts_00002');
insert into test_data (id, fname, lname) values (3, 'jon_00003', 'roberts_00003');
insert into test_data (id, fname, lname) values (4, 'jon_00004', 'roberts_00004');
insert into test_data (id, fname, lname) values (5, 'jon_00005', 'roberts_00005');
insert into test_data (id, fname, lname) values (6, 'jon_00006', 'roberts_00006');
insert into test_data (id, fname, lname) values (7, 'jon_00007', 'roberts_00007');
insert into test_data (id, fname, lname) values (8, 'jon_00008', 'roberts_00008');
insert into test_data (id, fname, lname) values (9, 'jon_00009', 'roberts_00009');
insert into test_data (id, fname, lname) values (10, 'jon_00010', 'roberts_00010');

This repeats for 10,000 tuples.

time psql -f si_test_data.sql > /dev/null
real	5m49.527s

As you can see, this is pretty slow and not recommended for inserting large amounts of data. Nearly 6 minutes to load 10,000 tuples is crawling.

COPY
If you are familiar with PostgreSQL then you will feel right at home with this technique. This time, the data is in a file named test_data.txt and it is not wrapped with an insert statement.

head test_data.txt
1|jon_00001|roberts_00001
2|jon_00002|roberts_00002
3|jon_00003|roberts_00003
4|jon_00004|roberts_00004
5|jon_00005|roberts_00005
6|jon_00006|roberts_00006
7|jon_00007|roberts_00007
8|jon_00008|roberts_00008
9|jon_00009|roberts_00009
10|jon_00010|roberts_00010
COPY test_data FROM '/home/gpadmin/test_data.txt' WITH DELIMITER '|';
COPY 10000
Time: 128.580 ms

This method is significantly faster but it loads the data through the master. This means it doesn’t scale well as the master will become the bottleneck but it does allow you to load data from a host anywhere on your network so long as it has access to the master.

gpfdist
gpfdist is a web server that serves posix files for the segments to fetch. Segment processes will get the data directly from gpfdist and bypass the master when doing so. This enables you to scale by adding more gpfdist processes and/or more segments.

gpfdist -p 8888 &
[1] 128836
[gpadmin@hdb ~]$ Serving HTTP on port 8888, directory /home/gpadmin

Now you’ll need to create a new external table to read the data from gpfdist.

CREATE EXTERNAL TABLE gpfdist_test_data
(id int,
 fname text,
 lname text)
LOCATION ('gpfdist://hdb:8888/test_data.txt')
FORMAT 'TEXT' (DELIMITER '|');

And to load the data.

INSERT INTO test_data SELECT * FROM gpfdist_test_data;
INSERT 0 10000
Time: 98.362 ms

gpfdist is blazing fast and scales easily. You can add more than one gpfdist location in the external table, use wild cards, use different formats, and much more. The downside is the file must be on a host that all segments can reach. You also have to create a separate gpfdist process on that host.

gpload
gpload is a utility that automates the loading process by using gpfdist. Review the documentation for more on this utility. Technically, it is the same as gpfdist and external tables but just automates the commands for you.

Programmable Extension Framework (PXF)
PXF allows you to read and write data to HDFS using external tables. Like using gpfdist, it is done by each segment so it scales and executes in parallel.

For this example, I’ve loaded the test data into HDFS.

hdfs dfs -cat /test_data/* | head
1|jon_00001|roberts_00001
2|jon_00002|roberts_00002
3|jon_00003|roberts_00003
4|jon_00004|roberts_00004
5|jon_00005|roberts_00005
6|jon_00006|roberts_00006
7|jon_00007|roberts_00007
8|jon_00008|roberts_00008
9|jon_00009|roberts_00009
10|jon_00010|roberts_00010

The external table definition.

CREATE EXTERNAL TABLE et_test_data
(id int,
 fname text,
 lname text)
LOCATION ('pxf://hdb:51200/test_data?Profile=HdfsTextSimple')
FORMAT 'TEXT' (DELIMITER '|');

And now to load it.

INSERT INTO test_data SELECT * FROM et_test_data;
INSERT 0 10000
Time: 227.599 ms

PXF is probably the best way to load data when using the “Data Lake” design. You load your raw data into HDFS and then consume it with a variety of tools in the Hadoop ecosystem. PXF can also read and write other formats.

Outsourcer and gplink
Last but not least are software programs I created. Outsourcer automates the table creation and load of data directly to Greenplum or HAWQ using gpfdist. It sources data from SQL Server and Oracle as these are the two most common OLTP databases.

gplink is another tool that can read external data but this technique can connect to any valid JDBC source. It doesn’t automate many of the steps that Oustourcer does but it is a convenient tool to get data from a JDBC source.

You might be thinking that sqoop does this but not exactly. gplink and Outsourcer load data into HAWQ and Greenplum tables. It is optimized for these databases and fixes data for you automatically. Both remove null and newline characters and escapes the escape and delimiter characters. With sqoop, you will have to read the data from HDFS using PXF and then fix the errors that could be in the files.

Both tools are linked above.

Summary
This post gives a brief description on the various ways to load data into HAWQ. Pick the right technique for your use case. As you can see, HAWQ is very flexible and can handle a variety of ways to load data.

Pivotal HDB 2.0 (Apache HAWQ) Table Distribution

Pivotal HDB version 2.0 is very close to being generally available and how table distribution works between this version and 1.3 which are worth mentioning.

Distribution
HDB is a fork of Greenplum Database which is an MPP database. Greenplum distributes or shards the data across multiple “segments” which are located on multiple “segment hosts”. The distribution is typically set by a hash of a column or set of columns.

Example:

CREATE TABLE customer 
(customer_id int,
 customer_name text,
 customer_address text)
DISTRIBUTED BY (customer_id);

In Greenplum, this would create a file in each segment in each segment host. If you made the table column oriented, the number of files increases with a file per column, per segment. Add in partitioning which again uses separate files for each partition, you end up with possibly thousands of files for a single table. This is great for an MPP database with a robust optimizer that can skip scanning files it doesn’t need to in order to execute the query in the fastest way possible.

HDB 1.3 uses the same design pattern as Greenplum but it stores the files in HDFS. Hadoop loves big files but doesn’t work optimally with lots of files which meant that you didn’t typically use column orientation and partitions were larger.

HDB 2.0 Distribution
1. There are now a dynamic number of segment processes per host. There is just a single segment directory per data node and the database will dynamically create the number of buckets as needed.

2. When you create a table with the distribution set (as shown above), the number of buckets is fixed. This is set with the GUC default_hash_table_bucket_number which sets the number of buckets per host.

3. When you create a random distribution table, the number of buckets is dynamic.

So how does this work? Take our example “customer” table above with the distribution set to (customer_id).

INSERT INTO customer SELECT i, 'company_' || i, i || ' main st' 
FROM generate_series(1,1000) AS i;

Query returned successfully: 1000 rows affected, 784 msec execution time.

Now let’s go look at the files.

--use the OID values to find the location in HDFS
SELECT oid FROM pg_database WHERE datname = 'gpadmin';

16508

SELECT c.oid
FROM pg_namespace n
JOIN pg_class c ON n.oid = c.relnamespace
WHERE n.nspname = 'public'
AND c.relname = 'customer'
AND c.relkind = 'r';

24591

So our data files are in HDFS. This is just a single node VM I’m working with one segment. It has default_hash_table_bucket_number set to 6 so HDB will create 6 buckets of data in HDFS.

hdfs dfs -ls /hawq_default/16385/16508/24591
Found 6 items
-rw-------   1 gpadmin hdfs       6776 2016-05-04 16:51 /hawq_default/16385/16508/24591/1
-rw-------   1 gpadmin hdfs       6768 2016-05-04 16:51 /hawq_default/16385/16508/24591/2
-rw-------   1 gpadmin hdfs       6688 2016-05-04 16:51 /hawq_default/16385/16508/24591/3
-rw-------   1 gpadmin hdfs       6728 2016-05-04 16:51 /hawq_default/16385/16508/24591/4
-rw-------   1 gpadmin hdfs       7600 2016-05-04 16:51 /hawq_default/16385/16508/24591/5
-rw-------   1 gpadmin hdfs       7488 2016-05-04 16:51 /hawq_default/16385/16508/24591/6

Now recreate this table with random distribution, insert the data, and look at the files.

DROP TABLE IF EXISTS customer;
CREATE TABLE customer 
(customer_id int,
 customer_name text,
 customer_address text)
DISTRIBUTED RANDOMLY;

INSERT INTO customer SELECT i, 'company_' || i, i || ' main st' 
FROM generate_series(1,1000) AS i;

SELECT c.oid
FROM pg_namespace n
JOIN pg_class c ON n.oid = c.relnamespace
WHERE n.nspname = 'public'
AND c.relname = 'customer'
AND c.relkind = 'r';

24596

[gpadmin@hdp23 ~]$ hdfs dfs -ls /hawq_default/16385/16508/24596
Found 1 items
-rw-------   1 gpadmin hdfs      41968 2016-05-04 17:02 /hawq_default/16385/16508/24596/1

It only created a single file in HDFS with random distribution.
– This is great for HDFS because there are less files for the namenode to track.
– Allows for elasticity of the cluster. Grow or shrink the cluster without having to redistribute the data.
– The optimizer has also been enhanced to dynamically set the number of buckets based on the demand of the query.

As you might be concluding right about now, RANDOM DISTRIBUTION is the recommendation for tables in HDB 2.0. You can still set your distribution to a hash of a column or columns which will use a static number of buckets but random is recommended.

If you create a table now in HDB 2.0 without setting the distribution, the default will be RANDOM.

Proof

DROP TABLE IF EXISTS customer;
CREATE TABLE customer 
(customer_id int,
 customer_name text,
 customer_address text)
 DISTRIBUTED BY (customer_id);

SELECT sub.attname
FROM pg_namespace n
JOIN pg_class c ON n.oid = c.relnamespace
LEFT JOIN 
(SELECT p.attrelid, p.attname
FROM pg_attribute p
JOIN (SELECT localoid, unnest(attrnums) AS attnum FROM gp_distribution_policy) AS g ON g.localoid = p.attrelid AND g.attnum = p.attnum) AS sub
ON c.oid = sub.attrelid 
WHERE n.nspname = 'public'
AND c.relname = 'customer'
AND c.relkind = 'r';

"customer_id"

Now, recreate the table without setting the distribution key.

DROP TABLE IF EXISTS customer;
CREATE TABLE customer 
(customer_id int,
 customer_name text,
 customer_address text);

SELECT sub.attname
FROM pg_namespace n
JOIN pg_class c ON n.oid = c.relnamespace
LEFT JOIN 
(SELECT p.attrelid, p.attname
FROM pg_attribute p
JOIN (SELECT localoid, unnest(attrnums) AS attnum FROM gp_distribution_policy) AS g ON g.localoid = p.attrelid AND g.attnum = p.attnum) AS sub
ON c.oid = sub.attrelid 
WHERE n.nspname = 'public'
AND c.relname = 'customer'
AND c.relkind = 'r';

--no data

HAWQ Demystified

The genesis of the best SQL engine in Hadoop is not an overnight, “me too” product. It seems that it wasn’t too long ago the Hadoop vendors all but wrote off SQL but the demand just hasn’t gone away.

HAWQ is the result of many, many years of work leveraging open source software as well as contributing back to the open source community. I think a short history lesson is in order to fully understand how this product came to be.

greenplum
Greenplum Database began life in 2003. The founders used open source PostgreSQL and released a commercial product soon after.

bizgres
Bizgres, an open source version of Greenplum, was released in 2005. Very early on, the founders of Greenplum embraced contributing back to the open source community.


Madlib was released in 2010 as an open source project which later became an Apache Incubator project.

EMC
Greenplum was acquired by EMC in 2010 and almost immediately, EMC invested heavily into Hadoop. The Greenplum division was agile like a small startup company but with the deep pockets of a multi-billion dollar company.

GPHD
Greenplum released a Hadoop distribution in 2011 and integration between Greenplum Database and HDFS got more robust with the introduction of “gphdfs”. Greenplum supported External Tables to read/write data in parallel to HDFS from several different distributions.

HAWQ
HAWQ, a fork of Greenplum Database, was released in 2013. HAWQ was immediately extremely performant and compliant with the newest SQL syntax. HAWQ borrowed from the 10 years experience of developing Greenplum to provide a robust optimizer designed for HDFS.

pivotal155px
2013 also saw Pivotal become a company. EMC contributed Greenplum Database, VMWare contributed Gemfire and Cloud Foundry, and GE contributed capital as an active partner. Paul Maritiz became the CEO and the dedication to fully embrace open source became an integral part of the corporate culture.

During the last three years, HAWQ has become an Apache Incubator Project. The Pivotal product is now a rather boring name of “Pivotal HDB” while HAWQ is the name of the Apache project.

Pivotal also made Greenplum and Geode (Gemfire is the commercial product name) open source projects too. Clearly, Pivotal has embraced open source with probably more committers than those other “open source” data companies.

So what now? What is happening in 2016? Well, Pivotal is about to release Pivotal HDB (HAWQ) 2.0. I’ve been testing this product for months on various platforms and I keep getting amazed by the performance and ease of use.

HAWQ 2.0 embraces Hadoop fully. I believe the two biggest features are elasticity and performance. HAWQ now supports elasticity for growing or shrinking clusters without having to redistribute the data. The performance is also much improved as it better utilizes HDFS and YARN.

Pivotal HDB is certified to run on Hortonworks HDP with plans on becoming a first class citizen of the Open Data Platform (ODPi).

So you may be asking, “is it fast?” and the answer is yes! I haven’t found a SQL engine that is faster and I’ve been doing competitive analysis for months. The other question you may ask is, “can I run my SQL?” and the answer is yes! A major competitor in the SQL on Hadoop landscape requires more tweaking of SQL just to get the SQL to execute and more tuning to get decent performance.

That “other SQL on Hadoop” product can’t do the following things, as well as many more, that HAWQ can.

– Can’t handle a comment line at the end of SQL file (I found that rather strange)
– Won’t do partition elimination through a joined table (e.g. date dimension)
– Can’t get number of milliseconds when subtracting from a date. Only gets seconds.
– Has “interval” in SQL dialect but doesn’t have interval as a type.
– No time data type.
– Concatenating strings doesn’t use || or +. You must use concat() function.
– Doesn’t support intersect or except.
– Doesn’t support subqueries that return more than one row.
– Doesn’t support correlated subqueries.
– Doesn’t support group by rollup.
– Doesn’t support subqueries in having statement.
– Subqueries not supported in select list.

HAWQ is the real deal. It is an Apache project, going to be part of ODPi, faster than everyone else, integrated with Apache Ambari, certified with Hortonworks, and the most mature SQL engine for Hadoop.

Analyzedb Performance Metrics

I have already written about AnalyzeDB in this post but I thought I would write another post about it with performance metrics.

The old method that I used to analyze tables was to analyze every table and partition sequentially. Then I would analyze the root partition of partitioned tables. The script would look like this:

Tables and Partitions

psql -t -A -c "SELECT 'ANALYZE ' || n.nspname || '.' || c.relname || ';' FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid WHERE n.nspname = 'tpcds' AND c.relname NOT IN (SELECT DISTINCT tablename FROM pg_partitions p WHERE schemaname = 'tpcds') ORDER BY 1" | psql -e

Root Partitions of Partitioned Tables

psql -t -A -c "SELECT 'ANALYZE ROOTPARTITION ' || n.nspname || '.' || c.relname || ';' FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid WHERE n.nspname = 'tpcds' AND c.relname IN (SELECT DISTINCT tablename FROM pg_partitions p WHERE schemaname = 'tpcds') ORDER BY 1" | psql -e

The second method is with analyzedb which can be done with a single line.

analyzedb -d gpadmin -s tpcds --full -a

The default for analyzedb is to use 5 threads so you can adjust this to maximize performance. You can also take advantange of how analyzedb keeps track of the tables it has analyzed so it won’t unnecessarily analyze tables which makes the process even faster.

Here are some numbers to put this into perspective. I’m using virtual machines with only 1GB of data but the percentage improvement is what we are wanting to measure.

HAWQ 2.0 Beta
Sequential: 18 minutes and 37 seconds
Analyzedb: 8 minutes and 3 seconds
Improvement: 131% faster!

Greenplum Database 4.3
Sequential: 11 minutes and 25 seconds
Analyzedb: 6 minutes and 59 seconds
Improvement: 63% faster!

If you aren’t using analyzedb to maintain your HAWQ and/or Greenplum databases, start using it now! You’ll see much better performance in keeping your tables’ statistics up to date.

Dear Cloudera, Please Stop!

So I was alerted to this blog post by a colleague and I was floored on what Cloudera is doing.

Basically, they are creating a new feature for objects that can store multiple levels in a single table. In a traditional relational structure, you may have an Orders table with another table for Order_Details. You would simply JOIN the two tables together when needed.

CREATE SCHEMA example;

CREATE TABLE example.Orders
(order_id int NOT NULL,
 order_date date NOT NULL,
 customer_id int NOT NULL);

CREATE TABLE example.Order_Details
(order_detail_id int NOT NULL,
 order_id int NOT NULL,
 product_id int NOT NULL,
 quantity int NOT NULL,
 price numeric NOT NULL);

And when you want to query both tables:

SELECT o.order_date, sum(od.price)
FROM example.orders o 
JOIN example.order_details od ON o.order_id = od.order_id
GROUP BY o.order_date;

What Cloudera is doing is an Object approach which combines both tables into a single table using a feature called a STRUCT. This is similar to a composite type in Greenplum, PostgreSQL, and HAWQ.

--Impala
CREATE TABLE Orders
(order_id int NOT NULL,
 order_date date NOT NULL,
 customer_id int NOT NULL,
 Order_Details ARRAY<STRUCT<
    order_detail_id: int,
    product_id: int,
    quantity: int,
    price: numeric>>
);

I get this approach when working with a OLTP system and developers want to match their Classes to the database structure but I really don’t see the benefit of this in a big data platform. It just makes it difficult for Database Users to understand and use the data.

Schema.Table
Every database I’ve ever worked with uses Schema.Table to organize table structures but not Impala! Cloudera has decided to break away from the decades old standard and use the dot notation for their nested fields stored within a STRUCT. If you adopt this silliness, you are adopting a standard that no one else uses. Why would Cloudera want that? Vendor lock-in maybe????

--Impala
select * from orders.order_details;

Orders isn’t the schema in the above example. This is a table. Cloudera just broke SQL!

Cartesian Product? Nope, Just Vendor Specific Syntax
Notice how this query appears to reference two different tables without specifying a join.

SELECT o.order_date, sum(od.price)
FROM orders o, o.order_details od 
GROUP BY o.order_date;

They even allow you to do an OUTER JOIN to the STRUCT ARRAY without defining the columns to join on.

SELECT o.order_date, sum(od.price)
FROM orders o
LEFT OUTER JOIN o.order_details od 
GROUP BY o.order_date;

Reasons Why Cloudera is Doing This
Here are my guesses as to why they are doing this.
1. They hate SQL or just don’t understand it.
2. Vendor Lock-In. They need ways to make it difficult for customers to switch to a competitor’s product.
3. Impala is bad at joins. Organizing tables like this make it easier for Impala’s query optimizer to create a robust query plan.

Please Cloudera, just stop!

TPC-DS Benchmark

TPC
The TPC is a non-profit organization that provides several benchmarks for databases. The two common benchmarks they provide for Data Warehousing and Analytics are the Decision Support (TPC-DS) and Ad-Hoc (TPC-H) benchmarks. More information can be found here: http://www.tpc.org/.

SQL on Hadoop
HAWQ can execute all 99 queries without modification.
– IBM Big SQL can execute all 99 queries but requires modifying 12 queries (as of the most recent publication I can find).
– Impala can’t run all of the queries and many require modifications to execute.
– Hive can’t run all of the queries and many require modifications to execute.

There isn’t much point to compare HAWQ with these other SQL engines when none can execute all of the queries without modification! So… this post will focus on the capabilities of HAWQ as well as the automated TPC-DS benchmark process I have put together.

Sandbox
The tests will use a Hortonworks 2.2 Sandbox with 8GB of RAM and 4 cores dedicated to the VM. The dataset will be 2GB in size which works well in a VM. This is definitely not a huge test but it will demonstrate the capabilities of HAWQ. Again, all 99 queries can run in HAWQ without any modification!

TPC-DS
I’m using the tests provided here: https://github.com/pivotalguru/TPC-DS which I put together to help evaluate different hardware platforms. This script works with HAWQ and with Greenplum database.

This script automates the entire process of generating the data, building the tables, loading the tables, and executing the TPC-DS queries.

Results
1. Compile TPC-DS: 3.4 seconds
2. Generate the Data using dsdgen: 3 minutes 23 seconds
3. Create tables and ensure correct optimizer settings are enabled: 5.5 seconds
4. Load the tables: 2 minutes and 5 seconds
5. Execute all 99 TPC-DS queries: 6 minutes 46 seconds

TPC-DS_2GB

To put this into perspective, I attempted to run a 2GB TPC-DS benchmark in the same VM with another SQL on Hadoop tool and it took 2 hours to just get all of the data loaded! The engine was unable to execute the vendor provided SQL queries either so I gave up on that effort. HAWQ is the way to go!