Skip to content

Data-Upload: Timeout problems and chunk size #494

@juergend7lytix

Description

@juergend7lytix

When uploading large data, I run into a TimeOut-Exception.

I shortened the trace for brevity -- at the end of the issue is the long version.

---------------------------------------------------------------------------
SocketTimeoutError                        Traceback (most recent call last)
File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/transport/_aiohttp.py:306, in AioHttpTransport.send(self, request, stream, proxies, **config)
    305 socket_timeout = aiohttp.ClientTimeout(sock_connect=timeout, sock_read=read_timeout)
--> 306 result = await self.session.request(  # type: ignore
    307     request.method,
    308     request.url,
    309     headers=request.headers,
    310     data=self._get_request_data(request),
    311     timeout=socket_timeout,
    312     allow_redirects=False,
    313     proxy=proxy,
    314     **config,
    315 )
    316 if _is_rest(request):
...

File /usr/local/lib/python3.12/site-packages/aiohttp/streams.py:672, in DataQueue.read(self)
    671 try:
--> 672     await self._waiter
    673 except (asyncio.CancelledError, asyncio.TimeoutError):

SocketTimeoutError: Timeout on reading data from socket

The above exception was the direct cause of the following exception:

ServiceResponseError                      Traceback (most recent call last)
File /usr/local/lib/python3.12/site-packages/adlfs/spec.py:2153, in AzureBlobFile._async_upload_chunk(self, final, **kwargs)
   2150 async with self.container_client.get_blob_client(
   2151     blob=self.blob
   2152 ) as bc:
-> 2153     await bc.stage_block(
   2154         block_id=block_id,
   2155         data=chunk,
   2156         length=len(chunk),
   2157     )
   2158     self._block_list.append(block_id)
...

ServiceResponseError: Timeout on reading data from socket

The above exception was the direct cause of the following exception:

RuntimeError                              Traceback (most recent call last)

File /usr/local/lib/python3.12/site-packages/fsspec/spec.py:2227, in AbstractBufferedFile.__exit__(self, *args)
   2226 def __exit__(self, *args):
-> 2227     self.close()
...

File /usr/local/lib/python3.12/site-packages/adlfs/spec.py:2200, in AzureBlobFile._async_upload_chunk(self, final, **kwargs)
   2198                     raise FileExistsError(self.path)
   2199         else:
-> 2200             raise RuntimeError(f"Failed to upload block: {e}!") from e
   2201 elif self.mode == "ab":
   2202     async with self.container_client.get_blob_client(blob=self.blob) as bc:

RuntimeError: Failed to upload block: Timeout on reading data from socket!

The problem is that the chunk size is hard coded to 1GB, c.f. here:

def _get_chunks(self, data, chunk_size=1024**3): # Keeping the chunk size as 1 GB

And uploading a chunk of 1GB takes longer than the timeout.

So this can either be resolved addressing

  • the timeout settings
  • the block size or
  • chunk size.

I go through what I tried.

timeout:
In my case setting neither timeout, nor read_timeout or connection_timeout in AzureBlobFileSystem(timeout=..., read_timeout=..., connection_timeout=...) https://github.com/fsspec/adlfs/blob/main/adlfs/spec.py#L242 had any effect,

In my case, as I call adlfs via fsspec I tried

timeout = 20*60
fs = fsspec.filesystem('abfs', **storage_options,
        timeout=timeout,
        read_timeout=timeout,
        connection_timeout=timeout)

blocksize

When I understand it correctly, the data is divided in blocks and each block is uploaded in chunks. Setting block size via

blocksize=5*1048576  # 5MB
fs = fsspec.filesystem('abfs', **storage_options,
        blocksize=blocksize)

or

with fs.open("...", block_size=blocksize) as f:
    ...

worked. The latter should be possible because of https://github.com/fsspec/adlfs/blob/main/adlfs/spec.py#L1818

chunksize

The only thing that worked was overwriting the hard coded chunk_size as in https://github.com/fsspec/adlfs/blob/main/adlfs/spec.py#L2122

This are the package versions I'm using

root@731be5858c26:/workspaces/# pip freeze | grep -e azure -e fsspec -e adlfs 
adlfs==2024.12.0
azure-core==1.32.0
azure-datalake-store==0.0.53
azure-identity==1.19.0
azure-storage-blob==12.24.1
fsspec==2025.2.0

root@731be5858c26:/workspaces/# python --version
Python 3.12.8

Questions:

  • Do I really need to change a hard coded value in order to get the upload work or do I miss something? Maybe there's another logic mistake on my side.
  • If setting the chunk size is really hard coded, we should find out, why the timeout settings are ignored, or
  • why the block_size and blocksize make no difference.

The full stack trace:

---------------------------------------------------------------------------
SocketTimeoutError                        Traceback (most recent call last)
File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/transport/_aiohttp.py:306, in AioHttpTransport.send(self, request, stream, proxies, **config)
    305 socket_timeout = aiohttp.ClientTimeout(sock_connect=timeout, sock_read=read_timeout)
--> 306 result = await self.session.request(  # type: ignore
    307     request.method,
    308     request.url,
    309     headers=request.headers,
    310     data=self._get_request_data(request),
    311     timeout=socket_timeout,
    312     allow_redirects=False,
    313     proxy=proxy,
    314     **config,
    315 )
    316 if _is_rest(request):

File /usr/local/lib/python3.12/site-packages/aiohttp/client.py:730, in ClientSession._request(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, server_hostname, proxy_headers, trace_request_ctx, read_bufsize, auto_decompress, max_line_size, max_field_size)
    729 try:
--> 730     await resp.start(conn)
    731 except BaseException:

File /usr/local/lib/python3.12/site-packages/aiohttp/client_reqrep.py:1059, in ClientResponse.start(self, connection)
   1058     protocol = self._protocol
-> 1059     message, payload = await protocol.read()  # type: ignore[union-attr]
   1060 except http.HttpProcessingError as exc:

File /usr/local/lib/python3.12/site-packages/aiohttp/streams.py:672, in DataQueue.read(self)
    671 try:
--> 672     await self._waiter
    673 except (asyncio.CancelledError, asyncio.TimeoutError):

SocketTimeoutError: Timeout on reading data from socket

The above exception was the direct cause of the following exception:

ServiceResponseError                      Traceback (most recent call last)
File /usr/local/lib/python3.12/site-packages/adlfs/spec.py:2153, in AzureBlobFile._async_upload_chunk(self, final, **kwargs)
   2150 async with self.container_client.get_blob_client(
   2151     blob=self.blob
   2152 ) as bc:
-> 2153     await bc.stage_block(
   2154         block_id=block_id,
   2155         data=chunk,
   2156         length=len(chunk),
   2157     )
   2158     self._block_list.append(block_id)

File /usr/local/lib/python3.12/site-packages/azure/core/tracing/decorator_async.py:114, in distributed_trace_async.<locals>.decorator.<locals>.wrapper_use_tracer(*args, **kwargs)
    113 if span_impl_type is None:
--> 114     return await func(*args, **kwargs)
    116 # Merge span is parameter is set, but only if no explicit parent are passed

File /usr/local/lib/python3.12/site-packages/azure/storage/blob/aio/_blob_client_async.py:1894, in BlobClient.stage_block(self, block_id, data, length, **kwargs)
   1893 try:
-> 1894     return cast(Dict[str, Any], await self._client.block_blob.stage_block(**options))
   1895 except HttpResponseError as error:

File /usr/local/lib/python3.12/site-packages/azure/core/tracing/decorator_async.py:114, in distributed_trace_async.<locals>.decorator.<locals>.wrapper_use_tracer(*args, **kwargs)
    113 if span_impl_type is None:
--> 114     return await func(*args, **kwargs)
    116 # Merge span is parameter is set, but only if no explicit parent are passed

File /usr/local/lib/python3.12/site-packages/azure/storage/blob/_generated/aio/operations/_block_blob_operations.py:640, in BlockBlobOperations.stage_block(self, block_id, content_length, body, transactional_content_md5, transactional_content_crc64, timeout, request_id_parameter, structured_body_type, structured_content_length, lease_access_conditions, cpk_info, cpk_scope_info, **kwargs)
    639 _stream = False
--> 640 pipeline_response: PipelineResponse = await self._client._pipeline.run(  # pylint: disable=protected-access
    641     _request, stream=_stream, **kwargs
    642 )
    644 response = pipeline_response.http_response

File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:229, in AsyncPipeline.run(self, request, **kwargs)
    228 first_node = self._impl_policies[0] if self._impl_policies else _AsyncTransportRunner(self._transport)
--> 229 return await first_node.send(pipeline_request)

File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:77, in _SansIOAsyncHTTPPolicyRunner.send(self, request)
     76 try:
---> 77     response = await self.next.send(request)
     78 except Exception:

File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:77, in _SansIOAsyncHTTPPolicyRunner.send(self, request)
     76 try:
---> 77     response = await self.next.send(request)
     78 except Exception:

    [... skipping similar frames: _SansIOAsyncHTTPPolicyRunner.send at line 77 (2 times)]

File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:77, in _SansIOAsyncHTTPPolicyRunner.send(self, request)
     76 try:
---> 77     response = await self.next.send(request)
     78 except Exception:

File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/policies/_redirect_async.py:76, in AsyncRedirectPolicy.send(self, request)
     75 while redirects_remaining:
---> 76     response = await self.next.send(request)
     77     redirect_location = self.get_redirect_location(response)

File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:77, in _SansIOAsyncHTTPPolicyRunner.send(self, request)
     76 try:
---> 77     response = await self.next.send(request)
     78 except Exception:

File /usr/local/lib/python3.12/site-packages/azure/storage/blob/_shared/policies_async.py:155, in AsyncStorageRetryPolicy.send(self, request)
    154             continue
--> 155         raise err
    156 if retry_settings['history']:

File /usr/local/lib/python3.12/site-packages/azure/storage/blob/_shared/policies_async.py:127, in AsyncStorageRetryPolicy.send(self, request)
    126 try:
--> 127     response = await self.next.send(request)
    128     if is_retry(response, retry_settings['mode']) or await is_checksum_retry(response):

File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:77, in _SansIOAsyncHTTPPolicyRunner.send(self, request)
     76 try:
---> 77     response = await self.next.send(request)
     78 except Exception:

File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:77, in _SansIOAsyncHTTPPolicyRunner.send(self, request)
     76 try:
---> 77     response = await self.next.send(request)
     78 except Exception:

    [... skipping similar frames: _SansIOAsyncHTTPPolicyRunner.send at line 77 (1 times)]

File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:77, in _SansIOAsyncHTTPPolicyRunner.send(self, request)
     76 try:
---> 77     response = await self.next.send(request)
     78 except Exception:

File /usr/local/lib/python3.12/site-packages/azure/storage/blob/_shared/policies_async.py:80, in AsyncStorageResponseHook.send(self, request)
     77 response_callback = request.context.get('response_callback') or \
     78     request.context.options.pop('raw_response_hook', self._response_callback)
---> 80 response = await self.next.send(request)
     81 will_retry = is_retry(response, request.context.options.get('mode')) or await is_checksum_retry(response)

File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:77, in _SansIOAsyncHTTPPolicyRunner.send(self, request)
     76 try:
---> 77     response = await self.next.send(request)
     78 except Exception:

File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:77, in _SansIOAsyncHTTPPolicyRunner.send(self, request)
     76 try:
---> 77     response = await self.next.send(request)
     78 except Exception:

File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/_base_async.py:111, in _AsyncTransportRunner.send(self, request)
    108 cleanup_kwargs_for_transport(request.context.options)
    109 return PipelineResponse(
    110     request.http_request,
--> 111     await self._sender.send(request.http_request, **request.context.options),
    112     request.context,
    113 )

File /usr/local/lib/python3.12/site-packages/azure/storage/blob/_shared/base_client_async.py:268, in AsyncTransportWrapper.send(self, request, **kwargs)
    267 async def send(self, request, **kwargs):
--> 268     return await self._transport.send(request, **kwargs)

File /usr/local/lib/python3.12/site-packages/azure/storage/blob/_shared/base_client_async.py:268, in AsyncTransportWrapper.send(self, request, **kwargs)
    267 async def send(self, request, **kwargs):
--> 268     return await self._transport.send(request, **kwargs)

File /usr/local/lib/python3.12/site-packages/azure/core/pipeline/transport/_aiohttp.py:349, in AioHttpTransport.send(self, request, stream, proxies, **config)
    348 except asyncio.TimeoutError as err:
--> 349     raise ServiceResponseError(err, error=err) from err
    350 except aiohttp.client_exceptions.ClientError as err:

ServiceResponseError: Timeout on reading data from socket

The above exception was the direct cause of the following exception:

RuntimeError                              Traceback (most recent call last)

File /usr/local/lib/python3.12/site-packages/fsspec/spec.py:2227, in AbstractBufferedFile.__exit__(self, *args)
   2226 def __exit__(self, *args):
-> 2227     self.close()

File /usr/local/lib/python3.12/site-packages/adlfs/spec.py:2033, in AzureBlobFile.close(self)
   2031 """Close file and azure client."""
   2032 asyncio.run_coroutine_threadsafe(close_container_client(self), loop=self.loop)
-> 2033 super().close()

File /usr/local/lib/python3.12/site-packages/fsspec/spec.py:2178, in AbstractBufferedFile.close(self)
   2176 else:
   2177     if not self.forced:
-> 2178         self.flush(force=True)
   2180     if self.fs is not None:
   2181         self.fs.invalidate_cache(self.path)

File /usr/local/lib/python3.12/site-packages/fsspec/spec.py:2041, in AbstractBufferedFile.flush(self, force)
   2038         self.closed = True
   2039         raise
-> 2041 if self._upload_chunk(final=force) is not False:
   2042     self.offset += self.buffer.seek(0, 2)
   2043     self.buffer = io.BytesIO()

File /usr/local/lib/python3.12/site-packages/fsspec/asyn.py:118, in sync_wrapper.<locals>.wrapper(*args, **kwargs)
    115 @functools.wraps(func)
    116 def wrapper(*args, **kwargs):
    117     self = obj or args[0]
--> 118     return sync(self.loop, func, *args, **kwargs)

File /usr/local/lib/python3.12/site-packages/fsspec/asyn.py:103, in sync(loop, func, timeout, *args, **kwargs)
    101     raise FSTimeoutError from return_result
    102 elif isinstance(return_result, BaseException):
--> 103     raise return_result
    104 else:
    105     return return_result

File /usr/local/lib/python3.12/site-packages/fsspec/asyn.py:56, in _runner(event, coro, result, timeout)
     54     coro = asyncio.wait_for(coro, timeout=timeout)
     55 try:
---> 56     result[0] = await coro
     57 except Exception as ex:
     58     result[0] = ex

File /usr/local/lib/python3.12/site-packages/adlfs/spec.py:2200, in AzureBlobFile._async_upload_chunk(self, final, **kwargs)
   2198                     raise FileExistsError(self.path)
   2199         else:
-> 2200             raise RuntimeError(f"Failed to upload block: {e}!") from e
   2201 elif self.mode == "ab":
   2202     async with self.container_client.get_blob_client(blob=self.blob) as bc:

RuntimeError: Failed to upload block: Timeout on reading data from socket!

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions