Skip to content

Commit 72885d1

Browse files
Update Salesforce Data Source Name (#19)
* update name * update doc * fix tests and doc * pyspark.datasource.salesforce * rename * update names
1 parent 7c4e881 commit 72885d1

File tree

5 files changed

+40
-35
lines changed

5 files changed

+40
-35
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ spark.readStream.format("fake").load().writeStream.format("console").start()
4747
| [KaggleDataSource](pyspark_datasources/kaggle.py) | `kaggle` | Read datasets from Kaggle | `kagglehub`, `pandas` |
4848
| [SimpleJsonDataSource](pyspark_datasources/simplejson.py) | `simplejson` | Write JSON data to Databricks DBFS | `databricks-sdk` |
4949
| [OpenSkyDataSource](pyspark_datasources/opensky.py) | `opensky` | Read from OpenSky Network. | None |
50-
| [SalesforceDataSource](pyspark_datasources/salesforce.py) | `salesforce` | Streaming sink for writing data to Salesforce | `simple-salesforce` |
50+
| [SalesforceDataSource](pyspark_datasources/salesforce.py) | `pyspark.datasource.salesforce` | Streaming datasource for writing data to Salesforce | `simple-salesforce` |
5151

5252
See more here: https://allisonwang-db.github.io/pyspark-data-sources/.
5353

docs/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,6 @@ spark.readStream.format("fake").load().writeStream.format("console").start()
3838
| [HuggingFaceDatasets](./datasources/huggingface.md) | `huggingface` | Read datasets from the HuggingFace Hub | `datasets` |
3939
| [StockDataSource](./datasources/stock.md) | `stock` | Read stock data from Alpha Vantage | None |
4040
| [SimpleJsonDataSource](./datasources/simplejson.md) | `simplejson` | Write JSON data to Databricks DBFS | `databricks-sdk` |
41-
| [SalesforceDataSource](./datasources/salesforce.md) | `salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` |
41+
| [SalesforceDataSource](./datasources/salesforce.md) | `pyspark.datasource.salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` |
4242
| [GoogleSheetsDataSource](./datasources/googlesheets.md) | `googlesheets` | Read table from public Google Sheets document | None |
4343
| [KaggleDataSource](./datasources/kaggle.md) | `kaggle` | Read datasets from Kaggle | `kagglehub`, `pandas` |

examples/salesforce_sink_example.py renamed to examples/salesforce_example.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
#!/usr/bin/env python3
22
# -*- coding: utf-8 -*-
33
"""
4-
Salesforce Sink Example
4+
Salesforce Datasource Example
55
6-
This example demonstrates how to use the SalesforceDataSource as a streaming sink
6+
This example demonstrates how to use the SalesforceDataSource as a streaming datasource
77
to write data from various sources to Salesforce objects.
88
99
Requirements:
@@ -64,11 +64,11 @@ def example_1_rate_source_to_accounts():
6464
)
6565

6666
try:
67-
# Register Salesforce sink
67+
# Register Salesforce Datasource
6868
from pyspark_datasources.salesforce import SalesforceDataSource
6969

7070
spark.dataSource.register(SalesforceDataSource)
71-
print("✅ Salesforce sink registered")
71+
print("✅ Salesforce datasource registered")
7272

7373
# Create streaming data from rate source
7474
streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 2).load()
@@ -84,7 +84,7 @@ def example_1_rate_source_to_accounts():
8484

8585
# Write to Salesforce
8686
query = (
87-
account_data.writeStream.format("salesforce")
87+
account_data.writeStream.format("pyspark.datasource.salesforce")
8888
.option("username", username)
8989
.option("password", password)
9090
.option("security_token", security_token)
@@ -135,7 +135,7 @@ def example_2_csv_to_contacts():
135135
)
136136

137137
try:
138-
# Register Salesforce sink
138+
# Register Salesforce datasource
139139
from pyspark_datasources.salesforce import SalesforceDataSource
140140

141141
spark.dataSource.register(SalesforceDataSource)
@@ -177,7 +177,7 @@ def example_2_csv_to_contacts():
177177

178178
# Write to Salesforce with custom schema
179179
query = (
180-
streaming_df.writeStream.format("salesforce")
180+
streaming_df.writeStream.format("pyspark.datasource.salesforce")
181181
.option("username", username)
182182
.option("password", password)
183183
.option("security_token", security_token)
@@ -279,9 +279,9 @@ def example_3_checkpoint_demonstration():
279279
col("industry").alias("Industry"),
280280
col("revenue").alias("AnnualRevenue"),
281281
)
282-
282+
283283
query1 = (
284-
account_df1.writeStream.format("salesforce")
284+
account_df1.writeStream.format("pyspark.datasource.salesforce")
285285
.option("username", username)
286286
.option("password", password)
287287
.option("security_token", security_token)
@@ -328,7 +328,7 @@ def example_3_checkpoint_demonstration():
328328
)
329329

330330
query2 = (
331-
account_df2.writeStream.format("salesforce")
331+
account_df2.writeStream.format("pyspark.datasource.salesforce")
332332
.option("username", username)
333333
.option("password", password)
334334
.option("security_token", security_token)
@@ -415,8 +415,8 @@ def example_4_custom_object():
415415

416416
# Example code (commented out since custom object may not exist)
417417
print("""
418-
query = custom_data.writeStream \\
419-
.format("salesforce") \\
418+
query = custom_data.writeStream \\
419+
.format("pyspark.datasource.salesforce") \\
420420
.option("username", username) \\
421421
.option("password", password) \\
422422
.option("security_token", security_token) \\
@@ -438,8 +438,8 @@ def example_4_custom_object():
438438

439439
def main():
440440
"""Run all examples"""
441-
print("🚀 Salesforce Sink Examples")
442-
print("This demonstrates various ways to use the Salesforce streaming sink")
441+
print("🚀 Salesforce Datasource Examples")
442+
print("This demonstrates various ways to use the Salesforce streaming datasource")
443443

444444
try:
445445
# Run examples
@@ -452,7 +452,7 @@ def main():
452452
print("✅ All examples completed!")
453453
print("=" * 60)
454454
print("\n💡 Key takeaways:")
455-
print(" - Salesforce sink supports various input sources (rate, CSV, etc.)")
455+
print(" - Salesforce datasource supports various input sources (rate, CSV, etc.)")
456456
print(" - Checkpoint functionality enables exactly-once processing")
457457
print(" - Custom schemas allow flexibility for different Salesforce objects")
458458
print(" - Batch processing optimizes Salesforce API usage")

pyspark_datasources/salesforce.py

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,28 @@ class SalesforceCommitMessage(WriterCommitMessage):
1818

1919
class SalesforceDataSource(DataSource):
2020
"""
21-
A Salesforce streaming sink for PySpark to write data to Salesforce objects.
21+
A Salesforce streaming datasource for PySpark to write data to Salesforce objects.
2222
23-
This data sink enables writing streaming data from Spark to Salesforce using the
23+
This datasource enables writing streaming data from Spark to Salesforce using the
2424
Salesforce REST API. It supports common Salesforce objects like Account, Contact,
2525
Opportunity, and custom objects.
2626
27-
Note: This is a write-only sink, not a full bidirectional data source.
27+
Note: This is a write-only datasource, not a full bidirectional data source.
2828
2929
Name: `salesforce`
3030
3131
Notes
3232
-----
3333
- Requires the `simple-salesforce` library for Salesforce API integration
34-
- **Write-only sink**: Only supports streaming write operations (no read operations)
34+
- **Write-only datasource**: Only supports streaming write operations (no read operations)
3535
- Uses Salesforce username/password/security token authentication
3636
- Supports batch writing with Salesforce Composite Tree API for efficient processing
3737
- Implements exactly-once semantics through Spark's checkpoint mechanism
38+
- If a streaming write job fails and is resumed from the checkpoint,
39+
it will not overwrite records already written in Salesforce;
40+
it resumes from the last committed offset.
41+
However, if records were written to Salesforce but not yet committed at the time of failure,
42+
duplicate records may occur after recovery.
3843
3944
Parameters
4045
----------
@@ -57,7 +62,7 @@ class SalesforceDataSource(DataSource):
5762
5863
Examples
5964
--------
60-
Register the Salesforce sink:
65+
Register the Salesforce Datasource:
6166
6267
>>> from pyspark_datasources import SalesforceDataSource
6368
>>> spark.dataSource.register(SalesforceDataSource)
@@ -78,9 +83,9 @@ class SalesforceDataSource(DataSource):
7883
... (col("value") * 100000).cast("double").alias("AnnualRevenue")
7984
... )
8085
>>>
81-
>>> # Write to Salesforce using the sink
86+
>>> # Write to Salesforce using the datasource
8287
>>> query = account_data.writeStream \\
83-
... .format("salesforce") \\
88+
... .format("pyspark.datasource.salesforce") \\
8489
... .option("username", "your-username@company.com") \\
8590
... .option("password", "your-password") \\
8691
... .option("security_token", "your-security-token") \\
@@ -98,7 +103,7 @@ class SalesforceDataSource(DataSource):
98103
... )
99104
>>>
100105
>>> query = contact_data.writeStream \\
101-
... .format("salesforce") \\
106+
... .format("pyspark.datasource.salesforce") \\
102107
... .option("username", "your-username@company.com") \\
103108
... .option("password", "your-password") \\
104109
... .option("security_token", "your-security-token") \\
@@ -114,7 +119,7 @@ class SalesforceDataSource(DataSource):
114119
... )
115120
>>>
116121
>>> query = custom_data.writeStream \\
117-
... .format("salesforce") \\
122+
... .format("pyspark.datasource.salesforce") \\
118123
... .option("username", "your-username@company.com") \\
119124
... .option("password", "your-password") \\
120125
... .option("security_token", "your-security-token") \\
@@ -128,7 +133,7 @@ class SalesforceDataSource(DataSource):
128133
>>> contact_schema = "FirstName STRING NOT NULL, LastName STRING NOT NULL, Email STRING, Phone STRING"
129134
>>>
130135
>>> query = contact_data.writeStream \\
131-
... .format("salesforce") \\
136+
... .format("pyspark.datasource.salesforce") \\
132137
... .option("username", "your-username@company.com") \\
133138
... .option("password", "your-password") \\
134139
... .option("security_token", "your-security-token") \\
@@ -148,7 +153,7 @@ class SalesforceDataSource(DataSource):
148153
... )
149154
>>>
150155
>>> query = opportunity_data.writeStream \\
151-
... .format("salesforce") \\
156+
... .format("pyspark.datasource.salesforce") \\
152157
... .option("username", "your-username@company.com") \\
153158
... .option("password", "your-password") \\
154159
... .option("security_token", "your-security-token") \\
@@ -159,7 +164,7 @@ class SalesforceDataSource(DataSource):
159164
160165
Key Features:
161166
162-
- **Write-only sink**: Designed specifically for writing data to Salesforce
167+
- **Write-only datasource**: Designed specifically for writing data to Salesforce
163168
- **Batch processing**: Uses Salesforce Composite Tree API for efficient bulk writes
164169
- **Exactly-once semantics**: Integrates with Spark's checkpoint mechanism
165170
- **Error handling**: Graceful fallback to individual record creation if batch fails
@@ -168,8 +173,8 @@ class SalesforceDataSource(DataSource):
168173

169174
@classmethod
170175
def name(cls) -> str:
171-
"""Return the short name for this Salesforce sink."""
172-
return "salesforce"
176+
"""Return the short name for this Salesforce datasource."""
177+
return "pyspark.datasource.salesforce"
173178

174179
def schema(self) -> str:
175180
"""
@@ -196,12 +201,12 @@ def schema(self) -> str:
196201
"""
197202

198203
def streamWriter(self, schema: StructType, overwrite: bool) -> "SalesforceStreamWriter":
199-
"""Create a stream writer for Salesforce sink integration."""
204+
"""Create a stream writer for Salesforce datasource integration."""
200205
return SalesforceStreamWriter(schema, self.options)
201206

202207

203208
class SalesforceStreamWriter(DataSourceStreamWriter):
204-
"""Stream writer implementation for Salesforce sink integration."""
209+
"""Stream writer implementation for Salesforce datasource integration."""
205210

206211
def __init__(self, schema: StructType, options: Dict[str, str]):
207212
self.schema = schema

tests/test_data_sources.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def test_salesforce_datasource_registration(spark):
7878
spark.dataSource.register(SalesforceDataSource)
7979

8080
# Test that the datasource is registered with correct name
81-
assert SalesforceDataSource.name() == "salesforce"
81+
assert SalesforceDataSource.name() == "pyspark.datasource.salesforce"
8282

8383
# Test that the data source is streaming-only (no batch writer)
8484
from pyspark.sql.functions import lit
@@ -91,7 +91,7 @@ def test_salesforce_datasource_registration(spark):
9191
lit(50000.0).alias("AnnualRevenue"),
9292
)
9393

94-
df.write.format("salesforce").mode("append").save()
94+
df.write.format("pyspark.datasource.salesforce").mode("append").save()
9595
assert False, "Should have raised error - Salesforce DataSource only supports streaming"
9696
except Exception as e:
9797
# This is expected - Salesforce DataSource only supports streaming writes

0 commit comments

Comments
 (0)