Greenplum, PostgreSQL 数据实时订阅的几种方式
背景
通常在一个企业中,对一份数据可能更有多个业务系统需要对其进行处理。
因此数据是流动的,通常会通过消息队列来完成这样的工作。不过呢,这样要求消息队列是最上游。
当无法将消息队列放到最上游时,例如数据先到了数据库,再订阅给其他业务线,怎么办呢?
就比如这里的RDS PG的部分。
下面来探讨一下有多少种方法来实现这个需求:将数据库的变更实时订阅到其他业务线(例如Kafka)。
PostgreSQL , 阿里云RDS for PostgreSQL
PostgreSQL,在阿里云对应RDS for PostgreSQL这个产品。
PostgreSQL的数据变更,如何订阅给下游?下面是可选的方法:
1、触发器,将表的变更记录到一个流水表,然后业务通过读取流水表进行订阅。
《USE hstore store table’s trace record》
2、规则,在需要订阅的表上,创建RULE,将表的变更记录到一个流水表,然后业务通过读取流水表进行订阅。
https://www.postgresql.org/docs/10/static/sql-createrule.html
3、异步消息,使用触发器或RULE,将表的变更写入CHANNEL。(数据库的异步消息通道功能)。
订阅端通过监听CHANNEL,实现对数据的订阅。
https://www.postgresql.org/docs/10/static/sql-notify.html
https://www.postgresql.org/docs/10/static/sql-listen.html
4、WAL 逻辑 decode。
从9.4的版本开始,PostgreSQL支持逻辑复制,将数据变更写入WAL,(类似MySQL的binlog复制)。客户端通过从WAL翻译REDO来实现订阅。
alidecode 是一个翻译wal的插件,用户也可以自己写翻译WAL的插件。
《PostgreSQL 最佳实践 - 逻辑增量复制(MySQL <-> PgSQL <-> PgSQL)》
5、PGQ ,是SKYTOOLS的一个基础功能,在PostgreSQL内部实现了一个异步的队列。用户可以对需要复制的表,创建PGQ,然后写PGQ的消费者来实现订阅。
https://wiki.postgresql.org/wiki/SkyTools
londiste3 就是一个用于复制的PGQ消费者代表程序。
6、confluentinc bottledwater-pg , 基于PG的WAL以及逻辑复制功能,实现的一个主动消费者,将数据自动从WAL翻译,并写入KAFKA队列,实现消息订阅。
https://github.com/confluentinc/bottledwater-pg
《实时数据交换平台 - BottledWater-pg with confluent》
7、时间戳,最传统的方法,用户在写入、删除、更新时,记录数据的写入时间、修改时间。
删除时,逻辑删除(标记字段),并记录删除时间。
通过这个时间戳的推移来订阅数据。
方法优先级
优先使用逻辑DECODE的方法(6、4)。
其次是时间戳(7)。
然后可以考虑PGQ。
然后再考虑异步消息的方法。
最后考虑触发器和规则。
Greenplum , 阿里云HybridDB for PostgreSQL
Greenplum,在阿里云对应HybridDB for PostgreSQL这个产品。
也有若干种订阅方法:
推荐方法:
使用appendonly表,以及时间戳的方法。
阿里云推出了metascan技术,可以在不建索引的情况下,极度高效的实现时间戳的推移订阅。
阿里云订阅套件
1、datax,配置推移字段,推移订阅。
2、cdp
3、d2
4、dts,通过类似binlog的解析来订阅。