oracle 并行运行 PL/SQL 调用并等待执行完成(fork 和 join)
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/26532122/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Running PL/SQL calls in parallel and wait for execution to finish (fork and join)
提问by schnatterer
In a legacy system there is some PL/SQL procedure that calls the another procedure mutliple times with different parameters. The procedure contains a lot of PL/SQL logic (if, then, else).
在遗留系统中,有一些 PL/SQL 过程使用不同的参数多次调用另一个过程。该过程包含大量 PL/SQL 逻辑(if、then、else)。
As the execution of this procedure takes very long, we thought about using concurrency to speed things up without even touching the actual logic.
由于这个过程的执行时间很长,我们考虑使用并发来加快速度,甚至不触及实际逻辑。
I understand that there are several ways of running (PL/)SQL in parallel on oracle (see bellow).
我知道有几种在 oracle 上并行运行 (PL/)SQL 的方法(见下文)。
However, I wasn't able to find a way to pass different arguments/parameters to a PL/SQL procedure, execute them in parallel and wait until all procedures are finished executing(i.e. I'm looking for mechanism to joinall threads or for a barriermechanism in oracle).
但是,我无法找到一种方法将不同的参数/参数传递给 PL/SQL 过程,并行执行它们并等待所有过程完成执行(即我正在寻找加入所有线程或oracle 中的屏障机制)。
Let's use the following simplified example on the SCOTT Schema:
让我们在 SCOTT Schema 上使用以下简化示例:
DECLARE
PROCEDURE DELETE_BONUS(
in_job IN VARCHAR2)
IS
BEGIN
-- Imagine a lot of IF, ELSEIF, ELSE statements here
DELETE FROM BONUS WHERE JOB=in_job;
END;
BEGIN
INSERT into BONUS(ENAME, JOB) SELECT ROWNUM, 'A' FROM DUAL CONNECT BY LEVEL <= 1000000;
INSERT into BONUS(ENAME, JOB) SELECT ROWNUM, 'B' FROM DUAL CONNECT BY LEVEL <= 1000000;
INSERT into BONUS(ENAME, JOB) SELECT ROWNUM, 'C' FROM DUAL CONNECT BY LEVEL <= 1000000;
-- TODO execute those in parallel
DELETE_BONUS('A');
DELETE_BONUS('B');
DELETE_BONUS('C');
-- TODO wait for all procedures to finish
EXCEPTION
WHEN OTHERS THEN
RAISE;
END;
/
Here's what I found so far:
这是我到目前为止发现的:
- DBMS_JOB(deprecated)
- DBMS_SCHEDULER(how to wait for jobs to finish? LOCKS?)
- DBMS_SCHEDULER CHAINS(passing parameters/arguments is not really possible?!)
- DBMS_PARALLEL_EXECUTE(can be used to run SQL queries in parallel but not PL/SQL procedures)
- DBMS_JOB(已弃用)
- DBMS_SCHEDULER(如何等待作业完成?锁定?)
- DBMS_SCHEDULER CHAINS(传递参数/参数是不可能的?!)
- DBMS_PARALLEL_EXECUTE(可用于并行运行 SQL 查询,但不能用于 PL/SQL 过程)
Can one of these approaches be used to fork and join the procedure calls? Or is there yet another approach that can?
可以使用这些方法之一来分叉和加入过程调用吗?或者还有另一种方法可以吗?
回答by tbone
Just wanted to add a few notes about DBMS_PARALLEL_EXECUTEpackage from Oracle.
只是想添加一些关于Oracle 的DBMS_PARALLEL_EXECUTE包的注释。
This can be used to do more than update a table, although many of the examples show this simple use case.
尽管许多示例都展示了这个简单的用例,但这不仅可以用于更新表,还可以用于执行更多操作。
The trick is to use an anonymous block instead of a DML statement, and the rest of the examples are still relevant. So, instead of this:
诀窍是使用匿名块而不是 DML 语句,其余示例仍然相关。所以,而不是这个:
l_sql_stmt := 'update EMPLOYEES e
SET e.salary = e.salary + 10
WHERE manager_id between :start_id and :end_id';
We might have this:
我们可能有这样的:
l_sql_stmt := 'BEGIN my_package.some_procedure(:start_id, :end_id); END;';
The rest of the example can be found in the "Chunk by User-Provided SQL" example section
示例的其余部分可以在“用户提供的 SQL 块”示例部分中找到
You will still need to tell Oracle the start/end ids for each process(using CREATE_CHUNKS_BY_SQL), I typically store them in a separate lookup table (if pre-defined) or you can provide a SQL query that returns a set of start/end values. For the latter approach, try using NTILE. For example, using 8 chunks:
您仍然需要告诉 Oracle 每个进程的开始/结束 ID(使用 CREATE_CHUNKS_BY_SQL),我通常将它们存储在单独的查找表中(如果预定义),或者您可以提供一个返回一组开始/结束的 SQL 查询值。对于后一种方法,请尝试使用 NTILE。例如,使用 8 个块:
select min(id) as start_id, max(id) as end_id
from (
select id, ntile(8) over (order by 1) bucket
from some_table
where some_clause...
)
group by bucket
order by bucket;
Hope that helps
希望有帮助
回答by schnatterer
I solved the problem using DBMS_SCHEDULER and PIPEs for synchronization/IPCthat does not rely on polling and does not need additional tables. It still wakes once per finished job, though.
我使用 DBMS_SCHEDULER 和 PIPEs 解决了不依赖轮询且不需要额外表的同步/ IPC的问题。尽管如此,它仍然会在每个完成的工作中唤醒一次。
It's quite some effort, so if some can propose a simpler solution please share it!
这是相当多的努力,所以如果有人可以提出更简单的解决方案,请分享它!
- Define a procedure that calls the actual procedure that can be run from a program/job and handles IPC (write message to pipe when finished).
- Define a program that calls this procedure and defines arguments to be passed to the procedure
- Define a procedure that creates a job from the program, maps parameters to job arguments and runs the job
- Define logic that waits for all jobs to finish: Wait until every job has sent a message on the pipe.
- 定义一个过程,该过程调用可从程序/作业运行并处理 IPC(完成后将消息写入管道)的实际过程。
- 定义一个程序来调用这个过程并定义要传递给该过程的参数
- 定义从程序创建作业、将参数映射到作业参数并运行作业的过程
- 定义等待所有作业完成的逻辑:等到每个作业都在管道上发送消息。
--
-- Define stored procedures to be executed by job
--
/** Actual method that should be run in parallel*/
CREATE OR REPLACE PROCEDURE PROC_DELETE_TEST_BONUS(
in_job IN VARCHAR2)
IS
BEGIN
-- Imagine a lot of IF, ELSEIF, ELSE statements here
DELETE FROM TEST_BONUS WHERE JOB=in_job;
END;
/
/** Stored procedure to be run from the job: Uses pipes for job synchronization, executes PROC_DELETE_TEST_BONUS. */
CREATE OR REPLACE PROCEDURE PROC_DELETE_TEST_BONUS_CONCUR(in_pipe_name IN VARCHAR2,
in_job IN VARCHAR2)
IS
flag INTEGER;
BEGIN
-- Execute actual procedure
PROC_DELETE_TEST_BONUS(in_job);
-- Signal completion
-- Use the procedure to put a message in the local buffer.
DBMS_PIPE.PACK_MESSAGE(SYSDATE ||': Success ' ||in_job);
-- Send message, success is a zero return value.
flag := DBMS_PIPE.SEND_MESSAGE(in_pipe_name);
EXCEPTION
WHEN OTHERS THEN
-- Signal completion
-- Use the procedure to put a message in the local buffer.
DBMS_PIPE.PACK_MESSAGE(SYSDATE ||':Failed ' || in_job);
-- Send message, success is a zero return value.
flag := DBMS_PIPE.SEND_MESSAGE(in_pipe_name);
RAISE;
END;
/
--
-- Run Jobs
--
DECLARE
timestart NUMBER;
duration_insert NUMBER;
jobs_amount NUMBER := 0;
retval INTEGER;
message VARCHAR2(4000);
rows_amount NUMBER;
/** Create and define a program that calls PROG_DELETE_TEST_BONUS_CONCUR to be run as job. */
PROCEDURE create_prog_delete_test_bonus
IS
BEGIN
-- define new in each run in order to ease development. TODO Once it works, no need to redefine for each run!
dbms_scheduler.drop_program(program_name => 'PROG_DELETE_TEST_BONUS_CONCUR', force=> TRUE);
dbms_scheduler.create_program ( program_name => 'PROG_DELETE_TEST_BONUS_CONCUR', program_action =>
'PROC_DELETE_TEST_BONUS_CONCUR', program_type => 'STORED_PROCEDURE', number_of_arguments => 2,
enabled => FALSE );
dbms_scheduler.DEFINE_PROGRAM_ARGUMENT( program_name => 'PROG_DELETE_TEST_BONUS_CONCUR',
argument_position => 1, argument_name => 'in_pipe_name', argument_type => 'VARCHAR2');
dbms_scheduler.DEFINE_PROGRAM_ARGUMENT( program_name=>'PROG_DELETE_TEST_BONUS_CONCUR',
argument_position => 2, argument_name => 'in_job', argument_type => 'VARCHAR2');
dbms_scheduler.enable('PROG_DELETE_TEST_BONUS_CONCUR');
END;
/** "Forks" a job that runs PROG_DELETE_TEST_BONUS_CONCUR */
PROCEDURE RUN_TEST_BONUS_JOB(
in_pipe_name IN VARCHAR2,
in_job IN VARCHAR2,
io_job_amount IN OUT NUMBER)
IS
jobname VARCHAR2(100);
BEGIN
jobname:=DBMS_SCHEDULER.GENERATE_JOB_NAME;
dbms_scheduler.create_job(job_name => jobname, program_name =>
'PROG_DELETE_TEST_BONUS_CONCUR');
dbms_scheduler.set_job_argument_value(job_name => jobname, argument_name =>
'in_pipe_name' , argument_value => in_pipe_name);
dbms_scheduler.set_job_argument_value(job_name => jobname, argument_name =>
'in_job' , argument_value => in_job);
dbms_output.put_line(SYSDATE || ': Running job: '|| jobname);
dbms_scheduler.RUN_JOB(jobname, false );
io_job_amount:= io_job_amount+1;
END;
-- Anonymous "Main" block
BEGIN
create_prog_delete_test_bonus;
-- Define private pipe
retval := DBMS_PIPE.CREATE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME, 100, FALSE);
dbms_output.put_line(SYSDATE || ': Created pipe: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned ' ||retval);
timestart := dbms_utility.get_time();
INSERT into TEST_BONUS(ENAME, JOB) SELECT ROWNUM, 'A' FROM DUAL CONNECT BY LEVEL <= 1000000;
INSERT into TEST_BONUS(ENAME, JOB) SELECT ROWNUM, 'B' FROM DUAL CONNECT BY LEVEL <= 1000000;
INSERT into TEST_BONUS(ENAME, JOB) SELECT ROWNUM, 'C' FROM DUAL CONNECT BY LEVEL <= 1000000;
COMMIT;
duration_insert := dbms_utility.get_time() - timestart;
dbms_output.put_line(SYSDATE || ': Duration (1/100s): INSERT=' || duration_insert);
SELECT COUNT(*) INTO rows_amount FROM TEST_BONUS;
dbms_output.put_line(SYSDATE || ': COUNT(*) FROM TEST_BONUS: ' || rows_amount);
timestart := dbms_utility.get_time();
-- -- Process sequentially
-- PROC_DELETE_TEST_BONUS('A');
-- PROC_DELETE_TEST_BONUS('B');
-- PROC_DELETE_TEST_BONUS('C');
-- start concurrent processing
RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'A', jobs_amount);
RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'B', jobs_amount);
RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'C', jobs_amount);
-- "Barrier": Wait for all jobs to finish
for i in 1 .. jobs_amount loop
-- Reset the local buffer.
DBMS_PIPE.RESET_BUFFER;
-- Wait and receive message. Timeout after an hour.
retval := SYS.DBMS_PIPE.RECEIVE_MESSAGE(SYS.DBMS_PIPE.UNIQUE_SESSION_NAME, 3600);
-- Handle errors: timeout, etc.
IF retval != 0 THEN
raise_application_error(-20000, 'Error: '||to_char(retval)||' receiving on pipe. See Job Log in table user_scheduler_job_run_details');
END IF;
-- Read message from local buffer.
DBMS_PIPE.UNPACK_MESSAGE(message);
dbms_output.put_line(SYSDATE || ': Received message on '''|| DBMS_PIPE.UNIQUE_SESSION_NAME ||''' (Status='|| retval ||'): ' || message);
end loop;
dbms_output.put(SYSDATE || ': Duration (1/100s): DELETE=');
dbms_output.put_line(dbms_utility.get_time() - timestart);
SELECT COUNT(*) INTO rows_amount FROM TEST_BONUS;
dbms_output.put_line(SYSDATE || ': COUNT(*) FROM TEST_BONUS: ' || rows_amount);
retval :=DBMS_PIPE.REMOVE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME);
dbms_output.put_line(systimestamp || ': REMOVE_PIPE: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned: ' ||retval);
EXCEPTION
WHEN OTHERS THEN
dbms_output.put_line(SYSDATE || SUBSTR(SQLERRM, 1, 1000) || ' ' ||
SUBSTR(DBMS_UTILITY.FORMAT_ERROR_BACKTRACE, 1, 1000));
retval := DBMS_PIPE.REMOVE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME);
dbms_output.put_line(SYSDATE || ': REMOVE_PIPE: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned: ' ||retval);
-- Clean up in case of error
PROC_DELETE_TEST_BONUS('A');
PROC_DELETE_TEST_BONUS('B');
PROC_DELETE_TEST_BONUS('C');
RAISE;
END;
/
You should always keep in mind that the changes executed within the job are committed in a separate transaction.
您应该始终记住,在作业中执行的更改是在单独的事务中提交的。
Just to get a feeling for what this concurrency achieves, here a some averaged measured values: The sequential code in the question takes about 60s to complete, the parallel one about 40s.
It would be an interesting further investigation how this turns out when there are more than the three jobs running in parallel.
只是为了感受一下这种并发实现的效果,这里有一些平均测量值:问题中的顺序代码大约需要 60 秒才能完成,并行代码大约需要 40 秒。
当并行运行的作业超过三个时,结果如何,将是一个有趣的进一步调查。
PS
A helpful query to find out about the status of the jobs is the following
PS
一个有用的查询来了解工作的状态如下
SELECT job_name,
destination,
TO_CHAR(actual_start_date) AS actual_start_date,
run_duration,
TO_CHAR((ACTUAL_START_DATE+run_duration)) AS actual_end_date,
status,
error#,
ADDITIONAL_INFO
FROM user_scheduler_job_run_details
ORDER BY actual_start_date desc;
回答by Dmitriy
If yes, you can try this:
如果是,你可以试试这个:
- create a new table with tasks (and parameter of tasks)
- create procedure that will read parameters from table, pass them to "legacy procedure" and then update task table after processing (using autonomous transactions) to show that processing is ended
- outer procedure, that creates tasks, can scan task table to get information about progress. You can use DBMS_LOCK.SLEEP to wait.
- 创建一个包含任务(和任务参数)的新表
- 创建将从表中读取参数的过程,将它们传递给“遗留过程”,然后在处理后更新任务表(使用自主事务)以显示处理结束
- 创建任务的外部过程可以扫描任务表以获取有关进度的信息。您可以使用 DBMS_LOCK.SLEEP 等待。