# Copyright (c) 2020 Jiří Altman <jiri.altman@konicaminolta.cz>
# Copyright (c) 2005-2013 Canonical Limited. All rights reserved.
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
"""Base module for Landscape API (Python 3)."""
__all__ = ["run_query", "API", "errors"]
import argparse
import copy
import ctypes
import ctypes.util
import hmac
import inspect
import json
import os
import re
import sys
import textwrap
import time
import types
from base64 import b64encode
from collections import namedtuple
from datetime import date, datetime
from functools import partial
from hashlib import sha256
from io import StringIO
from pprint import pprint
from urllib.parse import quote, urlparse, urlunparse
import requests
from landscape_api import __version__
LATEST_VERSION = "2011-08-01"
FUTURE_VERSION = "2013-11-04"
# The list of API actions that require a raw output (they will use vanilla
# "print" instead of pprint). This is useful for actions that return files, so
# that you can pipe the output to a file.
RAW_ACTIONS_LIST = ("get-script-code",)
class _ErrorsContainer(object):
"""
A container for Exception subclasses which is used as a fake module object.
"""
def add_error(self, error_name, error):
"""
Add an exception to this errors container.
"""
error.__module__ = __name__ + ".errors"
setattr(self, error_name, error)
def lookup_error(self, error_name):
"""
Find an exception by name. If it's not found, C{None} will be returned.
"""
return getattr(self, error_name, None)
class HTTPError(Exception):
"""Exception raised when a non-200 status is received.
@ivar code: The HTTP status code.
@ivar message: The HTTP response body.
@ivar message_data: A data structure extracted by parsing the response body
as JSON, if possible. Otherwise None. Can be overridden by passing the
C{message_data} parameter.
@ivar error_code: The value of the "error" key from the message data.
@ivar error_message: The value of the "message" key from the message data.
"""
def __init__(self, code, message=None, message_data=None):
self.code = code
self.message = message
self.message_data = None
self.error_code = None
self.error_message = None
if message is not None and message.startswith("{"):
self.message_data = json.loads(message)
if message_data:
self.message_data = message_data
if self.message_data:
self.error_code = self.message_data["error"]
self.error_message = self.message_data["message"]
def __str__(self):
s = "<%s code=%s" % (type(self).__name__, self.code)
if self.error_code is not None:
s += " error_code=%s error_message=%s" % (
self.error_code,
self.error_message,
)
else:
s += " message=%s" % (self.message)
return s + ">"
class APIError(HTTPError):
"""Exception for a known API error"""
_Action = namedtuple(
"action", ("name", "method_name", "doc", "required_args", "optional_args")
)
def fetch(url, post_body, headers, connect_timeout=30, total_timeout=600, cainfo=True):
"""
Wrapper around C{requests.session}, setting up the proper options and timeout.
@return: The body of the response.
"""
session = requests.session()
headers["Content-type"] = "application/x-www-form-urlencoded"
if headers:
session.headers.update(headers)
response = session.post(
url,
data=post_body.encode("utf-8"),
allow_redirects=True,
timeout=(connect_timeout, total_timeout),
verify=cainfo,
)
if not response.ok:
raise HTTPError(response.status_code, response.text)
return response.text
def parse(url):
"""
Split the given URL into the host, port, and path.
@type url: C{str}
@param url: An URL to parse.
"""
lowurl = url.lower()
if not lowurl.startswith(("http://", "https://")):
raise SyntaxError("URL must start with 'http://' or 'https://': %s" % (url,))
url = url.strip()
parsed = urlparse(url)
path = urlunparse(("", "") + parsed[2:])
host = parsed[1]
if ":" in host:
host, port = host.split(":")
try:
port = int(port)
except ValueError:
port = None
else:
port = None
return str(host), port, str(path)
[docs]def run_query(
access_key,
secret_key,
action,
params,
uri,
ssl_ca_file=True,
version=LATEST_VERSION,
):
"""Make a low-level query against the Landscape API.
@param access_key: The user access key.
@param secret_key: The user secret key.
@param action: The type of methods to call. For example, "GetComputers".
@param params: A dictionary of the parameters to pass to the action.
@param uri: The root URI of the API service. For example,
"https://landscape.canonical.com/".
@param ssl_ca_file: Path to the server's SSL Certificate Authority
certificate. For example, "~/landscape_server_ca.crt".
"""
for key, value in list(params.items()):
if isinstance(key, str):
params.pop(key)
key = str(key)
if isinstance(value, str):
value = str(value)
params[key] = value
timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
params.update(
{
"access_key_id": access_key,
"action": action,
"signature_version": "2",
"signature_method": "HmacSHA256",
"timestamp": timestamp,
"version": version,
}
)
method = "POST"
host, port, path = parse(uri)
signed_host = "%s:%d" % (host, port) if port is not None else host
if not path:
path = "/"
uri = "%s/" % uri
signed_params = "&".join(
"%s=%s" % (quote(key, safe="~"), quote(value, safe="~"))
for key, value in sorted(params.items())
)
to_sign = "%s\n%s\n%s\n%s" % (method, signed_host, path, signed_params)
digest = hmac.new(
secret_key.encode("utf-8"), to_sign.encode("utf-8"), sha256
).digest()
signature = b64encode(digest)
signed_params += "&signature=%s" % quote(signature)
try:
return fetch(uri, signed_params, {"Host": signed_host}, cainfo=ssl_ca_file)
except HTTPError as e:
if e.error_code is not None:
error_class = errors.lookup_error(_get_error_code_name(e.error_code))
if error_class:
raise error_class(e.code, e.message)
raise e
def _get_error_code_name(error_code):
"""
Get the Python exception name given an error code. If the error code
doesn't end in "Error", the word "Error" will be appended.
"""
if error_code.endswith("Error"):
return error_code
else:
return error_code + "Error"
def _lowercase_api_name(name):
s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower()
def load_schema():
"""
Load the schema from the C{schemas.json} file.
Invoking this method will populate the module-level C{errors} object with
exception classes based on the schema.
"""
this_directory = os.path.dirname(os.path.abspath(__file__))
schema_filename = os.path.join(this_directory, "schemas.json")
with open(schema_filename, "rt") as _file:
return json.loads(str(_file.read()))
def _build_exception(name):
# TODO: Put __doc__ on the generated errors (must be included in the
# schema)
class _APIError(APIError):
pass
_APIError.__name__ = str(name)
return _APIError
def _build_exceptions(schema):
"""
Given a schema, construct a L{_ErrorsContainer} and populate it with error
classes based on all the error codes specified in the schema.
"""
errors = _ErrorsContainer()
for action, version_handlers in list(schema.items()):
for version, handler in list(version_handlers.items()):
for error in handler["errors"]:
exception_name = _get_error_code_name(error["code"])
exception_type = _build_exception(exception_name)
if not errors.lookup_error(exception_name):
errors.add_error(exception_name, exception_type)
return errors
_schema = load_schema()
errors = _build_exceptions(_schema)
# A hack to make "from landscape_api.base.errors import UnknownComputer" to
# work:
sys.modules[__name__ + ".errors"] = errors
class MultiError(APIError):
"""
An exception that represents multiple sub-exceptions.
@ivar errors: A list of instances of L{APIError} or its subclasses.
"""
def __init__(self, http_code, message):
# Subclass from APIError just for convenience in catching; we're not
# using its functionality
APIError.__init__(self, http_code, message)
self.errors = []
for sub_error in self.message_data["errors"]:
if sub_error.get("error") is not None:
error_class = errors.lookup_error(
_get_error_code_name(sub_error["error"])
)
if error_class:
exception = error_class(self.code, message_data=sub_error)
else:
exception = APIError(self.code, message_data=sub_error)
else:
exception = APIError(self.code, message_data=sub_error)
self.errors.append(exception)
def __str__(self):
return "<%s errors=%s>" % (type(self).__name__, self.errors)
class UnauthorisedError(APIError):
pass
class SignatureDoesNotMatchError(APIError):
pass
class AuthFailureError(APIError):
pass
class InvalidCredentialsError(APIError):
pass
errors.add_error("MultiError", MultiError)
errors.add_error("Unauthorised", UnauthorisedError)
errors.add_error("SignatureDoesNotMatchError", SignatureDoesNotMatchError)
errors.add_error("AuthFailureError", AuthFailureError)
errors.add_error("InvalidCredentialsError", InvalidCredentialsError)
class _API(object):
"""Provide an object-oriented interface to the Landscape API.
@param uri: The URI endpoint of the API.
@param access_key: The 20 characters access key.
@param secret_key: The 40 characters secret key.
@param ssl_ca_file: Path to an alterneative CA certificate file.
@param json: Return plain JSON response instead of a python object.
@param schema: The schema data to use. If none is specified, it will be
read from 'schemas.json' in the same directory as this module.
Usage::
api = API("https://landscape.canonical.com/api", "access_key",
"secret_key")
computers = api.get_computers()
"""
# TODO: accept an api_version parameter, use it instead of LATEST_VERSION
_run_query = staticmethod(run_query)
# 'overridden_apis' contains information about command-line API actions
# that we want to override to (locally) take different arguments and invoke
# a hand-coded method. This is used for situations where we want to provide
# some extra layer of convenience to the user of this module or the command
# line, like accepting a filename containing large data instead of
# requiring it to be passed as a string.
# Any documentation that isn't specified in overridden_apis will be
# looked up in the original schema.
# Right now it only supports replacing arguments one-for-one, but it
# could be extended if we need to.
overridden_apis = {} # type: ignore
def __init__(
self, uri, access_key, secret_key, ssl_ca_file=None, json=False, schema=None
):
self._uri = uri
self._access_key = access_key
self._secret_key = secret_key
self._ssl_ca_file = ssl_ca_file
self._json = json
self._schema = schema if schema is not None else _schema
def run_query(self, action_name, arguments):
"""
Make a low-level query against the Landscape API, using details
provided in the L{API} constructor.
"""
result = self._run_query(
self._access_key,
self._secret_key,
str(action_name),
arguments,
self._uri,
self._ssl_ca_file,
)
if not self._json:
result = json.loads(result)
return result
def call(self, method, **kwargs):
"""
Invoke an API method, automatically encoding the arguments as defined
in the schema.
"""
action = self._schema[method][self.version]
parameters = action["parameters"]
fields = [(x["name"], x) for x in parameters]
arguments = self._encode_struct_fields(fields, kwargs)
return self.run_query(method, arguments)
def _encode_struct_fields(self, fields, arguments, prefix=""):
"""
Encode multiple named fields. This is used for both base argument
processing and struct fields.
@param fields: An associative list of field names to field parameter
descriptions.
@param arguments: A mapping of field names to actual values to encode.
@param prefix: The prefix to put on all named parameters encoded.
"""
result = {}
for parameter_name, parameter_description in fields:
# Figure out the type of the parameter and how to encode it.
if parameter_name not in arguments:
if not parameter_description.get("optional"):
raise TypeError("Missing parameter %s" % (parameter_name,))
else:
value = arguments.pop(parameter_name)
encoded_item = self._encode_argument(
parameter_description, prefix + parameter_name, value
)
result.update(encoded_item)
if arguments:
raise TypeError("Extra arguments: %r" % (arguments,))
return result
def _encode_argument(self, parameter, name, value):
"""
Encode a piece of data based on a parameter description.
Returns a dictionary of parameters that should be included in the
request.
"""
if parameter.get("optional") and value == parameter.get("default"):
return {}
kind = parameter["type"].replace(" ", "_")
handler = getattr(self, "_encode_%s" % (kind,))
return handler(parameter, str(name), value)
def _encode_integer(self, parameter, name, arg):
return {name: str(arg)}
def _encode_float(self, parameter, name, arg):
return {name: str(arg)}
def _encode_raw_string(self, parameter, name, value):
return {name: str(value)}
def _encode_enum(self, parameter, name, value):
return {name: str(value)}
def _encode_unicode(self, parameter, name, value):
"""
Encode a python unicode object OR, for historical reasons, a datetime
object, into an HTTP argument.
"""
if isinstance(value, (datetime, date)):
# This is really dumb compatibility stuff for APIs that aren't
# properly specifying their type.
return self._encode_date(parameter, name, value)
return {name: str(value)}
# These are Unicode types with specific validation.
_encode_unicode_line = _encode_unicode
_encode_unicode_title = _encode_unicode
def _encode_file(self, parameter, name, value):
contents = None
with open(value, "rb") as the_file:
contents = the_file.read()
encoded_contents = b64encode(contents).decode("utf-8")
# We send the filename along with the contents of the file.close
filename = os.path.basename(value)
payload = filename + "$$" + encoded_contents
return {name: str(payload)}
def _encode_boolean(self, parameter, name, value):
return {name: "true" if value else "false"}
def _encode_date(self, parameter, name, value):
if isinstance(value, str):
# allow people to pass strings, since the server has really good
# date parsing and can handle lots of different formats.
return {name: str(value)}
return {name: str(value.strftime("%Y-%m-%dT%H:%M:%SZ"))}
def _encode_list(self, parameter, name, sequence):
"""
Encode a python list OR a comma-separated string into individual
"foo.N" arguments.
"""
result = {}
if isinstance(sequence, str):
sequence = [item.strip() for item in sequence.split(",")]
for i, item in enumerate(sequence):
encoded_item = self._encode_argument(
parameter["item"], "%s.%s" % (name, i + 1), item
)
result.update(encoded_item)
return result
def _encode_mapping(self, parameter, name, items):
"""Encode a mapping into individual "foo.KEY=VALUE" arguments.
Mappings andcomma-separated strings of KEY=VALUE pairs are
supported.
"""
if isinstance(items, str):
items = {k.strip(): v.strip() for k, v in _parse_csv_mapping_safely(items)}
elif hasattr(items, "items"):
items = list(items.items())
keyparam = parameter["key"]
valueparam = parameter["value"]
result = {}
for key, value in items:
key = self._encode_argument(keyparam, "<key>", key)["<key>"]
subname = "{}.{}".format(name, key)
result.update(self._encode_argument(valueparam, subname, value))
return result
def _encode_data(self, parameter, name, value):
contents = None
with open(value, "rb") as the_file:
contents = the_file.read()
encoded_contents = b64encode(contents)
return {name: encoded_contents}
def _encode_structure(self, parameter, name, dictionary):
return self._encode_struct_fields(
iter(list(parameter["fields"].items())),
dictionary.copy(),
prefix=name + ".",
)
def call_arbitrary(self, method, arguments):
"""
Invoke an API method in a raw form, without encoding any parameters.
@returns: The result as returned by the API method. If the C{json}
parameter to L{API} was passed as C{True}, then the raw result will
be returned. Otherwise it will be decoded as json and returned as a
Python object.
"""
return self.run_query(method, arguments)
def api_factory(schema, version=LATEST_VERSION):
"""
A creator of L{API} classes. It will read a schema and create the methods
on an L{API} to be available statically.
"""
def _get_action_callers():
"""
Build callable methods for all actions published through the schema
that will invoke L{API.call}.
"""
actions = {}
for action_name in schema:
action = schema[action_name].get(version)
if action is None:
# This API version doesn't support this action
continue
python_action_name = _lowercase_api_name(action_name)
caller = _make_api_caller(action_name, action)
actions[python_action_name] = caller
return actions
def _make_api_caller(action_name, action):
method_name = _lowercase_api_name(action_name)
positional_parameters = []
optional_parameters = []
defaults = []
for parameter in action["parameters"]:
if parameter.get("optional"):
optional_parameters.append(parameter["name"])
defaults.append(parameter["default"])
else:
positional_parameters.append(parameter["name"])
positional_parameters.extend(optional_parameters)
caller = _change_function(
_caller,
str(method_name),
positional_parameters,
defaults,
action_name,
)
caller.__doc__ = _generate_doc(action)
return caller
def _generate_doc(action):
"""
Generate a python docstring vaguely using pydoc syntax.
"""
doc = inspect.cleandoc(action["doc"]) + "\n"
for parameter in action["parameters"]:
pdoc = parameter.get("doc", "Undocumented")
param_doc = "@param %s: %s" % (parameter["name"], pdoc)
doc += "\n" + textwrap.fill(param_doc, subsequent_indent=" ")
doc += "\n@type %s: %s" % (parameter["name"], _describe_type(parameter))
return doc
def _describe_type(parameter):
type_doc = parameter["type"]
if type_doc == "list":
type_doc += " (of %s)" % (_describe_type(parameter["item"]),)
return type_doc
def _change_function(func, newname, positional_parameters, defaults, action_name):
"""
Return a new function with the provided name C{newname}, and changing
the signature corresponding to C{positional_parameters} and
C{defaults}.
"""
argcount = len(positional_parameters) + 1
code = func.__code__
params = positional_parameters[:]
params.insert(0, "self")
varnames = [str(param) for param in params]
# See _caller for the defined variable _args
varnames.append("_args")
varnames = tuple(varnames)
co_nlocals = len(varnames)
func_defaults = tuple(defaults) if defaults else None
try:
newcode = code.replace(
co_argcount=argcount,
co_nlocals=co_nlocals,
co_name=str(newname),
co_varnames=varnames,
)
except Exception:
newcode = types.CodeType(
argcount,
code.co_kwonlyargcount,
co_nlocals,
code.co_stacksize,
code.co_flags,
code.co_code,
code.co_consts,
code.co_names,
varnames,
code.co_filename,
str(newname),
code.co_firstlineno,
code.co_lnotab,
code.co_freevars,
code.co_cellvars,
)
# Make locals and action_name available to the method
func_globals = func.__globals__.copy()
func_globals["action_name"] = action_name
return types.FunctionType(
newcode, func_globals, str(newname), func_defaults, func.__closure__
)
def _caller(self):
"""Wrapper calling C{API.call} with the proper action name."""
# TODO: Improve this
global action_name
# The locals of this function aren't obvious, because _change_function
# modifies the parameters, and we have to access them with locals().
_args = locals().copy()
_args.pop("self")
return self.call(action_name, **_args) # noqa
api_class = type("API", (_API,), {})
api_class.version = version
actions = _get_action_callers()
for k, v in list(actions.items()):
if not getattr(api_class, k, None):
setattr(api_class, k, v)
else:
raise RuntimeError(
"Tried setting '%s' from schema but that "
"method already exists" % (k,)
)
return api_class
[docs]class API(api_factory(_schema)): # type: ignore
overridden_apis = {
"ImportGPGKey": {
"method": "import_gpg_key_from_file",
"doc": None,
"replace_args": {
"material": {
"name": "filename",
"type": "unicode",
"doc": "The filename of the GPG file.",
}
},
}
}
extra_actions = [
_Action(
"ssh",
"ssh",
"Try to ssh to a landscape computer",
[
{
"name": "query",
"type": "unicode",
"doc": "A query string which should return " "one computer",
}
],
[
{
"name": "user",
"type": "unicode",
"default": None,
"doc": "If specified, the user to pass to " "the ssh command",
}
],
)
]
[docs] def import_gpg_key_from_file(self, name, filename):
"""
Import a GPG key with contents from the given filename.
"""
with open(filename, "rt") as _file:
material = _file.read()
return self.call("ImportGPGKey", name=name, material=material)
[docs] def ssh(self, query, user=None):
"""
Calls C{get_computers}, and then the ssh command with the given result.
"""
data = self.get_computers(query, with_network=True)
if len(data) != 1:
raise ValueError("Expected one computer as result, got %d" % len(data))
computer = data[0]
if not computer.get("network_devices", []):
raise ValueError("Couldn't find a network device")
address = computer["network_devices"][0]["ip_address"]
args = ["ssh"]
if user is not None:
args.extend(["-l", user])
args.append(address)
os.execvp("ssh", args)
class APIv2(api_factory(_schema, version=FUTURE_VERSION)): # type: ignore
"""Development version of the API."""
_run_query = staticmethod(partial(run_query, version=FUTURE_VERSION))
class ParseActionsError(Exception):
"""Raises for errors parsing the API class"""
class UsageError(Exception):
"""Raises when help should be printed."""
def __init__(self, stdout=None, stderr=None, error_code=None):
Exception.__init__(self, "", stdout, stderr)
self.stdout = stdout
self.stderr = stderr
self.error_code = error_code
class SchemaParameterAction(argparse.Action):
"""
An L{argparse.Action} that knows how to parse command-line schema
parameters and convert them to Python objects.
"""
def __init__(self, *args, **kwargs):
self.schema_parameter = kwargs.pop("schema_parameter")
argparse.Action.__init__(self, *args, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
value = self.parse_argument(self.schema_parameter, values)
setattr(namespace, self.dest, value)
def parse_argument(self, parameter, value):
suffix = parameter["type"].replace(" ", "_")
parser = getattr(self, "parse_%s" % (suffix,))
try:
return parser(parameter, value)
except UsageError:
raise
except: # noqa
raise UsageError(
stderr="Couldn't parse value %r as %s\n" % (value, parameter["type"]),
error_code=1,
)
def parse_integer(self, parameter, value):
return int(value)
def parse_float(self, parameter, value):
return float(value)
def parse_raw_string(self, parameter, value):
return value
def parse_enum(self, parameter, value):
return value
def parse_unicode(self, parameter, value):
return value
# These are Unicode types with specific validation.
parse_unicode_line = parse_unicode
parse_unicode_title = parse_unicode
def parse_file(self, parameter, value):
return str(value)
def parse_date(self, parameter, value):
# the server already has a good date parser, and to parse it well
# ourselves we'd have to depend on the "dateutil" package...
return value
def parse_boolean(self, parameter, value):
# This is only used for required arguments
return value == "true"
def parse_list(self, parameter, value):
"""Parse a comma-separated list of values converting it to a C{list}.
Items can contain escaped commas as "\\," and they will be unescaped
by this method.
"""
items = _parse_csv_list_safely(value)
return [
self.parse_argument(parameter["item"], list_item)
# TODO: check if list(...) should be used
# list(self.parse_argument(parameter["item"], list_item))
for list_item in items
if list_item != ""
]
def parse_mapping(self, parameter, value):
"""Parse a comma-separated list of key/value pairs into a dict.
Keys and values are separated by "=".
"""
keyparam = parameter["key"]
valueparam = parameter["value"]
result = {}
for key, value in _parse_csv_mapping_safely(value):
key = self.parse_argument(keyparam, key)
value = self.parse_argument(valueparam, value)
result[key] = value
return result
# TODO: Verify
def parse_data(self, parameter, value):
return value.decode("utf-8")
def _parse_csv_list_safely(value):
"""Yield each substring separated by commas.
Substrings can contain escaped commas as "\\," and they will be
unescaped by this function.
"""
item = ""
escaped = False
for c in value:
if c == ",":
if escaped:
item += c
escaped = False
else:
yield item
item = ""
elif c == "\\":
escaped = True
else:
if escaped:
item += "\\"
item += c
if escaped:
item += "\\"
if item:
yield item
def _parse_csv_mapping_safely(value):
"""Yield each key/value pair separated by commas.
Substrings can contain escaped commas as "\\," and they will be
unescaped by this function.
"""
for item in _parse_csv_list_safely(value):
key, sep, value = item.partition("=")
if not sep:
raise ValueError("invalid key/value pair {}".format(item))
yield (key, value)
class CommandLine(object):
"""
Implementation of the command-line logic.
"""
# TODO: Accept an --api-version parameter.
def __init__(self, stdout, stderr, exit, environ):
self.stdout = stdout
self.stderr = stderr
self.exit = exit
self.environ = environ
def main(self, argv, schema): # noqa
"""
@param argv: The list of command line arguments, usually from
C{sys.argv}.
"""
version = self.environ.get("LANDSCAPE_API_VERSION", LATEST_VERSION)
actions = self.get_actions(schema, version)
try:
# Build main parser
parser = self.get_main_parser()
# Special case for empty command line
if len(argv) == 0:
raise UsageError(
stdout=self.format_main_usage(parser, actions), error_code=0
)
action_map = dict([(action.name, action) for action in actions])
(args, argv) = self.wrap_parse_args(parser.parse_known_args, argv)
print_help_only = False
if (args.action == "help" and len(argv) == 0) or (
args.help and not args.action
):
raise UsageError(
stdout=self.format_main_usage(parser, actions), error_code=0
)
if args.action == "help":
print_help_only = True
args.action = argv[0]
if args.help:
print_help_only = True
if args.action != "call" and args.action not in action_map:
if args.action is None:
raise UsageError(stderr="Please specify an action.\n")
raise UsageError(stderr="Unknown action: %s\n" % args.action)
if args.action == "call":
action_parser = self.get_call_parser(parser)
else:
action = action_map[args.action]
action_parser = self.get_action_parser(parser, action)
if print_help_only:
raise UsageError(stdout=action_parser.format_help(), error_code=0)
api = self.get_api(args, schema, version)
action_args = self.wrap_parse_args(action_parser.parse_args, argv)
try:
if args.action != "call":
result = self.call_known_action(
api, action, action_parser, action_args
)
else:
result = self.call_arbitrary_action(api, action_args)
except HTTPError as e:
if e.error_code is not None:
self.stderr.write("\nGot server error:\nStatus: %s\n" % (e.code,))
self._format_api_error(e)
else:
self.stderr.write(
"\nGot unexpected server error:\nStatus: %d\n" % e.code
)
self.stderr.write("Error message: %s\n" % e.message)
return self.exit(2)
except UsageError as e:
if e.stdout is not None:
self.stdout.write(e.stdout)
if e.stderr is not None:
self.stderr.write(e.stderr)
if e.error_code is not None:
return self.exit(e.error_code)
else:
return self.exit(1)
except Exception as e:
self.stderr.write(str(e) + "\n")
return self.exit(1)
if args.json_output or action.name in RAW_ACTIONS_LIST:
# Some of the methods require raw output, for instance the code
# part of scripts.
self.stdout.write(str(result) + "\n")
else:
pprint(result, stream=self.stdout)
def _format_api_error(self, error):
"""
Format and print an HTTP error in a nice way.
"""
message = error.error_message
error_code = error.error_code
if isinstance(message, str):
message = message.encode("utf-8")
if isinstance(error_code, str):
error_code = error_code.encode("utf-8")
self.stderr.write("Error code: %s\nError message: %s\n" % (error_code, message))
if isinstance(error, MultiError):
for error in error.errors:
self._format_api_error(error)
def call_known_action(self, api, action, action_parser, args):
"""
Call a known, supported API action, using methods on L{API}.
"""
positional_args = []
keyword_args = {}
for req_arg in action.required_args:
# Special case to allow query to be multiple
# space-separated tokens without having to be quoted on the
# command line.
argname = req_arg["name"].replace("_", "-")
value = (
" ".join(args.query) if argname == "query" else getattr(args, argname)
)
positional_args.append(value)
for opt_arg in action.optional_args:
opt_arg_name = opt_arg["name"].replace("_", "-")
opt_arg_parameter_name = opt_arg["name"]
arg = getattr(args, opt_arg_name, None)
if arg is not None and arg != action_parser.get_default(opt_arg_name):
keyword_args[opt_arg_parameter_name] = arg
handler = getattr(api, action.method_name)
return handler(*positional_args, **keyword_args)
def call_arbitrary_action(self, api, args):
"""
Call an arbitrary action specified as raw HTTP arguments, using
L{API.call_arbitrary}.
"""
action_name = args.action_name
arguments = {}
for arg in args.argument:
key, value = arg.split("=", 1)
arguments[key] = value
return api.call_arbitrary(action_name, arguments)
def get_api(self, args, schema, version):
"""
Get an L{API} instance with parameters based on command line arguments
or environment variables.
"""
if args.key is not None:
access_key = args.key
elif "LANDSCAPE_API_KEY" in self.environ:
access_key = self.environ["LANDSCAPE_API_KEY"]
else:
raise UsageError(stderr="Access key not specified.\n")
if args.secret is not None:
secret_key = args.secret
elif "LANDSCAPE_API_SECRET" in self.environ:
secret_key = self.environ["LANDSCAPE_API_SECRET"]
else:
raise UsageError(stderr="Secret key not specified.\n")
if args.uri is not None:
uri = args.uri
elif "LANDSCAPE_API_URI" in self.environ:
uri = self.environ["LANDSCAPE_API_URI"]
else:
raise UsageError(stderr="URI not specified.\n")
if args.ssl_ca_file is not None:
ssl_ca_file = args.ssl_ca_file
else:
ssl_ca_file = self.environ.get("LANDSCAPE_API_SSL_CA_FILE")
api_class = APIv2 if version == FUTURE_VERSION else API
if schema is not _schema:
api_class = api_factory(schema, version=version)
return api_class(
uri, access_key, secret_key, ssl_ca_file, args.json_output, schema=schema
)
def get_action_parser(self, parser, action):
"""
Build an L{argparse.ArgumentParser} for a particular action.
"""
action_parser = argparse.ArgumentParser(
add_help=False,
description=action.doc,
prog="%s %s" % (parser.prog, action.name),
)
for req_arg in action.required_args:
argname = req_arg["name"].replace("_", "-")
argdoc = self.get_parameter_doc(req_arg)
if argname == "query":
# Special case to allow query to be multiple space-separated
# tokens without having to be quoted on the command line.
action_parser.add_argument(argname, help=argdoc, nargs="+")
else:
action_parser.add_argument(
argname,
help=argdoc,
action=SchemaParameterAction,
schema_parameter=req_arg,
)
for opt_arg in action.optional_args:
argname = opt_arg["name"].replace("_", "-")
argdoc = self.get_parameter_doc(opt_arg)
if opt_arg["default"] is False:
action_parser.add_argument(
"--%s" % argname, dest=argname, action="store_true", help=argdoc
)
elif opt_arg["default"] is True:
action_parser.add_argument(
"--no-%s" % argname, dest=argname, action="store_false", help=argdoc
)
else:
action_parser.add_argument(
"--%s" % argname,
dest=argname,
help=argdoc,
action=SchemaParameterAction,
schema_parameter=opt_arg,
)
return action_parser
def get_call_parser(self, parser):
"""
Build the L{argparse.ArgumentParser} that knows how to handle the
"call" action.
"""
call_parser = argparse.ArgumentParser(
add_help=False,
description="Call an arbitrary Landscape API action.",
prog="%s call" % (parser.prog,),
)
call_parser.add_argument(
"action_name", help="The name of the Landscape API action."
)
call_parser.add_argument(
"argument", help="An argument in key=value format", nargs="*"
)
return call_parser
def get_main_parser(self):
"""
Build the L{argparse.ArgumentParser} for the toplevel command line
options.
"""
# Not using argparse subgroups here because the help output gets very
# messy when you have many subgroups.
prog = sys.argv[0]
parser = argparse.ArgumentParser(prog=prog, add_help=False)
group = parser.add_argument_group("Global Arguments")
group.add_argument(
"-h",
"--help",
help="show this help message and exit",
action="store_true",
default=None,
)
group.add_argument(
"--key",
help="The Landscape access key to use when making "
"the API request. It defaults to the "
"environment variable LANDSCAPE_API_KEY if "
"not provided.",
)
group.add_argument(
"--secret",
help="The Landscape secret key to use when making "
"the API request. It defaults to the "
"environment variable LANDSCAPE_API_SECRET if "
"not provided.",
)
group.add_argument(
"--uri",
help="The URI of your Landscape endpoint. It "
"defaults to the environment variable "
"LANDSCAPE_API_URI if not provided.",
)
group.add_argument(
"--json",
dest="json_output",
action="store_true",
default=False,
help="Output directly the JSON structure instead "
"of the Python representation.",
)
group.add_argument(
"--ssl-ca-file",
help="SSL CA certificate to validate server. If "
"not provided, the SSL certificate provided "
"by the server will be verified with the "
"system CAs. It defaults to the environment "
"variable LANDSCAPE_API_SSL_CA_FILE if not "
"provided",
)
group = parser.add_argument_group("Actions")
group.add_argument("action", default=None, nargs="?")
return parser
def wrap_parse_args(self, parse_args, *args, **kwargs):
"""
Wraps a call to argparse's parse_args and captures all stdout, stderr,
and sys.exits() and converts them into a UsageError.
@param parse_args: The C{parse_args} method of an C{ArgumentParser} to
execute.
@param args: Positional args for the C{parse_args} call.
@param kwargs: Keyword args for the C{parse_args} call.
"""
old_stdout = sys.stdout
old_stderr = sys.stderr
sys.stdout = StringIO()
sys.stderr = StringIO()
try:
try:
return parse_args(*args, **kwargs)
except SystemExit as e:
code = e.code
stdout = sys.stdout.getvalue()
stderr = sys.stderr.getvalue()
raise UsageError(stdout, stderr, code)
finally:
sys.stdout = old_stdout
sys.stderr = old_stderr
def format_main_usage(self, parser, actions):
"""
Format a help display for the command line
@param parser: The main argparse object for the program.
@param actions: The actions available.
@returns: A formatted help string.
"""
prog = parser.prog
# Use argparse's help except the last line
parser_help = parser.format_help()
parser_help = "\n".join(parser_help.splitlines()[:-1])
# Build help text
help_lines = [
"Landscape API client (Python 3) - version " + __version__,
parser_help,
]
# Add action docs
for action in actions:
help_lines.append(" %s" % action.name)
help_lines.append(
"\nType '%(prog)s help ACTION' for help on a specific action.\n"
% {"prog": prog}
)
return "\n".join(help_lines)
def get_parameter_doc(self, parameter):
doc = parameter["doc"]
suffixes = {
"list": "(comma-delimited list)",
"mapping": "(comma-delimited KEY=VALUE pairs)",
"boolean": "(true or false)",
"date": "(time in YYYY-MM-DDTHH:MM:SS format)",
"file": "filename",
}
suffix = suffixes.get(parameter["type"])
if suffix:
doc += " %s" % (suffix,)
return doc
def get_actions(self, schema, version):
"""
Return a list of data structures representing callable actions provided
by the API, based on the schema.
@param schema: The schema, as returned from L{load_schema}.
@param version: The API version to use.
"""
overridden_apis = API.overridden_apis
actions = []
for name, version_handlers in list(schema.items()):
if name in overridden_apis:
# Don't add the base schema if it's been overridden; we don't
# want duplicate actions.
continue
schema_action = version_handlers.get(version)
if schema_action is None:
# This action is not supported by this API version
continue
actions.append(self._get_action_from_schema(name, schema_action))
for action_name, override_data in list(overridden_apis.items()):
if action_name not in schema:
# We ignore overridden APIs that aren't in the schema because
# tests override the schema without necessarily providing all
# the APIs that we override by default.
continue
overridden_schema = copy.deepcopy(schema[action_name][version])
for parameter in overridden_schema["parameters"]:
if parameter["name"] in override_data["replace_args"]:
replacement = override_data["replace_args"][parameter["name"]]
parameter.clear()
parameter.update(replacement)
overridden_doc = override_data.get("doc")
if overridden_doc:
overridden_schema["doc"] = overridden_doc
actions.append(
self._get_action_from_schema(
action_name,
overridden_schema,
overridden_method_name=override_data["method"],
)
)
actions.extend(API.extra_actions)
return sorted(actions)
def _get_action_from_schema(self, name, schema_action, overridden_method_name=None):
"""
Get an L{_Action} instance representing the API action from the schema.
"""
method_name = _lowercase_api_name(name)
cli_name = schema_action.get("cli_name")
cmdline_name = method_name.replace("_", "-") if cli_name is None else cli_name
action_doc = schema_action["doc"]
req_args = [
parameter
for parameter in schema_action["parameters"]
if not parameter.get("optional")
]
opt_args = [
parameter
for parameter in schema_action["parameters"]
if parameter.get("optional")
]
if overridden_method_name:
method_name = overridden_method_name
return _Action(cmdline_name, method_name, action_doc, req_args, opt_args)
def main(argv, stdout, stderr, exit, environ, schema=_schema):
return CommandLine(stdout, stderr, exit, environ).main(argv, schema)
if __name__ == "__main__":
main(sys.argv[1:], sys.stdout, sys.stderr, sys.exit, os.environ)