Commit 612952d5 authored by Jaroslaw Zola's avatar Jaroslaw Zola
Browse files

Code cleaning and documentation update.

parent 07a487d9
The MIT License (MIT)
Copyright (c) 2016-2018 SCoRe Group http://www.score-group.org/
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
......@@ -5,15 +5,15 @@ Frank Schoeneman <fvschoen@buffalo.edu>,
Jaroslaw Zola <jaroslaw.zola@hush.com>
## About
SparkIsomap is a tool to efficiently execute Isomap for learning manifolds from high-dimensional data. Isomap remains an important data analytics technique, as the vast majority of big data, coming from, for example, high performance high fidelity numerical simulations, high resolution scientific instruments or Internet of Things streams and feeds, is a result of complex non-linear processes that can be characterized by complex manifolds. SparkIsomap can be used to process data sets with tens to hundreds of thousands of high dimensional points using relatively small Spark cluster. The method uses Apache Spark, implemented entirely in PySpark, and offloads compute intensive linear algebra routines to BLAS.
SparkIsomap is a tool to efficiently execute Isomap for learning manifolds from high-dimensional data. Isomap remains an important data analytics technique, as the vast majority of big data, coming from, e.g., high-performance high-fidelity numerical simulations, high resolution scientific instruments or Internet of Things streams, is a result of complex non-linear processes that can be characterized by complex manifolds. SparkIsomap can be used to process datasets with tens to hundreds of thousands of high-dimensional points, using relatively small Spark cluster. The method uses Apache Spark, and is implemented entirely in PySpark with compute intensive linear algebra routines offloaded to BLAS.
## User Guide
SparkIsomap is entirely self-contained in `SparkIsomap.py`. When executing SparkIsomap, provide the following command line parameters:
SparkIsomap is Python 2.7 application self-contained in `SparkIsomap.py`. When executing SparkIsomap, you must provide the following command line parameters:
* `-f` Input data (in .tsv format).
* `-o` Output file name.
* `-e` Spark event log directory.
* `-e` Spark event log directory (must be created).
* `-C` Spark checkpoint directory.
* `-b` Submatrix block size.
* `-p` Number of partitions.
......@@ -22,11 +22,11 @@ SparkIsomap is entirely self-contained in `SparkIsomap.py`. When executing Spark
* `-k` Neighborhood size.
* `-d` Reduced dimensionality.
* `-l` Maximum iterations for power iteration.
* `-t` Convergence threshold for power iteration (provide EXPONENT, such that 10^-EXPONENT is convergence threshold).
* `-t` Convergence threshold for power iteration (provide $EXPONENT$, such that $10^{-EXPONENT}$ is convergence threshold).
Example invocation:
Example invocation (please make sure that Spark is using Python 2.7):
`Python SparkIsomap -k 10 -n 50000 -D 3 -d 2 -l 100 -t 9 -b 1250 -p 1431 -f swiss50k.tsv -o swiss50k_d2.tsv -C chkpt_swiss50 -e elogs/
`spark-submit SparkIsomap.py -k 10 -n 50000 -D 3 -d 2 -l 100 -t 9 -b 1250 -p 1431 -f swiss50k.tsv -o swiss50k_d2.tsv -C chkpt_swiss50 -e elogs/
`
If you have immediate questions regarding the method or software, please do not hesitate to contact Jaric Zola <jaroslaw.zola@hush.com>.
......@@ -35,4 +35,4 @@ If you have immediate questions regarding the method or software, please do not
To cite SparkIsomap, refer to this repository and our paper:
* F. Schoeneman, J. Zola, _Scalable Manifold Learning for Big Data with Apache Spark_, 2018. <http://arxiv.org/abs/1808.10776>.
* F. Schoeneman, J. Zola, _Scalable Manifold Learning for Big Data with Apache Spark_, IEEE International Conference on Big Data (IEEE BigData), pp. 272-281, 2018. <http://arxiv.org/abs/1808.10776>.
......@@ -117,15 +117,12 @@ if __name__ == "__main__":
D = int(args.D)
if args.b:
b = int(args.b)
if args.B:
B = int(args.B)
if args.p:
p = int(args.p)
if args.t:
t = int(args.t)
if args.l:
l = int(args.l)
if args.C:
CHKPT_DIR = args.C
......
__author__ = "Frank Schoeneman"
__copyright__ = "Copyright (c) 2019 SCoRe Group http://www.score-group.org/"
__license__ = "MIT"
__version__ = "1.0.0"
__maintainer__ = "Frank Schoeneman"
__email__ = "fvschoen@buffalo.edu"
__status__ = "Development"
import numpy as np
import glob
import my_util
def run_APSP_diag_iteration(q, q_, p, rdd_partitioner, apsp_graph, BLOCKS_DIR, sc):
def doPhase1(((I_, J_), thisBlock), q_):
diagBlock = my_util.scipy_floyd(thisBlock)
(shape1, shape2) = map(str, diagBlock.shape)
blkfname = BLOCKS_DIR+'block_q'+str(q_)+'_'+str(q_)+'_'+str(q_)+'_'+shape1+'_'+shape2+'.csv'
return ((I_, J_), diagBlock)
def doPhase2(((I_, J_), thisBlock), q_):
blkfname = BLOCKS_DIR+'block_q'+str(q_)+'_'+str(q_)+'_'+str(q_)+'_*.csv'
blkfname = glob.glob(blkfname)
blkfname = blkfname[0]
bsize = tuple(map(int, blkfname.split('/')[-1].split('.')[0].split('_')[-2:]))
diagPass = np.memmap(blkfname, shape=bsize, dtype='float', mode='r')
if J_ == q_:
updateP1 = my_util.minmpmatmul(thisBlock, diagPass, thisBlock)
else:
updateP1 = my_util.minmpmatmul(diagPass, thisBlock, thisBlock)
return ((I_, J_), updateP1)
def doPhase3(((I_, J_), thisBlock), q_):
if I_ < q_:
blkfname = BLOCKS_DIR+'block_q'+str(q_)+'_'+str(I_)+'_'+str(q_)+'_*.csv'
blkfname = glob.glob(blkfname)[0]
bsize = tuple(map(int, blkfname.split('/')[-1].split('.')[0].split('_')[-2:]))
blk_L = np.memmap(blkfname, shape=bsize, dtype='float', mode='r')
else:
blkfname = BLOCKS_DIR+'block_q'+str(q_)+'_'+str(q_)+'_'+str(I_)+'_*.csv'
blkfname = glob.glob(blkfname)[0]
bsize = tuple(map(int, blkfname.split('/')[-1].split('.')[0].split('_')[-2:]))
blk_L = np.memmap(blkfname, shape=bsize, dtype='float', mode='r').transpose()
if q_ < J_:
blkfname = BLOCKS_DIR+'block_q'+str(q_)+'_'+str(q_)+'_'+str(J_)+'_*.csv'
blkfname = glob.glob(blkfname)[0]
bsize = tuple(map(int, blkfname.split('/')[-1].split('.')[0].split('_')[-2:]))
blk_R = np.memmap(blkfname, shape=bsize, dtype='float', mode='r')
else:
blkfname = BLOCKS_DIR+'block_q'+str(q_)+'_'+str(J_)+'_'+str(q_)+'_*.csv'
blkfname = glob.glob(blkfname)[0]
bsize = tuple(map(int, blkfname.split('/')[-1].split('.')[0].split('_')[-2:]))
blk_R = np.memmap(blkfname, shape=bsize, dtype='float', mode='r').transpose()
thisBlock = my_util.minmpmatmul(blk_L, blk_R, thisBlock)
return ((I_, J_), thisBlock)
diagBlock1 = apsp_graph.filter(lambda x : x[0][0] == q_ and x[0][1] == q_)\
.map(lambda x : doPhase1(x, q_), preservesPartitioning=False)
diagBlock = diagBlock1.collectAsMap()
diagBlock = diagBlock[(q_, q_)]
(shape1, shape2) = map(str, diagBlock.shape)
blkfname = BLOCKS_DIR+'block_q'+str(q_)+'_'+str(q_)+'_'+str(q_)+'_'+shape1+'_'+shape2+'.csv'
diagBlock.tofile(blkfname)
rowColBlocksRDD = apsp_graph.filter(lambda x : (x[0][0] == q_) ^ (x[0][1] == q_))\
.map(lambda x : doPhase2(x, q_), preservesPartitioning=False)
rowColBlocks = rowColBlocksRDD.collectAsMap()
for i, j in rowColBlocks:
shape1, shape2 = map(str, rowColBlocks[(i,j)].shape)
blkfname = BLOCKS_DIR+'block_q'+str(q_)+'_'+str(i)+'_'+str(j)+'_'+shape1+'_'+shape2+'.csv'
rowColBlocks[(i,j)].tofile(blkfname)
p3Blocks = apsp_graph.filter(lambda x : x[0][0] != q_ and x[0][1] != q_)\
.map(lambda x : doPhase3(x, q_), preservesPartitioning=False)
# UNION (phase2 p3Blocks) AS phase3 and update/compute
apsp_graph = sc.union([diagBlock1, rowColBlocksRDD, p3Blocks])\
.partitionBy(p, rdd_partitioner)
# RESET apspGraph = phase3 (result after update / computation)
return apsp_graph
def collect_bc_block_fw(apsp_graph, q, p, rdd_partitioner, BLOCKS_DIR, sc):
for q_ in range(0, q, 1):
apsp_graph = run_APSP_diag_iteration(q, q_, p, rdd_partitioner, apsp_graph, BLOCKS_DIR, sc)
return apsp_graph
Example 3D data with 50K points smapled from isometric Swiss-roll.
This diff is collapsed.
import numpy as np
from collections import OrderedDict
from operator import add as _ADD_
def sim_power_iter(dbl_ctr_mtx, n, b, d, t, l, output_file, sc, logger):
def blockify(D):
return [((i, 0), D[a:(a+b), :]) for i, a in enumerate(range(0, n, b))]
def matrix_multiply(Amat, rightMat):
piVec = rightMat.value
def multBlocks(((I_, J_), subMatrix)):
# map block (i, j) as (i, j) and (j, i)
# multiplied to appropriate block
# of rightMat (bc)
yield ((I_, 0), np.dot(subMatrix, piVec[(J_, 0)]))
if I_ != J_: yield ((J_, 0), np.dot(subMatrix.T, piVec[(I_, 0)]) ) # check performance
# combineByKey to reduce (+)
# blocks (i, 0) of new V vector
# return this resulting RDD
return Amat.flatMap(lambda x : multBlocks(x)).reduceByKey(_ADD_) # RDD of results
# initial guess
Q_, _ = np.linalg.qr(np.eye(n, d), mode='reduced')
Q_dict = OrderedDict(sorted(blockify(Q_), key=lambda t : t[0]) )
Q_last = np.vstack(Q_dict.values())
for i in range(l):
V_bc = sc.broadcast(Q_dict)
V_ = matrix_multiply(dbl_ctr_mtx, V_bc)
V_bc.unpersist()
V_ = OrderedDict(sorted(V_.collectAsMap().items(), key=lambda t : t[0]) )
V_ = np.vstack((V_.values()))
Q_, R_ = np.linalg.qr(V_, mode='reduced')
err_ = np.sum(np.square(Q_ - Q_last))
if i % 5 == 0:
logger.info("Simultaneous Power Iteration error at iteration " \
+ str(i) + " is = " +str(err_))
Q_dict = OrderedDict(sorted(blockify(Q_), key=lambda t : t[0]) )
Q_last = np.vstack(Q_dict.values())
if err_ < t: break # converged
# after convergence
# Columns of Q_ are eigenvectors, Diagonal of R_ contains eigenvalues
# dCoords = Q_ .* ( ones(n, 1) * sqrt(np.diag(R_)) )
# dCoords = np.multiply( Q_, np.outer(np.ones((n, 1)), np.diag(R_)) )
dCoords = Q_ * np.sqrt(np.diag(R_))
if output_file is not None: np.savetxt(output_file, dCoords, fmt='%6f', delimiter='\t')
return dCoords
__author__ = "Frank Schoeneman"
__copyright__ = "Copyright (c) 2019 SCoRe Group http://www.score-group.org/"
__license__ = "MIT"
__version__ = "1.0.0"
__maintainer__ = "Frank Schoeneman"
__email__ = "fvschoen@buffalo.edu"
__status__ = "Development"
import numpy as np
from scipy.spatial.distance import cdist as sp_cdist#, pdist as sp_pdist, squareform
import heapq
from operator import add as _ADD_
from itertools import izip
from pyspark import StorageLevel as stglev
# load data from tsv
def load_data(b, input_file, D, sc):
def tsv_block((pttnId, linelist), D):
stringmat = np.fromstring("\t".join([st for st in linelist]), dtype=np.double, sep='\t')
nx = stringmat.shape[0] / D
stringmat = stringmat.reshape((nx, D))
return (pttnId, stringmat)
# load data into blocks from textfile
data_blocks = sc.textFile(input_file)\
.zipWithIndex()\
.map(lambda x : (x[1] / b, x[0]))\
.combineByKey(\
(lambda x : [x]),\
(lambda x, y : x + [y]),\
(lambda x, y : x + y))\
.map(lambda x : tsv_block(x, D))#\
return data_blocks
# subblocks of pwd matrix
def compute_block_pwd(data_blocks, q, p, rdd_partitioner):
def send_blk_to_blk((blkId, dataSub)):
[(yield (tuple(sorted((blkId, i))), (dataSub, blkId))) for i in range(0, q, 1)]
def do_block_pwd(iter_):
for ((I_, J_), x) in iter_:
if len(x) == 1:
yield((I_, J_), sp_cdist(x[0][0], x[0][0], metric='euclidean'))
elif x[0][1] < x[1][1]:
yield((I_, J_), sp_cdist(x[0][0], x[1][0], metric='euclidean'))
else:
yield((I_, J_), sp_cdist(x[1][0], x[0][0], metric='euclidean'))
pwd_blocks = data_blocks.flatMap(lambda x : send_blk_to_blk(x))\
.combineByKey((lambda x : [x]),\
(lambda x, y : x + [y]),\
(lambda x, y : x + y),
numPartitions=p,
partitionFunc=rdd_partitioner)\
.mapPartitions(lambda x : do_block_pwd(x),\
preservesPartitioning=False)
return pwd_blocks
# local knn to block
def min_local_knn(pwd_blocks, k):
def comp_sub_knn(((i_, j_), localPwdMat), k):
selfDist = False
if i_ == j_:
selfDist = True
if k < localPwdMat.shape[1]:
argpk = np.argpartition(localPwdMat, k, axis=1)
[(yield ((i_, local_i), (j_, rowentry))) for (local_i, rowentry) in enumerate(izip(localPwdMat[np.arange(localPwdMat.shape[0])[:, None], argpk[:, :k]], argpk[:, :k]))]
else:
argpk = np.argpartition(localPwdMat, localPwdMat.shape[1]-1, axis=1)
[(yield ((i_, local_i), (j_, rowentry))) for (local_i, rowentry) in enumerate(izip(localPwdMat[np.arange(localPwdMat.shape[0])[:, None], argpk], argpk))]
# also yield for (j, i) [upper triangular]
if (k < localPwdMat.shape[0]) and not selfDist:
argpkT = np.argpartition(localPwdMat.T, k, axis=1)
[(yield ((j_, local_i), (i_, rowentry))) for (local_i, rowentry) in enumerate(izip(localPwdMat.T[np.arange(localPwdMat.shape[1])[:, None], argpkT[:, :k]], argpkT[:, :k]))]
elif not selfDist:
argpkT = np.argpartition(localPwdMat.T, localPwdMat.shape[0]-1, axis=1)
[(yield ((j_, local_i), (i_, rowentry))) for (local_i, rowentry) in enumerate(izip(localPwdMat.T[np.arange(localPwdMat.shape[1])[:, None], argpkT], argpkT))]
# (block_i, local_i), (block_j, (dists, [args -- local_j?]))
local_min_knn = pwd_blocks.flatMap(lambda x : comp_sub_knn(x, k),\
preservesPartitioning=False)
return local_min_knn
# compute global knn
def row_red_all_knn(local_min_k, k, rdd_partitioner, p):
def zip_sub_row((block_j, (dvals, didx))):
return zip( dvals, zip(block_j * np.ones(len(didx), dtype=int), didx))
# returns [(distval, (block_j, local_j))]
def merge_and_take_k_small_M(x, y, k):
if isinstance(y, tuple):
(block_j, (dvals, didx)) = y
y = zip( dvals, zip(block_j * np.ones(len(didx), dtype=int), didx))
mergedL = x + y
heapq.heapify(mergedL)
return heapq.nsmallest(k, mergedL)
def merge_and_take_k_small(x, y, k):
mergedL = x + y
heapq.heapify(mergedL)
return heapq.nsmallest(k, mergedL)
row_red_knn = local_min_k.combineByKey(\
(lambda x : zip_sub_row(x)),\
(lambda x, y : merge_and_take_k_small_M(x, y, k)),\
(lambda x, y : merge_and_take_k_small(x, y, k)),\
numPartitions=p,\
partitionFunc=rdd_partitioner)
return row_red_knn
# change key to send knn val back to block
def re_assign_key(row_red_knn):
def changeKey(((block_i, local_i), list_of_kNN_from_row)): # (dist, (block_j, didx))
for (distval, (block_j, local_j)) in list_of_kNN_from_row:
if block_i <= block_j:
block_key = (block_i, block_j)
mat_entry = (local_i, local_j, distval)
yield (block_key, mat_entry)
if block_i == block_j:
mat_entry2 = (local_j, local_i, distval)
yield (block_key, mat_entry2)
else:
block_key = (block_j, block_i)
mat_entry = (local_j, local_i, distval)
yield (block_key, mat_entry)
#changekey to ((block_i, block_j), (local_i, local_j, distval))
block_red_knn = row_red_knn.flatMap(lambda x : changeKey(x),\
preservesPartitioning=False)
return block_red_knn
# final knn step
def reduce_global_knn(pwd_blocks, block_red_knn, p, rdd_partitioner):
def mat_comb(x):
if isinstance(x, np.ndarray):
x[:, :] = np.inf
return x
else: return [x]
def update_mat_list(M_, L_):
r_i, c_j, d_v = zip(*L_)
M_[r_i, c_j] = d_v
return M_
def merge_mat_kNN(x, y):
if isinstance(x, np.ndarray):
return update_mat_list(x, [y])
if isinstance(y, np.ndarray):
return update_mat_list(y, x)
return [y] + x
def merge_mat_K_comb(x, y):
if isinstance(x, np.ndarray):
return update_mat_list(x, y)
if isinstance(y, np.ndarray):
return update_mat_list(y, x)
return y + x
global_knn = pwd_blocks.union(block_red_knn)\
.combineByKey(\
(lambda x : mat_comb(x)),\
(lambda x, y : merge_mat_kNN(x, y)),\
(lambda x, y : merge_mat_K_comb(x, y)),\
numPartitions=p,\
partitionFunc=rdd_partitioner)
return global_knn
# square feature matrix
def square_matrix_values(apsp_graph):
def do_square_block(iter_):
for ((I_, J_), blockMat) in iter_:
yield ((I_, J_), np.square(blockMat, out=blockMat))
apsp_graph = apsp_graph.mapPartitions(lambda x : do_square_block(x),\
preservesPartitioning=False)
return apsp_graph
def double_center(n, feat_matrix, p, sc):
def comp_norm_sums(((I_, J_), localMat)):
if I_ == J_:
yield (J_, np.sum(localMat, axis=0))
else:
yield (J_, np.sum(localMat, axis=0))
yield (I_, np.sum(localMat, axis=1))
def grand_sum(iter_):
partTotalSum = 0.
for (J_, subColSum) in iter_:
partTotalSum += np.sum(subColSum)
yield partTotalSum
col_sums = feat_matrix.flatMap(lambda x : comp_norm_sums(x),\
preservesPartitioning=False )\
.combineByKey((lambda x : x),\
(lambda x, y: x + y),\
(lambda x, y: x + y),\
numPartitions=p)\
.persist(stglev.MEMORY_AND_DISK) # used twice below
grand_mean = col_sums.mapPartitions(lambda x : grand_sum(x)).reduce(_ADD_) / (n*n)
cRM = col_sums\
.mapPartitions(lambda x : [(yield ((x_[0], x_[1] / (1. * n) ))) for x_ in x])\
.collectAsMap()
means_bc = sc.broadcast((grand_mean, cRM))
def dbl_center(iter_):
gMean, colRowMeans = means_bc.value
for ((I_, J_), locSubMat) in iter_:
locSubMat -= colRowMeans[J_]
locSubMat -= colRowMeans[I_].reshape((locSubMat.shape[0], 1))
locSubMat += gMean
yield ((I_, J_), -.5*locSubMat) # * -.5
dbl_cent_matrix = feat_matrix.mapPartitions(lambda x : dbl_center(x))\
.persist(stglev.MEMORY_AND_DISK)
col_sums.unpersist()
means_bc.unpersist()
return dbl_cent_matrix
__author__ = "Frank Schoeneman"
__copyright__ = "Copyright (c) 2019 SCoRe Group http://www.score-group.org/"
__license__ = "MIT"
__version__ = "1.0.0"
__maintainer__ = "Frank Schoeneman"
__email__ = "fvschoen@buffalo.edu"
__status__ = "Development"
import argparse
import time
import os
import logging
import my_util
import init_matrix
import collect_bc
import eigensolvers
from pyspark import SparkContext, SparkConf
from pyspark import StorageLevel as stglev
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-n", "--num_points", help="Number of points.", type=int, required=True)
parser.add_argument("-b", "--block_size", help="Submatrix block size.", type=int, required=True)
parser.add_argument("-p", "--partitions", help="Number of partitions.", type=int, required=True)
parser.add_argument("-F", "--partitioner", help="Partitioning Function. [md or ph]", type=str, required=True)
parser.add_argument("-f", "--input_file", help="Input data. (.tsv format)", type=str, required=True)
parser.add_argument("-D", "--highdim", help="Input data dimensionality.", type=int, required=True)
parser.add_argument("-k", "--ngbr_size", help="Neighborhood size.", type=int, required=True)
parser.add_argument("-d", "--lowdim", help="Reduced dimensionality.", type=int, required=True)
parser.add_argument("-l", "--powiters", help="Maximum iterations for power iteration.", type=int, required=True)
parser.add_argument("-t", "--conv", help="Convergence threshold for power iteration.", type=int, required=True)
parser.add_argument("-o", "--output_file", help="Output file name. ", type=str, required=False)
parser.add_argument("-e", "--log_dir", help="Spark event log dir.", type=str, required=False)
args = parser.parse_args()
n = args.num_points
b = args.block_size
p = args.partitions
D = args.highdim
k = args.ngbr_size
d = args.lowdim
l = args.powiters
t = args.conv
F = args.partitioner.lower()
input_file = args.input_file
q, N = my_util.block_vars(n, b)
t = 10**(-1.*t)
rdd_partitioner = my_util.verify_partitioner(F, q)
conf = SparkConf()
# optional log for history server
save_history = args.log_dir is not None
if save_history:
conf.set("spark.eventLog.enabled", "true")\
.set("spark.eventLog.dir", args.log_dir)\
.set("spark.history.fs.logDirectory", args.log_dir)
write_file = args.output_file is not None
output_file = None
if write_file: output_file = args.output_file
sc = SparkContext(conf=conf)
log4jLogger = sc._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger("IsomapSpark")
logger.setLevel(sc._jvm.org.apache.log4j.Level.ALL)
logger.info('n: {}, b: {}, q: {}, p: {}, partitioner: {}'.format(n, b, q, p, F))
logger.info('D: {}, d: {}, k: {}, l: {}, t: {}'.format(D, d, k, l, t))
ti = time.time()
t0 = time.time()
# load data, construct pwd matrix, compute knn, return adj mtx
data_blocks = init_matrix.load_data(b, input_file, D, sc)
pwd_blocks = init_matrix.compute_block_pwd(data_blocks, q, p, rdd_partitioner)
local_min_knn = init_matrix.min_local_knn(pwd_blocks, k)
row_red_knn = init_matrix.row_red_all_knn(local_min_knn, k, rdd_partitioner, p)
block_red_knn = init_matrix.re_assign_key(row_red_knn)
global_knn = init_matrix.reduce_global_knn(pwd_blocks, block_red_knn, p, rdd_partitioner)
global_knn.persist(stglev.MEMORY_AND_DISK)
global_knn.count()
t1 = time.time()
logger.info("Time for kNN search: " + str(t1-t0) + " s.")
t0 = time.time()
# run apsp-solver Blocked-CB
blocks_dir = 'testdir/'
os.system("mkdir "+blocks_dir)
apsp_graph = collect_bc.collect_bc_block_fw(global_knn, q, p, rdd_partitioner, blocks_dir, sc)