Skip to content

GitLab

  • Projects
  • Groups
  • Snippets
  • Help
    • Loading...
  • Help
    • Help
    • Support
    • Community forum
    • Submit feedback
    • Contribute to GitLab
    • Switch to GitLab Next
  • Sign in / Register
N
nfiesta_pg
  • Project overview
    • Project overview
    • Details
    • Activity
    • Releases
  • Repository
    • Repository
    • Files
    • Commits
    • Branches
    • Tags
    • Contributors
    • Graph
    • Compare
    • Locked Files
  • Issues 6
    • Issues 6
    • List
    • Boards
    • Labels
    • Service Desk
    • Milestones
    • Iterations
  • Merge Requests 3
    • Merge Requests 3
  • Requirements
    • Requirements
    • List
  • CI / CD
    • CI / CD
    • Pipelines
    • Jobs
    • Schedules
    • Test Cases
  • Operations
    • Operations
    • Incidents
    • Environments
  • Packages & Registries
    • Packages & Registries
    • Container Registry
  • Analytics
    • Analytics
    • CI / CD
    • Code Review
    • Insights
    • Issue
    • Repository
    • Value Stream
  • Wiki
    • Wiki
  • Members
    • Members
  • Activity
  • Graph
  • Create a new issue
  • Jobs
  • Commits
  • Issue Boards
Collapse sidebar
  • nFIESTA
  • nfiesta_pg
  • Wiki
  • Parallel

Last edited by Jiří Fejfar Jan 22, 2021
Page history

Parallel

Computation in parallel

  • Using multiple client sessions. For native PostgreSQL parallelization see docs.
  • Implemented using Python multiprocessing module.

Client application

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
import psycopg2
import psycopg2.extras
from multiprocessing import Pool

def single_query(_sql, verbose=True):
    try:
        #connstr = 'host=localhost port=5433 dbname=nfi_esta user=vagrant' # for network authentication (password can be stored in ~/.pgpass)
        connstr = 'dbname=nfi_esta user=vagrant' # for local linux based authentication (without password)
        conn = psycopg2.connect(connstr)
    except psycopg2.DatabaseError as e:
        print("nfiesta_parallel.single_query(), not possible to connect DB")
        print(e)
        return 1

    conn.set_session(isolation_level=None, readonly=None, deferrable=None, autocommit=True) # to not execute in transaction
    cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
    cmd = '%s' % (_sql)

    try:
        #print("%s\tnfiesta_parallel.py processing command: %s" % (datetime.datetime.now(), cmd))
        cur.execute(cmd)
        #for notice in conn.notices: print(notice)
        #del conn.notices[:]
    except psycopg2.Error as e:
        print("nfiesta_parallel.single_query(), SQL error:")
        print(e)
        return 2
    if cur.description != None:
        path_row_list = cur.fetchall()
        if verbose:
            print('%s\tnfiesta_parallel.py result not None: %s' % (datetime.datetime.now(), path_row_list))
        else:
            print('%s\tnfiesta_parallel.py result not None' % (datetime.datetime.now()))
    else:
        path_row_list = None
        if verbose:
            print('%s\tnfiesta_parallel.py result not None: %s' % (datetime.datetime.now(), path_row_list))
        else:
            print('%s\tnfiesta_parallel.py result None' % (datetime.datetime.now()))
        #print('%s\tnfiesta_parallel.py cur.description: %s' % (datetime.datetime.now(), cur.description))
        #path_row_list = None
    cur.close()
    conn.close()
    return path_row_list

def run_parallel(_sql, _processes, _verbose=True):
    _qrsl = single_query(_sql, verbose=_verbose)
    _qrs = [q[0] for q in _qrsl]
    '''print(_qrs)'''
    with Pool(_processes) as p:
        res_multi = p.map(single_query, _qrs, chunksize=1)
    print(res_multi)

Exampe use

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
sys.path.append('./')
from nfiesta_parallel import run_parallel

sql = '''select format($$
	WITH w_conf as (
			SELECT *
			FROM nfiesta_test.v_conf_overview
			WHERE
				estimate_conf = %s
	)
	, w_conf_info as (
			select row_number() over() as i, estimate_conf, total_estimate_conf, total_estimate_conf__denom, (select count(*) as cnt from w_conf) from w_conf
	)
	, w_res as (
		select 
			estimate_conf,
			clock_timestamp() as t_start, 
			(select (point2p, var2p, est_info)::nfiesta_test.estimate_result
			from nfiesta_test.fn_2p_total_var(total_estimate_conf)) as res, 
			clock_timestamp() as t_stop,
			version
		from
			w_conf_info
			, (select version
			from pg_available_extension_versions
			where name = 'nfiesta' and installed) as extension_version
		WHERE
			nfiesta_test.fn_raise_notice(concat('computing estimate, configuration: '::text, estimate_conf::text, ', ',i::text,' / ', cnt::text)) --PROGRESS MONITORING	
	)
	insert into nfiesta_test.t_result (estimate_conf, point, var, extension_version, calc_started, calc_duration, sampling_units)
	select estimate_conf, (res).point, (res).var, version, t_start as calc_started, t_stop - t_start as calc_duration, (res).est_info
	from w_res
	RETURNING estimate_conf;
$$, estimate_conf) 
	FROM (	SELECT *
		FROM nfiesta_test.v_conf_overview
		WHERE estimate_type_str = '2p_total'
	) AS estimates_to_compute
;'''

run_parallel(_sql=sql, _processes=3, _verbose=False)

or more simply use fn_make_estimate(...)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
sys.path.append('./')
from nfiesta_parallel import run_parallel

sql = '''select format($$
        select estimate_conf from nfiesta_test.fn_make_estimate(%s)
        ;
        $$, estimate_conf) 
        FROM (  SELECT estimate_conf
                FROM nfiesta_test.v_conf_overview
                WHERE estimate_type_str = '2p_total'
        ) AS estimates_to_compute
;'''

run_parallel(_sql=sql, _processes=12, _verbose=True)
Clone repository
  • Data Storage
  • ETL
  • Estimates Calculation
  • Estimates Configuration
  • Functional Overview Of nFIESTA
  • Installation
  • Minimal Working Example
  • Parallel
  • References
  • Home