Skip to Content
AnalyticsSparkWriting & Submitting Jobs

Writing & Submitting Jobs

Writing a PySpark Job

A minimal PySpark job reads from PostgreSQL, applies a transformation, and writes the result back.

from pyspark.sql import SparkSession spark = ( SparkSession.builder .appName("nhic-example-job") .getOrCreate() ) df = ( spark.read.format("jdbc") .option("url", "jdbc:postgresql://db:5432/warehouse") .option("dbtable", "raw.hmis_malaria") .option("user", "spark") .option("password", "secret") .load() ) result = df.filter(df["confirmed_cases"] > 0).groupBy("facility_id").sum("confirmed_cases") ( result.write.format("jdbc") .option("url", "jdbc:postgresql://db:5432/warehouse") .option("dbtable", "analytics.malaria_by_facility") .option("user", "spark") .option("password", "secret") .mode("overwrite") .save() ) spark.stop()

Submitting via Livy REST API

Livy accepts job submissions as HTTP POST requests to /batches.

{ "file": "s3://bucket/jobs/my_job.py", "className": "main", "args": ["--date", "2025-01"], "conf": { "spark.executor.memory": "4g" } }

After submission, poll GET /batches/{id}/state until the state is success or dead.

Triggering from Prefect

Prefect workflows use spark.utils.py (located in apps/analytics/prefect/.prefect/workflows/) to submit Spark jobs as part of larger pipelines. This keeps Spark jobs visible in the Prefect UI — they inherit the same scheduling, monitoring, and retry semantics as all other HIC workflows.

Last updated on