select * from dropall('migrations_run','meta'); CREATE OR REPLACE FUNCTION meta.migration_run( p_id_migration_model integer default null ,p_priority_start integer default 0 ,p_priority_end integer default 10000 ,INOUT p_info jsonb default null ,OUT p_retval integer ,OUT p_errmsg text ) LANGUAGE plpgsql VOLATILE SECURITY DEFINER AS $$ DECLARE --Error Handling-- m_funcname text = 'meta.migration_run'; m_errmsg text; m_errcontext text; m_errdetail text; m_errhint text; m_errstate text; m_retval integer; --Error Handling-- m_dblink_conn TEXT; m_dblink_login TEXT; m_gotemplate text; m_pid integer; r_lp record; m_errcnt integer; G_BREAK_ERR_CNT constant integer = 500; BEGIN m_dblink_conn = 'dblink_migration'; p_retval = 0; p_errmsg = ''; m_pid = pg_backend_pid(); p_info = jsonb_build_array(); m_gotemplate = $CQl$do $OUTERDO$ declare m_lock_timeout text = '1500ms'; m_max_attempts int = 2; m_ddl_completed boolean = false; begin set local lock_timeout = '1500ms'; perform set_config('lock_timeout', m_lock_timeout, false); for i in 1..m_max_attempts loop begin execute $EXECMIGRATION$[proc]$EXECMIGRATION$; m_ddl_completed = true; exit; exception when lock_not_available then raise warning 'attempt %/% to lock table "test" failed', i, m_max_attempts; perform pg_sleep(0.1); end; -- if i > 1 -- then -- SET statement_timeout = 20000; -- end if; end loop; end $OUTERDO$; $CQl$; select 'host=127.0.0.1 dbname='||r.dbname||' port=' || r.port || ' user='|| r.loginuser ||' password=' || r.password from f_get_local_conn('migration') r into m_dblink_login; if not exists (SELECT T.text from unnest(dblink_get_connections()) T where T.text = m_dblink_conn) then PERFORM dblink_connect(m_dblink_conn, m_dblink_login); end if; perform log_event(m_funcname,format('migrations_run init [%s to %s] with connection %s',p_priority_start,p_priority_end,m_dblink_conn),bt_enum('eventlog','upgrade')); PERFORM dblink_exec(m_dblink_conn, format($S$set application_name to 'MIGRATION:%s'; $S$, m_pid), false); for r_lp in ( select * ,''::text as result ,clock_timestamp() as startat from meta.migration_script s where coalesce(s.status,0) = 0 and s.priority >= p_priority_start and s.priority <= p_priority_end and octet_length(s.body) > 1 order by s.priority, s.sequence, s.id_migration_script ) loop if m_errcnt > G_BREAK_ERR_CNT then perform log_event(m_funcname,format('migrations_run Max Error Count [%s to %s] Errors %s/s%',p_priority_start,p_priority_end,m_errcnt,G_BREAK_ERR_CNT),bt_enum('eventlog','upgrade')); exit; end if; perform dblink_exec(m_dblink_conn, format($Q$ DO $OUTERDO$ BEGIN if not meta.f_can_migrate() then raise exception 'Cannot migrate. Check the f_can_migrate function.'; end if; update meta.migration_script u set status = 1 ,changedat = now() where u.id_migration_script = %1$s; END; $OUTERDO$; $Q$, r_lp.id_migration_script), true) ; begin --raise notice 'Lp %: %',r_lp.objectname, r_lp.result; --raise notice 'Executing % % [%,%]', r_lp.objectname , r_lp.objecttype, r_lp.priority, r_lp.sequence; select dblink_exec(m_dblink_conn, replace(replace(replace(m_gotemplate,'[proc]', r_lp.body),'[objectname]',r_lp.objectname),'[priority]',r_lp.priority::text), true) into r_lp.result; r_lp.duration = clock_timestamp() - r_lp.startat; perform log_event(m_funcname,format('Ran Script %s @%s ',r_lp.objectname,r_lp.priority),bt_enum('eventlog','upgrade')); perform dblink_exec(m_dblink_conn, format($Q$ update meta.migration_script u set status = 2 ,changedat = now() ,duration = %2$s::interval where u.id_migration_script = %1$s; $Q$, r_lp.id_migration_script, quote_literal( r_lp.duration)), true) ; p_info = jsonb_concat(p_info, jsonb_build_object('objectname',r_lp.objectname,'objecttype', r_lp.objecttype,'status','success')); EXCEPTION WHEN others THEN GET STACKED DIAGNOSTICS m_errmsg = MESSAGE_TEXT; raise notice 'Exception % -> %',r_lp.objectname, m_errmsg; r_lp.duration = clock_timestamp() - r_lp.startat; perform log_event(m_funcname,format('migrations_run error %s -> %s >> %s',r_lp.objecttype,r_lp.objectname,m_errmsg),bt_enum('eventlog','local error')); perform dblink_exec(m_dblink_conn, format($Q$ update meta.migration_script u set status = 3 ,error = %2$s || E' \nResult: ' || %3$s ,changedat = now() ,duration = %4$s::interval where u.id_migration_script = %1$s; $Q$, r_lp.id_migration_script, quote_literal(m_errmsg), quote_literal(coalesce(r_lp.result,'')), quote_literal( r_lp.duration) ), false); p_info = jsonb_concat(p_info, jsonb_build_object('objectname',r_lp.objectname,'objecttype', r_lp.objecttype,'status','error', 'error',m_errmsg)); m_errcnt = coalesce(m_errcnt,0) + 1; end; perform dblink_exec(m_dblink_conn, format($Q$DO $DDD$ declare m_text text; begin select pg_notify('upgrade.events', json_build_object('type','upgrade','status',m.status,'objecttype', m.objecttype,'objectname',m.objectname)::text) from meta.migration_script m where m.id_migration_script = %1$s into m_text; end; $DDD$;$Q$, r_lp.id_migration_script, quote_literal(m_errmsg), quote_literal(coalesce(r_lp.result,'')) ), false); end loop; perform dblink_disconnect(m_dblink_conn); EXCEPTION WHEN others THEN GET STACKED DIAGNOSTICS m_errmsg = MESSAGE_TEXT ,m_errcontext = PG_EXCEPTION_CONTEXT ,m_errdetail = PG_EXCEPTION_DETAIL ,m_errhint = PG_EXCEPTION_HINT ,m_errstate = RETURNED_SQLSTATE; p_retval = 1; p_errmsg = format('%s Context %s State: %s', m_errmsg, m_errcontext, m_errstate); if exists (SELECT T.text from unnest(dblink_get_connections()) T where T.text = m_dblink_conn) then perform dblink_disconnect(m_dblink_conn); end if; perform pg_notify('upgrade.events', json_build_object('type','upgrade','status',3,'error',m_errmsg,'objecttype',m_funcname)::text); END; $$;