PostgreSQL pgbench tpcb 数据生成与SQL部分源码解读

4 minute read

背景

pgbench是PG的一款测试工具,内置的测试CASE为tpcb测试。同时支持用户自己写测试CASE。

大量自定义CASE参考

https://github.com/digoal/blog/blob/master/201711/readme.md

本文为pgbench 内置tpcb的解读。

源码

src/bin/pgbench/pgbench.c

表结构

/*  
 * Create pgbench's standard tables  
 */  
static void  
initCreateTables(PGconn *con)  
{  
        /*  
         * The scale factor at/beyond which 32-bit integers are insufficient for  
         * storing TPC-B account IDs.  
         *  
         * Although the actual threshold is 21474, we use 20000 because it is  
         * easier to document and remember, and isn't that far away from the real  
         * threshold.  
         */  
#define SCALE_32BIT_THRESHOLD 20000  
  
        /*  
         * Note: TPC-B requires at least 100 bytes per row, and the "filler"  
         * fields in these table declarations were intended to comply with that.  
         * The pgbench_accounts table complies with that because the "filler"  
         * column is set to blank-padded empty string. But for all other tables  
         * the columns default to NULL and so don't actually take any space.  We  
         * could fix that by giving them non-null default values.  However, that  
         * would completely break comparability of pgbench results with prior  
         * versions. Since pgbench has never pretended to be fully TPC-B compliant  
         * anyway, we stick with the historical behavior.  
         */  
        struct ddlinfo  
        {  
                const char *table;              /* table name */  
                const char *smcols;             /* column decls if accountIDs are 32 bits */  
                const char *bigcols;    /* column decls if accountIDs are 64 bits */  
                int                     declare_fillfactor;  
        };  
        static const struct ddlinfo DDLs[] = {  
                {  
                        "pgbench_history",  
                        "tid int,bid int,aid    int,delta int,mtime timestamp,filler char(22)",  
                        "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",  
                        0  
                },  
                {  
                        "pgbench_tellers",  
                        "tid int not null,bid int,tbalance int,filler char(84)",  
                        "tid int not null,bid int,tbalance int,filler char(84)",  
                        1  
                },  
                {  
                        "pgbench_accounts",  
                        "aid    int not null,bid int,abalance int,filler char(84)",  
                        "aid bigint not null,bid int,abalance int,filler char(84)",  
                        1  
                },  
                {  
                        "pgbench_branches",  
                        "bid int not null,bbalance int,filler char(88)",  
                        "bid int not null,bbalance int,filler char(88)",  
                        1  
                }  
        };  
        int                     i;  
  
        fprintf(stderr, "creating tables...\n");  
  
        for (i = 0; i < lengthof(DDLs); i++)  
        {  
                char            opts[256];  
                char            buffer[256];  
                const struct ddlinfo *ddl = &DDLs[i];  
                const char *cols;  
  
                /* Construct new create table statement. */  
                opts[0] = '\0';  
                if (ddl->declare_fillfactor)  
                        snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),  
                                         " with (fillfactor=%d)", fillfactor);  
                if (tablespace != NULL)  
                {  
                        char       *escape_tablespace;  
  
                        escape_tablespace = PQescapeIdentifier(con, tablespace,  
                                                                                                   strlen(tablespace));  
                        snprintf(opts + strlen(opts), sizeof(opts) - strlen(opts),  
                                         " tablespace %s", escape_tablespace);  
                        PQfreemem(escape_tablespace);  
                }  
  
                cols = (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols;  
  
                snprintf(buffer, sizeof(buffer), "create%s table %s(%s)%s",  
                                 unlogged_tables ? " unlogged" : "",  
                                 ddl->table, cols, opts);  
  
                executeStatement(con, buffer);  
        }  
}  

tpcb 记录数算法

1、系数:

#define nbranches       1                       /* Makes little sense to change this.  Change  
                                                                 * -s instead */  
#define ntellers        10  
#define naccounts       100000  

2、记录数算法:

nbranches * scale  
  
ntellers * scale  
  
naccounts * scale  

3、如果要写入1万亿数据,那么设置 scale = 10000000

此时:

tellers = 1亿条  
  
branches = 1000万条  
  
accounts = 1万亿条  

初始化数据

只有pgbench_accounts用了copy协议,另外两个表数据相对较少,用的是INSERT

	/*  
         * fill branches, tellers, accounts in that order in case foreign keys  
         * already exist  
         */  
        for (i = 0; i < nbranches * scale; i++)  
        {  
                /* "filler" column defaults to NULL */  
                snprintf(sql, sizeof(sql),  
                                 "insert into pgbench_branches(bid,bbalance) values(%d,0)",  
                                 i + 1);  
                executeStatement(con, sql);  
        }  
  
        for (i = 0; i < ntellers * scale; i++)  
        {  
                /* "filler" column defaults to NULL */  
                snprintf(sql, sizeof(sql),  
                                 "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",  
                                 i + 1, i / ntellers + 1);  
                executeStatement(con, sql);  
        }  
.....  
        /*  
         * accounts is big enough to be worth using COPY and tracking runtime  
         */  
        res = PQexec(con, "copy pgbench_accounts from stdin");  
        if (PQresultStatus(res) != PGRES_COPY_IN)  
        {  
                fprintf(stderr, "%s", PQerrorMessage(con));  
                exit(1);  
        }  
        PQclear(res);  
  
        INSTR_TIME_SET_CURRENT(start);  
  
        for (k = 0; k < (int64) naccounts * scale; k++)  
        {  
                int64           j = k + 1;  
  
                /* "filler" column defaults to blank padded empty string */  
                snprintf(sql, sizeof(sql),  
                                 INT64_FORMAT "\t" INT64_FORMAT "\t%d\t\n",  
                                 j, k / naccounts + 1, 0);  
                if (PQputline(con, sql))  
                {  
                        fprintf(stderr, "PQputline failed\n");  
                        exit(1);  
                }  
  
                /*  
                 * If we want to stick with the original logging, print a message each  
                 * 100k inserted rows.  
                 */  
                if ((!use_quiet) && (j % 100000 == 0))  
                {  
                        INSTR_TIME_SET_CURRENT(diff);  
                        INSTR_TIME_SUBTRACT(diff, start);  
  
                        elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);  
                        remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;  
  
                        fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",  
                                        j, (int64) naccounts * scale,  
                                        (int) (((int64) j * 100) / (naccounts * (int64) scale)),  
                                        elapsed_sec, remaining_sec);  
                }  
                /* let's not call the timing for each row, but only each 100 rows */  
                else if (use_quiet && (j % 100 == 0))  
                {  
                        INSTR_TIME_SET_CURRENT(diff);  
                        INSTR_TIME_SUBTRACT(diff, start);  
  
                        elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);  
                        remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;  
  
                        /* have we reached the next interval (or end)? */  
                        if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))  
                        {  
                                fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)\n",  
                                                j, (int64) naccounts * scale,  
                                                (int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec);  
  
                                /* skip to the next interval */  
                                log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);  
                        }  
                }  
  
        }  
        if (PQputline(con, "\\.\n"))  
        {  
                fprintf(stderr, "very last PQputline failed\n");  
                exit(1);  
        }  
        if (PQendcopy(con))  
        {  
                fprintf(stderr, "PQendcopy failed\n");  
                exit(1);  
        }  
  
        executeStatement(con, "commit");  
}  

如果你要测试tpcb, 并且生成的数据量特别庞大(比如我最近在生成1万亿的CASE,实际上tellers, branches两张表也分别有1万千和1亿。),可以修改一下pgbench的源码,全部改成COPY协议。

COPY写入时,可以达到120万行/s左右的写入速度。

tpcb 读写测试

\set aid random(1, 100000 * :scale)  
\set bid random(1, 1 * :scale)  
\set tid random(1, 10 * :scale)  
\set delta random(-5000, 5000)  
BEGIN;  
UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;  
SELECT abalance FROM pgbench_accounts WHERE aid = :aid;  
UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;  
UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;  
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);  
END;  

tpcb 只读测试

\set aid random(1, 100000 * :scale)  
SELECT abalance FROM pgbench_accounts WHERE aid = :aid;  

Flag Counter

digoal’s 大量PostgreSQL文章入口