-
Notifications
You must be signed in to change notification settings - Fork 109
Description
When uploading large data, I run into a TimeOut-Exception.
I shortened the trace for brevity -- at the end of the issue is the long version.
---------------------------------------------------------------------------
SocketTimeoutError Traceback (most recent call last)
File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/transport/_aiohttp.py:306, in AioHttpTransport.send(self, request, stream, proxies, **config)
305 socket_timeout = aiohttp.ClientTimeout(sock_connect=timeout, sock_read=read_timeout)
--> 306 result = await self.session.request( # type: ignore
307 request.method,
308 request.url,
309 headers=request.headers,
310 data=self._get_request_data(request),
311 timeout=socket_timeout,
312 allow_redirects=False,
313 proxy=proxy,
314 **config,
315 )
316 if _is_rest(request):
...
File /usr/local/lib/python3.12/site-packages/aiohttp/streams.py:672, in DataQueue.read(self)
671 try:
--> 672 await self._waiter
673 except (asyncio.CancelledError, asyncio.TimeoutError):
SocketTimeoutError: Timeout on reading data from socket
The above exception was the direct cause of the following exception:
ServiceResponseError Traceback (most recent call last)
File /usr/local/lib/python3.12/site-packages/adlfs/spec.py:2153, in AzureBlobFile._async_upload_chunk(self, final, **kwargs)
2150 async with self.container_client.get_blob_client(
2151 blob=self.blob
2152 ) as bc:
-> 2153 await bc.stage_block(
2154 block_id=block_id,
2155 data=chunk,
2156 length=len(chunk),
2157 )
2158 self._block_list.append(block_id)
...
ServiceResponseError: Timeout on reading data from socket
The above exception was the direct cause of the following exception:
RuntimeError Traceback (most recent call last)
File /usr/local/lib/python3.12/site-packages/fsspec/spec.py:2227, in AbstractBufferedFile.__exit__(self, *args)
2226 def __exit__(self, *args):
-> 2227 self.close()
...
File /usr/local/lib/python3.12/site-packages/adlfs/spec.py:2200, in AzureBlobFile._async_upload_chunk(self, final, **kwargs)
2198 raise FileExistsError(self.path)
2199 else:
-> 2200 raise RuntimeError(f"Failed to upload block: {e}!") from e
2201 elif self.mode == "ab":
2202 async with self.container_client.get_blob_client(blob=self.blob) as bc:
RuntimeError: Failed to upload block: Timeout on reading data from socket!
The problem is that the chunk size is hard coded to 1GB, c.f. here:
Line 2122 in adb9c53
def _get_chunks(self, data, chunk_size=1024**3): # Keeping the chunk size as 1 GB |
And uploading a chunk of 1GB takes longer than the timeout.
So this can either be resolved addressing
- the timeout settings
- the block size or
- chunk size.
I go through what I tried.
timeout:
In my case setting neither timeout
, nor read_timeout
or connection_timeout
in AzureBlobFileSystem(timeout=..., read_timeout=..., connection_timeout=...)
https://github.com/fsspec/adlfs/blob/main/adlfs/spec.py#L242 had any effect,
In my case, as I call adlfs
via fsspec
I tried
timeout = 20*60
fs = fsspec.filesystem('abfs', **storage_options,
timeout=timeout,
read_timeout=timeout,
connection_timeout=timeout)
blocksize
When I understand it correctly, the data is divided in blocks and each block is uploaded in chunks. Setting block size via
blocksize=5*1048576 # 5MB
fs = fsspec.filesystem('abfs', **storage_options,
blocksize=blocksize)
or
with fs.open("...", block_size=blocksize) as f:
...
worked. The latter should be possible because of https://github.com/fsspec/adlfs/blob/main/adlfs/spec.py#L1818
chunksize
The only thing that worked was overwriting the hard coded chunk_size as in https://github.com/fsspec/adlfs/blob/main/adlfs/spec.py#L2122
This are the package versions I'm using
root@731be5858c26:/workspaces/# pip freeze | grep -e azure -e fsspec -e adlfs
adlfs==2024.12.0
azure-core==1.32.0
azure-datalake-store==0.0.53
azure-identity==1.19.0
azure-storage-blob==12.24.1
fsspec==2025.2.0
root@731be5858c26:/workspaces/# python --version
Python 3.12.8
Questions:
- Do I really need to change a hard coded value in order to get the upload work or do I miss something? Maybe there's another logic mistake on my side.
- If setting the chunk size is really hard coded, we should find out, why the timeout settings are ignored, or
- why the
block_size
andblocksize
make no difference.
The full stack trace:
---------------------------------------------------------------------------
SocketTimeoutError Traceback (most recent call last)
File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/transport/_aiohttp.py:306, in AioHttpTransport.send(self, request, stream, proxies, **config)
305 socket_timeout = aiohttp.ClientTimeout(sock_connect=timeout, sock_read=read_timeout)
--> 306 result = await self.session.request( # type: ignore
307 request.method,
308 request.url,
309 headers=request.headers,
310 data=self._get_request_data(request),
311 timeout=socket_timeout,
312 allow_redirects=False,
313 proxy=proxy,
314 **config,
315 )
316 if _is_rest(request):
File /usr/local/lib/python3.12/site-packages/aiohttp/client.py:730, in ClientSession._request(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, server_hostname, proxy_headers, trace_request_ctx, read_bufsize, auto_decompress, max_line_size, max_field_size)
729 try:
--> 730 await resp.start(conn)
731 except BaseException:
File /usr/local/lib/python3.12/site-packages/aiohttp/client_reqrep.py:1059, in ClientResponse.start(self, connection)
1058 protocol = self._protocol
-> 1059 message, payload = await protocol.read() # type: ignore[union-attr]
1060 except http.HttpProcessingError as exc:
File /usr/local/lib/python3.12/site-packages/aiohttp/streams.py:672, in DataQueue.read(self)
671 try:
--> 672 await self._waiter
673 except (asyncio.CancelledError, asyncio.TimeoutError):
SocketTimeoutError: Timeout on reading data from socket
The above exception was the direct cause of the following exception:
ServiceResponseError Traceback (most recent call last)
File /usr/local/lib/python3.12/site-packages/adlfs/spec.py:2153, in AzureBlobFile._async_upload_chunk(self, final, **kwargs)
2150 async with self.container_client.get_blob_client(
2151 blob=self.blob
2152 ) as bc:
-> 2153 await bc.stage_block(
2154 block_id=block_id,
2155 data=chunk,
2156 length=len(chunk),
2157 )
2158 self._block_list.append(block_id)
File /usr/local/lib/python3.12/site-packages/azure/core/tracing/decorator_async.py:114, in distributed_trace_async.<locals>.decorator.<locals>.wrapper_use_tracer(*args, **kwargs)
113 if span_impl_type is None:
--> 114 return await func(*args, **kwargs)
116 # Merge span is parameter is set, but only if no explicit parent are passed
File /usr/local/lib/python3.12/site-packages/azure/storage/blob/aio/_blob_client_async.py:1894, in BlobClient.stage_block(self, block_id, data, length, **kwargs)
1893 try:
-> 1894 return cast(Dict[str, Any], await self._client.block_blob.stage_block(**options))
1895 except HttpResponseError as error:
File /usr/local/lib/python3.12/site-packages/azure/core/tracing/decorator_async.py:114, in distributed_trace_async.<locals>.decorator.<locals>.wrapper_use_tracer(*args, **kwargs)
113 if span_impl_type is None:
--> 114 return await func(*args, **kwargs)
116 # Merge span is parameter is set, but only if no explicit parent are passed
File /usr/local/lib/python3.12/site-packages/azure/storage/blob/_generated/aio/operations/_block_blob_operations.py:640, in BlockBlobOperations.stage_block(self, block_id, content_length, body, transactional_content_md5, transactional_content_crc64, timeout, request_id_parameter, structured_body_type, structured_content_length, lease_access_conditions, cpk_info, cpk_scope_info, **kwargs)
639 _stream = False
--> 640 pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access
641 _request, stream=_stream, **kwargs
642 )
644 response = pipeline_response.http_response
File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:229, in AsyncPipeline.run(self, request, **kwargs)
228 first_node = self._impl_policies[0] if self._impl_policies else _AsyncTransportRunner(self._transport)
--> 229 return await first_node.send(pipeline_request)
File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:77, in _SansIOAsyncHTTPPolicyRunner.send(self, request)
76 try:
---> 77 response = await self.next.send(request)
78 except Exception:
File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:77, in _SansIOAsyncHTTPPolicyRunner.send(self, request)
76 try:
---> 77 response = await self.next.send(request)
78 except Exception:
[... skipping similar frames: _SansIOAsyncHTTPPolicyRunner.send at line 77 (2 times)]
File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:77, in _SansIOAsyncHTTPPolicyRunner.send(self, request)
76 try:
---> 77 response = await self.next.send(request)
78 except Exception:
File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/policies/_redirect_async.py:76, in AsyncRedirectPolicy.send(self, request)
75 while redirects_remaining:
---> 76 response = await self.next.send(request)
77 redirect_location = self.get_redirect_location(response)
File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:77, in _SansIOAsyncHTTPPolicyRunner.send(self, request)
76 try:
---> 77 response = await self.next.send(request)
78 except Exception:
File /usr/local/lib/python3.12/site-packages/azure/storage/blob/_shared/policies_async.py:155, in AsyncStorageRetryPolicy.send(self, request)
154 continue
--> 155 raise err
156 if retry_settings['history']:
File /usr/local/lib/python3.12/site-packages/azure/storage/blob/_shared/policies_async.py:127, in AsyncStorageRetryPolicy.send(self, request)
126 try:
--> 127 response = await self.next.send(request)
128 if is_retry(response, retry_settings['mode']) or await is_checksum_retry(response):
File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:77, in _SansIOAsyncHTTPPolicyRunner.send(self, request)
76 try:
---> 77 response = await self.next.send(request)
78 except Exception:
File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:77, in _SansIOAsyncHTTPPolicyRunner.send(self, request)
76 try:
---> 77 response = await self.next.send(request)
78 except Exception:
[... skipping similar frames: _SansIOAsyncHTTPPolicyRunner.send at line 77 (1 times)]
File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:77, in _SansIOAsyncHTTPPolicyRunner.send(self, request)
76 try:
---> 77 response = await self.next.send(request)
78 except Exception:
File /usr/local/lib/python3.12/site-packages/azure/storage/blob/_shared/policies_async.py:80, in AsyncStorageResponseHook.send(self, request)
77 response_callback = request.context.get('response_callback') or \
78 request.context.options.pop('raw_response_hook', self._response_callback)
---> 80 response = await self.next.send(request)
81 will_retry = is_retry(response, request.context.options.get('mode')) or await is_checksum_retry(response)
File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:77, in _SansIOAsyncHTTPPolicyRunner.send(self, request)
76 try:
---> 77 response = await self.next.send(request)
78 except Exception:
File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:77, in _SansIOAsyncHTTPPolicyRunner.send(self, request)
76 try:
---> 77 response = await self.next.send(request)
78 except Exception:
File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:111, in _AsyncTransportRunner.send(self, request)
108 cleanup_kwargs_for_transport(request.context.options)
109 return PipelineResponse(
110 request.http_request,
--> 111 await self._sender.send(request.http_request, **request.context.options),
112 request.context,
113 )
File /usr/local/lib/python3.12/site-packages/azure/storage/blob/_shared/base_client_async.py:268, in AsyncTransportWrapper.send(self, request, **kwargs)
267 async def send(self, request, **kwargs):
--> 268 return await self._transport.send(request, **kwargs)
File /usr/local/lib/python3.12/site-packages/azure/storage/blob/_shared/base_client_async.py:268, in AsyncTransportWrapper.send(self, request, **kwargs)
267 async def send(self, request, **kwargs):
--> 268 return await self._transport.send(request, **kwargs)
File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/transport/_aiohttp.py:349, in AioHttpTransport.send(self, request, stream, proxies, **config)
348 except asyncio.TimeoutError as err:
--> 349 raise ServiceResponseError(err, error=err) from err
350 except aiohttp.client_exceptions.ClientError as err:
ServiceResponseError: Timeout on reading data from socket
The above exception was the direct cause of the following exception:
RuntimeError Traceback (most recent call last)
File /usr/local/lib/python3.12/site-packages/fsspec/spec.py:2227, in AbstractBufferedFile.__exit__(self, *args)
2226 def __exit__(self, *args):
-> 2227 self.close()
File /usr/local/lib/python3.12/site-packages/adlfs/spec.py:2033, in AzureBlobFile.close(self)
2031 """Close file and azure client."""
2032 asyncio.run_coroutine_threadsafe(close_container_client(self), loop=self.loop)
-> 2033 super().close()
File /usr/local/lib/python3.12/site-packages/fsspec/spec.py:2178, in AbstractBufferedFile.close(self)
2176 else:
2177 if not self.forced:
-> 2178 self.flush(force=True)
2180 if self.fs is not None:
2181 self.fs.invalidate_cache(self.path)
File /usr/local/lib/python3.12/site-packages/fsspec/spec.py:2041, in AbstractBufferedFile.flush(self, force)
2038 self.closed = True
2039 raise
-> 2041 if self._upload_chunk(final=force) is not False:
2042 self.offset += self.buffer.seek(0, 2)
2043 self.buffer = io.BytesIO()
File /usr/local/lib/python3.12/site-packages/fsspec/asyn.py:118, in sync_wrapper.<locals>.wrapper(*args, **kwargs)
115 @functools.wraps(func)
116 def wrapper(*args, **kwargs):
117 self = obj or args[0]
--> 118 return sync(self.loop, func, *args, **kwargs)
File /usr/local/lib/python3.12/site-packages/fsspec/asyn.py:103, in sync(loop, func, timeout, *args, **kwargs)
101 raise FSTimeoutError from return_result
102 elif isinstance(return_result, BaseException):
--> 103 raise return_result
104 else:
105 return return_result
File /usr/local/lib/python3.12/site-packages/fsspec/asyn.py:56, in _runner(event, coro, result, timeout)
54 coro = asyncio.wait_for(coro, timeout=timeout)
55 try:
---> 56 result[0] = await coro
57 except Exception as ex:
58 result[0] = ex
File /usr/local/lib/python3.12/site-packages/adlfs/spec.py:2200, in AzureBlobFile._async_upload_chunk(self, final, **kwargs)
2198 raise FileExistsError(self.path)
2199 else:
-> 2200 raise RuntimeError(f"Failed to upload block: {e}!") from e
2201 elif self.mode == "ab":
2202 async with self.container_client.get_blob_client(blob=self.blob) as bc:
RuntimeError: Failed to upload block: Timeout on reading data from socket!