add support for non sorted streams
The default behavior of the singer sdk is that the stream should have their data sorted based on the replication key in ascending order meaning that the replication key value of the last item received should be saved as the state. The state of a stream is actually updated after each record received but does not take into account if the data is sorted or not.
That said i believe we should add a way to save the state of a stream by taking the maximum replication key value of all the records received from a page.
Below you can find attached a patch of a simple implementation i carried out to handle this. Although it does not take into account a tap failure/restart. 0001-add-support-for-non-sorted-streams.patch. The implementation adds a method to the Stream class which can be overrided to compare the current state value with the current record being pushed and return the new value of the state. Below is how i used it in my case:
def maximum(self, state: dict, last_record: dict):
current_value = state.get(self.replication_key, None)
new_value = last_record.get(self.replication_key, None)
if current_value and new_value:
current_datetime = pendulum.parse(current_value)
new_datetime = pendulum.parse(new_value)
return current_value if current_datetime >= new_datetime else new_value
return new_value if new_value else current_value
I hope this helps. :)