PostgreSQL 9.6 并行计算 优化器算法浅析

10 minute read

背景

之前写过几篇 PostgreSQL 并行计算的文章,文中并没有仔细描述PostgreSQL是如何决策并行计算,以及并行度的。

  • 开源数据库PostgreSQL攻克并行计算难题

https://yq.aliyun.com/articles/44655

  • PostgreSQL 并行计算 在 xfs, ext4 下的表现

https://yq.aliyun.com/articles/53985

PostgreSQL 并不需要用户在SQL中使用HINT来启用并行计算,因为优化器会从成本的角度做出选择,是否使用,以及使用的并行度是多大。

优化器选择并行计算的相关参数

PostgreSQL会通过这些参数来决定是否使用并行,以及该启用几个work process。

  • max_worker_processes (integer)

    很显然,这个参数决定了整个数据库集群允许启动多少个work process,注意如果有standby,standby的参数必须大于等于主库的参数值。

    如果设置为0,表示不允许并行。

Sets the maximum number of background processes that the system can support.   
This parameter can only be set at server start.   
The default is 8.  
#   
When running a standby server, you must set this parameter to the same or higher value than on the master server.   
Otherwise, queries will not be allowed in the standby server.  
  • max_parallel_workers_per_gather (integer)

    这个参数决定了每个Gather node最多允许启用多少个work process。

    同时需要注意,在OLTP业务系统中,不要设置太大,因为每个worker都会消耗同等的work_mem等资源,争抢会比较厉害。

    建议在OLAP中使用并行,并且做好任务调度,减轻冲突。

Sets the maximum number of workers that can be started by a single Gather node.   
Parallel workers are taken from the pool of processes established by max_worker_processes.   
Note that the requested number of workers may not actually be available at run time. -- 因为work process可能被使用了一些,整个系统还能开启的work process=max_worker_processes减去已使用的。    
#   
If this occurs, the plan will run with fewer workers than expected, which may be inefficient.   
The default value is 2.   
Setting this value to 0 disables parallel query execution.   
#   
Note that parallel queries may consume very substantially more resources than non-parallel queries, because each worker process is a completely separate process which has roughly the same impact on the system as an additional user session.   
This should be taken into account when choosing a value for this setting, as well as when configuring other settings that control resource utilization, such as work_mem.   
Resource limits such as work_mem are applied individually to each worker, which means the total utilization may be much higher across all processes than it would normally be for any single process.   
For example, a parallel query using 4 workers may use up to 5 times as much CPU time, memory, I/O bandwidth, and so forth as a query which uses no workers at all.  

例子,WITH语法中,有两个QUERY用来并行计算,虽然设置的max_parallel_workers_per_gather=6,但是由于max_worker_processes=8,所以第一个Gather node用了6个worker process,而另一个Gather实际上只用了2个worker。

postgres=# show max_worker_processes ;  
 max_worker_processes   
----------------------  
 8  
(1 row)  
postgres=# set max_parallel_workers_per_gather=6;  
SET  
postgres=# explain (analyze,verbose,costs,timing,buffers) with t as (select count(*) from test), t1 as (select count(id) from test) select * from t,t1;  
                                                                            QUERY PLAN                                                                              
------------------------------------------------------------------------------------------------------------------------------------------------------------------  
 Nested Loop  (cost=159471.81..159471.86 rows=1 width=16) (actual time=7763.033..7763.036 rows=1 loops=1)  
   Output: t.count, t1.count  
   Buffers: shared hit=32940 read=74784  
   CTE t  
     ->  Finalize Aggregate  (cost=79735.90..79735.91 rows=1 width=8) (actual time=4714.114..4714.115 rows=1 loops=1)  
           Output: count(*)  
           Buffers: shared hit=16564 read=37456  
           ->  Gather  (cost=79735.27..79735.88 rows=6 width=8) (actual time=4714.016..4714.102 rows=7 loops=1)  
                 Output: (PARTIAL count(*))  
                 Workers Planned: 6  
                 Workers Launched: 6  
                 Buffers: shared hit=16564 read=37456  
                 ->  Partial Aggregate  (cost=78735.27..78735.28 rows=1 width=8) (actual time=4709.465..4709.466 rows=1 loops=7)  
                       Output: PARTIAL count(*)  
                       Buffers: shared hit=16084 read=37456  
                       Worker 0: actual time=4709.146..4709.146 rows=1 loops=1  
                         Buffers: shared hit=2167 read=5350  
                       Worker 1: actual time=4708.156..4708.156 rows=1 loops=1  
                         Buffers: shared hit=2140 read=5288  
                       Worker 2: actual time=4708.370..4708.370 rows=1 loops=1  
                         Buffers: shared hit=2165 read=4990  
                       Worker 3: actual time=4708.968..4708.969 rows=1 loops=1  
                         Buffers: shared hit=2501 read=5529  
                       Worker 4: actual time=4709.194..4709.195 rows=1 loops=1  
                         Buffers: shared hit=2469 read=5473  
                       Worker 5: actual time=4708.812..4708.813 rows=1 loops=1  
                         Buffers: shared hit=2155 read=5349  
                       ->  Parallel Seq Scan on public.test  (cost=0.00..73696.22 rows=2015622 width=0) (actual time=0.051..2384.380 rows=1728571 loops=7)  
                             Buffers: shared hit=16084 read=37456  
                             Worker 0: actual time=0.046..2385.108 rows=1698802 loops=1  
                               Buffers: shared hit=2167 read=5350  
                             Worker 1: actual time=0.057..2384.698 rows=1678728 loops=1  
                               Buffers: shared hit=2140 read=5288  
                             Worker 2: actual time=0.061..2384.109 rows=1617030 loops=1  
                               Buffers: shared hit=2165 read=4990  
                             Worker 3: actual time=0.046..2387.143 rows=1814780 loops=1  
                               Buffers: shared hit=2501 read=5529  
                             Worker 4: actual time=0.046..2382.491 rows=1794892 loops=1  
                               Buffers: shared hit=2469 read=5473  
                             Worker 5: actual time=0.070..2383.598 rows=1695904 loops=1  
                               Buffers: shared hit=2155 read=5349  
   CTE t1  
     ->  Finalize Aggregate  (cost=79735.90..79735.91 rows=1 width=8) (actual time=3048.902..3048.902 rows=1 loops=1)  
           Output: count(test_1.id)  
           Buffers: shared hit=16376 read=37328  
           ->  Gather  (cost=79735.27..79735.88 rows=6 width=8) (actual time=3048.732..3048.880 rows=3 loops=1)  
                 Output: (PARTIAL count(test_1.id))  
                 Workers Planned: 6  
                 Workers Launched: 2  
                 Buffers: shared hit=16376 read=37328  
                 ->  Partial Aggregate  (cost=78735.27..78735.28 rows=1 width=8) (actual time=3046.399..3046.400 rows=1 loops=3)  
                       Output: PARTIAL count(test_1.id)  
                       Buffers: shared hit=16212 read=37328  
                       Worker 0: actual time=3045.394..3045.395 rows=1 loops=1  
                         Buffers: shared hit=5352 read=12343  
                       Worker 1: actual time=3045.339..3045.340 rows=1 loops=1  
                         Buffers: shared hit=5354 read=12402  
                       ->  Parallel Seq Scan on public.test test_1  (cost=0.00..73696.22 rows=2015622 width=4) (actual time=0.189..1614.261 rows=4033333 loops=3)  
                             Output: test_1.id  
                             Buffers: shared hit=16212 read=37328  
                             Worker 0: actual time=0.039..1617.258 rows=3999030 loops=1  
                               Buffers: shared hit=5352 read=12343  
                             Worker 1: actual time=0.033..1610.934 rows=4012856 loops=1  
                               Buffers: shared hit=5354 read=12402  
   ->  CTE Scan on t  (cost=0.00..0.02 rows=1 width=8) (actual time=4714.120..4714.121 rows=1 loops=1)  
         Output: t.count  
         Buffers: shared hit=16564 read=37456  
   ->  CTE Scan on t1  (cost=0.00..0.02 rows=1 width=8) (actual time=3048.907..3048.908 rows=1 loops=1)  
         Output: t1.count  
         Buffers: shared hit=16376 read=37328  
 Planning time: 0.144 ms  
 Execution time: 7766.458 ms  
(72 rows)  
  • parallel_setup_cost (floating point)

    表示启动woker process的启动成本,因为启动worker进程需要建立共享内存等操作,属于附带的额外成本。

Sets the planner's estimate of the cost of launching parallel worker processes.   
The default is 1000.  
  • parallel_tuple_cost (floating point)

    woker进程处理完后的tuple要传输给上层node,即进程间的row交换成本,按node评估的输出rows来乘。

Sets the planner's estimate of the cost of transferring one tuple from a parallel worker process to another process.    
The default is 0.1.    

代码如下

parallel_tuple_cost : Cost of CPU time to pass a tuple from worker to master backend  
parallel_setup_cost : Cost of setting up shared memory for parallelism  
//  
double          parallel_tuple_cost = DEFAULT_PARALLEL_TUPLE_COST;  
double          parallel_setup_cost = DEFAULT_PARALLEL_SETUP_COST;  
//  
/*  
 * cost_gather  
 *        Determines and returns the cost of gather path.  
 *  
 * 'rel' is the relation to be operated upon  
 * 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL  
 * 'rows' may be used to point to a row estimate; if non-NULL, it overrides  
 * both 'rel' and 'param_info'.  This is useful when the path doesn't exactly  
 * correspond to any particular RelOptInfo.  
 */  
void  
cost_gather(GatherPath *path, PlannerInfo *root,  
                        RelOptInfo *rel, ParamPathInfo *param_info,  
                        double *rows)  
{  
        Cost            startup_cost = 0;  
        Cost            run_cost = 0;  
//  
        /* Mark the path with the correct row estimate */  
        if (rows)  
                path->path.rows = *rows;  
        else if (param_info)  
                path->path.rows = param_info->ppi_rows;  
        else  
                path->path.rows = rel->rows;  
//  
        startup_cost = path->subpath->startup_cost;  
//  
        run_cost = path->subpath->total_cost - path->subpath->startup_cost;  
//  
        /* Parallel setup and communication cost. */  
        startup_cost += parallel_setup_cost;  //  累加启动成本  
        run_cost += parallel_tuple_cost * path->path.rows;  //  累加tuple的worker与上层进程间传输成本  
//  
        path->path.startup_cost = startup_cost;  
        path->path.total_cost = (startup_cost + run_cost);  
}  
  • min_parallel_relation_size (integer)

    表的大小,也作为是否启用并行计算的条件,如果小于它,不启用并行计算。

    但是也请注意,还有其他条件决定是否启用并行,所以并不是小于它的表就一定不会启用并行。

Sets the minimum size of relations to be considered for parallel scan.   
The default is 8 megabytes (8MB).  

代码如下

src/backend/optimizer/path/allpaths.c

/*  
 * create_plain_partial_paths  
 *        Build partial access paths for parallel scan of a plain relation  
 */  
static void  
create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel)  
{  
        int                     parallel_workers;  
//  
        /*  
         * If the user has set the parallel_workers reloption, use that; otherwise  
         * select a default number of workers.  
         */  
        if (rel->rel_parallel_workers != -1)  // 如果设置了表级的parallel_workers参数,则直接使用这个作为并行度。    
                parallel_workers = rel->rel_parallel_workers;  
        else  // 如果没有设置表级并行度参数,则使用表的大小计算出一个合适的并行度    
        {  
                int                     parallel_threshold;  
//  
                /*  
                 * If this relation is too small to be worth a parallel scan, just  
                 * return without doing anything ... unless it's an inheritance child.  
                 * In that case, we want to generate a parallel path here anyway.  It  
                 * might not be worthwhile just for this relation, but when combined  
                 * with all of its inheritance siblings it may well pay off.  
                 */  
                if (rel->pages < (BlockNumber) min_parallel_relation_size &&  
                        rel->reloptkind == RELOPT_BASEREL)  // 如果表的大小小于设置的min_parallel_relation_size(单位为block),不启用并行  
                        return;  
//  
                /*  
                 * Select the number of workers based on the log of the size of the  
                 * relation.  This probably needs to be a good deal more  
                 * sophisticated, but we need something here for now.  Note that the  
                 * upper limit of the min_parallel_relation_size GUC is chosen to  
                 * prevent overflow here.  
                 */  
		 // 以下算法目前还不完善,根据表的大小计算出需要开多大的并行。 算法如下     
                parallel_workers = 1;  
                parallel_threshold = Max(min_parallel_relation_size, 1);    
                while (rel->pages >= (BlockNumber) (parallel_threshold * 3))  
                {  
                        parallel_workers++;  
                        parallel_threshold *= 3;  
                        if (parallel_threshold > INT_MAX / 3)  
                                break;                  /* avoid overflow */  
                }  
        }  
//  
        /*  
         * In no case use more than max_parallel_workers_per_gather workers.  
         */  
        parallel_workers = Min(parallel_workers, max_parallel_workers_per_gather);  // 根据计算出的并行度值,与max_parallel_workers_per_gather参数比较,取小的。  就是需要开启的并行度。    
//  
        /* If any limit was set to zero, the user doesn't want a parallel scan. */  
        if (parallel_workers <= 0)    
                return;  
//  
        /* Add an unordered partial path based on a parallel sequential scan. */  
        add_partial_path(rel, create_seqscan_path(root, rel, NULL, parallel_workers));  // 根据计算出来的并行度,添加execute worker path。    
}  
  • force_parallel_mode (enum)

    强制开启并行,可以作为测试的目的,也可以作为hint来使用。

Allows the use of parallel queries for testing purposes even in cases where no performance benefit is expected.   
The allowed values of force_parallel_mode are   
off (use parallel mode only when it is expected to improve performance),   
on (force parallel query for all queries for which it is thought to be safe),   
regress (like on, but with additional behavior changes as explained below).   
#   
More specifically, setting this value to on will add a Gather node to the top of any query plan for which this appears to be safe, so that the query runs inside of a parallel worker.   
Even when a parallel worker is not available or cannot be used, operations such as starting a subtransaction that would be prohibited in a parallel query context will be prohibited unless the planner believes that this will cause the query to fail.   
If failures or unexpected results occur when this option is set, some functions used by the query may need to be marked PARALLEL UNSAFE (or, possibly, PARALLEL RESTRICTED).  
#   
Setting this value to regress has all of the same effects as setting it to on plus some additional effects that are intended to facilitate automated regression testing.   
Normally, messages from a parallel worker include a context line indicating that, but a setting of regress suppresses this line so that the output is the same as in non-parallel execution.   
Also, the Gather nodes added to plans by this setting are hidden in EXPLAIN output so that the output matches what would be obtained if this setting were turned off.  
  • parallel_workers (integer), 正式版本已更名,请注意
max_parallel_workers configuration parameter, Asynchronous Behavior  
max_parallel_workers_per_gather configuration parameter, Asynchronous Behavior  

以上都是数据库的参数,parallel_workers是表级参数,可以在建表时设置,也可以后期设置。

代码见create_plain_partial_paths()

create table ... WITH( storage parameter ... )    
#   
This sets the number of workers that should be used to assist a parallel scan of this table.   
If not set, the system will determine a value based on the relation size.   
The actual number of workers chosen by the planner may be less, for example due to the setting of max_worker_processes.    

例子

设置表级并行度  
alter table test set (parallel_workers=0);  
#  
关闭表的并行  
alter table test set (parallel_workers=0);  
#  
重置参数,那么在create_plain_partial_paths中会通过表的pages计算出一个合理的并行度  
alter table test reset (parallel_workers);  

PG优化器如何决定使用并行或者如何计算并行度

其实前面在讲参数时都已经讲到了,这里再总结一下。

1. 决定整个系统能开多少个worker进程

max_worker_processes

2. 计算并行计算的成本,优化器根据CBO原则选择是否开启并行

parallel_setup_cost

parallel_tuple_cost

所以简单QUERY,如果COST本来就很低(比如小于并行计算的启动成本),那么很显然数据库不会对这种QUERY启用并行计算。

3. 强制开启并行的开关

force_parallel_mode

当第二步计算出来的成本大于非并行的成本时,可以通过这种方式强制让优化器开启并行查询。

4. 根据表级parallel_workers参数决定每个Gather node的并行度

取min(parallel_workers, max_parallel_workers_per_gather)

注意正式版本已更名

max_parallel_workers_per_gather (integer)  
Sets the maximum number of workers that can be started by a single Gather node. 
Parallel workers are taken from the pool of processes established by max_worker_processes, 
limited by max_parallel_workers. Note that the requested number of workers may not actually be available at runtime. 
If this occurs, the plan will run with fewer workers than expected, which may be inefficient. 
The default value is 2. Setting this value to 0 disables parallel query execution.  
  
Note that parallel queries may consume very substantially more resources than non-parallel queries, 
because each worker process is a completely separate process which has roughly the same impact on the system as an additional user session. 
This should be taken into account when choosing a value for this setting, as well as when configuring other settings that control resource utilization, such as work_mem. 
Resource limits such as work_mem are applied individually to each worker, which means the total utilization may be much higher across all processes than it would normally be for any single process. 
For example, a parallel query using 4 workers may use up to 5 times as much CPU time, memory, I/O bandwidth, and so forth as a query which uses no workers at all.  
  
For more information on parallel query, see Chapter 15, Parallel Query.  
  
max_parallel_workers (integer)  
Sets the maximum number of workers that the system can support for parallel queries. The default value is 8. 
When increasing or decreasing this value, consider also adjusting max_parallel_workers_per_gather. 
Also, note that a setting for this value which is higher than max_worker_processes will have no effect, 
since parallel workers are taken from the pool of worker processes established by that setting.  

5. 当表没有设置parallel_workers参数并且表的大小大于min_parallel_relation_size是,由算法决定每个Gather node的并行度

相关参数 min_parallel_relation_size

算法见 create_plain_partial_paths

取Min(parallel_workers, max_parallel_workers_per_gather)

注意实际上,每个Gather能开启多少个worker还和PG集群总体剩余可以开启的worker进程数相关。

因此实际开启的可能小于优化器算出来的。从前面的例子中也可以理解。

6. 用户也可以使用hint来控制优化器选择是否强制并行 , 参考pg_hint_plan插件的用法。

如何通过参数设置开多少个并行

加入我要让某个QUERY开启32个并行,如何设置?

1. 足够大的max_worker_processes

max_worker_processes = 128

2. 足够大的max_parallel_workers_per_gather

max_parallel_workers_per_gather = 32

3. 以下设置为0

parallel_tuple_cost = 0
parallel_setup_cost = 0
min_parallel_relation_size = 0

4. 强制并行

force_parallel_mode = on

5. 最容易忽视的参数,表级并行度,如果没有设置,内核会根据表的大小算出一个。

alter table test set (parallel_workers=32);

用法
设置表级并行度
alter table test set (parallel_workers=0);
#
关闭表的并行
alter table test set (parallel_workers=0);
#
重置参数,那么在create_plain_partial_paths中会通过表的pages计算出一个合理的并行度
alter table test reset (parallel_workers);

以上设置过后,并行度会强制为32,(当然,表必须要>=32个数据块,否则没法分配任务了).

祝大家玩得开心,欢迎随时来 阿里云促膝长谈业务需求 ,恭候光临

阿里云的小伙伴们加油,努力 做好内核与服务,打造最贴地气的云数据库

Flag Counter

digoal’s 大量PostgreSQL文章入口