Writing & Submitting Jobs
Writing a PySpark Job
A minimal PySpark job reads from PostgreSQL, applies a transformation, and writes the result back.
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("nhic-example-job")
.getOrCreate()
)
df = (
spark.read.format("jdbc")
.option("url", "jdbc:postgresql://db:5432/warehouse")
.option("dbtable", "raw.hmis_malaria")
.option("user", "spark")
.option("password", "secret")
.load()
)
result = df.filter(df["confirmed_cases"] > 0).groupBy("facility_id").sum("confirmed_cases")
(
result.write.format("jdbc")
.option("url", "jdbc:postgresql://db:5432/warehouse")
.option("dbtable", "analytics.malaria_by_facility")
.option("user", "spark")
.option("password", "secret")
.mode("overwrite")
.save()
)
spark.stop()Submitting via Livy REST API
Livy accepts job submissions as HTTP POST requests to /batches.
{
"file": "s3://bucket/jobs/my_job.py",
"className": "main",
"args": ["--date", "2025-01"],
"conf": {
"spark.executor.memory": "4g"
}
}After submission, poll GET /batches/{id}/state until the state is success or dead.
Triggering from Prefect
Prefect workflows use spark.utils.py (located in apps/analytics/prefect/.prefect/workflows/) to submit Spark jobs as part of larger pipelines. This keeps Spark jobs visible in the Prefect UI — they inherit the same scheduling, monitoring, and retry semantics as all other HIC workflows.
Last updated on