Commit d84ce2bd authored by aikchar's avatar aikchar

Initial version of code

parent 54ca8b04
import logging
import sys
FORMAT = "%(asctime)s : %(pathname)s : %(name)s : %(module)s : %(funcName)s : %(lineno)d : %(levelname)s : %(message)s"
date_format = "%Y%m%d-%H%M%S-%f"
logging.basicConfig(format=FORMAT, level=logging.DEBUG, stream=sys.stdout)
logger = logging.getLogger("brokerlso")
"""Command messages for Qpid Management Framework (QMF) version 2"""
from brokerlso import logger
class RequestCmd:
"""Craft QMFv2 command messages"""
def __init__(self):
self.object_name = "org.apache.qpid.broker:broker:amqp-broker"
logger.debug("object name -> {0}".format(self.object_name))
self.properties = {"x-amqp-0-10.app-id": "qmf2", "qmf.opcode": "_query_request", "method": "request"}
logger.debug("Message properties -> {0}".format(self.properties))
def create_queue(self, name, strict=True, auto_delete=False, auto_delete_timeout=10):
"""Create message content and properties to create queue with QMFv2
:param name: Name of queue to create
:type name: str
:param strict: Whether command should fail when unrecognized properties are provided
Not used by QMFv2
Default: True
:type strict: bool
:param auto_delete: Whether queue should be auto deleted
Default: False
:type auto_delete: bool
:param auto_delete_timeout: Timeout in seconds for auto deleting queue
Default: 10
:type auto_delete_timeout: int
:returns: Tuple containing content and properties
"""
content = {"_object_id": {"_object_name": self.object_name},
"_method_name": "create",
"_arguments": {"type": "queue",
"name": name,
"strict": strict,
"properties": {"auto-delete": auto_delete,
"qpid.auto_delete_timeout": auto_delete_timeout}}}
logger.debug("Message content -> {0}".format(content))
return content, self.properties
def create_exchange(self, name, type_="fanout", strict=True, auto_delete=False, auto_delete_timeout=10):
"""Create message content and properties to create exchange with QMFv2
:param name: Name of exchange to create
:type name: str
:param type_: Type of exchange to create
Possible values are fanout, ...?
:type type_: str
:param strict: Whether command should fail when unrecognized properties are provided
Not used by QMFv2
Default: True
:type strict: bool
:param auto_delete: Whether exchange should be auto deleted
Default: False
:type auto_delete: bool
:param auto_delete_timeout: Timeout in seconds for auto deleting exchange
Default: 10
:type auto_delete_timeout: int
:returns: Tuple containing content and properties
"""
content = {"_object_id": {"_object_name": self.object_name},
"_method_name": "create",
"_arguments": {"type": "exchange",
"name": name,
"strict": strict,
"properties": {"auto-delete": auto_delete,
"qpid.auto_delete_timeout": auto_delete_timeout,
"exchange-type": type_}}}
logger.debug("Message content -> {0}".format(content))
return content, self.properties
def create_binding(self, name, strict=True, auto_delete=False, auto_delete_timeout=10):
"""Create message content and properties to create binding with QMFv2
:param name: Name of binding to create in format "exchange/queue/key"
:type name: str
:param strict: Whether command should fail when unrecognized properties are provided
Not used by QMFv2
Default: True
:type strict: bool
:param auto_delete: Whether exchange should be auto deleted
Default: False
:type auto_delete: bool
:param auto_delete_timeout: Timeout in seconds for auto deleting exchange
Default: 10
:type auto_delete_timeout: int
:returns: Tuple containing content and properties
"""
content = {"_object_id": {"_object_name": self.object_name},
"_method_name": "create",
"_arguments": {"type": "binding",
"name": name,
"strict": strict,
"properties": {"auto-delete": auto_delete,
"qpid.auto_delete_timeout": auto_delete_timeout}}}
logger.debug("Message content -> {0}".format(content))
return content, self.properties
def delete_queue(self, name):
"""Create message content and properties to delete queue with QMFv2
:param name: Name of queue to delete
:type name: str
"""
content = {"_object_id": {"_object_name": self.object_name},
"_method_name": "delete",
"options": {"type": "queue", "name": name, "options": dict()}}
logger.debug("Message content -> {0}".format(content))
return content, self.properties
def delete_exchange(self, name):
"""Create message content and properties to delete exchange with QMFv2
:param name: Name of exchange to delete
:type name: str
"""
content = {"_object_id": {"_object_name": self.object_name},
"_method_name": "delete",
"options": {"type": "exchange", "name": name, "options": dict()}}
logger.debug("Message content -> {0}".format(content))
return content, self.properties
def delete_binding(self, name):
"""Create message content and properties to delete exchange with QMFv2
:param name: Name of exchange to delete in format "exchange/queue/key"
:type name: str
"""
content = {"_object_id": {"_object_name": self.object_name},
"_method_name": "delete",
"options": {"type": "binding", "name": name, "options": dict()}}
logger.debug("Message content -> {0}".format(content))
return content, self.properties
def list_queues(self):
"""Create message content and properties to list all queues with QMFv2
:returns: Tuple containing content and properties
"""
content = {"_what": "OBJECT",
"_schema_id": {"_class_name": "queue"}}
logger.debug("Message content -> {0}".format(content))
return content, self.properties
def list_exchanges(self):
"""Create message content and properties to list all exchanges with QMFv2
:returns: Tuple containing content and properties
"""
content = {"_what": "OBJECT",
"_schema_id": {"_class_name": "exchange"}}
logger.debug("Message content -> {0}".format(content))
return content, self.properties
[bdist_wheel]
python-tag = py35
[aliases]
test=pytest
[tool:pytest]
addopts = -lsvvvv --capture=fd -rxs --junit-xml=pytest-report.xml --instafail --cov=brokerlso
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import glob
from setuptools import setup, find_packages
with open("README.rst") as readme_file:
readme = readme_file.read()
# Lists of docs to include in package (tarball & wheel)
html_docs_root = glob.glob("docs/build/html/*.*")
html_docs_sources = glob.glob("docs/build/html/_sources/*.*")
html_docs_static = glob.glob("docs/build/html/_static/*.*")
setup(
name="brokerlso",
version="0.0.1",
description="A small Python library to craft command messages for Apache Qpid Broker.",
long_description=readme,
author="Hamza Sheikh",
author_email="code@codeghar.com",
url="https://github.com/codeghar/brokerlso",
license="MIT",
packages=find_packages(),
package_dir={"brokerlso": "brokerlso"},
package_data=dict(),
include_package_data=True,
data_files=[("usr/share/doc/brokerlso/", html_docs_root),
("usr/share/doc/brokerlso/_sources/", html_docs_sources),
("usr/share/doc/brokerlso/_static/", html_docs_static)],
zip_safe=False,
keywords="broker messaging amqp",
classifiers=[
"Development Status :: 2 - Pre-Alpha",
"Intended Audience :: Developers",
"Natural Language :: English",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.5",
],
install_requires=[],
setup_requires=['pytest-runner'],
test_suite="tests",
tests_require=["pytest"],
extras_require={"docs": ["sphinx"]},
entry_points=dict()
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment