Skip to content

Source SFTP Bulk: Timeout while reading large file #26021

Open
@jurgispods

Description

Connector Name

source-sftp-bulk

Connector Version

0.1.2

What step the error happened?

During the sync

Revelant information

When using the SFTP bulk source to read a fairly large CSV file (close to 1GB), I run into a timeout, which itself causes another error, since the exception does not seem to be handled correctly.

It would be nice if this could be fixed. I would also be very interested in a workaround, i.e. being able to configure the timeout to be higher than the default.

Relevant log output

'SFTPFile' object is not subscriptable", "stack_trace": "Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/paramiko/channel.py", line 699, in recv
    out = self.in_buffer.read(nbytes, self.timeout)
  File "/usr/local/lib/python3.9/site-packages/paramiko/buffered_pipe.py", line 164, in read
    raise PipeTimeout()
paramiko.buffered_pipe.PipeTimeout

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/airbyte/integration_code/source_sftp_bulk/client.py", line 190, in fetch_file
    df = pd.read_csv(f, engine="python", sep=separator)
  File "/usr/local/lib/python3.9/site-packages/pandas/util/_decorators.py", line 211, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/pandas/util/_decorators.py", line 317, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/pandas/io/parsers/readers.py", line 950, in read_csv
    return _read(filepath_or_buffer, kwds)
  File "/usr/local/lib/python3.9/site-packages/pandas/io/parsers/readers.py", line 611, in _read
    return parser.read(nrows)
  File "/usr/local/lib/python3.9/site-packages/pandas/io/parsers/readers.py", line 1772, in read
    ) = self._engine.read(  # type: ignore[attr-defined]
  File "/usr/local/lib/python3.9/site-packages/pandas/io/parsers/python_parser.py", line 251, in read
    content = self._get_lines(rows)
  File "/usr/local/lib/python3.9/site-packages/pandas/io/parsers/python_parser.py", line 1124, in _get_lines
    new_row = self._next_iter_line(row_num=self.pos + rows + 1)
  File "/usr/local/lib/python3.9/site-packages/pandas/io/parsers/python_parser.py", line 787, in _next_iter_line
    line = next(self.data)
  File "/usr/local/lib/python3.9/site-packages/paramiko/file.py", line 125, in __next__
    line = self.readline()
  File "/usr/local/lib/python3.9/site-packages/paramiko/file.py", line 291, in readline
    new_data = self._read(n)
  File "/usr/local/lib/python3.9/site-packages/paramiko/sftp_file.py", line 185, in _read
    t, msg = self.sftp._request(
  File "/usr/local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 822, in _request
    return self._read_response(num)
  File "/usr/local/lib/python3.9/site-packages/paramiko/sftp_client.py", line 852, in _read_response
    t, data = self._read_packet()
  File "/usr/local/lib/python3.9/site-packages/paramiko/sftp.py", line 201, in _read_packet
    x = self._read_all(4)
  File "/usr/local/lib/python3.9/site-packages/paramiko/sftp.py", line 185, in _read_all
    x = self.sock.recv(n)
  File "/usr/local/lib/python3.9/site-packages/paramiko/channel.py", line 701, in recv
    raise socket.timeout()
socket.timeout

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/airbyte/integration_code/main.py", line 13, in <module>
    launch(source, sys.argv[1:])
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py", line 131, in launch
    for message in source_entrypoint.run(parsed_args):
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py", line 122, in run
    for message in generator:
  File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 99, in read
    stream_instances = {s.name: s for s in self.streams(config)}
  File "/airbyte/integration_code/source_sftp_bulk/source.py", line 133, in streams
    json_schema = self._infer_json_schema(config, conn)
  File "/airbyte/integration_code/source_sftp_bulk/source.py", line 58, in _infer_json_schema
    df = connection.fetch_file(fn=files[-1], file_type=config["file_type"], separator=config.get("separator"))
  File "/usr/local/lib/python3.9/site-packages/backoff/_sync.py", line 94, in retry
    ret = target(*args, **kwargs)
  File "/airbyte/integration_code/source_sftp_bulk/client.py", line 206, in fetch_file
    logger.warning("Skipping %s file because it is unable to be read.", f["filepath"])
TypeError: 'SFTPFile' object is not subscriptable

Contribute

  • Yes, I want to contribute

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions