Commit e2ba6ddf authored by Jon Tavernier's avatar Jon Tavernier

play with spark parallel read

parent 713e2f22
#!/usr/bin/env python3
"""Playing around with parallel reads and writing as JSON.
Start with ./submit_job.sh!"""
import textwrap
from pyspark import SparkConf
from pyspark.sql import SparkSession
from helpers import db_connections
# Class documentation: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession
# A SparkSession is required.
spark = SparkSession \
.builder \
.appName("ReadParallelPostgres") \
.getOrCreate()
# avoid putting a semicolon here since spark requires
# this query to go in a sub query (see below).
# let's add an arbitrary chunk_id on which will extract
# in parallel.
query = textwrap.dedent("""\
select
order_id, order_number,
mod(order_id, 4) + 1 as chunk_id
from public.jt_orders o
""")
# this is one way to define a data frame.
# i put in in this function to ingore it.
# see the other way below, which i liked because it seems
# i have more control over options.
def junk():
# https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.jdbc
df = spark.read.jdbc(
url=db_connections['postgres']['url'],
table=f"({query}) as subq",
column='chunk_id',
lowerBound=1,
upperBound=4,
numPartitions=2,
properties=db_connections['postgres']['properties']
)
# since numPartitions is 2, spark will grab the data in two queries.
# it will NOT run four queries two at a time to grab all data.
# first query: WHERE chunk_id < 3 or chunk_id is null
# second query: WHERE chunk_id >= 3
# spark will execute queries anytime it needs to run.
# https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
df = spark.read.format("jdbc") \
.option("url", db_connections['postgres']['url']) \
.option("driver", db_connections['postgres']['properties']['driver']) \
.option("dbtable", f"({query}) as subq") \
.option("partitionColumn", 'chunk_id') \
.option("lowerBound", 1) \
.option("upperBound", 4) \
.option("numPartitions", 2) \
.option("user", db_connections['postgres']['properties']['user']) \
.option("password", db_connections['postgres']['properties']['password']) \
.option("fetchsize", 1000) \
.load()
# i only saw one query execute here. maybe the other occurred so
# fast i missed or spark was smart enough to just grab 20 rows
# and show a header.
df.show()
df.printSchema()
# this will execute two queries.
summary = df.groupBy(['chunk_id']).count()
summary.show()
# this will also execute two queries.
df.drop('chunk_id').write.json(
'/tmp/jt_orders.json',
mode='overwrite'
)
spark.stop()
drop table if exists jt_orders;
create table jt_orders(
order_id int,
order_number varchar(20),
constraint pk_orders primary key (order_id)
);
insert into jt_orders(order_id, order_number)
select i as order_id, 'R' || lpad(cast(i as varchar), 10, '0') as order_number
from (
select i
from generate_series(1, 10000000) i
) t;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment