Commits (3)
......@@ -61,9 +61,9 @@ author = "Minty_infra_amqp Team"
# built documents.
#
# The short X.Y version.
version = "0.0.3"
version = "0.0.4"
# The full version, including alpha/beta/rc tags.
release = "0.0.3"
release = "0.0.4"
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
......
__version__ = "0.0.3"
__version__ = "0.0.4"
from minty import Base
import amqpstorm
from amqpstorm import AMQPChannelError, AMQPConnectionError
import threading
class AMQPInfrastructure(Base):
__slots__ = ["cache_lock", "connections"]
__slots__ = ["cache_lock", "connection", "channels"]
def __init__(self):
"""Initialize the AMQP infrastructure"""
self.cache_lock = threading.Lock()
self.connections = {}
self.channels = {}
self.connection = None
def __call__(self, config: dict):
"""Create a new AMQP connection using the specified configuration
......@@ -26,13 +29,28 @@ class AMQPInfrastructure(Base):
with self.cache_lock:
try:
connection = self.connections[rmq_url]
channel = self.channels[rmq_url]
channel.check_for_errors()
except KeyError:
self.connections[rmq_url] = amqpstorm.UriConnection(rmq_url)
connection = self.connections[rmq_url]
return connection.channel()
def clean_up(self, channel):
"""Close the specified AMQP channel"""
channel.close()
channel = self._create_connection_and_channel(rmq_url)
except AMQPConnectionError:
channel = self._create_connection_and_channel(rmq_url)
except AMQPChannelError:
channel = self.connection.channel()
self.channels[rmq_url] = channel
return channel
def _create_connection_and_channel(
self, rmq_url: str
) -> amqpstorm.Channel:
"""Create connection and channel.
:param rmq_url: amqp:// URL
:type rmq_url: str
:return: channel
:rtype: amqpstorm.Channel
"""
self.connection = amqpstorm.UriConnection(rmq_url)
self.channels[rmq_url] = self.connection.channel()
return self.channels[rmq_url]
[bumpversion]
current_version = 0.0.3
current_version = 0.0.4
commit = True
tag = True
message = "Tagged new version, from {current_version} to {new_version}"
......
......@@ -47,6 +47,6 @@ setup(
test_suite="tests",
tests_require=test_requirements,
url="https://gitlab.com/minty-python/minty-infra-amqp",
version="0.0.3",
version="0.0.4",
zip_safe=False,
)
from unittest import mock
from amqpstorm import AMQPChannelError, AMQPConnectionError
from minty_infra_amqp import AMQPInfrastructure
......@@ -7,41 +9,35 @@ class TestAMQPInfrastructure:
"""Test the AMQP infrastructure"""
@mock.patch("amqpstorm.UriConnection")
def test_amqp_infrastructure_basic(self, amqp_connection: mock.patch):
"""AMQP infrastructure tests"""
connection = mock.MagicMock()
connection.channel = mock.MagicMock(return_value=3)
amqp_connection.return_value = connection
infra_instance = AMQPInfrastructure()
assert infra_instance.connections == {}
config = {"amqp": {"url": "amqp://url/here"}}
infra = infra_instance(config)
assert list(infra_instance.connections.keys()) == ["amqp://url/here"]
assert infra == 3
amqp_connection.assert_called_once_with("amqp://url/here")
connection.channel.assert_called_once_with()
def setup(self, mock_amqp):
amqp_infra = AMQPInfrastructure()
assert amqp_infra.channels == {}
assert amqp_infra.connection is None
self.amqp_infra = amqp_infra
self.config = {"amqp": {"url": "amqp://url/here"}}
self.mock_channel = mock.MagicMock()
mock_amqp().channel.return_value = self.mock_channel
channel = self.amqp_infra(self.config)
assert channel == self.mock_channel
assert list(self.amqp_infra.channels.keys()) == ["amqp://url/here"]
def test_amqp_infrastructure_cached_connection(self):
self.mock_channel.check_for_errors.return_value = True
channel2 = self.amqp_infra(self.config)
assert channel2 == self.mock_channel
def test_amqp_infrastructure_channel_error(self):
self.mock_channel.check_for_errors.side_effect = AMQPChannelError
self.amqp_infra.connection.channel.return_value = (
"channel_called_again"
)
channel = self.amqp_infra(self.config)
assert channel == "channel_called_again"
# Reset for a second call (this should use the cache for the connection,
# but return a new channel)
amqp_connection.reset_mock()
connection.reset_mock()
infra2 = infra_instance(config)
assert infra == infra2
amqp_connection.assert_not_called()
connection.channel.assert_called_once_with()
def test_cleanup(self):
mock_channel = mock.MagicMock()
infra_instance = AMQPInfrastructure()
assert infra_instance.connections == {}
@mock.patch("amqpstorm.UriConnection")
def test_amqp_infrastructure_connection_error(self, mock_amqp):
self.mock_channel.check_for_errors.side_effect = AMQPConnectionError
mock_amqp().channel.return_value = "channel"
infra_instance.clean_up(mock_channel)
mock_channel.close.assert_called_once()
channel = self.amqp_infra(self.config)
assert channel == "channel"