PipelineDB 实践
默认分类
2020-01-15
690
0
目前公司采用Shell、Java、Storm等方式进行实时数据的处理。后两者开发成本较高,因此寻找新的替代方案,使得我们能够快速开发迭代实时需求。
基础使用
连接pipelinedb
# 设置密码环境变量
export PGPASSWORD='password'
# 登录PipelineDB
psql -h pg1922 -p 1922 -U username -d pipeline "$@"
命令帮助
# psql命令帮助
\?
# SQL命令帮助
\h
列出Database
\l 或 \l+
创建Schema
CREATE SCHEMA dw_bihell AUTHORIZATION username;
列出Schema
\dn 或 \dn+
切换Schema
SET search_path TO dw_bihell;
列出表、视图等
# 默认shema下的
\d 或 \d+
# 指定shema
\dp [PATTERN] 或 \z [PATTERN]
比如 \z dw_order.*
Streams
streams可以向Continuous Views推送数据,一个stream或称为事件(event),看着跟常规的表行一样,而且数据写入stream和写入表的接口完全相同,但是steam跟表有着根本的不同。
简单的说,存在于steam中的事件在被所有Continuous Views消费以后就会'消失',无法被用户通过select
语句查询到,即steam专门作为Continuous Views的数据输入源而存在。这跟Storm中的Spout很相似。
创建STREAM
本文实例将接收kafka的数据
-- 语法
CREATE STREAM stream_name ( [
{ column_name data_type [ COLLATE collation ] | LIKE parent_stream } [, ... ]
] )
-- 可以直接支持json数据
CREATE STREAM dw_bihell.rt_oreder_bihell (log json);
-- 或者直接接收文本(kafka发数据的时候根据分隔符分割行)
CREATE STREAM dw_bihell.rt_oreder_bihell (collect_date text,record_status integer,operate_type integer,update_mask integer,order_date text,bill_date text,order_id bigint,order_type)
STREAM 增加字段
ALTER STREAM stream ADD COLUMN x integer;
删除STREAM
DROP STREAM
查询已创建STREAM
SELECT * FROM pipeline_streams() ORDER BY schema;
创建Kafka配置
PipelineDB有个Kafka插件pipeline_kafka使得它可以获取kafka的消息。
添加kafak broker
pipeline_kafka.add_broker (<host>[:<port>])
查看pipeline_kafka配置信息
有个专门的pipeline_kafka schema可以看到目前配置的信息
SET search_path TO pipeline_kafka;
\d+
消费Kafka
语法:
pipeline_kafka.consume_begin ( topic text, stream text, format := ‘text’, delimiter := E’\t’, quote := NULL, escape := NULL, batchsize := 1000, maxbytes := 32000000, parallelism := 1, start_offset := NULL )
实例:
-- 直接传json
SELECT pipeline_kafka.consume_begin ('bihell', 'dw_bihell.rt_oreder_bihell', format := 'json', start_offset := -2);
-- 传文本,并指定分隔符
select pipeline_kafka.consume_begin ('ordermaster','dw_order.rt_stream_order_master_eb', format := 'text', delimiter := E'\u0001')
参考
0条评论