如何用 sysbench 并行装载 PostgreSQL 测试数据

2 minute read

背景

本文参考老唐的使用sysbench和sqlldr并行装载Oracle测试数据而成。

http://blog.osdba.net/538.html

sysbench原来自带的lua数据装载脚本是使用以下方式串行装载的,速度比较慢(比单条insert快,但是比COPY慢)。

insert into table1 values (),(),()....      
insert into table2 values (),(),()....      
...  
insert into tablen values (),(),()....      

使用prepare导入数据的用法举例

./sysbench_pg --test=lua/oltp.lua --db-driver=pgsql --pgsql-host=127.0.0.1 --pgsql-port=1921 --pgsql-user=postgres --pgsql-password=postgres --pgsql-db=postgres --oltp-tables-count=64 --oltp-table-size=1000000 --num-threads=64 prepare      

prepare 表示装载数据,但是它串行的。

sysbench0.5中可以在命令行中指定测试时启动的并行线程数,这个测试过程是使用run命令,而且是多线程并发的,所以我们可以使用sysbench的run命令来造数据,而不再使用其提供的prepare命令的方法来造数据。run命令会根据命令行参数–num-threads来指定并发线程数的多少。

在sysbench中自定义的lua脚本中要求实现以下几个函数:

function thread_init(thread_id): 此函数在线程创建后只被执行一次    
function event(thread_id): 每执行一次就会被调用一次。    

由上可以知道,本次造数据的脚本我们只需要实现thread_init()函数就可以了。

生成测试数据的脚本沿用老唐提供的代码:

#include <stdio.h>  
#include <stdlib.h>  
#include <time.h>  
#include <stdint.h>  
#include <sys/time.h>  
uint64_t my_rand(struct random_data * r1, struct random_data * r2)  
{  
    uint64_t rand_max = 100000000000LL;  
    uint64_t result;  
    uint32_t u1, u2;  
    random_r(r1, &u1);  
    random_r(r2, &u2);  
    result = (int64_t)u1 * (int64_t)u2;  
    result = result % rand_max;  
    return result;  
}  
int main(int argc, char *argv[])  
{  
    struct timeval tpstart;  
    struct random_data r1, r2;  
    int i;  
    int r;  
    int max_value;  
    char rand_state1[128];  
    char rand_state2[128];  
    if (argc !=2)  
    {  
        printf("Usage: %s <rownums>\n", argv[0]);  
        return 1;  
    }  
    max_value = atoi(argv[1]);  
    gettimeofday(&tpstart,NULL);  
    initstate_r(tpstart.tv_usec,rand_state1,sizeof(rand_state1),&r1);  
    srandom_r(tpstart.tv_usec, &r1);  
    gettimeofday(&tpstart,NULL);  
    initstate_r(tpstart.tv_usec,rand_state2,sizeof(rand_state1),&r2);  
    srandom_r(tpstart.tv_usec, &r2);  
    for (i=1; i<max_value+1; i++)  
    {  
        r = my_rand(&r1, &r2) % max_value;   
        printf("%d,%d,%011llu-%011llu-%011llu-%011llu-%011llu-%011llu-%011llu-%011llu-%011llu-%011llu,%011llu-%011llu-%011llu-%011llu-%011llu\n",  
                i,  
                r,  
                 my_rand(&r1, &r2),  
                 my_rand(&r1, &r2),  
                 my_rand(&r1, &r2),  
                 my_rand(&r1, &r2),  
                 my_rand(&r1, &r2),  
                 my_rand(&r1, &r2),  
                 my_rand(&r1, &r2),  
                 my_rand(&r1, &r2),  
                 my_rand(&r1, &r2),  
                 my_rand(&r1, &r2),  
                 my_rand(&r1, &r2),  
                 my_rand(&r1, &r2),  
                 my_rand(&r1, &r2),  
                 my_rand(&r1, &r2),  
                 my_rand(&r1, &r2)  
              );  
    }  
    return 0;  
}  

编译此C语言程序的方法如下:

gcc gendata.c -o gendata    

新建一个copy.lua的脚本,内容如下

调用 common.lua中的 set_vars() 继承来自 common.lua 的全局变量。

函数 copydata(table_id) : 创建表,创建管道,将管道数据传输到psql -c “copy …“客户端的方式导入数据。

函数 create_index(table_id) : 创建索引,调整SEQUENCE next val。

注意咯, oltp_tables_count 必须是 num_threads 的倍数,在 thread_init 中, 以num_threads 为步调,以thread_id+1为起始值,设置i的值,并调用copydata(table_id)和create_index(table_id)。

$ vi lua/copy.lua  
pathtest = string.match(test, "(.*/)") or ""  
  
dofile(pathtest .. "common.lua")  
  
function copydata(table_id)  
  local query  
  
  query = [[  
CREATE UNLOGGED TABLE sbtest]] .. table_id .. [[ (  
id SERIAL NOT NULL,  
k INTEGER,  
c CHAR(120) DEFAULT '' NOT NULL,  
pad CHAR(60) DEFAULT '' NOT NULL,  
PRIMARY KEY (id)  
) ]]  
  
  db_query(query)  
  
  os.execute ('export PGPASSWORD=' .. pgsql_password)  
  os.execute ('rm -f sbtest' .. table_id .. '.dat')  
  os.execute ('mknod sbtest' .. table_id .. '.dat p')  
  os.execute ('./gendata ' .. oltp_table_size .. ' >> sbtest'..table_id ..'.dat &')  
  os.execute ('cat sbtest' .. table_id .. '.dat | psql -h ' .. pgsql_host .. ' -p ' .. pgsql_port .. ' -U ' .. pgsql_user .. ' -d ' .. pgsql_db .. ' -c "copy sbtest' .. table_id .. ' from stdin with csv"')  
  os.execute ('rm -f sbtest' .. table_id .. '.dat')  
end  
  
function create_index(table_id)  
  db_query("select setval('sbtest" .. table_id .. "_id_seq', " .. (oltp_table_size+1) .. ")" )  
  db_query("CREATE INDEX k_" .. table_id .. " on sbtest" .. table_id .. "(k)")  
end  
  
function thread_init(thread_id)  
   set_vars()  
  
   print("thread prepare"..thread_id)  
  
   for i=thread_id+1, oltp_tables_count, num_threads  do  
     copydata(i)  
     create_index(i)  
   end  
end  
  
function event(thread_id)  
   os.exit()  
end  

用法,必须把psql放到路径中,因为lua中需要用到psql命令

export PATH=/home/digoal/pgsql9.5/bin:$PATH  

生成数据,速度比以前快多了

./sysbench_pg --test=lua/copy.lua \  
  --db-driver=pgsql \  
  --pgsql-host=127.0.0.1 \  
  --pgsql-port=1921 \  
  --pgsql-user=postgres \  
  --pgsql-password=postgres \  
  --pgsql-db=postgres \  
  --oltp-tables-count=64 \  
  --oltp-table-size=1000000 \  
  --num-threads=64 \  
  run  

清除数据, drop table

./sysbench_pg --test=lua/copy.lua \  
  --db-driver=pgsql \  
  --pgsql-host=127.0.0.1 \  
  --pgsql-port=1921 \  
  --pgsql-user=postgres \  
  --pgsql-password=postgres \  
  --pgsql-db=postgres \  
  --oltp-tables-count=64 \  
  --oltp-table-size=1000000 \  
  --num-threads=64 \  
  cleanup  

lua全局变量代码:

sysbench/scripting/lua/src/lua.h:#define lua_register(L,n,f) (lua_pushcfunction(L, (f)), lua_setglobal(L, (n)))  
sysbench/scripting/lua/src/lua.h:#define lua_setglobal(L,s)     lua_setfield(L, LUA_GLOBALSINDEX, (s))  
sysbench/scripting/lua/src/lbaselib.c:  lua_setglobal(L, "_G");  
sysbench/scripting/lua/src/lbaselib.c:  lua_setglobal(L, "_VERSION");  /* set global _VERSION */  
sysbench/scripting/lua/src/lbaselib.c:  lua_setglobal(L, "newproxy");  /* set global `newproxy' */  
sysbench/scripting/script_lua.c:    lua_setglobal(state, opt->name);  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "sb_rand");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "sb_rand_uniq");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "sb_rnd");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "sb_rand_str");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "sb_rand_uniform");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "sb_rand_gaussian");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "sb_rand_special");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_connect");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_disconnect");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_query");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_bulk_insert_init");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_bulk_insert_next");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_bulk_insert_done");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_prepare");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_bind_param");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_bind_result");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_execute");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_close");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_store_results");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "db_free_results");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "DB_ERROR_NONE");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "DB_ERROR_DEADLOCK");  
sysbench/scripting/script_lua.c:  lua_setglobal(state, "DB_ERROR_FAILED");  
sysbench/scripting/script_lua.c:  lua_setglobal(L, "db_driver");  

传入参数,可以把sysbench_pg的参数-替换成_在lua脚本中使用这些变量,例子

--pgsql-host=127.0.0.1  -> 对应lua中的变量名 pgsql_host  
--pgsql-port=1921   -> 对应lua中的变量名 pgsql_port  
--pgsql-user=postgres   -> 对应lua中的变量名 pgsql_user  
--pgsql-password=postgres   -> 对应lua中的变量名 pgsql_password  
--pgsql-db=postgres   -> 对应lua中的变量名 pgsql_db  
--oltp-tables-count=64   -> 对应lua中的变量名 oltp_tables_count  
--oltp-table-size=1000000   -> 对应lua中的变量名 oltp_table_size  
--num-threads=64  -> 对应lua中的变量名 num_threads  

Flag Counter

digoal’s 大量PostgreSQL文章入口