博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
PostgreSQL 高并发任务分配系统 实践
阅读量:6314 次
发布时间:2019-06-22

本文共 3978 字,大约阅读时间需要 13 分钟。

标签

PostgreSQL , 高并发消费 , pg_try_advisory_xact_lock , 秒杀 , 任务分配


背景

给任务分配线程ID,或让线程去抢占任务执行,是任务分配系统中的基本需求。

目的是能够快速的消耗掉所有的任务,同又要保证两点:

1、所有任务都被领取。

2、每个任务只能被一个线程领取。

3、每个线程同一时间只能领取一个任务。

实际上在数据库中, 就是一个高并发的,实时更新系统,设计时要尽量避免冲突,提高处理吞吐。

PostgreSQL的UDF,advisory lock是一个很好的功能点,可以实现高并发、高可靠的任务分配。

其中,秒杀例子:

给任务分配唯一JAVA线程

例子1

功能描述:

有1000个java线程/进程,需要为具体的某个任务选举出一个master,并把选举结果写入到table中,记录任务ID与master线程/线程ID。

或者说:1每个任务的选举都被投票一次;2每个任务都只有一个master。

如果某个线程已经是某个任务的master,那这个线程/进程不参与选举。

1、JAVA线程与任务ID对应关系表

create table java_pool (    tid int primary key ,    -- JAVA 线程ID    taskid int unique        -- 任务 ID  );

2、插入1000个线程ID

insert into java_pool select generate_series(1,1000);

3、输入任务ID,返回JAVA线程ID,表示这个任务分配给某个JAVA线程ID。

create or replace function set_master(v_taskid int) returns int as $$  declare    res int;   begin    -- set lock_timeout = '10 ms';    -- 使用adlock,消除唯一约束时的等待以及更新时的锁等待。    update java_pool set taskid=$1    where tid in    (select tid from java_pool where pg_try_advisory_xact_lock(tid) and pg_try_advisory_xact_lock($1) and taskid is null limit 1)    and pg_try_advisory_xact_lock($1)    returning tid into res;        return res;      exception when unique_violation then      return -1;  -- this task already set other tid    when others then      return -2;  -- lock timeout, other session is setting the same taskid.  end;  $$ language plpgsql strict;

释放TID

update java_pool set taskid=null where tid=? and pg_try_advisory_xact_lock(?) and taskid is not null;

4、压测

vi test.sql    \set taskid random(100,100000)  \set tid random(1,1000)  select set_master(:taskid);  update java_pool set taskid=null where tid=:tid and pg_try_advisory_xact_lock(:tid) and taskid is not null;

5、压测结果

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T  120      transaction type: ./test.sql  scaling factor: 1  query mode: prepared  number of clients: 32  number of threads: 32  duration: 120 s  number of transactions actually processed: 6876049  latency average = 0.558 ms  latency stddev = 0.508 ms  tps = 57298.823054 (including connections establishing)  tps = 57301.470302 (excluding connections establishing)

例子2

功能描述:

在一个table中,每一行记录了一个任务,需要把每个任务分配一个java执行线程/进程。

总的线程数/进程数多于任务数,并要求在table中记录当前任务分配到的线程/进程ID。

或者说,是多个java线程/进程需要争抢一个任务,需要某个方式实现: 1每个任务都被争抢到;2每个任务只被一个java线程/进程争抢到。

10万任务(已知)

100万线程(ID未知)

1、建表

create table task_pool (    taskid int primary key ,    -- 任务ID    tid int unique              -- JAVA 线程ID  );

2、插入10万任务ID

insert into task_pool select generate_series(1,100000);

3、输入JAVA线程ID,返回任务ID。表示这个任务分配给某个JAVA线程ID。

create or replace function set_tid(v_tid int) returns int as $$  declare    res int;  begin    -- set lock_timeout = '10 ms';    update task_pool set tid=$1    where taskid in    (select taskid from task_pool where pg_try_advisory_xact_lock(taskid) and pg_try_advisory_xact_lock($1) and tid is null limit 1)    and pg_try_advisory_xact_lock($1)    returning taskid into res;        return res;      exception when unique_violation then      return -1;  -- this task already set other tid    when others then      return -2;  -- lock timeout, other session is setting the same taskid.  end;  $$ language plpgsql strict;

4、压测

vi test.sql    \set tid random(1,1000000)  select set_tid(:tid);

5、压测结果,约6秒分配完10万任务。

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120    progress: 1.0 s, 42693.2 tps, lat 0.733 ms stddev 0.333  progress: 2.0 s, 41831.7 tps, lat 0.765 ms stddev 0.285  progress: 3.0 s, 38475.0 tps, lat 0.832 ms stddev 1.544  progress: 4.0 s, 39560.5 tps, lat 0.809 ms stddev 0.276  progress: 5.0 s, 36850.0 tps, lat 0.868 ms stddev 0.317  progress: 6.0 s, 32344.5 tps, lat 0.989 ms stddev 0.720  progress: 7.0 s, 16541.2 tps, lat 1.934 ms stddev 0.579  progress: 8.0 s, 17078.0 tps, lat 1.875 ms stddev 0.575

例子3

如果任务ID和JAVA 线程ID都不是预先生成的,那么同样可以使用类似的功能点提高并发和可靠性。

使用pg_try_advisory_xact_lock来提高并发,降低等待。

insert into tbl select $1,$2 where pg_try_advisory_xact_lock($1) and pg_try_advisory_xact_lock($2) returning *;  根据结果判定是否锁定任务和JAVA线程ID

转载地址:http://bgexa.baihongyu.com/

你可能感兴趣的文章
[Erlang 0035] Erlang SMP
查看>>
6.4. ruby
查看>>
【Swift 3.0】iOS 国际化切换语言
查看>>
零元学Expression Blend 4 - Chapter 26 教你如何使用RaidoButton以及布局容器的活用
查看>>
BZOJ 1411&&Vijos 1544 : [ZJOI2009]硬币游戏【递推,快速幂】
查看>>
ECJTUACM16 Winter vacation training #1 题解&源码
查看>>
jsp中如何整合CKEditor+CKFinder实现文件上传
查看>>
前后台交互经常使用的技术汇总(后台:Java技术,前台:Js或者Jquery)
查看>>
【java规则引擎】之规则引擎解释
查看>>
Radware:金融机构如何应对日益猖獗的网络攻击
查看>>
Java中注释的使用是有原则的
查看>>
SanDisk研发32nm工艺最小尺寸NAND闪存
查看>>
阿里云推视频云解决方案 窄带高清省流量
查看>>
数据中心集约化发展的“大小精灵”
查看>>
大数据难懂?从奥巴马竞选中轻松读懂大数据
查看>>
前端任务构建利器Gulp.js使用指南
查看>>
公司如何组建数据部门?三种数据部门架构优与劣
查看>>
走进大数据之拓扑数据分析方法
查看>>
我是如何击败Java自带排序算法的
查看>>
Gartner::未来五年有颠覆性的IT技术都在这里
查看>>