This project is archived. Its data is read-only. This project is read-only.
Meltano elt run fails with a “pipe closed” exception
### What is the current *bug* behavior? I’m stress testing my meltano installation with an elt run using tap-spreadsheets-anywhere and a custom loader-kafka plugin. I'm finding that after running an elt job full throttle for about 30-45 minutes, meltano consistently fails with a “pipe closed” exception. Here’s the message I see repeated by Meltano when the error occurs "meltano | pipe closed by peer or os.write(pipe, data) raised exception." ### What is the expected *correct* behavior? An ELT run that begins to load data should finish doing so. At minimum I would expect a better error message indicating a root cause for the failure. ### Steps to reproduce I have meltano configured for extremely high throughput on the tap side and a slightly slower loader. My guess is that without loader backpressure to slow the rate of production of records from the tap - this will eventually happen to any sufficiently long-running/large elt run where tap is faster than loader. ### Relevant logs and/or screenshots I captured this log with PYTHONASYNCIODEBUG env var enabled. ```bash tap-spreadsheets-anywhere | INFO Found credentials in environment variables. meltano | Executing <Handle <TaskWakeupMethWrapper object at 0x7fb78aa93378>(<Future finis...events.py:299>) created at /usr/local/lib/python3.6/asyncio/streams.py:399> took 4.405 seconds meltano | Executing <Handle <TaskWakeupMethWrapper object at 0x7fb78aa93378>(<Future finis...events.py:299>) created at /usr/local/lib/python3.6/asyncio/streams.py:399> took 6.464 seconds meltano | <_UnixWritePipeTransport fd=17 polling bufsize=1786482688> was closed by peer meltano | Executing <Handle cancelled _UnixWritePipeTransport._read_ready() created at /usr/local/lib/python3.6/asyncio/selector_events.py:251> took 0.115 seconds meltano | <_UnixReadPipeTransport fd=20 polling> was closed by peer meltano | <_UnixReadPipeTransport fd=18 polling> was closed by peer meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | <_UnixSubprocessTransport pid=166 running stdin=<_UnixWritePipeTransport closed fd=17 closed> stdout=<_UnixReadPipeTransport closed fd=18 closed> stderr=<_UnixReadPipeTransport closed fd=20 closed>> exited with return code -9 meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. meltano | pipe closed by peer or os.write(pipe, data) raised exception. tap-spreadsheets-anywhere | ERROR Pipe to loader broke after 134629 records were written tap-spreadsheets-anywhere | BrokenPipeError: [Errno 32] Broken pipe tap-spreadsheets-anywhere | Exception ignored in: <_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'> tap-spreadsheets-anywhere | BrokenPipeError: [Errno 32] Broken pipe meltano | <_UnixReadPipeTransport fd=16 polling> was closed by peer meltano | <_UnixSubprocessTransport pid=165 running stdout=<_UnixReadPipeTransport closed fd=14 closed> stderr=<_UnixReadPipeTransport closed fd=16 closed>> exited with return code 120 meltano | Extraction failed (120): BrokenPipeError: [Errno 32] Broken pipe meltano | Loading failed (-9): INFO in target meltano | <_UnixReadPipeTransport fd=12 polling> was closed by peer meltano | ELT could not be completed: Tap and target failed meltano | <_UnixReadPipeTransport fd=10 polling> was closed by peer ``` ### Possible fixes I couldn't reproduce this issue with debug enabled so that's an unfriendly workaround. The write to disk apparently slowed the tap enough to allow the loader to stay current. Realistically - some type of back-pressure that slows the tap based upon how much data is buffered for the reader is likely the only real solution. ### Further regression test Load test religiously?
issue