Wednesday, April 24, 2019

Topics, Partitions & Streaming ... Oh My!

It landed on my shoulders to derive the [longitude,latitude] from a list of addresses.  Naturally, PostGIS is the tool to use however this list is over 11M and anyone who has used geocode() knows it's not exactly, shall we say, quick.

Concurrent Processing On The Cheap

Breaking the list apart it the right route as a single geocode() consume a core but only one core.  Here's the basics on how I did it.

  1. Added an id primary key bigint to my table and populated
  2. Created a query to pull array_agg(id) on distinct addresses
  3. Sent each array_agg(id) as a message to a Kafka topic with 4 partitions
  4. Created a process to consume from a single topic partition
  5. This process again pulled the address for one id in the list
  6. Execute geocode()
  7. Saved long-lat in a table
Simple right?  Here's a diagram to help illustrate:


Before, running one process at a time when by script or SQL would have taken an estimated 5 days.  Breaking the work up because no one row has anything to do with another and executing 4 concurrent processes took roughly 1 day to finish.  Neat, eh?

Before Kafka and similar technologies, I'd wold have sat down and created a program that did much the same except would need to use threads and some form of IPC to hand work out.  Such a tedious and antiquated process reminiscent of the 1990's.

But let's say we don't want to use Kafka.  Let's also say we don't want to script anything.  How can we keep the data in the database and break the work up in such a way as to accomplish the end goal in the least amount of time?  My answer... better technology selection.

Introducing Greenplum (for those who don't know it)

Greenplum was derived from PostgreSQL 8.2 and one of the first MPP (Massive Parallel Processing) RDBMS.  Pivotal, the creator of Greenplum, has since open sourced the project as "gpdb" and can be found on github here if you're interested: https://github.com/greenplum-db/gpdb.  I've opted to sign up on Pivotal's web site and downloaded the official Greenplum for trial purposes.

The entire premise of Greenplum is many hands make light work.  No data is stored on the master but rather is hashed, sent to 1 of many segments (PostgreSQL instances) running on backend nodes.  This effectively shards the data and permits the capability of taking advantage of all available I/O on each system, cores and memory.  Greenplum is your ultimate horizontal scaling RDBMS solution allowing for up to 1,024 backend nodes.  Given the need (and budget), a single cluster can have petabytes of memory and disk, hundreds or perhaps thousands of cores, and IOPS in the millions.

If you're a systems and data geek, you're drooling right now.

Basic architecture looks like this:


Here are the highlights:
  • This illustration has only two backend nodes shown but as mentioned it can be up to 1,024
  • Each backend has two primary segments (pseg) and two secondary segments (sseg)
  • Ideally every primary segment runs on a LUN and disk separate from all others
  • Primary segments replicate to a secondary on a different backend node
  • Backend nodes are optional
  • The master database has a standby or master-mirror
Being originally from PostgreSQL 8.2, Greenplum utilizes WAL file shipping to replicate.  Yuck right?  Well, the open source project is quickly catching up and as of this blog the stable branch is up to speed with PostgreSQL 9.4.  I am not fully familiar with what replication capabilities have changed but anticipate streaming replication will become the defacto standard soon enough.

My GIS Configuration

I won't get into setting it up now but will describe the cluster.
  1. A single master
  2. Two nodes
  3. Four primary segment on each node
After creating the database, I downloaded and loaded the Greenplum safe version of PostGIS.  There are a couple other notable add-ons includes GPText for full text search capabilities and Apache MADlib for all your math geeky, statistical needs.

This configuration has effectively 8 PostgreSQL instances.  So how does it run?

After loading TIGER data (what a bother that was), loading my data, using a single SQL statement creating the address, geocode() and inserting into a table the time it took to complete was close to 14 hours.  I've gone from 5 days, to 1 day, to 14 hours.  Not too shabby.

I don't have a huge budget, in fact I haven't a budget at all.  This cluster is running on a single host within virtual machines.  I've done what I could to have each node on its own disk.  Perhaps given some $$ and time, I could run the same test with Greenplum in AWS.  I've done that in the past with great results.  Certainly it would reduce the time even further but I reached my objective.

Tuesday, April 23, 2019

PostgreSQL Progress Bar


We've all been there, right?  Large statement, insert / update / delete / select, taking for... ev... er...  We understand it is hard to predict when these things will complete but wouldn't it be neat if there was a way to provide a progress bar?

Obviously, there is a solution.  :)

Let's not get ahead of ourselves.  The solution I am presenting will not include parallelism.  Could it be done?  Certainly with additional scripting external to the database however the problem I am wanting to address is simply providing feedback for a single query.  Keep that objective in mind.


Enter PostgreSQL Version 11 and Transaction Management

Available in PostgreSQL version 11 is the ability to COMMIT and ROLLBACK within a procedure or anonymous code block (DO).  There are limitations to this control including:

  1. CALL or DO only (sorry FUNCTION)
  2. Loops on CURSOR
  3. UPDATE RETURNING

If your statement can be batched, we can break things up in such a way as to provide the ever elusive SQL progress bar.


What is postgres_fdw doing here?

I'm glad this question occurred to you to ask.  PostgreSQL is an ACID compliant MVCC RDBMS.  A single query executes in one transaction and no process outside of that transaction can see what it is doing or, more to the point, what has been done.

What I thought postgres_fdw brought to the table is the ability for a single controlling function to execute DML on a foreign table and thus data made available to other sessions.  That was my thought when I started down this post but it is simply not the case.  When DML on a foreign table is being performed, the remote session is also in a transaction and not until the local session is COMMIT'd is the remote session also COMMIT'd.

Well, buggers.  At least I tried and learned a little.


The Basics

We need something to operate on:

CREATE TABLE output(
  id        serial primary key,
  created   timestamp default now(),
  uu_id     uuid not null
);

Demo

About time I got to it.  Now, let's say we have that huge statement that takes forever.

INSERT INTO batch_output (id, created, uu_id)
SELECT nextval('output_id_seq'), now(), uuid_generate_v1()
  FROM generate_series(1, 1000, 1);

Okay, it's not that big but what if it were and took hours to complete and the boss wants feedback as it progresses?  Could use an anonymous code block?

DO $$
DECLARE
  i  int;
BEGIN
  FOR i IN 1..10
  LOOP
    INSERT INTO output (id, created, uu_id)
    SELECT nextval('output_id_seq'), now(), uuid_generate_v1()
      FROM generate_series(1, 100, 1);

    RAISE NOTICE '% pct done, xact id %', i*10, txid_current();
  END LOOP;
END $$;
NOTICE:  10 pct done, xact id 1551874
NOTICE:  20 pct done, xact id 1551874
NOTICE:  30 pct done, xact id 1551874
NOTICE:  40 pct done, xact id 1551884
NOTICE:  50 pct done, xact id 1551884
NOTICE:  60 pct done, xact id 1551884
NOTICE:  70 pct done, xact id 1551884
NOTICE:  80 pct done, xact id 1551884
NOTICE:  90 pct done, xact id 1551884
NOTICE:  100 pct done, xact id 1551884
DO

No.  Doesn't meet the objective where other processes want access to the data as everything is in a single transaction.  Note xact id 1551884 at every notice.  We need to COMMIT in the loop.

DO $$
DECLARE
  i  int;
BEGIN
  FOR i IN 1..10
  LOOP
    INSERT INTO batch_output (id, created, uu_id)
    SELECT nextval('output_id_seq'), now(), uuid_generate_v1()
      FROM generate_series(1, 100, 1);

    RAISE NOTICE '% pct done, xact id %', i*10, txid_current();

    COMMIT;
  END LOOP;
END $$;
NOTICE:  10 pct done, xact id 1551876
NOTICE:  20 pct done, xact id 1551877
NOTICE:  30 pct done, xact id 1551878
NOTICE:  40 pct done, xact id 1551880
NOTICE:  50 pct done, xact id 1551884
NOTICE:  60 pct done, xact id 1551885
NOTICE:  70 pct done, xact id 1551886
NOTICE:  80 pct done, xact id 1551887
NOTICE:  90 pct done, xact id 1551888
NOTICE:  100 pct done, xact id 1551890
DO

What's going on?

  1. Obviously we're looping
  2. Our batch sizes are 100
  3. Output is provided showing progress and transaction id
  4. Transaction is COMMIT'd
To wrap up the illustration, here is a list of timestamps for records created:

SELECT created,count(*) FROM output GROUP BY 1;
          created           | count
----------------------------+-------
 2019-04-23 13:07:41.476109 |   100
 2019-04-23 13:07:41.25541  |   100
 2019-04-23 13:07:41.30827  |   100
 2019-04-23 13:07:41.49809  |   100
 2019-04-23 13:07:41.392188 |   100
 2019-04-23 13:07:41.521092 |   100
 2019-04-23 13:07:41.447075 |   100
 2019-04-23 13:07:41.567168 |   100
 2019-04-23 13:07:41.542118 |   100
 2019-04-23 13:07:41.277067 |   100
(10 rows)

It's quite that simple.  


Caveats

In addition to those mentioned above, two things are worth mentioning.

First, a child stored procedures called by a parent stored procedure or anonymous code block may COMMIT or ROLLBACK as long as CALL is used.  Okay, that's not too hard to remember.

Second, a child stored procedure called by a FUNCTION may not perform transaction controls.


I hope you learned something here.  I certainly did.