Computation in parallel
- Using multiple client sessions. For native PostgreSQL parallelization see docs.
- Implemented using Python multiprocessing module.
Client application
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
import psycopg2
import psycopg2.extras
from multiprocessing import Pool
def single_query(_sql, verbose=True):
try:
#connstr = 'host=localhost port=5433 dbname=nfi_esta user=vagrant' # for network authentication (password can be stored in ~/.pgpass)
connstr = 'dbname=nfi_esta user=vagrant' # for local linux based authentication (without password)
conn = psycopg2.connect(connstr)
except psycopg2.DatabaseError as e:
print("nfiesta_parallel.single_query(), not possible to connect DB")
print(e)
return 1
conn.set_session(isolation_level=None, readonly=None, deferrable=None, autocommit=True) # to not execute in transaction
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
cmd = '%s' % (_sql)
try:
#print("%s\tnfiesta_parallel.py processing command: %s" % (datetime.datetime.now(), cmd))
cur.execute(cmd)
#for notice in conn.notices: print(notice)
#del conn.notices[:]
except psycopg2.Error as e:
print("nfiesta_parallel.single_query(), SQL error:")
print(e)
return 2
if cur.description != None:
path_row_list = cur.fetchall()
if verbose:
print('%s\tnfiesta_parallel.py result not None: %s' % (datetime.datetime.now(), path_row_list))
else:
print('%s\tnfiesta_parallel.py result not None' % (datetime.datetime.now()))
else:
path_row_list = None
if verbose:
print('%s\tnfiesta_parallel.py result not None: %s' % (datetime.datetime.now(), path_row_list))
else:
print('%s\tnfiesta_parallel.py result None' % (datetime.datetime.now()))
#print('%s\tnfiesta_parallel.py cur.description: %s' % (datetime.datetime.now(), cur.description))
#path_row_list = None
cur.close()
conn.close()
return path_row_list
def run_parallel(_sql, _processes, _verbose=True):
_qrsl = single_query(_sql, verbose=_verbose)
_qrs = [q[0] for q in _qrsl]
'''print(_qrs)'''
with Pool(_processes) as p:
res_multi = p.map(single_query, _qrs, chunksize=1)
print(res_multi)
Exampe use
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
sys.path.append('./')
from nfiesta_parallel import run_parallel
sql = '''select format($$
WITH w_conf as (
SELECT *
FROM nfiesta_test.v_conf_overview
WHERE
estimate_conf = %s
)
, w_conf_info as (
select row_number() over() as i, estimate_conf, total_estimate_conf, total_estimate_conf__denom, (select count(*) as cnt from w_conf) from w_conf
)
, w_res as (
select
estimate_conf,
clock_timestamp() as t_start,
(select (point2p, var2p, est_info)::nfiesta_test.estimate_result
from nfiesta_test.fn_2p_total_var(total_estimate_conf)) as res,
clock_timestamp() as t_stop,
version
from
w_conf_info
, (select version
from pg_available_extension_versions
where name = 'nfiesta' and installed) as extension_version
WHERE
nfiesta_test.fn_raise_notice(concat('computing estimate, configuration: '::text, estimate_conf::text, ', ',i::text,' / ', cnt::text)) --PROGRESS MONITORING
)
insert into nfiesta_test.t_result (estimate_conf, point, var, extension_version, calc_started, calc_duration, sampling_units)
select estimate_conf, (res).point, (res).var, version, t_start as calc_started, t_stop - t_start as calc_duration, (res).est_info
from w_res
RETURNING estimate_conf;
$$, estimate_conf)
FROM ( SELECT *
FROM nfiesta_test.v_conf_overview
WHERE estimate_type_str = '2p_total'
) AS estimates_to_compute
;'''
run_parallel(_sql=sql, _processes=3, _verbose=False)
or more simply use fn_make_estimate(...)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
sys.path.append('./')
from nfiesta_parallel import run_parallel
sql = '''select format($$
select estimate_conf from nfiesta_test.fn_make_estimate(%s)
;
$$, estimate_conf)
FROM ( SELECT estimate_conf
FROM nfiesta_test.v_conf_overview
WHERE estimate_type_str = '2p_total'
) AS estimates_to_compute
;'''
run_parallel(_sql=sql, _processes=12, _verbose=True)