Skip to content

allisonwang-db/pyspark-data-sources

Repository files navigation

PySpark Data Sources

pypi code style: ruff

This repository showcases custom Spark data sources built using the new Python Data Source API introduced in Apache Spark 4.0. For an in-depth understanding of the API, please refer to the API source code. Note this repo is demo only and please be aware that it is not intended for production use. Contributions and feedback are welcome to help improve the examples.

Installation

pip install pyspark-data-sources

Usage

Make sure you have pyspark >= 4.0.0 installed.

pip install pyspark

Or use Databricks Runtime 15.4 LTS or above versions, or Databricks Serverless.

from pyspark_datasources.fake import FakeDataSource

# Register the data source
spark.dataSource.register(FakeDataSource)

spark.read.format("fake").load().show()

# For streaming data generation
spark.readStream.format("fake").load().writeStream.format("console").start()

Example Data Sources

Data Source Short Name Type Description Dependencies Example
Batch Read
ArrowDataSource arrow Batch Read Read Apache Arrow files (.arrow) pyarrow pip install pyspark-data-sources[arrow]
spark.read.format("arrow").load("/path/to/file.arrow")
FakeDataSource fake Batch/Streaming Read Generate fake data using the Faker library faker pip install pyspark-data-sources[fake]
spark.read.format("fake").load() or spark.readStream.format("fake").load()
GithubDataSource github Batch Read Read pull requests from a Github repository None pip install pyspark-data-sources
spark.read.format("github").load("apache/spark")
GoogleSheetsDataSource googlesheets Batch Read Read table from public Google Sheets None pip install pyspark-data-sources
spark.read.format("googlesheets").load("https://docs.google.com/spreadsheets/d/...")
HuggingFaceDatasets huggingface Batch Read Read datasets from HuggingFace Hub datasets pip install pyspark-data-sources[huggingface]
spark.read.format("huggingface").load("imdb")
KaggleDataSource kaggle Batch Read Read datasets from Kaggle kagglehub, pandas pip install pyspark-data-sources[kaggle]
spark.read.format("kaggle").load("titanic")
StockDataSource stock Batch Read Read stock data from Alpha Vantage None pip install pyspark-data-sources
spark.read.format("stock").option("symbols", "AAPL,GOOGL").option("api_key", "key").load()
Batch Write
LanceSink lance Batch Write Write data in Lance format lance pip install pyspark-data-sources[lance]
df.write.format("lance").mode("append").save("/tmp/lance_data")
Streaming Read
OpenSkyDataSource opensky Streaming Read Read from OpenSky Network. None pip install pyspark-data-sources
spark.readStream.format("opensky").option("region", "EUROPE").load()
WeatherDataSource weather Streaming Read Fetch weather data from tomorrow.io None pip install pyspark-data-sources
spark.readStream.format("weather").option("locations", "[(37.7749, -122.4194)]").option("apikey", "key").load()
Streaming Write
SalesforceDataSource pyspark.datasource.salesforce Streaming Write Streaming datasource for writing data to Salesforce simple-salesforce pip install pyspark-data-sources[salesforce]
df.writeStream.format("pyspark.datasource.salesforce").option("username", "user").start()

See more here: https://allisonwang-db.github.io/pyspark-data-sources/.

Official Data Sources

For production use, consider these official data source implementations built with the Python Data Source API:

Data Source Repository Description Features
HuggingFace Datasets @huggingface/pyspark_huggingface Production-ready Spark Data Source for 🤗 Hugging Face Datasets • Stream datasets as Spark DataFrames
• Select subsets/splits with filters
• Authentication support
• Save DataFrames to Hugging Face

Data Source Naming Convention

When creating custom data sources using the Python Data Source API, follow these naming conventions for the short_name parameter:

Recommended Approach

  • Use the system name directly: Use lowercase system names like huggingface, opensky, googlesheets, etc.
  • This provides clear, intuitive naming that matches the service being integrated

Conflict Resolution

  • If there's a naming conflict: Use the format pyspark.datasource.<system_name>
  • Example: pyspark.datasource.salesforce if "salesforce" conflicts with existing naming

Examples from this repository:

# Direct system naming (preferred)
spark.read.format("github").load()       # GithubDataSource
spark.read.format("googlesheets").load() # GoogleSheetsDataSource  
spark.read.format("opensky").load()      # OpenSkyDataSource

# Namespaced format (when conflicts exist)
spark.read.format("pyspark.datasource.opensky").load()

Contributing

We welcome and appreciate any contributions to enhance and expand the custom data sources.:

  • Add New Data Sources: Want to add a new data source using the Python Data Source API? Submit a pull request or open an issue.
  • Suggest Enhancements: If you have ideas to improve a data source or the API, we'd love to hear them!
  • Report Bugs: Found something that doesn't work as expected? Let us know by opening an issue.

Development

Environment Setup

poetry install
poetry env activate

Build Docs

mkdocs serve

Code Formatting

This project uses Ruff for code formatting and linting.

# Format code
poetry run ruff format .

# Run linter
poetry run ruff check .

# Run linter with auto-fix
poetry run ruff check . --fix