PostgreSQL 流计算指 pipelinedb 实时处理Kafka消息流
背景
Kafka是一个分布式的消息系统,应用范围广泛,例如用于消息管理,WEB站点的活跃度跟踪,监控数据,日志聚合,流处理,提交日志,等等。
具体可参考:
http://kafka.apache.org/documentation.html#gettingStarted
对于流数据记录的场景,我们可能会想到它可以替代原始的记录日志文件,或记录在数据库中的流水记录的应用场景。例如运营商的网关日志,互联网应用中的用户轨迹,网站的浏览轨迹,O2O应用中的用户或资产运动轨迹等。这些记录都是时间或空间的流水记录,你有可能会使用Kafka来存储和处理。
现在PipelineDB可以和Kafka完美结合,作为Kafka的一个实时的消息消费者,完成实时的数据统计,完全可以替代Storm,因为PipelineDB定义一个统计维度实在是太方便了,一条SQL搞定,并且处理能力可以达到每秒千万行记录。
下面是个例子:
在本地起一个nginx服务端,并且使用siege模拟HTTP请求,nginx将记录这些行为,存储为JSON格式到文件中。
在本地起kafka服务端,使用kafkacat将nginx的访问日志不断的push到kafka。
在pipelinedb中订阅kafka的消息,并实时处理为想要的统计信息,(WEB页面的访问人数,延迟,等信息)
安装kafka
http://kafka.apache.org/07/quickstart.html
# wget http://www.us.apache.org/dist/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz
# tar -zxvf kafka_2.10-0.8.2.2.tgz
# git clone https://github.com/edenhill/librdkafka.git
# cd librdkafka
./configure
make
make install
# git clone https://github.com/lloyd/yajl.git
# cd yajl
./configure
make
make install
# vi /etc/ld.so.conf
/usr/local/lib
# ldconfig
# git clone https://github.com/edenhill/kafkacat.git
# cd kafkacat
./configure
make
make install
安装siege和nginx
# yum install -y siege nginx
创建一个nginx配置文件,记录访问日志到/tmp/access.log,格式为json
cd /tmp
cat <<EOF > nginx.conf
worker_processes 4;
pid $PWD/nginx.pid;
events {}
http {
log_format json
'{'
'"ts": "\$time_iso8601", '
'"user_agent": "\$http_user_agent", '
'"url": "\$request_uri", '
'"latency": "\$request_time", '
'"user": "\$arg_user"'
'}';
access_log $PWD/access.log json;
error_log $PWD/error.log;
server {
location ~ ^/ {
return 200;
}
}
}
EOF
启动nginx
nginx -c $PWD/nginx.conf -p $PWD/
配置主机名
# hostname
digoal.org
# vi /etc/hosts
127.0.0.1 digoal.org
启动kafka
cd /opt/soft_bak/kafka_2.10-0.8.2.2
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
产生一个随机URL文件
for x in {0..1000000}; do echo "http://localhost/page$((RANDOM % 100))/path$((RANDOM % 10))?user=$((RANDOM % 100000))" >> urls.txt; done
使用siege模拟访问这些URL,nginx会产生访问日志到/tmp/access.log
siege -c32 -b -d0 -f urls.txt >/dev/null 2>&1
/tmp/access.log举例,格式为JSON
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page68/path7?user=18583", "latency": "0.002", "user": "18583"}
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page78/path0?user=24827", "latency": "0.003", "user": "24827"}
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page19/path6?user=3988", "latency": "0.003", "user": "3988"}
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page55/path2?user=18433", "latency": "0.003", "user": "18433"}
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page62/path3?user=10801", "latency": "0.001", "user": "10801"}
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page9/path2?user=4915", "latency": "0.001", "user": "4915"}
{"ts": "2015-10-21T11:21:48+08:00", "user_agent": "Mozilla/5.0 (redhat-x86_64-linux-gnu) Siege/3.0.8", "url": "/page10/path2?user=5367", "latency": "0.001", "user": "5367"}
将访问日志输出到kafkacat,推送到kafka消息系统,对应的topic为logs_topic。
( tail -f /tmp/access.log | kafkacat -b localhost:9092 -t logs_topic ) &
原始的消费方式如下:
# cd /opt/soft_bak/kafka_2.10-0.8.2.2
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic logs_topic --from-beginning
# Ctrl+C
接下来我们使用pipelinedb来实时消费这些消息,并转化为需要的统计结果。
CREATE EXTENSION pipeline_kafka;
SELECT kafka_add_broker('localhost:9092'); -- 添加一个kafka broker(kafka集群的一个节点)
CREATE STREAM logs_stream (payload json); -- 创建一个流映射到,kafka消息系统。
CREATE CONTINUOUS VIEW message_count AS SELECT COUNT(*) FROM logs_stream; -- 创建一个流视图,实时消费,处理kafka消息。
SELECT kafka_consume_begin('logs_topic', 'logs_stream'); -- 开始消费指定的topic,logs_topic,
kafka_consume_begin
------------------
success
(1 row)
查询流视图,可以获得当前NGINX的访问统计。
SELECT * FROM message_count;
count
--------
24
(1 row)
SELECT * FROM message_count;
count
--------
36
success
(1 row)
接下来做一个更深入的实时分析,分析每个URL的访问次数,用户数,99%用户的访问延迟低于多少。
/*
* This function will strip away any query parameters from each url,
* as we're not interested in them.
*/
CREATE FUNCTION url(raw text, regex text DEFAULT '\?.*', replace text DEFAULT '')
RETURNS text
AS 'textregexreplace_noopt' -- textregexreplace_noopt@src/backend/utils/adt/regexp.c
LANGUAGE internal;
CREATE CONTINUOUS VIEW url_stats AS
SELECT
url, -- url地址
percentile_cont(0.99) WITHIN GROUP (ORDER BY latency_ms) AS p99, -- 99%的URL访问延迟小于多少
count(DISTINCT user) AS uniques, -- 唯一用户数
count(*) total_visits -- 总共访问次数
FROM
(SELECT
url(payload->>'url'), -- 地址
payload->>'user' AS user, -- 用户ID
(payload->>'latency')::float * 1000 AS latency_ms, -- 访问延迟
arrival_timestamp
FROM logs_stream) AS unpacked
WHERE arrival_timestamp > clock_timestamp() - interval '1 day'
GROUP BY url;
CREATE CONTINUOUS VIEW user_stats AS
SELECT
day(arrival_timestamp),
payload->>'user' AS user,
sum(CASE WHEN payload->>'url' LIKE '%landing_page%' THEN 1 ELSE 0 END) AS landings,
sum(CASE WHEN payload->>'url' LIKE '%conversion%' THEN 1 ELSE 0 END) AS conversions,
count(DISTINCT url(payload->>'url')) AS unique_urls,
count(*) AS total_visits
FROM logs_stream GROUP BY payload->>'user', day;
-- What are the top-10 most visited urls?
SELECT url, total_visits FROM url_stats ORDER BY total_visits DESC limit 10;
url | total_visits
---------------+--------------
/page62/path4 | 10182
/page51/path4 | 10181
/page24/path5 | 10180
/page93/path3 | 10180
/page81/path0 | 10180
/page2/path5 | 10180
/page75/path2 | 10179
/page28/path3 | 10179
/page40/path2 | 10178
/page74/path0 | 10176
(10 rows)
-- What is the 99th percentile latency across all urls?
SELECT combine(p99) FROM url_stats;
combine
------------------
6.95410494731137
(1 row)
-- What is the average conversion rate each day for the last month?
SELECT day, avg(conversions / landings) FROM user_stats GROUP BY day;
day | avg
------------------------+----------------------------
2015-09-15 00:00:00-07 | 1.7455000000000000000000000
(1 row)
-- How many unique urls were visited each day for the last week?
SELECT day, combine(unique_urls) FROM user_stats WHERE day > now() - interval '1 week' GROUP BY day;
day | combine
------------------------+---------
2015-09-15 00:00:00-07 | 100000
(1 row)
-- Is there a relationship between the number of unique urls visited and the highest conversion rates?
SELECT unique_urls, sum(conversions) / sum(landings) AS conversion_rate FROM user_stats
GROUP BY unique_urls ORDER BY conversion_rate DESC LIMIT 10;
unique_urls | conversion_rate
-------------+-------------------
41 | 2.67121005785842
36 | 2.02713894173361
34 | 2.02034637010851
31 | 2.01958418072859
27 | 2.00045348712296
24 | 1.99714899522942
19 | 1.99438839453606
16 | 1.98083502184886
15 | 1.87983011139079
14 | 1.84906254929873
(1 row)
使用PipelineDB + kafka,应用场景又更丰富了,这是PG之前没有涉及的领域。
参考
1. 《基于PostgreSQL的流式PipelineDB, 1000万/s实时统计不是梦》
2. 《PostgreSQL aggregate function 3 : Aggregate Functions for Ordered-Set》
3. https://www.pipelinedb.com/blog/sql-on-kafka
4. 《PostgreSQL aggregate function 4 : Hypothetical-Set Aggregate Functions》
5. http://kafka.apache.org/07/quickstart.html
6. https://cwiki.apache.org/confluence/display/KAFKA/Powered+By