Meltano elt run fails with a “pipe closed” exception
What is the current bug behavior?
I’m stress testing my meltano installation with an elt run using tap-spreadsheets-anywhere and a custom loader-kafka plugin. I'm finding that after running an elt job full throttle for about 30-45 minutes, meltano consistently fails with a “pipe closed” exception. Here’s the message I see repeated by Meltano when the error occurs "meltano | pipe closed by peer or os.write(pipe, data) raised exception."
What is the expected correct behavior?
An ELT run that begins to load data should finish doing so. At minimum I would expect a better error message indicating a root cause for the failure.
Steps to reproduce
I have meltano configured for extremely high throughput on the tap side and a slightly slower loader. My guess is that without loader backpressure to slow the rate of production of records from the tap - this will eventually happen to any sufficiently long-running/large elt run where tap is faster than loader.
Relevant logs and/or screenshots
I captured this log with PYTHONASYNCIODEBUG env var enabled.
tap-spreadsheets-anywhere | INFO Found credentials in environment variables.
meltano | Executing <Handle <TaskWakeupMethWrapper object at 0x7fb78aa93378>(<Future finis...events.py:299>) created at /usr/local/lib/python3.6/asyncio/streams.py:399> took 4.405 seconds
meltano | Executing <Handle <TaskWakeupMethWrapper object at 0x7fb78aa93378>(<Future finis...events.py:299>) created at /usr/local/lib/python3.6/asyncio/streams.py:399> took 6.464 seconds
meltano | <_UnixWritePipeTransport fd=17 polling bufsize=1786482688> was closed by peer
meltano | Executing <Handle cancelled _UnixWritePipeTransport._read_ready() created at /usr/local/lib/python3.6/asyncio/selector_events.py:251> took 0.115 seconds
meltano | <_UnixReadPipeTransport fd=20 polling> was closed by peer
meltano | <_UnixReadPipeTransport fd=18 polling> was closed by peer
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | <_UnixSubprocessTransport pid=166 running stdin=<_UnixWritePipeTransport closed fd=17 closed> stdout=<_UnixReadPipeTransport closed fd=18 closed> stderr=<_UnixReadPipeTransport closed fd=20 closed>> exited with return code -9
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
meltano | pipe closed by peer or os.write(pipe, data) raised exception.
tap-spreadsheets-anywhere | ERROR Pipe to loader broke after 134629 records were written
tap-spreadsheets-anywhere | BrokenPipeError: [Errno 32] Broken pipe
tap-spreadsheets-anywhere | Exception ignored in: <_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>
tap-spreadsheets-anywhere | BrokenPipeError: [Errno 32] Broken pipe
meltano | <_UnixReadPipeTransport fd=16 polling> was closed by peer
meltano | <_UnixSubprocessTransport pid=165 running stdout=<_UnixReadPipeTransport closed fd=14 closed> stderr=<_UnixReadPipeTransport closed fd=16 closed>> exited with return code 120
meltano | Extraction failed (120): BrokenPipeError: [Errno 32] Broken pipe
meltano | Loading failed (-9): INFO in target
meltano | <_UnixReadPipeTransport fd=12 polling> was closed by peer
meltano | ELT could not be completed: Tap and target failed
meltano | <_UnixReadPipeTransport fd=10 polling> was closed by peer
Possible fixes
I couldn't reproduce this issue with debug enabled so that's an unfriendly workaround. The write to disk apparently slowed the tap enough to allow the loader to stay current. Realistically - some type of back-pressure that slows the tap based upon how much data is buffered for the reader is likely the only real solution.
Further regression test
Load test religiously?