Commit 0a5dd022 authored by Maribeth Pierce's avatar Maribeth Pierce
Browse files

Remove many redundant API tests, refactor UI account_details tests to use seed...

Remove many redundant API tests, refactor UI account_details tests to use seed data. Remove all references to injector.py. Remove UI tests that rely on injector with next steps = continue refactoring/replacing UI tests to use seed data.
parent ddc4d385
Loading
Loading
Loading
Loading
+2 −21
Original line number Diff line number Diff line
@@ -7,7 +7,7 @@ import urllib3

import yaml

from integrade import exceptions, injector, utils
from integrade import exceptions


# Suppress HTTPS warnings against our test server without a cert
@@ -21,7 +21,7 @@ _CONFIG = None
_AWS_CONFIG = None


def get_config(create_superuser=True, need_base_url=True):
def get_config(need_base_url=True):
    """Return a copy of the global config dictionary.

    This method makes use of a cache. If the cache is empty, the configuration
@@ -113,25 +113,6 @@ def get_config(create_superuser=True, need_base_url=True):
            raise exceptions.MissingConfigurationError(
                '\n'.join(missing_config_errors)
            )
        super_username = os.environ.get(
            'CLOUDIGRADE_USER', utils.uuid4()
        )
        _CONFIG['super_user_name'] = super_username
        super_password = os.environ.get(
            'CLOUDIGRADE_PASSWORD', utils.gen_password()
        )
        _CONFIG['super_user_password'] = super_password
        token = os.environ.get('CLOUDIGRADE_TOKEN', False)
        if not token and create_superuser:
            try:
                token = injector.make_super_user(
                    super_username, super_password)
            except RuntimeError as e:
                raise exceptions.MissingConfigurationError(
                    'Could not create a super user or token, error:\n'
                    f'{repr(e)}'
                )
        _CONFIG['superuser_token'] = token
    return deepcopy(_CONFIG)


integrade/injector.py

deleted100755 → 0
+0 −295
Original line number Diff line number Diff line
"""Utilities to help interact with the remote environment."""
import pickle
import subprocess
from random import randint
from shutil import which
from textwrap import dedent, indent

from integrade import config
from integrade.constants import (
    CLOUD_ACCESS_AMI_NAME,
    MARKETPLACE_AMI_NAME,
)


def run_remote_python(script, **kwargs):
    """Run Python code inside the remote OpenShift pod."""
    script = dedent(script).strip()

    openshift_prefix = config.get_config()['openshift_prefix']
    if openshift_prefix:
        container_name = f'{openshift_prefix}a'
    else:
        raise RuntimeError('Unable to determine openshift prefix!')

    data = pickle.dumps(kwargs)
    wrap_start = 'import pickle as _pickle;import sys as _sys;\n' \
        f'globals().update(_pickle.loads({repr(data)}))\n' \
        'def _codewrapper():\n'
    wrap_end = '\n_retval = _codewrapper()\n' \
        '_sys.stdout.buffer.write(_pickle.dumps(_retval))\n'
    wrapped_script = wrap_start + indent(script, '  ') + wrap_end
    wrapped_script = wrapped_script.encode('utf8')

    if which('oc'):
        result = subprocess.run(['sh',
                                 '-c',
                                 f'oc rsh -c {container_name} $(oc get pods'
                                 ' -o jsonpath="{.items[*].metadata.name}" -l'
                                 f' name={container_name})'
                                 ' scl enable rh-python36'
                                 ' -- python -W ignore manage.py shell'],
                                stdout=subprocess.PIPE,
                                stderr=subprocess.PIPE,
                                input=wrapped_script,
                                timeout=60
                                )
        if result.returncode != 0:
            raise RuntimeError(
                f'Remote script failed\n'
                f'container_name = {container_name}\n'
                f'stdout = {result.stdout}\n'
                f'stderr = {result.stderr}\n'
                f'data = {kwargs}\n'
                f'script = {script}\n'
            )
        elif result.stdout:
            return pickle.loads(result.stdout)
    else:
        raise EnvironmentError(
            'Must have access to the cloudigrade openshift pod via the "oc"'
            'client to run remote commands in the Django manage.py shell. Make'
            'sure the "oc" client is in your path and the $OPENSHIFT_PREFIX'
            'used in the deploy is in your environment.'
        )


def direct_count_images(acct_id=None):
    """Count the number of images in an account directly."""
    return run_remote_python("""
        from datetime import date, timedelta
        from account.models import Account, AwsInstance, AwsInstanceEvent
        from account.models import AwsMachineImage

        if acct_id:
            return AwsMachineImage.objects.filter(
                instanceevent__instance__account__user_id=acct_id).count()
        else:
            return AwsMachineImage.objects.all().count()
        """, **locals())


def clear_images(acct_id=None):
    """Count the number of images in an account directly."""
    return run_remote_python("""
        from datetime import date, timedelta
        from account.models import Account, AwsInstance, AwsInstanceEvent
        from account.models import AwsMachineImage

        if acct_id:
            images = AwsMachineImage.objects.filter(
                instanceevent__instance__account__user_id=acct_id)
        else:
            images = AwsMachineImage.objects.all()
        images.delete()
        """, **locals())


def inject_aws_cloud_account(user_id,
                             name=None,
                             aws_account_number=None,
                             acct_age=100,
                             ):
    """Mock an aws account for the user specified.

    Mocked AWS accounts dont point to a real AWS account! No cloudtrail
    will exist, no real events will arrive, and no images will actually be
    inspected.

    :returns: (int) The account id, needed by inject_instance_data.
    """
    if aws_account_number is None:
        aws_account_number = str(randint(100000000000, 999999999999))
    arn = f'arn:aws:iam::{aws_account_number}:role/mock-arn'
    if name is None:
        name = aws_account_number
    return run_remote_python("""
    import datetime
    from account.models import AwsAccount

    kwargs = {
        'aws_account_id': aws_account_number,
        'user_id': user_id,
        'account_arn': arn,
        'name': name,
    }

    acct, new = AwsAccount.objects.get_or_create(**kwargs)
    days_ago = datetime.timedelta(days=acct_age)
    created_date = datetime.datetime.now() - days_ago
    timetuple = list(created_date.timetuple()[:-2])
    # make it noon, this sets the hour
    timetuple[3] = 12
    # this sets the min
    timetuple[4] = 0
    # this sets the sec
    timetuple[5] = 0
    acct.created_at = datetime.datetime(*timetuple)
    acct.save()

    return {
        'id' : acct.id,
        'aws_account_id' : aws_account_number,
        'user_id' : user_id,
        'account_arn' : arn,
        'name' : name
    }
    """, **locals())


def inject_instance_data(
    acct_id, image_type, events,
    instance_id=None,
    ec2_ami_id=None,
    owner_aws_account_id=None,
    challenged=False,
    vcpu=1,
    memory=1,
    rhel_enabled_repos_found=False,
    rhel_product_certs_found=False,
    rhel_release_files_found=False,
    rhel_signed_packages_found=False,
    is_marketplace=False,
    is_cloud_access=False,
):
    """Inject instance and image data for tests.

    Creates a new instance object directly in the Cloudigrade system and an
    image matching the given ID or creates one. The instance will be given tags
    based on a list of tags in `image_type`.

    The `events` parameter is a list of day offsets from today. For each offset
    an event will be created to power on, then power off, then repeat. As an
    example, the event list [10, 5, 3] will create events showing the instance
    was powered on 10 days ago, powered off 5 days ago, then powered on and
    left on 3 days ago.
    """
    ami_name = ''
    owner_id = 841258680906
    if instance_id is None:
        instance_id = str(randint(100000, 999999999999))
    if ec2_ami_id is None:
        ec2_ami_id = str(randint(100000, 999999999999))
    if is_marketplace:
        owner_aws_account_id = owner_id
        ami_name = MARKETPLACE_AMI_NAME
    if is_cloud_access:
        owner_aws_account_id = owner_id
        ami_name = CLOUD_ACCESS_AMI_NAME
    if is_marketplace and is_cloud_access:
        raise ValueError('Both is_marketplace and is_cloud_access are True.'
                         'Only zero or one can be True at a time.')
    return run_remote_python("""
    from datetime import date, timedelta
    import json

    from account.models import Account, AwsInstance, AwsInstanceEvent
    from account.models import AwsMachineImage, AwsEC2InstanceDefinitions
    from account.util import recalculate_runs

    instance_type = 'xx.fake-' + str(vcpu) + '-' + str(memory)

    AwsEC2InstanceDefinitions.objects.get_or_create(
        instance_type=instance_type,
        defaults=dict(
            memory=memory,
            vcpu=vcpu,
        ),
    )

    acct = Account.objects.get_or_create(id=acct_id)[0]
    rhel_detected = True if 'rhel' in image_type else False
    rhel_detected = rhel_detected or rhel_release_files_found
    image1 = AwsMachineImage.objects.get_or_create(
        ec2_ami_id=ec2_ami_id,

        defaults=dict(
            owner_aws_account_id=owner_aws_account_id or acct.aws_account_id,
            name=ami_name,
            status=AwsMachineImage.INSPECTED,
            inspection_json=json.dumps({
                'rhel_release_files_found': rhel_detected,
                'rhel_enabled_repos_found': rhel_enabled_repos_found,
                'rhel_product_certs_found': rhel_product_certs_found,
                'rhel_signed_packages_found': rhel_signed_packages_found,
            }),
            openshift_detected=True if 'openshift' in image_type else False,

            rhel_challenged=(challenged and 'rhel' in image_type),
            openshift_challenged=(challenged and 'openshift' in image_type),

            platform='none',
        )
    )[0]
    instance1 = AwsInstance.objects.get_or_create(
        ec2_instance_id=instance_id,
        machineimage=image1,
        defaults=dict(
            account=acct,
            region='us-east1',
        )
    )[0]

    on = False
    on_off_when = []
    for event in events:
        on = not on
        if isinstance(event, int):
            when = date.today() - timedelta(days=event)
        else:
            when = event
        on_off_when.append(when)
        instanceevent = AwsInstanceEvent.objects.create(
            event_type='power_on' if on else 'power_off',
            machineimage=image1 if on else None,
            instance=instance1,
            instance_type=instance_type,
            occurred_at=when,
            created_at=when,
        )
        # Need to reload event from DB, otherwise occurred_at is passed
        # as a date object instead of a datetime object.
        instanceevent.refresh_from_db()
        recalculate_runs(instanceevent)
    return {
        'image_id': image1.id,
        'instance_id': instance1.id,
        'on_off_when': on_off_when,
        'rhel_enabled_repos_found': rhel_enabled_repos_found,
        'rhel_product_certs_found': rhel_product_certs_found,
        'rhel_release_files_found': rhel_release_files_found,
        'rhel_signed_packages_found': rhel_signed_packages_found,
    }
    """, **locals())


def make_super_user(username, password):
    """Use manange.py to create a superuser and return an auth token."""
    return run_remote_python("""
        from django.contrib.auth import get_user_model;
        from rest_framework.authtoken.models import Token
        User = get_user_model();
        email = '{0}@example.com'.format(username)
        if User.objects.filter(username=username).exists():
            super_user = User.objects.get_by_natural_key(username)
            super_user.set_password(password)
            super_user.save()
        else:
            super_user = User.objects.create_superuser(
                            username,
                            email,
                            password
                            )
        token = Token.objects.get_or_create(user=super_user)
        return str(token[0])
            """, **locals())
+0 −211
Original line number Diff line number Diff line
"""Tests for accounts reports.

:caseautomation: automated
:casecomponent: api
:caseimportance: high
:caselevel: integration
:requirement: Cloud Meter
:testtype: functional
:upstream: yes
"""
import operator

import pytest

from integrade import api
from integrade.injector import inject_aws_cloud_account, inject_instance_data
from integrade.tests import urls, utils


def usertype(superuser):
    """Generate a test id based on the user type."""
    return 'superuser' if superuser else 'regularuser'


@pytest.fixture(scope='module')
def accounts_report_data():
    """Create cloud account data for the accounts report tests.

    Create three cloud accounts and create some instance data.
    """
    user = utils.create_user_account()
    auth = utils.get_auth(user)
    first_account = inject_aws_cloud_account(
        user['id'],
        name='a greatest account ever',
    )
    second_account = inject_aws_cloud_account(
        user['id'],
        name='b just another account',
    )
    third_account = inject_aws_cloud_account(
        user['id'],
        name='c my awesome account',
    )

    # Make first account have one RHEL instance and image that was running for
    # 3 days
    inject_instance_data(first_account['id'], 'rhel', [5, 2])

    # Make second account have one RHEL and one OpenShift instance and image
    # that was running for 5 days
    inject_instance_data(second_account['id'], 'rhel,openshift', [12, 7])

    # Make third account have one OpenShift instance and image that was running
    # for 10 days
    inject_instance_data(third_account['id'], 'openshift', [13, 3])

    return auth, first_account, second_account, third_account


@pytest.mark.parametrize('superuser', (False, True), ids=usertype)
def test_filter_by_account_id(accounts_report_data, superuser):
    """Test that cloud accounts report can be filtered by account ID.

    :id: 8de488c4-2550-4d51-9d25-a7c4355fe6f4
    :description: Test that regular users and superusers can filter cloud
        accounts by account ID.
    :steps:
        1) Add three cloud accounts
        2) Insert some instance events for all accounts.
        3) Filter the cloud accounts by providing the account_id of one of the
           three accounts ID
        4) Ensure a single account is returned and assert that their instance
           events are correct.
    :expectedresults:
        One account matched by its account ID is returned. All instance events
        should match.
    """
    auth, first_account, second_account, third_account = accounts_report_data
    client = api.Client(authenticate=superuser)
    start, end = utils.get_time_range()
    params = {
        'account_id': second_account['id'],
        'end': end,
        'start': start,
    }
    if superuser:
        params['user_id'] = first_account['user_id']
    response = client.get(urls.REPORT_ACCOUNTS, params=params, auth=auth)

    cloud_account_overviews = response.json()['cloud_account_overviews']
    assert len(cloud_account_overviews) == 1, cloud_account_overviews
    account = cloud_account_overviews[0]
    second_account_id = second_account['aws_account_id']
    cloud_account_id = account['cloud_account_id']
    assert second_account_id == cloud_account_id
    assert account['images'] == 1, repr(account)
    assert account['instances'] == 1, repr(account)
    assert account['rhel_instances'] == 1, repr(account)
    assert account['openshift_instances'] == 1, repr(account)


@pytest.mark.parametrize('superuser', (False, True), ids=usertype)
def test_filter_by_name(accounts_report_data, superuser):
    """Test that cloud accounts can be filtered by name.

    :id: 5abbfb8d-c447-464a-a980-4c7e8d2fcc80
    :description: Test that regular users and superusers can filter cloud
        accounts by name. Cloudigrade takes the search pattern, split its words
        and then for each word is matched as a substring in the name, any
        account that is matched is returned.
    :steps:
        1) Add three cloud accounts
        2) Insert some instance events for all accounts.
        3) Filter the cloud accounts using a two words pattern. Each pattern's
           word should match a single account.
        4) Ensure two accounts are returned and assert that their instance
           events are correct.
    :expectedresults:
        Two accounts are returned, one matched by the first word and the other
        by the second word. All instance events should match.
    """
    auth, first_account, second_account, third_account = accounts_report_data
    client = api.Client(authenticate=superuser)
    start, end = utils.get_time_range()
    params = {
        'end': end,
        'name_pattern': 'EaT sOme ToFu',
        'start': start,
    }
    if superuser:
        params['user_id'] = first_account['user_id']
    response = client.get(urls.REPORT_ACCOUNTS, params=params, auth=auth)

    results = sorted(
        response.json()['cloud_account_overviews'],
        key=operator.itemgetter('name')
    )
    assert len(results) == 2, results
    for expected, account in zip((first_account, third_account), results):
        expected_id = expected['aws_account_id']
        cloud_account_id = account['cloud_account_id']
        assert expected_id == cloud_account_id
        assert account['images'] == 1, repr(account)
        assert account['instances'] == 1, repr(account)
        if expected is first_account:
            assert account['rhel_instances'] == 1, repr(account)
            assert account['openshift_instances'] == 0, repr(account)
        else:
            assert account['rhel_instances'] == 0, repr(account)
            assert account['openshift_instances'] == 1, repr(account)


@pytest.mark.parametrize('superuser', (False, True), ids=usertype)
def test_filter_by_account_id_and_name(accounts_report_data, superuser):
    """Test that cloud accounts report can be filtered by account ID and name.

    :id: edacb611-dfec-4d8b-b480-b0c0d901c08e
    :description: Test that regular users and superusers can filter cloud
        accounts by account ID and name. This is not useful since the account
        ID will restrict the list to a single account but, since this is a
        possibility, ensure that an and operation will be done to match both
        account ID and name.
    :steps:
        1) Add three cloud accounts
        2) Insert some instance events for all accounts.
        3) Filter the cloud accounts by providing the account_id and a name
           pattern that does not match the account's name.
        4) Ensure an emptly list is returned since it won't match anything.
        5) Now update the account ID and the name pattern to match both the
           account ID and the name.
        4) Ensure a single account is returned and assert that their instance
           events are correct.
    :expectedresults:
        No result should be returned if both account ID and name pattern don't
        match any account. One result is returned when both account ID and name
        pattern match an account. All instance events should match for the
        matched account.
    """
    auth, first_account, second_account, third_account = accounts_report_data
    client = api.Client(authenticate=superuser)
    start, end = utils.get_time_range()
    params = {
        'end': end,
        'account_id': second_account['id'],
        'name_pattern': 'EaT sOme ToFu',
        'start': start,
    }
    if superuser:
        params['user_id'] = first_account['user_id']

    # No result will be returned since it will try to mach the account ID and
    # the name pattern
    response = client.get(urls.REPORT_ACCOUNTS, params=params, auth=auth)
    cloud_account_overviews = response.json()['cloud_account_overviews']
    assert len(cloud_account_overviews) == 0, cloud_account_overviews

    # Now update the account ID to point to an account that the name pattern
    # will match, it should return a single result.
    params['account_id'] = first_account['id']
    response = client.get(urls.REPORT_ACCOUNTS, params=params, auth=auth)
    cloud_account_overviews = response.json()['cloud_account_overviews']
    assert len(cloud_account_overviews) == 1, cloud_account_overviews
    account = cloud_account_overviews[0]
    first_account_id = first_account['aws_account_id']
    cloud_account_id = account['cloud_account_id']
    assert first_account_id == cloud_account_id
    assert account['images'] == 1, repr(account)
    assert account['instances'] == 1, repr(account)
    assert account['rhel_instances'] == 1, repr(account)
    assert account['openshift_instances'] == 0, repr(account)
+2 −365

File changed.

Preview size limit exceeded, changes collapsed.

+0 −281

File deleted.

Preview size limit exceeded, changes collapsed.

Loading