Foreign Data

Foreign data wrappers are a new feature introduced in PostgreSQL 9.1. They allow executing SQL queries on data that's external to the database. To enable them, you need to define a foreign data wrapper that can convert the foreign data from its original form into PostgreSQL's internal tuple format.

For us, the really exciting part is enabling this functionality in CitusDB. We think having the ability to run SQL on distributed data, without having to do any data loads, provides substantial value. In the following, we describe examples where we run queries on distributed flat files and MongoDB. Through writing different foreign data wrappers, other data formats can be processed as well.

File Foreign Data Wrapper

One example foreign data wrapper that comes with PostgreSQL's contrib package is file_fdw. This wrapper handles flat files with tabular data, and several good tutorials for using it with PostgreSQL already exist.

CitusDB extends on this functionality by creating a foreign table for each flat file, and distributing SQL queries across these foreign tables. The CitusDB master node can efficiently parallelize filters, groupings, sorts, limits, aggregations, most table joins, and certain subselects.

To demonstrate this with an example, we describe a setup that uses customer reviews data files from Amazon. We start by showing commands to install the foreign data wrapper package on the local node for 64-bit Fedora or Ubuntu systems. If you'd like to run foreign data wrappers on a multiple node cluster, you'll need to repeat these installation steps on all the nodes.

localhost# wget http://packages.citusdata.com/contrib/citusdb-contrib-3.0.0-1.x86_64.rpm
localhost# sudo rpm --install citusdb-contrib-3.0.0-1.x86_64.rpm

OR

localhost# wget http://packages.citusdata.com/contrib/citusdb-contrib-3.0.0-1.amd64.deb
localhost# sudo dpkg --install citusdb-contrib-3.0.0-1.amd64.deb

Then, you need to download customer reviews data for years 1998 and 1999. These data include 1.75M customer reviews, and more data from subsequent years (2000 to 2004) are also available.

localhost# wget http://examples.citusdata.com/customer_reviews_1998.csv.gz
localhost# wget http://examples.citusdata.com/customer_reviews_1999.csv.gz

localhost# gzip -d customer_reviews_1998.csv.gz
localhost# gzip -d customer_reviews_1999.csv.gz

Now, we need to create an extension and server for foreign files, and then create a distributed foreign table associated with the file server. Note that you need to connect to the master database to issue these commands.

localhost# /opt/citusdb/3.0/bin/psql -h localhost -p 5432 -d postgres

postgres# CREATE EXTENSION file_fdw;
postgres# CREATE SERVER file_server FOREIGN DATA WRAPPER file_fdw;

postgres# CREATE FOREIGN TABLE customer_reviews
(
    customer_id TEXT not null,
    review_date DATE not null,
    review_rating INTEGER not null,
    review_votes INTEGER,
    review_helpful_votes INTEGER,
    product_id CHAR(10) not null,
    product_title TEXT not null,
    product_sales_rank BIGINT,
    product_group TEXT,
    product_category TEXT,
    product_subcategory TEXT,
    similar_product_ids CHAR(10)[]
)
DISTRIBUTE BY APPEND (review_date)
SERVER file_server
OPTIONS (filename '', format 'csv');

The first two commands are standard in PostgreSQL. The last command creates a new distributed foreign table, and has almost the same syntax as PostgreSQL's create foreign table command. The only difference is that the command has a distribute by append clause at the end. In this context, the append clause tells the database to extract and keep the minimum and maximum values for review_dates in each foreign file.

Now, let's "upload" our customer reviews files to the database by specifying the correct file path. In practice, the following commands simply change table related metadata on the master and the worker node. They initiate file copy operations only if the replication factor is set to more than 1.

postgres=# \STAGE customer_reviews FROM '/home/user/customer_reviews_1998.csv'
postgres=# \STAGE customer_reviews FROM '/home/user/customer_reviews_1999.csv'

Once the \stage command finishes, you are now ready to run analytic queries on these flat files. CitusDB will partition an incoming analytic query into one query per foreign file, run these queries in parallel, and merge their results.

-- Find all popular reviews made on the Dune series in the spring of 1998.

SELECT
    customer_id, review_date, review_rating, product_id, product_title
FROM
    customer_reviews
WHERE
    product_title LIKE '%Dune%' AND
    review_votes >= 10 AND
    review_date >= '1998-03-01' AND
    review_date < date '1998-03-01' + interval '3' month;

-- Do we have a correlation between a book's title's length and its review ratings?

SELECT
    width_bucket(length(product_title), 1, 50, 5) title_length_bucket,
    round(avg(review_rating), 2) AS review_average,
    count(*)
FROM
   customer_reviews
WHERE
    product_group = 'Book'
GROUP BY
    title_length_bucket
ORDER BY
    title_length_bucket;

MongoDB Foreign Data Wrapper

Foreign data wrappers can also enable SQL queries on semi-structured data. For this, the wrapper needs to establish a convention that maps semi-structured data fields into table columns in PostgreSQL. To demonstrate such an example mapping, we implemented an open source wrapper named mongo_fdw.

In the following, we assume that you already have a MongoDB installation, and describe doing a fresh installation from CitusDB packages. You can also use regular PostgreSQL 9.3 packages; we chose our packages to ensure consistent use of directory paths.

Now, let's first download some example JSON files and import them into MongoDB. We use customer reviews data from Amazon for this example, and download 1.75M reviews for years 1998 and 1999. More data from subsequent years are also available.

localhost# wget http://examples.citusdata.com/customer_reviews_1998.json.gz
localhost# wget http://examples.citusdata.com/customer_reviews_1999.json.gz

localhost# gzip -d customer_reviews_1998.json.gz
localhost# gzip -d customer_reviews_1999.json.gz

localhost# mongoimport --port 27017 --db test --collection customer_reviews \
           --type json --file customer_reviews_1998.json
localhost# mongoimport --port 27017 --db test --collection customer_reviews \
           --type json --file customer_reviews_1999.json

To install the MongoDB foreign data wrapper, please download the code from our github account here, and follow the build instructions as described in the README. Then, we need to install packages for CitusDB; and we have these packages available for different platforms and platform versions. If you are using Fedora 12+ or Ubuntu 10.04+ on a 64-bit machine:

localhost# wget http://packages.citusdata.com/readline-6.0/citusdb-3.0.1-1.x86_64.rpm
localhost# sudo rpm --install citusdb-3.0.1-1.x86_64.rpm

OR

localhost# wget http://packages.citusdata.com/readline-6.0/citusdb-3.0.1-1.amd64.deb
localhost# sudo dpkg --install citusdb-3.0.1-1.amd64.deb

After installing the PostgreSQL database, you are now ready to start it up.

localhost# /opt/citusdb/3.0/bin/pg_ctl -D /opt/citusdb/3.0/data -l logfile start

Next, you can connect to the PostgreSQL database, and create a foreign table for the customer_reviews collection.

localhost# /opt/citusdb/3.0/bin/psql -h localhost -d postgres

postgres=# CREATE EXTENSION mongo_fdw;
postgres=# CREATE SERVER mongo_server FOREIGN DATA WRAPPER mongo_fdw
           OPTIONS (address '127.0.0.1', port '27017');

postgres=# CREATE FOREIGN TABLE customer_reviews
(
    customer_id TEXT,
    review_date TIMESTAMP,
    review_rating INTEGER,
    product_id CHAR(10),
    product_title TEXT,
    product_group TEXT,
    product_category TEXT,
    similar_product_ids CHAR(10)[]
)
SERVER mongo_server
OPTIONS (database 'test', collection 'customer_reviews');

The first command here loads the foreign data wrapper extension into the database; this needs to happen only once after a database install. The following commands create a new server and foreign table, and associate the foreign table with the customer_reviews collection. Note that the foreign table schema includes only some of the fields defined in the BSON documents. This is fine as we don't impose any restrictions on the number or order of column definitions so as long as the column and field names match.

In here, we also specify the default option values for server and foreign table commands. In practice, if your option values are the same as defaults, you don't need to specify these.

Another point that is worth mentioning is that mongo_fdw requires certain column names to be declared in double quotes. In particular, BSON document keys that contain upper-case letters or that occur within a nested document, need to be quoted when declared as column names. For example, a nested field such as "review": { "Votes": 19 } should be declared as "review.Votes" INTEGER in the foreign table schema.

Once you create the foreign table, you are now ready to run SQL queries on it. In the following, we show a rather simple query, but complex queries that involve sub-selects, SQL window functions, and collection joins are also possible. In fact, you can even join a MongoDB collection with a PostgreSQL table.

-- Do we have a correlation between a book's title's length and its review ratings?

SELECT
    round(avg(review_rating), 2),
    width_bucket(length(product_title), 1, 50, 5) as title_length,
    count(*)
FROM
    customer_reviews
WHERE
    product_group='Book' AND
    review_date >= '1998-01-01' AND
    review_date < date '1998-01-01' + interval '1 year'
GROUP BY
    title_length
ORDER BY
    title_length;

Behind the covers, mongo_fdw first takes all filtering expressions in the WHERE clause, and converts these expressions into their equivalents in the Mongo query language. The wrapper then sends this query to MongoDB, fetches all documents that fit the query criteria, and converts these documents into PostgreSQL's internal tuple format.

[conn2] query test.customer_reviews query: { product_group: "Book", review_date: { $gte:
new Date(883612800000), $lt: new Date(915148800000) } }
[conn2] getmore test.customer_reviews query: { product_group: "Book", review_date: { $gte: 
new Date(883612800000), $lt: new Date(915148800000) } } cursorid:8599789499140355936
...

As the wrapper converts fetched documents into tuples, the PostgreSQL executor iterates over tuples and executes the query plan. To see this plan and estimated query execution costs, you can run EXPLAIN along with your queries. Currently, mongo_fdw only incorporates the document count and average document size when estimating costs for different query plans. A better cost estimate also includes data distribution statistics; and the newer PostgreSQL 9.2 APIs allow for making use of such statistics.


Columnar Store Wrapper

Foreign data wrappers can also enable new file storage techniques. An example of this is the cstore_fdw foreign data wrapper which stores data on disk using a columnar storage format and leverages the benefits of this format when querying that data. The benefits of this columnar storage implementation are covered in detail here but briefly they are; faster queries, less disk I/O, and less data stored on disk.

In order to use cstore_fdw with CitusDB we will first need to install the cstore_fdw extension on all nodes in the CitusDB cluster by following the instructions in the Building section of the cstore_fdw documentation. Note that in the linked example pg_config from PostgreSQL is included on the PATH during building, when building we'll want to ensure the pg_config from CitusDB is on our PATH.

PATH=/opt/citusdb/3.0/bin/:$PATH make
sudo PATH=/opt/citusdb/3.0/bin/:$PATH make install

For our example we will use cstore_fdw to store and query data from the customer reviews dataset. First we'll download these files:

localhost# wget http://examples.citusdata.com/customer_reviews_1998.json.gz
localhost# wget http://examples.citusdata.com/customer_reviews_1999.json.gz

localhost# gzip -d customer_reviews_1998.json.gz
localhost# gzip -d customer_reviews_1999.json.gz

Next we'll create a distributed version of the customer_reviews table from the previous examples. To do this we run the following on the CitusDB master node:

-- load extension first time after install
CREATE EXTENSION cstore_fdw;

-- create server object
CREATE SERVER cstore_server FOREIGN DATA WRAPPER cstore_fdw;

-- create foreign table
CREATE FOREIGN TABLE customer_reviews
(
    customer_id TEXT,
    review_date DATE,
    review_rating INTEGER,
    review_votes INTEGER,
    review_helpful_votes INTEGER,
    product_id CHAR(10),
    product_title TEXT,
    product_sales_rank BIGINT,
    product_group TEXT,
    product_category TEXT,
    product_subcategory TEXT,
    similar_product_ids CHAR(10)[]
)
DISTRIBUTE BY APPEND(customer_reviews)
SERVER cstore_server
OPTIONS(filename '', compression 'pglz');

CitusDB will manage creating the extension on worker nodes and choosing a filename to store the table data. We can now load data into CitusDB as we typically would using the \STAGE command:

\STAGE customer_reviews FROM '/home/user/customer_reviews_1998.csv' WITH CSV;
\STAGE customer_reviews FROM '/home/user/customer_reviews_1999.csv' WITH CSV;

We can now query the distributed customer_reviews table. For example the following SQL query calculates the average customer review by product category without having to read unneeded fields from disk:

SELECT 
    product_category, 
    avg(review_rating) as avg_rating 
FROM 
    customer_reviews 
GROUP BY 
    product_category
ORDER BY 
    avg_rating DESC;

The benefits of columnar storage can mainly be seen with workloads that make heavy use of disks. The relatively small size of the example customer review data allows it to fit into memory on many machines in which case it is unlikely you will see significant performance improvements.