Commit 9b5d1d04 authored by Jon Tavernier's avatar Jon Tavernier

lower mem usage, increase parallelism

parent e2ba6ddf
......@@ -5,7 +5,6 @@ Start with ./submit_job.sh!"""
import textwrap
from pyspark import SparkConf
from pyspark.sql import SparkSession
from helpers import db_connections
......@@ -15,6 +14,7 @@ from helpers import db_connections
spark = SparkSession \
.builder \
.appName("ReadParallelPostgres") \
.config("spark.executor.memory", "512m") \
.getOrCreate()
# avoid putting a semicolon here since spark requires
......@@ -28,9 +28,6 @@ query = textwrap.dedent("""\
from public.jt_orders o
""")
# this is one way to define a data frame.
# i put in in this function to ingore it.
# see the other way below, which i liked because it seems
......@@ -60,8 +57,8 @@ df = spark.read.format("jdbc") \
.option("dbtable", f"({query}) as subq") \
.option("partitionColumn", 'chunk_id') \
.option("lowerBound", 1) \
.option("upperBound", 4) \
.option("numPartitions", 2) \
.option("upperBound", 5) \
.option("numPartitions", 4) \
.option("user", db_connections['postgres']['properties']['user']) \
.option("password", db_connections['postgres']['properties']['password']) \
.option("fetchsize", 1000) \
......@@ -77,9 +74,12 @@ df.printSchema()
summary = df.groupBy(['chunk_id']).count()
summary.show()
print("Writing JSON ")
# this will also execute two queries.
# avoid looking at this path in finder
# spark will get confused with mac's .DS_Store file
df.drop('chunk_id').write.json(
'/tmp/jt_orders.json',
'/tmp/jt_orders/',
mode='overwrite'
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment