dcmc.py 5.84 KB
Newer Older
1 2 3 4
#!/usr/bin/env python3

import json.decoder
import logging as log
5
import random
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
import urllib.parse

from abc import ABC, abstractmethod
from aiohttp import web, ClientSession
from aiohttp_swagger import setup_swagger
from datetime import datetime
from http import HTTPStatus
from marshmallow import Schema as MSchema
from typing import ClassVar, List, Optional, Type, TYPE_CHECKING

from ..common.utils import WithSchema

if TYPE_CHECKING:
    from dataclasses import dataclass
else:
    from marshmallow_dataclass import dataclass


@dataclass
class LoadValue(WithSchema):
    ''' A generic key - value - unit of measurement container '''
    parameter: str
    value: int
    uom: str
    Schema: ClassVar[Type[MSchema]] = MSchema  # for mypy


@dataclass
class MigrationRequestItem(WithSchema):
    ''' Item of a clearing request: either an offer or a bid '''
    date: datetime
    vc_tag: Optional[str]
    starttime: datetime
    endtime: datetime
    load_values: List[LoadValue]
    price: int
    action_type: str
    Schema: ClassVar[Type[MSchema]] = MSchema  # for mypy


@dataclass
class MigrationStatus(WithSchema):
    ''' Posted by DCMC to inform of the status of a migration '''
    vc_tag: str
    status: str
    Schema: ClassVar[Type[MSchema]] = MSchema  # for mypy


class DCMCEndpointApp:

    class Hello:
        pass

    def __init__(self, *, message_queue, listen_address):
        self._message_queue = message_queue
        self._listen_address = urllib.parse.urlparse("http://" + listen_address)
        self._listen_address_raw = listen_address

        self.app = web.Application(
        middlewares=[
            DCMCEndpointApp._catch_json_decode_error,
        ])

        self.app.add_routes([
            web.get("/hello", self._hello),
            web.post("/migration", self._handle_migration_status),
        ])

        setup_swagger(self.app)


    async def start(self):
        log.warning("Starting DCMC api server, listening on %s", self._listen_address_raw)
        runner = web.AppRunner(self.app)
        await runner.setup()
        site = web.TCPSite(runner, host=self._listen_address.hostname, port=self._listen_address.port)
        await site.start()


    @staticmethod
    @web.middleware
    async def _catch_json_decode_error(request, handler):
        try:
            return await handler(request)
        except json.decoder.JSONDecodeError:
            raise web.HTTPBadRequest(body=json.dumps({'message': "Could not decode json body"}))


    async def _hello(self, request):  # pylint: disable=unused-argument
        '''
        ----
        description: Check it's running
        responses:
            "200":
                description: it's running
            "500":
                description: it's running but apparently it doesn't work very well
        '''
        await self._message_queue.put(self.Hello())
        return web.Response(text="Hello")


    async def _handle_migration_status(self, request):
        '''
        ----
        description: Report migration status update
        parameters:
        - in: body
          name: body
          description: migration status
          schema:
            type: object
            properties:
              vc_tag:
                type: string
              status:
                type: string
        responses:
            "202":
                description: the updated was queued for treatment
            "400":
                description: the service doesn't like your data
            "500":
                description: the service is grumpy or sick
        '''
        body = await request.json()
        if isinstance(body, list):
            statuses = [MigrationStatus.Schema().load(status) for status in body]
        else:
            statuses = [MigrationStatus.Schema().load(body)]
        await self._message_queue.put(statuses)

        return web.Response(status=HTTPStatus.ACCEPTED)



class DCMCClient(ABC):
    @abstractmethod
    async def send_migration_request(self, migration_request: List[MigrationRequestItem]):
        pass


class DCMCClientFake(DCMCClient):
    async def send_migration_request(self, migration_request: List[MigrationRequestItem]):
        log.info("Would send to DCMC the migration request: %s", migration_request)


class DCMCClientHttp(DCMCClient):
    def __init__(self, remote_uri, authorization_header):
        self._remote_uri = remote_uri
        self._authorization_header = authorization_header

    async def send_migration_request(self, migration_request: List[MigrationRequestItem]):
        log.info("Will send to DCMC the migration request: %s, at %s", migration_request, self._remote_uri)
        try:
            async with ClientSession(raise_for_status=True, headers={'Authorization': self._authorization_header}) as session:
                json_data = [item.json_obj() for item in migration_request]
                raw_json = json.dumps(json_data, indent=4)
164 165
                cookie = random.randint(1, 100000000)
                log.debug("Posting request %s to DCMC client at %s: %s", cookie, self._remote_uri, raw_json)
166
                await session.post(url=self._remote_uri, json=json_data)
167
                log.info("Migration request %s properly posted to DCMC", cookie)
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
        except Exception:
            log.exception("Failed to submit migration request: ")





# For documentation:
#
# {
#     "date": "2019-05-14T12:00:00Z",
#     "vc_tag": "vdfkbjafhvlafjvendflvkdsfnvjndsflvkshdl",
#     "starttime": "2019-05-14T12:00:00Z",
#     "endtime": "2019-05-14T12:00:00Z",
#     "load_values": [
#         {
#             "parameter": "cpu",
#             "value": 2,
#             "uom": "cpu"},
#         {
#             "parameter": "ram",
#             "value": 2048,
#             "uom": "MB"},
#         {
#             "parameter": "disk",
#             "value": 20,
#             "uom": "GB"}
#         ],
#     "price": 10,
#     "action_type": "offer"
# }