PostgreSQL , 实时推荐系统 , 用户画像 , 标签 , TAG , 比特位
查询需求: 包含,不包含,或,与。
并发几百,RT 毫秒级。
那么可以使用APPID+USERID作为PK,存储为多个表来实现无限个TAG的需求。 以单表1000个列为例,10个表就能满足1万个TAG的需求。
create table t_tags_1(appid int, userid int8, tag1 boolean, tag2 boolean, ...... tag1000 boolean); .....create table t_tags_10(appid int, userid int8, tag9001 boolean, tag9002 boolean, ...... tag10000 boolean);
1. 优点
2. 缺点
性能不一定好,特别是查询组合条件多的话,性能会下降比较明显,例如(tag1 and tag2 and (tag4 or tag5) or not tag6) 。
1. 数据结构
2. 按APPID分区,随机分片
3. query语法
3.1 包含array2指定的所有TAG
array1 @> array2
3.2 包含array2指定的TAG之一
array1 && array2
3.3 不包含array2指定的所有tag
not array1 && array2
4. 例子
create table t_arr(appid int, userid int8, tags int2[]) with(parallel_workers=128);create index idx_t_array_tags on t_arr using gin (tags) with (fastupdate=on, gin_pending_list_limit= 1024000000);create index idx_t_arr_uid on t_arr(userid);819200KB约缓冲10000条80K的数组记录,可以自行调整.
1.每个USERID包含10000个TAG(极限)。insert into t_arr select 1, 2000000000*random(),(select array_agg(10000*random()) from generate_series(1,10000));nohup pgbench -M prepared -n -r -f ./test.sql -P 1 -c 50 -j 50 -t 2000000 > ./arr.log 2>&1 &
5. 优点
6. 缺点
1. 数据结构
2. 按APPID分区,随机分片
3. query语法
3.1 包含bit2指定的所有TAG(需要包含的TAG对应的BIT设置为1,其他为0)
bitand(bit1,bit2) = bit2
3.2 包含bit2指定的TAG之一(需要包含的TAG对应的BIT设置为1,其他为0)
bitand(bit1,bit2) > 0
3.3 不包含bit2指定的所有tag (需要包含的TAG对应的BIT设置为1,其他为0)
bitand(bit1,bit2) = zerobit(10000)
4. 例子
create table t_bit(appid int, userid int8, tags varbit) ; create index idx_t_bit_uid on t_bit(userid);
每个USERID对应10000位随机的比特值date;for ((i=1;i<=50;i++)); do psql -c "insert into t_bit select 1, 2000000000*random(), \(select (string_agg(mod((2*random())::int,2)::text,''))::varbit from generate_series(1,10000)) tags \from generate_series(1,2000000)" ; done; date127GB, 每秒插入24.5万, 326MB/s
插入速度(批量)每秒插入24.5万, 326MB/s更新、删除 tag速度create or replace function randbit(int) returns varbit as $$ select (string_agg(mod((2*random())::int,2)::text,''))::varbit from generate_series(1,$1);$$ language sql strict volatile;create or replace function zerobit(int) returns varbit as $$ select (string_agg('0',''))::varbit from generate_series(1,$1);$$ language sql strict immutable;update t_bit set tags=randbit(10000) where userid=:id;每秒更新、删除1万 记录,响应时间约4毫秒查询速度do language plpgsql $$declare sql text; bit1 varbit := randbit(10000); bit2 varbit := randbit(10000); bit3 varbit := randbit(10000); zbit varbit := zerobit(10000);begin set max_parallel_workers_per_gather =27; sql := 'select * from t_bit where bitand(tags,'''||bit1::text||''')='''||bit1::text||''' and bitand(tags,'''||bit2::text||''')>bit''0'' and bitand(tags,'''||bit3::text||''')='''||zbit::text||''''; raise notice '%', sql; -- execute sql;end;$$;开27个并行, 17秒。
5. 优点
6. 缺点
PostgreSQL 9.6 支持CPU并行计算,1亿用户时,可以满足20秒内返回,但是会消耗很多的CPU资源,因此查询的并行度不能做到很高。
data -> 明细表 -> 增量聚合 -> appid, tagid, userid_bits
1. query
1.1 包含这些tags的用户
userids (bitand) userids
1.2 不包含这些tags的用户
userids (bitor) userids
1.3 包含这些tags之一的用户
userids (bitor) userids
2. 优点
3. 缺点
本文会用到几个新增的FUNCTION,这几个function很有用,同时会加入阿里云的RDS PostgreSQL中。
get_bit (varbit, int, int) returns varbit 从指定位置开始获取N个BIT位,返回varbit 例如 get_bit('111110000011', 3, 5) 返回11000set_bit_array (varbit, int, int, int[]) returns varbit 将指定位置的BIT设置为0|1,超出原始长度的部分填充0|1 例如 set_bit_array('111100001111', 0, 1, array[1,15]) 返回 1011000011111110bit_count (varbit, int, int, int) returns int 从第n位开始,统计N个BIT位中有多少个0|1,如果N超出长度,则只计算已经存在的。 例如 bit_count('1111000011110000', 1, 5, 4) 返回 1 (0001)bit_count (varbit, int) returns int 统计整个bit string中1|0的个数。 例如 bit_count('1111000011110000', 1) 返回 8bit_fill (int, int) returns varbit 填充指定长度的0 或 1 例如 bit_fill(0,10) 返回 '0000000000'bit_rand (int, int, float) returns varbit 填充指定长度的随机BIT,并指定1或0的随机比例 例如 bit_rand(10, 1, 0.3) 可能返回 '0101000001' bit_posite (varbit, int, boolean) returns int[] 返回 1|0 的位置信息,下标从0开始计数, true时正向返回,false时反向返回 例如 bit_posite ('11110010011', 1, true) 返回 [0,1,2,3,6,9,10] bit_posite ('11110010011', 1, false) 返回 [10,9,6,3,2,1,0]bit_posite (varbit, int, int, boolean) returns int[] 返回 1|0 的位置信息,下标从0开始计数,true时正向返回,false时反向返回,返回N个为止 例如 bit_posite ('11110010011', 1, 3, true) 返回 [0,1,2] bit_posite ('11110010011', 1, 3, false) 返回 [10,9,6] get_bit_2 (varbit, int, int) returns int 返回指定位置的bit, 下标从0开始,如果超出BIT位置,返回指定的0或1. 例如 get_bit_2('111110000011', 100, 0) 返回 0 (100已经超出长度,返回用户指定的0)
userid int8表示,可以超过40亿。
rowid int表示,也就是说单个APPID不能允许超过20亿的用户,从0开始自增,配合BIT下标的表示。
appid int表示,不会超过40亿个。
1. 字典表, rowid决定MAP顺序,使用窗口查询返回顺序。
drop table IF EXISTS t_userid_dic;create table IF NOT EXISTS t_userid_dic(appid int not null, rowid int not null, userid int8 not null, unique (appid,userid), unique (appid,rowid));
create or replace function f_uniq(i_appid int, i_userid int8) returns int as $$declare newid int; i int := 0; res int; stack1 text; stack2 text; stack3 text; stack4 text; stack5 text; stack6 text; stack7 text; stack8 text; stack9 text; stack10 text;begin loop if i>0 then perform pg_sleep(random()); else i := i+1; end if; -- 获取已有的最大ID+1 (即将插入的ID) select max(rowid)+1 into newid from t_userid_dic where appid=i_appid; if newid is not null then -- 获取AD LOCK (乘以appid, 为了避免不同ID的冲突, 算法可以再改进) if pg_try_advisory_xact_lock(i_appid::int8 * newid) then -- 插入 insert into t_userid_dic (appid, rowid, userid) values (i_appid, newid, i_userid); -- 返回此次获取到的UID return newid; else -- 没有获取到AD LOCK则继续循环 continue; end if; else -- 表示这是第一条记录,获取AD=0 的LOCK if pg_try_advisory_xact_lock(0 * i_appid::int8) then insert into t_userid_dic (appid, rowid, userid) values (i_appid, 0, i_userid); return 0; else continue; end if; end if; end loop; exception -- 只忽略违反唯一约束的错误,其他错误继续报 when SQLSTATE '23505' then /* -- 参考 https://www.postgresql.org/docs/9.6/static/plpgsql-control-structures.html#PLPGSQL-ERROR-TRAPPING GET STACKED DIAGNOSTICS stack1 = RETURNED_SQLSTATE, stack2 = COLUMN_NAME, stack3 = CONSTRAINT_NAME, stack4 = PG_DATATYPE_NAME, stack5 = MESSAGE_TEXT, stack6 = TABLE_NAME, stack7 = SCHEMA_NAME, stack8 = PG_EXCEPTION_DETAIL, stack9 = PG_EXCEPTION_HINT, stack10 = PG_EXCEPTION_CONTEXT; -- 如果因为瞬态导致PK冲突了,继续调用(注意如果appid+userid重复,会无限循环,所以不建议使用) -- select f_uniq(i_appid, i_userid) into res; raise notice 'RETURNED_SQLSTATE, %', stack1; raise notice 'COLUMN_NAME, %', stack2; raise notice 'CONSTRAINT_NAME, %', stack3; raise notice 'PG_DATATYPE_NAME, %', stack4; raise notice 'MESSAGE_TEXT, %', stack5; raise notice 'TABLE_NAME, %', stack6; raise notice 'SCHEMA_NAME, %', stack7; raise notice 'PG_EXCEPTION_DETAIL, %', stack8; raise notice 'PG_EXCEPTION_HINT, %', stack9; raise notice 'PG_EXCEPTION_CONTEXT, %', stack10; -- 没有插入成功,返回NULL */ return null;end;$$ language plpgsql strict;
压测以上函数是否能无缝插入,压测时raise notice可以去掉。
$ vi test.sql\set appid random(1,1000)\set userid random(1,2000000000)select f_uniq(:appid, :userid);164个并发 $ pgbench -M prepared -n -r -P 1 -f ./test.sql -c 164 -j 164 -T 10progress: 10.0 s, 85720.5 tps, lat 1.979 ms stddev 34.808transaction type: ./test.sqlscaling factor: 1query mode: preparednumber of clients: 164number of threads: 164duration: 10 snumber of transactions actually processed: 827654latency average = 2.039 mslatency stddev = 35.674 mstps = 75435.422933 (including connections establishing)tps = 75483.813182 (excluding connections establishing)script statistics: - statement latencies in milliseconds: 0.001 \set appid random(1,1000) 0.000 \set userid random(1,2000000000) 2.034 select f_uniq(:appid, :userid);
postgres=# select count(*) from t_userid_dic ; count -------- 825815(1 row)postgres=# select appid,count(*),max(rowid) from t_userid_dic group by 1; appid | count | max -------+-------+----- 251 | 857 | 857 106 | 854 | 854 681 | 816 | 816 120 | 826 | 826 285 | 815 | 815 866 | 778 | 778 264 | 873 | 873......postgres=# select * from (select appid,count(*),max(rowid) from t_userid_dic group by 1) t where max<>count; appid | count | max -------+-------+-----(0 rows)
生成1亿测试用户,APPID=1, 用于后面的测试
truncate t_userid_dic;insert into t_userid_dic select 1, generate_series(1,100000000), generate_series(1,100000000);
2. 实时变更表
drop table IF EXISTS t_user_tags;create table IF NOT EXISTS t_user_tags(id serial8 primary key, appid int, userid int8, tag int, ins boolean, dic boolean default false); create index idx_t_user_tags_id on t_user_tags(id) where dic is false;-- ins = true表示新增tag, =false 表示删除tag。 -- dic = true表示该记录设计的USERID已经合并到用户字典表 -- 这张表的设计有优化余地,例如最好切换使用,以清理数据,比如每天一张,保存31天。
生成1.5千万测试数据(APPID=1 , USERID 总量20亿,随机产生, 新增tagid 范围1-10000, 删除tagid 范围1-1000)
insert into t_user_tags (appid,userid,tag,ins) select 1, 2000000000*random(),10000*random(),true from generate_series(1,10000000);insert into t_user_tags (appid,userid,tag,ins) select 1, 2000000000*random(),5000*random(),false from generate_series(1,5000000);
3. tag + userids bitmap 表,这个是最关键的表,查询量很大,从t_user_tags增量合并进这个表。
drop table IF EXISTS t_tags;create table IF NOT EXISTS t_tags(id serial primary key, appid int, tag int, userids varbit);-- id可以用于advisory lock, 并行更新create unique index idx_t_tags_uk on t_tags(tag,appid);
-- 生成随机bit CREATE OR REPLACE FUNCTION public.randbit(integer) RETURNS bit varying LANGUAGE plpgsql STRICTAS $function$declare res varbit;begin select (string_agg(mod((2*random())::int,2)::text,''))::varbit into res from generate_series(1,$1); if res is not null then return res; else return ''::varbit; end if;end;$function$create sequence seq;-- 并行插入10000条记录$ vi test.sqlinsert into t_tags(appid,tag,userids) select 1,nextval('seq'::regclass),randbit(100000000);$ pgbench -M simple -n -r -f ./test.sql -c 50 -j 50 -t 200
测试方法很简单: 包含所有,不包含,包含任意。
1. 包含以下TAG的用户ID
userids (bitand) userids
-- 获取最大BIT的长度 with tmp as ( select max(bit_length(userids)) maxlen from t_tags where tag in (?,?,...) ) select appid,userid from t_userid_dic, tmp where appid = ? and rowid = any ( ( -- 正向取出1万个bit=1的用户, 如果要反向取,使用false(表示取最近的用户数据) -- rowid从0开始计数,刚好与bit下标起始位置匹配 select bit_posite(res, 1, 10000, true) from ( select t1.userids & t2.userids & t3.userids & t4.userids AS res -- & ...... from -- 根据最大长度, BIT补齐 ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t1 , ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t2 , ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t3 , ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t4 -- ...... ) t )::int[]);
with tmp as ( select max(bit_length(userids)) maxlen from t_tags where tag in (226833, 226830, 226836, 226834) ) select appid,userid from t_userid_dic, tmp where appid = 1 and rowid = any ( ( select bit_posite(res, 1, 10000, true) from ( select t1.userids & t2.userids & t3.userids & t4.userids as res from ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226833 ) t1 , ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226830 ) t2 , ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226836 ) t3 , ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226834 ) t4 ) t )::int[]);
2. 不包含以下TAG的用户
userids (bitor) userids
-- 获取最大BIT的长度 with tmp as ( select max(bit_length(userids)) maxlen from t_tags where tag in (?,?,...) ) select appid,userid from t_userid_dic, tmp where appid = ? and rowid = any ( ( -- 正向取出1万个bit=0的用户, 如果要反向取,使用false(表示取最近的用户数据) select bit_posite(res, 0, 10000, true) from ( select t1.userids | t2.userids | t3.userids | t4.userids AS res -- | ...... from -- 根据最大长度, BIT补齐 ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t1 , ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t2 , ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t3 , ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t4 -- ...... ) t )::int[]);
with tmp as ( select max(bit_length(userids)) maxlen from t_tags where tag in (226833, 226830, 226836, 226834) ) select appid,userid from t_userid_dic, tmp where appid = 1 and rowid = any ( ( select bit_posite(res, 0, 10000, true) from ( select t1.userids | t2.userids | t3.userids | t4.userids as res from ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226833 ) t1 , ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226830 ) t2 , ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226836 ) t3 , ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226834 ) t4 ) t )::int[]);
3. 包含以下任意TAG
userids (bitor) userids
-- 获取最大BIT的长度 with tmp as ( select max(bit_length(userids)) maxlen from t_tags where tag in (?,?,...) ) select appid,userid from t_userid_dic, tmp where appid = ? and rowid = any ( ( -- 正向取出1万个bit=1的用户, 如果要反向取,使用false(表示取最近的用户数据) select bit_posite(res, 1, 10000, true) from ( select t1.userids | t2.userids | t3.userids | t4.userids AS res -- | ...... from -- 根据最大长度, BIT补齐 ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t1 , ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t2 , ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t3 , ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags where tag = ? ) t4 -- ...... ) t )::int[]);
with tmp as ( select max(bit_length(userids)) maxlen from t_tags where tag in (226833, 226830, 226836, 226834) ) select appid,userid from t_userid_dic, tmp where appid = 1 and rowid = any ( ( select bit_posite(res, 1, 10000, true) from ( select t1.userids | t2.userids | t3.userids | t4.userids as res from ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226833 ) t1 , ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226830 ) t2 , ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226836 ) t3 , ( select set_bit_array(userids, 0, 0, array[tmp.maxlen-1]) userids from t_tags,tmp where tag = 226834 ) t4 ) t )::int[]);
1. 结合bit_posite,可以实现正向取若干用户,反向取若干用户(例如有100万个结果,本次推广只要1万个用户,而且要最近新增的1万个用户,则反向取1万个用户即可)。
2. 结合get_bit则可以实现截取某一段BIT,再取得结果,很好用哦。
postgres=# \d+ t_user_tags Table "public.t_user_tags" Column | Type | Modifiers | Storage | Stats target | Description --------+---------+----------------------------------------------------------+---------+--------------+------------- id | bigint | not null default nextval('t_user_tags_id_seq'::regclass) | plain | | appid | integer | | plain | | userid | bigint | | plain | | tag | integer | | plain | | ins | boolean | | plain | | dic | boolean | default false | plain | | Indexes: "t_user_tags_pkey" PRIMARY KEY, btree (id) "idx_t_user_tags_id" btree (id) WHERE dic IS FALSE
$ vi test.sql\set appid random(1,1000) \set userid random(1,2000000000) \set new_tag random(1,10000) \set old_tag random(8001,10000) insert into t_user_tags (appid,userid,tag,ins) values (:appid, :userid, :new_tag, true); insert into t_user_tags (appid,userid,tag,ins) values (:appid, :userid, :old_tag, false); $ pgbench -M prepared -n -r -P 1 -f ./test.sql -c 128 -j 128 -T 120postgres=# select count(*) from t_user_tags; count ---------- 14721724(1 row)
transaction type: ./test.sqlscaling factor: 1query mode: preparednumber of clients: 128number of threads: 128duration: 120 snumber of transactions actually processed: 7360862latency average = 2.085 mslatency stddev = 1.678 mstps = 61326.338528 (including connections establishing)tps = 61329.196790 (excluding connections establishing)script statistics: - statement latencies in milliseconds: 0.001 \set appid random(1,1000) 0.000 \set userid random(1,2000000000) 0.000 \set new_tag random(1,10000) 0.000 \set old_tag random(8001,10000) 1.042 insert into t_user_tags (appid,userid,tag,ins) values (:appid, :userid, :new_tag, true); 1.037 insert into t_user_tags (appid,userid,tag,ins) values (:appid, :userid, :old_tag, false);
1. 更新用户字典表t_userid_dic,
2. 批量获取并删除t_user_tags的记录,
3. 合并标签数据到t_tags。
create or replace function update_dict(v_rows int) returns void as $$declare min_id int; -- 边界id f_uniq_res int8;begin if v_rows<1 then raise notice 'v_rows must >=1'; return; end if; -- 查找APPID, tag对应最小边界, 没有记录直接返回 select min(id) into min_id from t_user_tags where dic=false; if not found then raise notice 'no data'; return; end if; -- 插入本次需要处理的临时数据到数组 -- f_uniq失败也不会报错,这里需要修改一下f_uniq仅对UK冲突不处理,其他错误还是需要处理的,否则t_user_tags改了,但是USER可能没有进入字典。 with tmp as (update t_user_tags t set dic=true where id>=min_id and id<=min_id+v_rows returning *) select count(*) into f_uniq_res from (select f_uniq(appid,userid) from (select appid,userid from tmp group by 1,2) t) t;end;$$ language plpgsql;
由于批量操作,可能申请大量的ad lock, 所以需要增加max_locks_per_transaction, 数据库参数调整
postgres=# select update_dict(200000); update_dict -------------(1 row)Time: 8986.175 ms多执行几次 Time: 9395.991 msTime: 10798.631 msTime: 10726.547 msTime: 10620.055 ms每秒约处理2万 验证字典更新是否准确 postgres=# select count(*) from t_userid_dic ; count --------- 1399501(1 row)Time: 110.656 mspostgres=# select count(*) from (select appid,userid from t_user_tags where dic=true group by 1,2) t; count --------- 1399501(1 row)Time: 2721.264 mspostgres=# select * from t_userid_dic order by appid,rowid limit 10; appid | rowid | userid -------+-------+------------ 1 | 0 | 1802787010 1 | 1 | 1342147584 1 | 2 | 1560458710 1 | 3 | 1478701081 1 | 4 | 1826138023 1 | 5 | 182295180 1 | 6 | 1736227913 1 | 7 | 512247294 1 | 8 | 686842950 1 | 9 | 1940486738(10 rows)postgres=# select min(rowid),max(rowid),count(*),appid from t_userid_dic group by appid; min | max | count | appid -----+---------+---------+------- 1 | 1399501 | 1399501 | 1(1 row)Time: 369.562 ms
$ vi test.sqlselect update_dict(1000);$ pgbench -M prepared -n -r -P 1 -f ./test.sql -c 64 -j 64 -T 100
postgres=# select count(*) from t_userid_dic ; count --------- 1533534(1 row)postgres=# select count(*) from (select appid,userid from t_user_tags where dic=true group by 1,2) t; count --------- 1533534(1 row)postgres=# select * from t_userid_dic order by appid,rowid limit 10; appid | rowid | userid -------+-------+------------ 1 | 0 | 1802787010 1 | 1 | 1342147584 1 | 2 | 1560458710 1 | 3 | 1478701081 1 | 4 | 1826138023 1 | 5 | 182295180 1 | 6 | 1736227913 1 | 7 | 512247294 1 | 8 | 686842950 1 | 9 | 1940486738(10 rows)postgres=# select min(rowid),max(rowid),count(*),appid from t_userid_dic group by appid; min | max | count | appid -----+---------+---------+------- 1 | 1533534 | 1533534 | 1(1 row)
-- 不要对同一个APPID并行使用APPID与APPID+tag的模式.
create or replace function merge_tags( v_appid int, -- 输入本次要处理的appid v_tag int, -- 输入本次要处理的tags , 输入null 则处理该APPID的所有tag, 注意不要使用strict function, 否则遇到NULL参数会直接返回NULL v_rows int -- 输入要本次处理多少条记录,即合并量 ) returns void as $$declare min_id int; -- 边界id ad_lockid int8; -- 可以有效避免不同的APPID在并行操作时冲突 -- 循环 i_tag int; i_userid_del int8[]; i_userid_add int8[]; -- 将userid[]转换为ROWID[] i_rowid_del int[]; i_rowid_add int[]; -- 临时存储, 用于debug i_userids_del varbit := '0'::varbit; i_userids_add varbit := '0'::varbit;begin -- 计算ad_lockid if v_tag is not null then ad_lockid := (v_appid+1)^2 + (v_tag+1)^2; else ad_lockid := (v_appid+1)^2; end if; -- 获取并行保护锁 if not pg_try_advisory_xact_lock(ad_lockid) then raise notice 'cann''t parallel merge same tag with ad_lockid: %', ad_lockid; return; end if; -- 处理行数 if v_rows<1 then raise notice 'v_rows must >=1'; return; end if; -- 判断是否处理单个tag if v_tag is not null then -- raise notice 'v_tag: %', v_tag; -- 处理单个tag -- 查找APPID, tag对应最小边界, 没有记录直接返回 select min(id) into min_id from t_user_tags where dic=true and appid=v_appid and tag=v_tag; if not found then raise notice 'no data for appid:% , tag:% ', v_appid, v_tag; return; end if; -- 获取实时数据,删除实时数据,聚合实时数据,合并到TAGS -- 建议RDS PostgreSQL新增delete|update limit语法,抛弃with id>=min_id for update -- 这里有个BUG, 合并没有考虑到TAG的新增或删除的先后顺序,例如某APPID的某用户新增了一个TAG,然后又把这个TAG删除了。最后合并的结果会变成有这个TAG。 -- 使用window函数可以解决这个问题, 将array_agg这个QUERY改掉, 在聚合前,先合并数据,使用ins, row_number() over (partition by appid,userid,tag order by id desc) rn .... where rn=1, 每一个appid,userid,tag取最后一个ins的值。 for i_tag, i_userid_del, i_userid_add in with tmp as (select * from t_user_tags t where dic=true and appid=v_appid and tag=v_tag and id>=min_id order by id limit v_rows for update), tmp1 as (delete from t_user_tags t where exists (select 1 from tmp where tmp.id=t.id)) select tag, array_agg(case when not ins then userid else null end) userid_del, array_agg(case when ins then userid else null end) userid_add from tmp group by tag loop -- 获取ROWID execute format('select coalesce(array_agg(rowid), array[]::int[]) from t_userid_dic where appid=%L and userid = any (%L)', v_appid, array_remove(i_userid_del, null) ) into i_rowid_del; execute format('select coalesce(array_agg(rowid), array[]::int[]) from t_userid_dic where appid=%L and userid = any (%L)', v_appid, array_remove(i_userid_add, null) ) into i_rowid_add; -- 判断TAG是否存在, 存在则更新,否则插入 perform 1 from t_tags where appid=v_appid and tag=i_tag; if found then update t_tags set userids = set_bit_array( set_bit_array(userids, 0, 0, i_rowid_del), 1, 0, i_rowid_add )::text::varbit where appid=v_appid and tag=i_tag; else insert into t_tags(appid, tag, userids) values (v_appid, i_tag, set_bit_array( set_bit_array('0'::varbit, 0, 0, i_rowid_del), 1, 0, i_rowid_add )::text::varbit); end if; end loop; else -- 处理所有tag -- 查找APPID最小边界, 没有记录直接返回 select min(id) into min_id from t_user_tags where dic=true and appid=v_appid; if not found then raise notice 'no data for appid:%', v_appid; return; end if; -- 获取实时数据,删除实时数据,聚合实时数据,合并到TAGS -- 这里有个BUG, 合并没有考虑到TAG的新增或删除的先后顺序,例如某APPID的某用户新增了一个TAG,然后又把这个TAG删除了。最后合并的结果会变成有这个TAG。 -- 使用window函数可以解决这个问题, 将array_agg这个QUERY改掉, 在聚合前, 先合并数据, 使用ins, row_number() over (partition by appid,userid,tag order by id desc) rn .... where rn=1, 每一个appid,userid,tag取最后一个ins的值。 for i_tag, i_userid_del, i_userid_add in with tmp as (select * from t_user_tags t where dic=true and appid=v_appid and id>=min_id order by id limit v_rows for update), tmp1 as (delete from t_user_tags t where exists (select 1 from tmp where tmp.id=t.id)) select tag, array_agg(case when not ins then userid else null end) userid_del, array_agg(case when ins then userid else null end) userid_add from tmp group by tag loop execute format('select coalesce(array_agg(rowid), array[]::int[]) from t_userid_dic where appid=%L and userid = any (%L)', v_appid, array_remove(i_userid_del, null) ) into i_rowid_del; execute format('select coalesce(array_agg(rowid), array[]::int[]) from t_userid_dic where appid=%L and userid = any (%L)', v_appid, array_remove(i_userid_add, null) ) into i_rowid_add; -- 判断TAG是否存在, 存在则更新,否则插入 perform 1 from t_tags where appid=v_appid and tag=i_tag; if found then update t_tags set userids = set_bit_array( set_bit_array(userids, 0, 0, i_rowid_del), 1, 0, i_rowid_add )::text::varbit where appid=v_appid and tag=i_tag; else insert into t_tags(appid, tag, userids) values (v_appid, i_tag, set_bit_array( set_bit_array('0'::varbit, 0, 0, i_rowid_del), 1, 0, i_rowid_add )::text::varbit); end if; end loop; end if;end;$$ language plpgsql;-- 不要使用strict
$ vi test.sql\set tag random(1,10000)select merge_tags(1,:tag,10000);$ pgbench -M prepared -n -r -P 1 -f ./test.sql -c 128 -j 128 -T 100每秒约合并15万记录。
truncate t_tags;select update_dict(100000);create table check_merge1 as select tag,count(*) cnt from (select tag,userid from t_user_tags where dic=true order by id limit 10000) t group by tag;select merge_tags(1,null,10000);。。。。。。select merge_tags(1,null,10000);create table check_merge2 as select tag, count_bit(userids,1) cnt from t_tags; postgres=# select t1.*,t2.* from check_merge1 t1 full outer join check_merge2 t2 on (t1.tag=t2.tag and t1.cnt=t2.cnt) where t1.* is null or t2.* is null; tag | cnt | tag | cnt -----+-----+-----+-----(0 rows)Time: 5.133 ms如果有结果说明合并有问题。
找到userid对应的rowid, 根据userids rowid位置的bit,判断是否有该tag.
select tag from (select tag, get_bit(t1.userids, t2.rowid-1) bt from t_tags t1, (select rowid from t_userid_dic where userid=?) t2) t where bt=1;例子,注意对齐(或者改造get_bit函数,支持没有BIT位的操作)postgres=# \set FETCH_COUNT 1postgres=# select tag from (select tag, get_bit(t1.userids, t2.rowid-1) bt from t_tags t1, (select rowid from t_userid_dic where userid=100000) t2) t where bt=1; tag -------- 226813 226824 226818 226810 226782 226790 226792 226787 226803 226826(10 rows)Time: 152.636 ms
并行相关参数配置 postgres=# show parallel_tuple_cost; parallel_tuple_cost --------------------- 0postgres=# show parallel_setup_cost ; parallel_setup_cost --------------------- 0postgres=# show max_parallel_workers_per_gather ; max_parallel_workers_per_gather --------------------------------- 27postgres=# show max_worker_processes ; max_worker_processes ---------------------- 128postgres=# show force_parallel_mode ; force_parallel_mode --------------------- onpostgres=# alter table t_tags set (parallel_workers=27);ALTER TABLE返回用户数组,并行化之后,每个tag处理约耗时0.76毫秒。 如果用游标返回,可以快速得到第一个用户。 postgres=# explain (analyze,verbose,costs,buffers,timing) select array_agg(tag) from t_tags where get_bit(userids,10000)=1; QUERY PLAN --------------------------------------------------------------------------------------------------------------------- Aggregate (cost=1047.68..1047.69 rows=1 width=32) (actual time=7176.745..7176.745 rows=1 loops=1) Output: array_agg(tag) Buffers: shared hit=15912565 -> Gather (cost=0.00..1047.55 rows=50 width=4) (actual time=8.940..7175.486 rows=4957 loops=1) Output: tag Workers Planned: 27 Workers Launched: 27 Buffers: shared hit=15912565 -> Parallel Seq Scan on public.t_tags (cost=0.00..1047.55 rows=2 width=4) (actual time=51.974..6023.333 rows=177 loops=28) Output: tag Filter: (get_bit((t_tags.userids)::"bit", 10000) = 1) Rows Removed by Filter: 180 Buffers: shared hit=15909973 Worker 0: actual time=24.376..5791.799 rows=158 loops=1 Buffers: shared hit=528366 Worker 1: actual time=23.143..6749.264 rows=198 loops=1 Buffers: shared hit=632954 Worker 2: actual time=55.201..6400.872 rows=186 loops=1 Buffers: shared hit=604388 Worker 3: actual time=23.456..5351.070 rows=152 loops=1 Buffers: shared hit=482151 Worker 4: actual time=71.248..6179.161 rows=181 loops=1 Buffers: shared hit=580237 Worker 5: actual time=124.402..5395.424 rows=140 loops=1 Buffers: shared hit=493010 Worker 6: actual time=111.926..6013.077 rows=168 loops=1 Buffers: shared hit=553851 Worker 7: actual time=24.483..7170.148 rows=202 loops=1 Buffers: shared hit=677578 Worker 8: actual time=23.284..5448.081 rows=139 loops=1 Buffers: shared hit=487985 Worker 9: actual time=54.677..7057.927 rows=233 loops=1 Buffers: shared hit=666715 Worker 10: actual time=73.070..6615.151 rows=177 loops=1 Buffers: shared hit=622393 Worker 11: actual time=25.978..5977.110 rows=182 loops=1 Buffers: shared hit=552329 Worker 12: actual time=22.975..5366.569 rows=150 loops=1 Buffers: shared hit=480447 Worker 13: actual time=76.756..6940.743 rows=201 loops=1 Buffers: shared hit=655799 Worker 14: actual time=54.590..5362.862 rows=161 loops=1 Buffers: shared hit=482488 Worker 15: actual time=106.099..5454.446 rows=153 loops=1 Buffers: shared hit=494638 Worker 16: actual time=53.649..6048.233 rows=165 loops=1 Buffers: shared hit=553771 Worker 17: actual time=23.089..5810.984 rows=160 loops=1 Buffers: shared hit=532711 Worker 18: actual time=55.039..5981.338 rows=165 loops=1 Buffers: shared hit=542380 Worker 19: actual time=24.163..6187.498 rows=182 loops=1 Buffers: shared hit=571046 Worker 20: actual time=23.965..6119.395 rows=194 loops=1 Buffers: shared hit=566214 Worker 21: actual time=106.038..6238.629 rows=187 loops=1 Buffers: shared hit=582724 Worker 22: actual time=54.568..6488.311 rows=183 loops=1 Buffers: shared hit=613989 Worker 23: actual time=24.021..5368.295 rows=152 loops=1 Buffers: shared hit=488385 Worker 24: actual time=53.327..5658.396 rows=178 loops=1 Buffers: shared hit=515591 Worker 25: actual time=23.201..5358.615 rows=142 loops=1 Buffers: shared hit=483975 Worker 26: actual time=109.940..5560.662 rows=163 loops=1 Buffers: shared hit=505844 Planning time: 0.081 ms Execution time: 7637.509 ms(69 rows)Time: 7638.100 mspostgres=# select tag from t_tags where get_bit(userids,10000)=1; tag -------- 226813 226824Cancel request sentERROR: canceling statement due to user requestTime: 17.521 msprofile,get_bit的瓶颈在memcpy,这个可以通过优化PG内核改进。 3647.00 34.2% memcpy /lib64/libc-2.12.so
如果一个APPID 1万个TAG,1亿用户,只占用120GB。
参考 MADlib、R ,聚类分析 PostgreSQL都可以非常好的支持。