Source code for aeriscloud.cli.helpers

from __future__ import print_function

import click
import os
import sys

from aeriscloud import __version__ as ac_version
from click._compat import strip_ansi
from functools import update_wrapper
from requests.exceptions import HTTPError

from ..box import BoxList
from ..config import config, verbosity
from ..expose import ExposeConnectionError, ExposeTimeout
from ..log import set_log_level, set_log_file, get_logger
from ..project import get, from_cwd, all as all_projects
from ..utils import jinja_env, timestamp
from ..virtualbox import list_vms

logger = get_logger('cli.helpers')


# Have both -h and --help
class Command(click.Command):
    # This is inherited by the Context to switch between pure POSIX and
    # argparse-like argument parsing. Setting it to false will push any
    # unknown flag to the argument located where the flag was declared
    allow_interspersed_args = config.get('config', 'posix', default=False)

    def __init__(self, *args, **kwargs):
        cs = dict(help_option_names=['-h', '--help'])
        super(Command, self).__init__(context_settings=cs,
                                      *args, **kwargs)


# Automatically load commands from the cli folder
class AerisCLI(click.MultiCommand):
    def __init__(self, command_dir, *args, **kwargs):
        cs = dict(help_option_names=['-h', '--help'])
        params = [
            click.Option(
                param_decls=['-v', '--verbose'],
                count=True,
                help='Make commands verbose, use '
                     'multiple time for higher verbosity'
            ),
            click.Option(
                param_decls=['--log-file'],
                help='When using the verbose flag, redirects '
                     'output to this file'
            )
        ]
        super(AerisCLI, self).__init__(context_settings=cs, params=params,
                                       *args, **kwargs)
        self.command_dir = command_dir

        no_setup = 'AC_NO_ASSISTANT' in os.environ \
                   and os.environ['AC_NO_ASSISTANT'] == '1'
        if not no_setup and sys.stdout.isatty() and not config.complete():
            click.echo('''
It seems it is the first time you are launching AerisCloud, or new
configuration options were added since the last update. We can guide
you through a series of questions to setup your configuration.
''', err=True)
            if not click.confirm('Run configuration assistant', default=True):
                return
            from .config import assistant

            assistant()

    def list_commands(self, ctx):
        excluded = ['complete', 'test']

        rv = [filename[:-3].replace('_', '-') for filename in
              os.listdir(self.command_dir)
              if filename.endswith('.py') and
              filename[:-3] not in excluded and
              not filename.startswith('__')]
        rv.sort()
        return rv

    def get_command(self, ctx, name):
        ns = {}
        fn = os.path.join(self.command_dir, name.replace('-', '_') + '.py')

        if not os.path.exists(fn):
            click.echo('error: unknown command: %s' % name)
            sys.exit(1)

        with open(fn) as f:
            code = compile(f.read(), fn, 'exec')
            eval(code, ns, ns)
        return ns['cli']

    def invoke(self, ctx):  # noqa
        # setup logging
        if 'verbose' in ctx.params and ctx.params['verbose']:
            level = max(10, 40 - ctx.params['verbose'] * 10)
            set_log_level(level)
            verbosity(ctx.params['verbose'])
        if 'log_file' in ctx.params and ctx.params['log_file']:
            set_log_file(ctx.params['log_file'])

        # try running the command
        try:
            super(AerisCLI, self).invoke(ctx)
        except SystemExit:
            raise
        except click.UsageError as e:
            click.secho('error: %s' % e.message, err=True, fg='red')
            click.echo(e.ctx.command.get_help(e.ctx), err=True)
            sys.exit(e.exit_code)
        except (ExposeTimeout, ExposeConnectionError):
            warning('warning: expose is not available at the moment')
        except KeyboardInterrupt:
            logger.error('keyboard interrupt while running subcommand "%s"',
                         ctx.invoked_subcommand, exc_info=sys.exc_info())
            warning('\nwarning: ctrl+c pressed, aborting')
        except:
            log_id = None
            if config.has('config', 'raven'):
                from raven import Client
                client = Client('requests+' + config.get('config', 'raven'))
                log_id = client.get_ident(client.captureException(
                    tags={
                        'python': sys.version,
                        'aeriscloud': ac_version,
                        'platform': sys.platform
                    },
                    extra={
                        'user': os.environ['USER']
                    }
                ))

            logger.error('uncaught exception while running subcommand "%s"',
                         ctx.invoked_subcommand, exc_info=sys.exc_info())
            if log_id:
                fatal('error: an internal exception caused "%s" '
                      'to exit unexpectedly (log id: %s)' % (ctx.info_name,
                                                             log_id))
            else:
                fatal('error: an internal exception caused "%s" '
                      'to exit unexpectedly' % ctx.info_name)


def start_box(box, provision_with=None):
    # if the vm is suspended, just resume it
    res = 0

    provision = provision_with or []

    if 'shell' not in provision and box.status() != 'not created':
        provision.append('shell')

    extra_args = []
    if provision:
        extra_args = ['--provision-with', ','.join(provision)]

    manual_provision = False
    if box.status() == 'saved':
        manual_provision = True
        click.echo('Resuming box %s' % box.name())

    if not box.is_running():
        try:
            extra_args_copy = extra_args[:]
            if '--provision-with' in extra_args_copy:
                extra_args_copy.insert(0, '--provision')
            res = box.up(*extra_args_copy)
        except (ExposeTimeout, ExposeConnectionError):
            warning('warning: expose is not available at the moment')
    else:
        hist = box.history()
        if not hist or (
            'failed_at' in hist[-1] and
                hist[-1]['failed_at']
        ) or hist[-1]['stats'][box.name()]['unreachable'] > 0:
            # run provisioning if last one failed
            res = box.vagrant('provision')

        box.expose()  # just in case

    if manual_provision:
        box.vagrant('provision', *extra_args)

    if res == 0 or res is True:
        timestamp(render_cli('provision-success', box=box))
    else:
        timestamp(render_cli('provision-failure'))

    # refresh cache
    list_vms(clear_cache_only=True)

    return res


# used by standard options, resolve a single box
def _single_box_decorator(start_prompt=True):  # noqa
    def _wrap(func):
        @click.option('-p', '--project', 'project_name',
                      metavar='[project]',
                      help='Which project to use, by default'
                           'use the current folder.')
        @click.option('-b', '--box', 'box_name',
                      metavar='[box]',
                      help='Which box to use, by default uses '
                           'the first available.')
        @click.pass_context
        def _deco(ctx, project_name, box_name, *args, **kwargs):
            if not project_name:
                project = from_cwd()
                if not project:
                    fatal('error: you must be in a project directory '
                          'or specify the --project option')
            else:
                project = get(project_name)
                if not project:
                    fatal('error: invalid project: %s' % (project_name))

            if not box_name:
                boxes = project.boxes()
                if not boxes:
                    fatal('error: no infra for project %s' % (project_name))
                box = boxes[0]
            else:
                box = project.box(box_name)
                if not box:
                    fatal('error: invalid box: %s' % (box_name))

            if not box.is_running() and start_prompt:
                if click.confirm('warning: the box you specified is not '
                                 'running, do you want to start it?',
                                 default=True):
                    try:
                        start_box(box)
                    except HTTPError as e:
                        fatal(e.message)

            return ctx.invoke(func, box=box, *args, **kwargs)

        return update_wrapper(_deco, func)
    return _wrap


def _multi_project_parser(project_names):
    projects = []
    if not project_names:
        project = from_cwd()

        if not project:
            fatal('error: you must be in a project directory '
                  'or specify the --project option')

        projects.append(project)
    else:
        for project_name in project_names:
            project = get(project_name)

            if not project:
                fatal('error: invalid project: %s' % project_name)

            projects.append(project)

    return projects


# used by standard options, resolve a multiple boxes and support --all
def _multi_box_decorator(func):
    @click.option('-p', '--project', 'project_names',
                  multiple=True,
                  metavar='[project]',
                  help='Which project to use, by default use'
                       'the current folder.')
    @click.option('-b', '--box', 'box_names', multiple=True,
                  metavar='[box]',
                  help='Which box to use, by default uses '
                       'all the boxes from the project.')
    @click.option('-a', '--all', is_flag=True,
                  help='Target all boxes from all projects')
    @click.pass_context
    def _deco(ctx, project_names, box_names, all, *args, **kwargs):
        boxes = {}

        if all:
            projects = all_projects()
            for project in projects:
                boxes[project] = project.boxes()
        else:
            projects = _multi_project_parser(project_names)

            # if we matched more than one project, it does not make sense
            # to parse boxes
            if len(projects) > 1 or not box_names:
                for project in projects:
                    boxes[project] = project.boxes()
            else:
                project = projects[0]
                for box_name in box_names:
                    box = project.box(box_name)

                    if not box:
                        fatal('error: invalid box: %s' % box_name)

                    if project not in boxes:
                        boxes[project] = BoxList()

                    boxes[project].append(box)

        return ctx.invoke(func, boxes=boxes, *args, **kwargs)

    return update_wrapper(_deco, func)


# allow a CLI command to support the --project, --box, --all options
[docs]def standard_options(multiple=False, start_prompt=True): """ Add the standard --project and --box options to a command :param multiple: Whether to find a single or multiple boxes :param start_prompt: When using single boxes, ask the user to start it if offline """ if multiple: return _multi_box_decorator else: return _single_box_decorator(start_prompt)
# small cli helpers def get_input(prompt): from prompt_toolkit.contrib.shortcuts import get_input as pt_get_input return pt_get_input(prompt) def bold(text, **kwargs): return click.style(text, bold=True, **kwargs) def success(text, **kwargs): click.secho(text, fg='green', **kwargs) def info(text, **kwargs): click.secho(text, fg='cyan', **kwargs) def warning(text, **kwargs): click.secho(text, fg='yellow', err=True, **kwargs) def error(text, **kwargs): click.secho(text, fg='red', err=True, **kwargs) def fatal(text, code=1, **kwargs): error(text, **kwargs) sys.exit(code)
[docs]class CLITable(object): """ Helps displaying a dynamically sized table a la docker ps """ COL_PADDING = 2 def __init__(self, *cols): self._cols = cols self._header_out = False def _str(self, data, size): str_real_len = len(strip_ansi(data)) return data + (' ' * (size - str_real_len)) def _compute_col_sizes(self, data): sizes = {} # prepend data with header data = [dict(zip(self._cols, self._cols))] + data for row in data: for name, row_data in row.iteritems(): real_len = len(strip_ansi(row_data)) if name not in sizes or real_len > sizes[name]: sizes[name] = real_len # filter unknown values self._sizes = dict([ (key, length + self.COL_PADDING) for key, length in sizes.iteritems() if key in self._cols ]) def _header(self): if self._header_out: return self._header_out = True for name in self._cols: click.echo(self._str(name.upper(), self._sizes[name]), nl=False) click.echo('') def echo(self, data): if not isinstance(data, list): data = [data] self._compute_col_sizes(data) self._header() for row in data: for name in self._cols: row_data = '' if name in row: row_data = row[name] click.echo(self._str(row_data, self._sizes[name]), nl=False) click.echo('')
# Both following functions were shamelessly taken and adapted from # http://stackoverflow.com/questions/566746 def _ioctl_gwinsz(fd): try: import fcntl import termios import struct rc = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) except: return return rc
[docs]def move_shell_to(path): """ If the process is called through the function set in scripts/env.sh, will move the user to the given path on successful exit :param path: str """ if 'AERIS_CD_TMP_FILE' in os.environ: with open(os.environ['AERIS_CD_TMP_FILE'], 'w') as f: f.write(path)
class JinjaColors(object): def __getattr__(self, item): return click.style('', fg=item, reset=False) def render_cli(template, **kwargs): env = jinja_env('aeriscloud.cli', 'templates') template = env.get_template(template + '.j2') kwargs['fg'] = JinjaColors() kwargs['bold'] = click.style('', bold=True, reset=False) kwargs['reset'] = click.style('') return template.render(**kwargs)