Commit 07a487d9 authored by Frank Schoeneman's avatar Frank Schoeneman

added Blocked-CB APSP method, improved partitioner, and refactored code.

parent f74b2ec4
__author__ = "Frank Schoeneman"
__copyright__ = "Copyright (c) 2019 SCoRe Group http://www.score-group.org/"
__license__ = "MIT"
__version__ = "1.0.0"
__maintainer__ = "Frank Schoeneman"
__email__ = "fvschoen@buffalo.edu"
__status__ = "Development"
import numpy as np
import glob
import my_util
def run_APSP_diag_iteration(q, q_, p, rdd_partitioner, apsp_graph, BLOCKS_DIR, sc):
def doPhase1(((I_, J_), thisBlock), q_):
diagBlock = my_util.scipy_floyd(thisBlock)
(shape1, shape2) = map(str, diagBlock.shape)
blkfname = BLOCKS_DIR+'block_q'+str(q_)+'_'+str(q_)+'_'+str(q_)+'_'+shape1+'_'+shape2+'.csv'
return ((I_, J_), diagBlock)
def doPhase2(((I_, J_), thisBlock), q_):
blkfname = BLOCKS_DIR+'block_q'+str(q_)+'_'+str(q_)+'_'+str(q_)+'_*.csv'
blkfname = glob.glob(blkfname)
blkfname = blkfname[0]
bsize = tuple(map(int, blkfname.split('/')[-1].split('.')[0].split('_')[-2:]))
diagPass = np.memmap(blkfname, shape=bsize, dtype='float', mode='r')
if J_ == q_:
updateP1 = my_util.minmpmatmul(thisBlock, diagPass, thisBlock)
else:
updateP1 = my_util.minmpmatmul(diagPass, thisBlock, thisBlock)
return ((I_, J_), updateP1)
def doPhase3(((I_, J_), thisBlock), q_):
if I_ < q_:
blkfname = BLOCKS_DIR+'block_q'+str(q_)+'_'+str(I_)+'_'+str(q_)+'_*.csv'
blkfname = glob.glob(blkfname)[0]
bsize = tuple(map(int, blkfname.split('/')[-1].split('.')[0].split('_')[-2:]))
blk_L = np.memmap(blkfname, shape=bsize, dtype='float', mode='r')
else:
blkfname = BLOCKS_DIR+'block_q'+str(q_)+'_'+str(q_)+'_'+str(I_)+'_*.csv'
blkfname = glob.glob(blkfname)[0]
bsize = tuple(map(int, blkfname.split('/')[-1].split('.')[0].split('_')[-2:]))
blk_L = np.memmap(blkfname, shape=bsize, dtype='float', mode='r').transpose()
if q_ < J_:
blkfname = BLOCKS_DIR+'block_q'+str(q_)+'_'+str(q_)+'_'+str(J_)+'_*.csv'
blkfname = glob.glob(blkfname)[0]
bsize = tuple(map(int, blkfname.split('/')[-1].split('.')[0].split('_')[-2:]))
blk_R = np.memmap(blkfname, shape=bsize, dtype='float', mode='r')
else:
blkfname = BLOCKS_DIR+'block_q'+str(q_)+'_'+str(J_)+'_'+str(q_)+'_*.csv'
blkfname = glob.glob(blkfname)[0]
bsize = tuple(map(int, blkfname.split('/')[-1].split('.')[0].split('_')[-2:]))
blk_R = np.memmap(blkfname, shape=bsize, dtype='float', mode='r').transpose()
thisBlock = my_util.minmpmatmul(blk_L, blk_R, thisBlock)
return ((I_, J_), thisBlock)
diagBlock1 = apsp_graph.filter(lambda x : x[0][0] == q_ and x[0][1] == q_)\
.map(lambda x : doPhase1(x, q_), preservesPartitioning=False)
diagBlock = diagBlock1.collectAsMap()
diagBlock = diagBlock[(q_, q_)]
(shape1, shape2) = map(str, diagBlock.shape)
blkfname = BLOCKS_DIR+'block_q'+str(q_)+'_'+str(q_)+'_'+str(q_)+'_'+shape1+'_'+shape2+'.csv'
diagBlock.tofile(blkfname)
rowColBlocksRDD = apsp_graph.filter(lambda x : (x[0][0] == q_) ^ (x[0][1] == q_))\
.map(lambda x : doPhase2(x, q_), preservesPartitioning=False)
rowColBlocks = rowColBlocksRDD.collectAsMap()
for i, j in rowColBlocks:
shape1, shape2 = map(str, rowColBlocks[(i,j)].shape)
blkfname = BLOCKS_DIR+'block_q'+str(q_)+'_'+str(i)+'_'+str(j)+'_'+shape1+'_'+shape2+'.csv'
rowColBlocks[(i,j)].tofile(blkfname)
p3Blocks = apsp_graph.filter(lambda x : x[0][0] != q_ and x[0][1] != q_)\
.map(lambda x : doPhase3(x, q_), preservesPartitioning=False)
# UNION (phase2 p3Blocks) AS phase3 and update/compute
apsp_graph = sc.union([diagBlock1, rowColBlocksRDD, p3Blocks])\
.partitionBy(p, rdd_partitioner)
# RESET apspGraph = phase3 (result after update / computation)
return apsp_graph
def collect_bc_block_fw(apsp_graph, q, p, rdd_partitioner, BLOCKS_DIR, sc):
for q_ in range(0, q, 1):
apsp_graph = run_APSP_diag_iteration(q, q_, p, rdd_partitioner, apsp_graph, BLOCKS_DIR, sc)
return apsp_graph
import numpy as np
from collections import OrderedDict
from operator import add as _ADD_
def sim_power_iter(dbl_ctr_mtx, n, b, d, t, l, output_file, sc, logger):
def blockify(D):
return [((i, 0), D[a:(a+b), :]) for i, a in enumerate(range(0, n, b))]
def matrix_multiply(Amat, rightMat):
piVec = rightMat.value
def multBlocks(((I_, J_), subMatrix)):
# map block (i, j) as (i, j) and (j, i)
# multiplied to appropriate block
# of rightMat (bc)
yield ((I_, 0), np.dot(subMatrix, piVec[(J_, 0)]))
if I_ != J_: yield ((J_, 0), np.dot(subMatrix.T, piVec[(I_, 0)]) ) # check performance
# combineByKey to reduce (+)
# blocks (i, 0) of new V vector
# return this resulting RDD
return Amat.flatMap(lambda x : multBlocks(x)).reduceByKey(_ADD_) # RDD of results
# initial guess
Q_, _ = np.linalg.qr(np.eye(n, d), mode='reduced')
Q_dict = OrderedDict(sorted(blockify(Q_), key=lambda t : t[0]) )
Q_last = np.vstack(Q_dict.values())
for i in range(l):
V_bc = sc.broadcast(Q_dict)
V_ = matrix_multiply(dbl_ctr_mtx, V_bc)
V_bc.unpersist()
V_ = OrderedDict(sorted(V_.collectAsMap().items(), key=lambda t : t[0]) )
V_ = np.vstack((V_.values()))
Q_, R_ = np.linalg.qr(V_, mode='reduced')
err_ = np.sum(np.square(Q_ - Q_last))
if i % 5 == 0:
logger.info("Simultaneous Power Iteration error at iteration " \
+ str(i) + " is = " +str(err_))
Q_dict = OrderedDict(sorted(blockify(Q_), key=lambda t : t[0]) )
Q_last = np.vstack(Q_dict.values())
if err_ < t: break # converged
# after convergence
# Columns of Q_ are eigenvectors, Diagonal of R_ contains eigenvalues
# dCoords = Q_ .* ( ones(n, 1) * sqrt(np.diag(R_)) )
# dCoords = np.multiply( Q_, np.outer(np.ones((n, 1)), np.diag(R_)) )
dCoords = Q_ * np.sqrt(np.diag(R_))
if output_file is not None: np.savetxt(output_file, dCoords, fmt='%6f', delimiter='\t')
return dCoords
__author__ = "Frank Schoeneman"
__copyright__ = "Copyright (c) 2019 SCoRe Group http://www.score-group.org/"
__license__ = "MIT"
__version__ = "1.0.0"
__maintainer__ = "Frank Schoeneman"
__email__ = "fvschoen@buffalo.edu"
__status__ = "Development"
import numpy as np
from scipy.spatial.distance import cdist as sp_cdist#, pdist as sp_pdist, squareform
import heapq
from operator import add as _ADD_
from itertools import izip
from pyspark import StorageLevel as stglev
# load data from tsv
def load_data(b, input_file, D, sc):
def tsv_block((pttnId, linelist), D):
stringmat = np.fromstring("\t".join([st for st in linelist]), dtype=np.double, sep='\t')
nx = stringmat.shape[0] / D
stringmat = stringmat.reshape((nx, D))
return (pttnId, stringmat)
# load data into blocks from textfile
data_blocks = sc.textFile(input_file)\
.zipWithIndex()\
.map(lambda x : (x[1] / b, x[0]))\
.combineByKey(\
(lambda x : [x]),\
(lambda x, y : x + [y]),\
(lambda x, y : x + y))\
.map(lambda x : tsv_block(x, D))#\
return data_blocks
# subblocks of pwd matrix
def compute_block_pwd(data_blocks, q, p, rdd_partitioner):
def send_blk_to_blk((blkId, dataSub)):
[(yield (tuple(sorted((blkId, i))), (dataSub, blkId))) for i in range(0, q, 1)]
def do_block_pwd(iter_):
for ((I_, J_), x) in iter_:
if len(x) == 1:
yield((I_, J_), sp_cdist(x[0][0], x[0][0], metric='euclidean'))
elif x[0][1] < x[1][1]:
yield((I_, J_), sp_cdist(x[0][0], x[1][0], metric='euclidean'))
else:
yield((I_, J_), sp_cdist(x[1][0], x[0][0], metric='euclidean'))
pwd_blocks = data_blocks.flatMap(lambda x : send_blk_to_blk(x))\
.combineByKey((lambda x : [x]),\
(lambda x, y : x + [y]),\
(lambda x, y : x + y),
numPartitions=p,
partitionFunc=rdd_partitioner)\
.mapPartitions(lambda x : do_block_pwd(x),\
preservesPartitioning=False)
return pwd_blocks
# local knn to block
def min_local_knn(pwd_blocks, k):
def comp_sub_knn(((i_, j_), localPwdMat), k):
selfDist = False
if i_ == j_:
selfDist = True
if k < localPwdMat.shape[1]:
argpk = np.argpartition(localPwdMat, k, axis=1)
[(yield ((i_, local_i), (j_, rowentry))) for (local_i, rowentry) in enumerate(izip(localPwdMat[np.arange(localPwdMat.shape[0])[:, None], argpk[:, :k]], argpk[:, :k]))]
else:
argpk = np.argpartition(localPwdMat, localPwdMat.shape[1]-1, axis=1)
[(yield ((i_, local_i), (j_, rowentry))) for (local_i, rowentry) in enumerate(izip(localPwdMat[np.arange(localPwdMat.shape[0])[:, None], argpk], argpk))]
# also yield for (j, i) [upper triangular]
if (k < localPwdMat.shape[0]) and not selfDist:
argpkT = np.argpartition(localPwdMat.T, k, axis=1)
[(yield ((j_, local_i), (i_, rowentry))) for (local_i, rowentry) in enumerate(izip(localPwdMat.T[np.arange(localPwdMat.shape[1])[:, None], argpkT[:, :k]], argpkT[:, :k]))]
elif not selfDist:
argpkT = np.argpartition(localPwdMat.T, localPwdMat.shape[0]-1, axis=1)
[(yield ((j_, local_i), (i_, rowentry))) for (local_i, rowentry) in enumerate(izip(localPwdMat.T[np.arange(localPwdMat.shape[1])[:, None], argpkT], argpkT))]
# (block_i, local_i), (block_j, (dists, [args -- local_j?]))
local_min_knn = pwd_blocks.flatMap(lambda x : comp_sub_knn(x, k),\
preservesPartitioning=False)
return local_min_knn
# compute global knn
def row_red_all_knn(local_min_k, k, rdd_partitioner, p):
def zip_sub_row((block_j, (dvals, didx))):
return zip( dvals, zip(block_j * np.ones(len(didx), dtype=int), didx))
# returns [(distval, (block_j, local_j))]
def merge_and_take_k_small_M(x, y, k):
if isinstance(y, tuple):
(block_j, (dvals, didx)) = y
y = zip( dvals, zip(block_j * np.ones(len(didx), dtype=int), didx))
mergedL = x + y
heapq.heapify(mergedL)
return heapq.nsmallest(k, mergedL)
def merge_and_take_k_small(x, y, k):
mergedL = x + y
heapq.heapify(mergedL)
return heapq.nsmallest(k, mergedL)
row_red_knn = local_min_k.combineByKey(\
(lambda x : zip_sub_row(x)),\
(lambda x, y : merge_and_take_k_small_M(x, y, k)),\
(lambda x, y : merge_and_take_k_small(x, y, k)),\
numPartitions=p,\
partitionFunc=rdd_partitioner)
return row_red_knn
# change key to send knn val back to block
def re_assign_key(row_red_knn):
def changeKey(((block_i, local_i), list_of_kNN_from_row)): # (dist, (block_j, didx))
for (distval, (block_j, local_j)) in list_of_kNN_from_row:
if block_i <= block_j:
block_key = (block_i, block_j)
mat_entry = (local_i, local_j, distval)
yield (block_key, mat_entry)
if block_i == block_j:
mat_entry2 = (local_j, local_i, distval)
yield (block_key, mat_entry2)
else:
block_key = (block_j, block_i)
mat_entry = (local_j, local_i, distval)
yield (block_key, mat_entry)
#changekey to ((block_i, block_j), (local_i, local_j, distval))
block_red_knn = row_red_knn.flatMap(lambda x : changeKey(x),\
preservesPartitioning=False)
return block_red_knn
# final knn step
def reduce_global_knn(pwd_blocks, block_red_knn, p, rdd_partitioner):
def mat_comb(x):
if isinstance(x, np.ndarray):
x[:, :] = np.inf
return x
else: return [x]
def update_mat_list(M_, L_):
r_i, c_j, d_v = zip(*L_)
M_[r_i, c_j] = d_v
return M_
def merge_mat_kNN(x, y):
if isinstance(x, np.ndarray):
return update_mat_list(x, [y])
if isinstance(y, np.ndarray):
return update_mat_list(y, x)
return [y] + x
def merge_mat_K_comb(x, y):
if isinstance(x, np.ndarray):
return update_mat_list(x, y)
if isinstance(y, np.ndarray):
return update_mat_list(y, x)
return y + x
global_knn = pwd_blocks.union(block_red_knn)\
.combineByKey(\
(lambda x : mat_comb(x)),\
(lambda x, y : merge_mat_kNN(x, y)),\
(lambda x, y : merge_mat_K_comb(x, y)),\
numPartitions=p,\
partitionFunc=rdd_partitioner)
return global_knn
# square feature matrix
def square_matrix_values(apsp_graph):
def do_square_block(iter_):
for ((I_, J_), blockMat) in iter_:
yield ((I_, J_), np.square(blockMat, out=blockMat))
apsp_graph = apsp_graph.mapPartitions(lambda x : do_square_block(x),\
preservesPartitioning=False)
return apsp_graph
def double_center(n, feat_matrix, p, sc):
def comp_norm_sums(((I_, J_), localMat)):
if I_ == J_:
yield (J_, np.sum(localMat, axis=0))
else:
yield (J_, np.sum(localMat, axis=0))
yield (I_, np.sum(localMat, axis=1))
def grand_sum(iter_):
partTotalSum = 0.
for (J_, subColSum) in iter_:
partTotalSum += np.sum(subColSum)
yield partTotalSum
col_sums = feat_matrix.flatMap(lambda x : comp_norm_sums(x),\
preservesPartitioning=False )\
.combineByKey((lambda x : x),\
(lambda x, y: x + y),\
(lambda x, y: x + y),\
numPartitions=p)\
.persist(stglev.MEMORY_AND_DISK) # used twice below
grand_mean = col_sums.mapPartitions(lambda x : grand_sum(x)).reduce(_ADD_) / (n*n)
cRM = col_sums\
.mapPartitions(lambda x : [(yield ((x_[0], x_[1] / (1. * n) ))) for x_ in x])\
.collectAsMap()
means_bc = sc.broadcast((grand_mean, cRM))
def dbl_center(iter_):
gMean, colRowMeans = means_bc.value
for ((I_, J_), locSubMat) in iter_:
locSubMat -= colRowMeans[J_]
locSubMat -= colRowMeans[I_].reshape((locSubMat.shape[0], 1))
locSubMat += gMean
yield ((I_, J_), -.5*locSubMat) # * -.5
dbl_cent_matrix = feat_matrix.mapPartitions(lambda x : dbl_center(x))\
.persist(stglev.MEMORY_AND_DISK)
col_sums.unpersist()
means_bc.unpersist()
return dbl_cent_matrix
__author__ = "Frank Schoeneman"
__copyright__ = "Copyright (c) 2019 SCoRe Group http://www.score-group.org/"
__license__ = "MIT"
__version__ = "1.0.0"
__maintainer__ = "Frank Schoeneman"
__email__ = "fvschoen@buffalo.edu"
__status__ = "Development"
import argparse
import time
import os
import logging
import my_util
import init_matrix
import collect_bc
import eigensolvers
from pyspark import SparkContext, SparkConf
from pyspark import StorageLevel as stglev
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-n", "--num_points", help="Number of points.", type=int, required=True)
parser.add_argument("-b", "--block_size", help="Submatrix block size.", type=int, required=True)
parser.add_argument("-p", "--partitions", help="Number of partitions.", type=int, required=True)
parser.add_argument("-F", "--partitioner", help="Partitioning Function. [md or ph]", type=str, required=True)
parser.add_argument("-f", "--input_file", help="Input data. (.tsv format)", type=str, required=True)
parser.add_argument("-D", "--highdim", help="Input data dimensionality.", type=int, required=True)
parser.add_argument("-k", "--ngbr_size", help="Neighborhood size.", type=int, required=True)
parser.add_argument("-d", "--lowdim", help="Reduced dimensionality.", type=int, required=True)
parser.add_argument("-l", "--powiters", help="Maximum iterations for power iteration.", type=int, required=True)
parser.add_argument("-t", "--conv", help="Convergence threshold for power iteration.", type=int, required=True)
parser.add_argument("-o", "--output_file", help="Output file name. ", type=str, required=False)
parser.add_argument("-e", "--log_dir", help="Spark event log dir.", type=str, required=False)
args = parser.parse_args()
n = args.num_points
b = args.block_size
p = args.partitions
D = args.highdim
k = args.ngbr_size
d = args.lowdim
l = args.powiters
t = args.conv
F = args.partitioner.lower()
input_file = args.input_file
q, N = my_util.block_vars(n, b)
t = 10**(-1.*t)
rdd_partitioner = my_util.verify_partitioner(F, q)
conf = SparkConf()
# optional log for history server
save_history = args.log_dir is not None
if save_history:
conf.set("spark.eventLog.enabled", "true")\
.set("spark.eventLog.dir", args.log_dir)\
.set("spark.history.fs.logDirectory", args.log_dir)
write_file = args.output_file is not None
output_file = None
if write_file: output_file = args.output_file
sc = SparkContext(conf=conf)
log4jLogger = sc._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger("IsomapSpark")
logger.setLevel(sc._jvm.org.apache.log4j.Level.ALL)
logger.info('n: {}, b: {}, q: {}, p: {}, partitioner: {}'.format(n, b, q, p, F))
logger.info('D: {}, d: {}, k: {}, l: {}, t: {}'.format(D, d, k, l, t))
ti = time.time()
t0 = time.time()
# load data, construct pwd matrix, compute knn, return adj mtx
data_blocks = init_matrix.load_data(b, input_file, D, sc)
pwd_blocks = init_matrix.compute_block_pwd(data_blocks, q, p, rdd_partitioner)
local_min_knn = init_matrix.min_local_knn(pwd_blocks, k)
row_red_knn = init_matrix.row_red_all_knn(local_min_knn, k, rdd_partitioner, p)
block_red_knn = init_matrix.re_assign_key(row_red_knn)
global_knn = init_matrix.reduce_global_knn(pwd_blocks, block_red_knn, p, rdd_partitioner)
global_knn.persist(stglev.MEMORY_AND_DISK)
global_knn.count()
t1 = time.time()
logger.info("Time for kNN search: " + str(t1-t0) + " s.")
t0 = time.time()
# run apsp-solver Blocked-CB
blocks_dir = 'testdir/'
os.system("mkdir "+blocks_dir)
apsp_graph = collect_bc.collect_bc_block_fw(global_knn, q, p, rdd_partitioner, blocks_dir, sc)
apsp_graph.persist(stglev.MEMORY_AND_DISK)
apsp_graph.count()
t1 = time.time()
os.system("rm -r "+blocks_dir)
logger.info("Time for Blocked-CB solution: " + str(t1-t0) + " s.")
t0 = time.time()
feat_matrix = init_matrix.square_matrix_values(apsp_graph)
dbl_ctr_mtx = init_matrix.double_center(n, feat_matrix, p, sc)
t1 = time.time()
logger.info("Time for matrix normalization: " + str(t1-t0) + " s.")
t0 = time.time()
lowd_coords = eigensolvers.sim_power_iter(dbl_ctr_mtx, n, b, d, t, l, output_file, sc, logger)
t1 = time.time()
logger.info("Time for eigensolver: " + str(t1-t0) + " s.")
tf = time.time()
sc.stop()
logger.info("Time for complete Isomap method: " + str(t1-t0) + "s.")
logger.info('done!')
__author__ = "Frank Schoeneman"
__copyright__ = "Copyright (c) 2019 SCoRe Group http://www.score-group.org/"
__license__ = "MIT"
__version__ = "1.0.0"
__maintainer__ = "Frank Schoeneman"
__email__ = "fvschoen@buffalo.edu"
__status__ = "Development"
from pyspark.rdd import portable_hash as pyspark_portable_hash
from scipy.sparse.csgraph import floyd_warshall as apsp_fw
import numpy as np
import numba as nb
# Compute block dim q
# and num blocks N
def block_vars(n, b):
q = int((n-1) / b) + 1
N = (q * (q+1)) / 2
return q, N
def verify_partitioner(F, q):
pttn_correct = F == 'md' or F == 'ph' or 'rs'
assert pttn_correct, 'Error: Unrecognized partitioner \'' + F + '\'. Use \'md\' or \'ph\'.'
if F == 'md':
def multi_diag((k1, k2)):
return int(k1 - (0.5) * (k1 - k2) * (k1 - k2 + 2 * q + 1))
return multi_diag
elif F == 'rs':
# TODO add rs
temp = 0
return pyspark_portable_hash # otherwise use default.
def scipy_floyd(ADJ_MAT):
return apsp_fw(ADJ_MAT, directed=False, unweighted=False, overwrite=True)
'''
Min-plus matrix multiplication of matrices
A and B. Compiled for better performance using numba.
'''
@nb.jit(nopython=True)
def mpmatmul(A, B):
assert A.shape[1] == B.shape[0], 'Matrix dimension mismatch in mpmatmul().'
C = np.zeros((A.shape[0], B.shape[1]))
for i in range(A.shape[0]):
for j in range(B.shape[1]):
somesum = np.inf
for k in range(A.shape[1]):
somesum = min(somesum, A[i, k] + B[k, j])
C[i, j] = somesum
return C
'''
Min-plus matrix multiplication of matrices
A*B = C. D_ returned is elementwise min of C and input D_.
Compiled for better performance using numba.
Input matrices A, B stored C, Fortran contiguous, respectively, for best performance.
'''
def minmpmatmul(A, B, D_):
return _minmpmatmul(np.ascontiguousarray(A), np.asfortranarray(B), D_)
@nb.jit(nopython=True)
def _minmpmatmul(A, B, D_):
assert A.shape[1] == B.shape[0], 'Matrix dimension mismatch in minmpmatmul().'
for i in range(A.shape[0]):
for j in range(B.shape[1]):
somesum = np.inf
for k in range(A.shape[1]):
somesum = min(somesum, A[i, k] + B[k, j])
D_[i, j] = min(D_[i, j], somesum)
return D_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment