PostgreSQL Notify/Listen Like ESB

1 minute read

背景

ESB : 基于消息的调用企业服务的通信模块.

上个礼拜和一位老朋友吃饭,聊到他现在在做的一个产品。才初略的对ESB有点了解。

发现PostgreSQL独有的Notify和Listen与总线实现的东西类似。一般用于监测表的改变,配合触发器使用,通知接收方表的数据有改变,接收方及时采取刷新动作。

下面来简单的了解一下,详细的请参看PG源码src/backend/commands/async.c部分。

LISTEN

  • 用来注册一个消息通道。

  • 如果在事务中执行listen, 那么事务必须成功commit, listen才生效.

  • session 退出后listen自动释放.

NOTIFY

  • 用来往消息通道发送异步消息。

  • 如果在事务中执行notify, 那么必须等事务成功commit之后这个消息才会塞进消息队列。

  • 消息在事务和事务之间的间隙传递。因此如果有一个listen的session中跑了一个很长的事务, 那么要等这个事务结束才能接收到这个过程中发出的notify。

  • 一个消息如果在发出notify之前有监听者, 必须等这些监听者都接收到了这个消息才从消息队列中去除。

UNLISTEN

  • 释放已经注册的消息通道。

不同的驱动接收异步消息的接口不一样,

The method a client application must use to detect notification events depends on which PostgreSQL application programming interface it uses. With the libpq library, the application issues LISTEN as an ordinary SQL command, and then must periodically call the function PQnotifies to find out whether any notification events have been received. Other interfaces such as libpgtcl provide higher-level methods for handling notify events; indeed, with libpgtcl the application programmer should not even issue LISTEN orUNLISTEN directly. See the documentation for the interface you are using for more details.

用法举例

SESSION A :

注册一个监听通道’digoal’,

blss=> listen digoal;  
LISTEN  

SESSION C :

往监听通道’digoal’发送异步消息,

blss=> notify digoal,'1';  
NOTIFY  
blss=> notify digoal,'2';  
NOTIFY  

SESSION A :

接收异步消息,(注意异步消息在事务间隙传递,因此BEGIN的时候产生了这个间隙,消息被推送到接收方)

blss=> begin;  
BEGIN  
Asynchronous notification "digoal" with payload "1" received from server process with PID 25964.  
Asynchronous notification "digoal" with payload "2" received from server process with PID 25964.  

SESSION B :

注册一个监听通道’digoal’,

blss=> listen digoal;  
LISTEN  
blss=> begin;  
BEGIN  
blss=>   

未接收到之前的异步消息, 原因是已经移除了。

SESSION A :

开启一个长事务,

blss=> select 1;  
 ?column?   
----------  
        1  
(1 row)  

SESSION C :

往监听通道’digoal’发送异步消息,

blss=> notify digoal,'3';  
NOTIFY  
blss=> notify digoal,'4';  
NOTIFY  

SESSION B :

接收异步消息,

blss=> end;  
COMMIT  
blss=>   
blss=> begin;  
BEGIN  
Asynchronous notification "digoal" with payload "3" received from server process with PID 25964.  
Asynchronous notification "digoal" with payload "4" received from server process with PID 25964.  

SESSION D :

注册一个监听通道’digoal’,

blss=> listen digoal;  
LISTEN  
blss=> begin;  
BEGIN  
blss=>   
blss=> end;  
COMMIT  
blss=> begin;  
BEGIN  
blss=>  

虽然消息还在队列中,但是无法接收到。因为这个监听是在notify发出消息之后启动的。只能接收后面的消息。

SESSION A :

关闭长事务, 接收消息。

blss=> end;  
COMMIT  
blss=> begin;  
BEGIN  
Asynchronous notification "digoal" with payload "3" received from server process with PID 25964.  
Asynchronous notification "digoal" with payload "4" received from server process with PID 25964.  

相关数据库函数 :

   Schema   |         Name          | Result data type | Argument data types |  Type    
------------+-----------------------+------------------+---------------------+--------  
 pg_catalog | pg_listening_channels | SETOF text       |                     | normal  
 pg_catalog | pg_notify             | void             | text, text          | normal  

参考

http://www.postgresql.org/docs/9.1/static/sql-listen.html

http://www.postgresql.org/docs/9.1/static/sql-unlisten.html

http://www.postgresql.org/docs/9.1/static/sql-notify.html

http://jdbc.postgresql.org/documentation/83/listennotify.html

src/backend/commands/async.c

Flag Counter

digoal’s 大量PostgreSQL文章入口