Skip to content

Commit 36559c8

Browse files
authored
Merge pull request #132 from treasure-data/copilot/fix-fe93b6aa-303e-4886-bf56-e2cd32aad076
Add timeout parameter to BulkImport.perform() method
2 parents c8e8639 + 21287f7 commit 36559c8

File tree

2 files changed

+29
-2
lines changed

2 files changed

+29
-2
lines changed

tdclient/bulk_import_model.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,15 @@ def unfreeze(self):
101101
self.update()
102102
return response
103103

104-
def perform(self, wait=False, wait_interval=5, wait_callback=None):
104+
def perform(self, wait=False, wait_interval=5, wait_callback=None, timeout=None):
105105
"""Perform bulk import
106106
107107
Args:
108108
wait (bool, optional): Flag for wait bulk import job. Default `False`
109109
wait_interval (int, optional): wait interval in second. Default `5`.
110110
wait_callback (callable, optional): A callable to be called on every tick of
111111
wait interval.
112+
timeout (int, optional): Timeout in seconds. No timeout by default.
112113
"""
113114
self.update()
114115
if not self.upload_frozen:
@@ -117,7 +118,7 @@ def perform(self, wait=False, wait_interval=5, wait_callback=None):
117118
)
118119
job = self._client.perform_bulk_import(self.name)
119120
if wait:
120-
job.wait(wait_interval=wait_interval, wait_callback=wait_callback)
121+
job.wait(timeout=timeout, wait_interval=wait_interval, wait_callback=wait_callback)
121122
self.update()
122123
return job
123124

tdclient/test/bulk_import_model_test.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,32 @@ def test_bulk_import_perfom():
120120
assert bulk_import.update.called
121121

122122

123+
def test_bulk_import_perform_with_timeout():
124+
client = mock.MagicMock()
125+
job_mock = mock.MagicMock()
126+
client.perform_bulk_import.return_value = job_mock
127+
128+
bulk_import = models.BulkImport(
129+
client,
130+
name="name",
131+
database="database",
132+
table="table",
133+
status="status",
134+
upload_frozen=True,
135+
job_id="job_id",
136+
valid_records="valid_records",
137+
error_records="error_records",
138+
valid_parts="valid_parts",
139+
error_parts="error_parts",
140+
)
141+
bulk_import.update = mock.MagicMock()
142+
bulk_import.perform(wait=True, timeout=300, wait_interval=10)
143+
144+
client.perform_bulk_import.assert_called_with("name")
145+
job_mock.wait.assert_called_with(timeout=300, wait_interval=10, wait_callback=None)
146+
assert bulk_import.update.called
147+
148+
123149
def test_bulk_import_commit():
124150
client = mock.MagicMock()
125151
bulk_import = models.BulkImport(

0 commit comments

Comments
 (0)