PostgreSQL 流计算指 pipelinedb 实时处理Kafka消息流

3 minute read

背景

Kafka是一个分布式的消息系统,应用范围广泛,例如用于消息管理,WEB站点的活跃度跟踪,监控数据,日志聚合,流处理,提交日志,等等。

具体可参考:

http://kafka.apache.org/documentation.html#gettingStarted

对于流数据记录的场景,我们可能会想到它可以替代原始的记录日志文件,或记录在数据库中的流水记录的应用场景。例如运营商的网关日志,互联网应用中的用户轨迹,网站的浏览轨迹,O2O应用中的用户或资产运动轨迹等。这些记录都是时间或空间的流水记录,你有可能会使用Kafka来存储和处理。

现在PipelineDB可以和Kafka完美结合,作为Kafka的一个实时的消息消费者,完成实时的数据统计,完全可以替代Storm,因为PipelineDB定义一个统计维度实在是太方便了,一条SQL搞定,并且处理能力可以达到每秒千万行记录。

下面是个例子:

在本地起一个nginx服务端,并且使用siege模拟HTTP请求,nginx将记录这些行为,存储为JSON格式到文件中。

在本地起kafka服务端,使用kafkacat将nginx的访问日志不断的push到kafka。

在pipelinedb中订阅kafka的消息,并实时处理为想要的统计信息,(WEB页面的访问人数,延迟,等信息)

安装kafka

http://kafka.apache.org/07/quickstart.html  
  
# wget http://www.us.apache.org/dist/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz  
# tar -zxvf kafka_2.10-0.8.2.2.tgz  
  
# git clone https://github.com/edenhill/librdkafka.git  
# cd librdkafka  
./configure  
make  
make install  
  
# git clone https://github.com/lloyd/yajl.git  
# cd yajl  
./configure  
make  
make install  
  
# vi /etc/ld.so.conf  
/usr/local/lib  
# ldconfig  
  
# git clone https://github.com/edenhill/kafkacat.git  
# cd kafkacat  
./configure  
make  
make install  

安装siege和nginx

# yum install -y siege nginx  

创建一个nginx配置文件,记录访问日志到/tmp/access.log,格式为json

cd /tmp  
  
cat <<EOF > nginx.conf  
worker_processes 4;  
pid $PWD/nginx.pid;  
events {}  
http {  
  
    log_format json   
    '{'  
        '"ts": "\$time_iso8601", '  
        '"user_agent": "\$http_user_agent", '  
        '"url": "\$request_uri", '  
        '"latency": "\$request_time",  '  
        '"user": "\$arg_user"'  
    '}';  
  
    access_log $PWD/access.log json;  
    error_log $PWD/error.log;  
  
    server {  
        location ~ ^/ {  
            return 200;  
        }  
    }  
}  
EOF  

启动nginx

nginx -c $PWD/nginx.conf -p $PWD/  

配置主机名

# hostname  
digoal.org  
# vi /etc/hosts  
127.0.0.1 digoal.org  

启动kafka

cd /opt/soft_bak/kafka_2.10-0.8.2.2  
bin/zookeeper-server-start.sh config/zookeeper.properties &  
bin/kafka-server-start.sh config/server.properties &  

产生一个随机URL文件

for x in {0..1000000}; do echo "http://localhost/page$((RANDOM % 100))/path$((RANDOM % 10))?user=$((RANDOM % 100000))" >> urls.txt; done  

使用siege模拟访问这些URL,nginx会产生访问日志到/tmp/access.log

siege -c32 -b -d0 -f urls.txt >/dev/null 2>&1  
  
/tmp/access.log举例,格式为JSON  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page68/path7?user=18583", "latency": "0.002",  "user": "18583"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page78/path0?user=24827", "latency": "0.003",  "user": "24827"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page19/path6?user=3988", "latency": "0.003",  "user": "3988"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page55/path2?user=18433", "latency": "0.003",  "user": "18433"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page62/path3?user=10801", "latency": "0.001",  "user": "10801"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page9/path2?user=4915", "latency": "0.001",  "user": "4915"}  
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page10/path2?user=5367", "latency": "0.001",  "user": "5367"}  

将访问日志输出到kafkacat,推送到kafka消息系统,对应的topic为logs_topic。

( tail -f /tmp/access.log | kafkacat -b localhost:9092 -t logs_topic ) &  

原始的消费方式如下:

# cd /opt/soft_bak/kafka_2.10-0.8.2.2  
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic logs_topic --from-beginning  
# Ctrl+C  

接下来我们使用pipelinedb来实时消费这些消息,并转化为需要的统计结果。

CREATE EXTENSION pipeline_kafka;  
SELECT kafka_add_broker('localhost:9092');  -- 添加一个kafka broker(kafka集群的一个节点)  
CREATE STREAM logs_stream (payload json);  -- 创建一个流映射到,kafka消息系统。  
CREATE CONTINUOUS VIEW message_count AS SELECT COUNT(*) FROM logs_stream;   -- 创建一个流视图,实时消费,处理kafka消息。  
SELECT kafka_consume_begin('logs_topic', 'logs_stream');  -- 开始消费指定的topic,logs_topic,  
 kafka_consume_begin   
------------------  
 success  
(1 row)  

查询流视图,可以获得当前NGINX的访问统计。

SELECT * FROM message_count;  
 count   
--------  
  24  
(1 row)  
  
SELECT * FROM message_count;  
 count  
--------  
  36  
 success  
(1 row)  

接下来做一个更深入的实时分析,分析每个URL的访问次数,用户数,99%用户的访问延迟低于多少。

/*   
 * This function will strip away any query parameters from each url,  
 * as we're not interested in them.  
 */  
CREATE FUNCTION url(raw text, regex text DEFAULT '\?.*', replace text DEFAULT '')  
    RETURNS text  
AS 'textregexreplace_noopt'    -- textregexreplace_noopt@src/backend/utils/adt/regexp.c  
LANGUAGE internal;  
  
CREATE CONTINUOUS VIEW url_stats AS  
    SELECT  
        url, -- url地址  
    percentile_cont(0.99) WITHIN GROUP (ORDER BY latency_ms) AS p99,  -- 99%的URL访问延迟小于多少  
        count(DISTINCT user) AS uniques,  -- 唯一用户数  
    count(*) total_visits  -- 总共访问次数  
  FROM  
    (SELECT   
        url(payload->>'url'),  -- 地址  
        payload->>'user' AS user,  -- 用户ID  
        (payload->>'latency')::float * 1000 AS latency_ms,  -- 访问延迟  
        arrival_timestamp  
    FROM logs_stream) AS unpacked  
WHERE arrival_timestamp > clock_timestamp() - interval '1 day'  
 GROUP BY url;  
  
CREATE CONTINUOUS VIEW user_stats AS  
    SELECT  
        day(arrival_timestamp),  
        payload->>'user' AS user,  
        sum(CASE WHEN payload->>'url' LIKE '%landing_page%' THEN 1 ELSE 0 END) AS landings,  
        sum(CASE WHEN payload->>'url' LIKE '%conversion%' THEN 1 ELSE 0 END) AS conversions,  
        count(DISTINCT url(payload->>'url')) AS unique_urls,  
        count(*) AS total_visits  
    FROM logs_stream GROUP BY payload->>'user', day;  
  
-- What are the top-10 most visited urls?  
SELECT url, total_visits FROM url_stats ORDER BY total_visits DESC limit 10;  
      url      | total_visits   
---------------+--------------  
 /page62/path4 |        10182  
 /page51/path4 |        10181  
 /page24/path5 |        10180  
 /page93/path3 |        10180  
 /page81/path0 |        10180  
 /page2/path5  |        10180  
 /page75/path2 |        10179  
 /page28/path3 |        10179  
 /page40/path2 |        10178  
 /page74/path0 |        10176  
(10 rows)  
  
  
-- What is the 99th percentile latency across all urls?  
SELECT combine(p99) FROM url_stats;  
     combine        
------------------  
 6.95410494731137  
(1 row)  
  
-- What is the average conversion rate each day for the last month?  
SELECT day, avg(conversions / landings) FROM user_stats GROUP BY day;  
          day           |            avg               
------------------------+----------------------------  
 2015-09-15 00:00:00-07 | 1.7455000000000000000000000  
(1 row)  
  
-- How many unique urls were visited each day for the last week?  
SELECT day, combine(unique_urls) FROM user_stats WHERE day > now() - interval '1 week' GROUP BY day;  
          day           | combine   
------------------------+---------  
 2015-09-15 00:00:00-07 |  100000  
(1 row)  
  
-- Is there a relationship between the number of unique urls visited and the highest conversion rates?  
SELECT unique_urls, sum(conversions) / sum(landings) AS conversion_rate FROM user_stats  
    GROUP BY unique_urls ORDER BY conversion_rate DESC LIMIT 10;  
 unique_urls |  conversion_rate    
-------------+-------------------  
          41 |  2.67121005785842  
          36 |  2.02713894173361  
          34 |  2.02034637010851  
          31 |  2.01958418072859  
          27 |  2.00045348712296  
          24 |  1.99714899522942  
          19 |  1.99438839453606  
          16 |  1.98083502184886  
          15 |  1.87983011139079  
          14 |  1.84906254929873  
(1 row)  

使用PipelineDB + kafka,应用场景又更丰富了,这是PG之前没有涉及的领域。

参考

1. 《基于PostgreSQL的流式PipelineDB, 1000万/s实时统计不是梦》

2. 《PostgreSQL aggregate function 3 : Aggregate Functions for Ordered-Set》

3. https://www.pipelinedb.com/blog/sql-on-kafka

4. 《PostgreSQL aggregate function 4 : Hypothetical-Set Aggregate Functions》

5. http://kafka.apache.org/07/quickstart.html

6. https://cwiki.apache.org/confluence/display/KAFKA/Powered+By

Flag Counter

digoal’s 大量PostgreSQL文章入口