# -*- coding: utf-8 -*-
# vim:fenc=utf-8
# Copyright (C) 2012-2019 by Mike Gabriel <mike.gabriel@das-netzwerkteam.de>
#
# X2Go Session Broker is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# X2Go Session Broker is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program; if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
import os
import os.path
import subprocess
import paramiko
import io
import socket
import logging
paramiko_logger = paramiko.util.logging.getLogger()
paramiko_logger.setLevel(logging.ERROR)
import x2gobroker._paramiko
x2gobroker._paramiko.monkey_patch_paramiko()
# X2Go Broker modules
import x2gobroker.defaults
import x2gobroker.x2gobroker_exceptions
import x2gobroker.utils
from x2gobroker.loggers import logger_broker, logger_error
from x2gobroker.utils import delayed_execution
tasks = {}
[docs]def has_remote_broker_agent_setup():
"""\
Peform some integrity checks that may indicate that a remote
broker agent setup is available.
- Check for available SSH private keys.
- Nothing else, so far...
:returns: ``True``, if the broker supports remote broker agent calls
:rtype: ``bool``
"""
home = os.path.expanduser("~")
if os.path.exists(os.path.join(home, '.ssh', 'id_rsa')):
return True
elif os.path.exists(os.path.join(home, '.ssh', 'id_dsa')):
return True
elif os.path.exists(os.path.join(home, '.ssh', 'id_ecdsa')):
return True
return False
[docs]def call_broker_agent(username, task, cmdline_args=[], remote_agent=None, logger=None, **kwargs):
"""\
Launch X2Go Broker Agent and process its output.
:param username: run the broker agent for this user
:type username: ``str``
:param task: task name to execute via the broker agent (listsessions, getservers, etc.)
:type task: ``str``
:param cmdline_args: additional command line parameters for the broker agent
:type cmdline_args: ``list``
:param remote_agent: if not ``None`` call a remote broker agent via SSH
:type remote_agent: ``dict``
:param logger: logger instance to report log messages to
:type logger: :class:`logging.<Some>Logger`
:raises X2GoBrokerAgentException: if the call to the remote broker agents fails.
:returns: ``(<success>, <data>)``, a tuple with the <success> flag as first item
and the data retrieved from the broker agent as second item
:rtype: ``tuple``
"""
if remote_agent in ('LOCAL', None):
result = _call_local_broker_agent(username=username, task=task, cmdline_args=cmdline_args, logger=logger)
else:
result = _call_remote_broker_agent(username=username, task=task, cmdline_args=cmdline_args, remote_agent=remote_agent, logger=logger)
return result
def _call_local_broker_agent(username, task, cmdline_args=[], logger=None):
"""\
Launch X2Go Broker Agent locally and process its output.
:param username: run the broker agent for this user
:type username: ``str``
:param task: task name to execute via the broker agent (listsessions, getservers, etc.)
:type task: ``str``
:param cmdline_args: additional command line parameters for the broker agent
:type cmdline_args: ``list``
:param logger: logger instance to report log messages to
:type logger: :class:`logging.<Some>Logger`
:raises X2GoBrokerAgentException: if the call to the remote broker agents fails.
:returns: ``(<success>, <data>)``, a tuple with the <success> flag as first item
and the data retrieved from the broker agent as second item
:rtype: ``tuple``
"""
if logger is None:
logger = logger_broker
cmd_line = []
try:
if os.stat("/usr/local/bin/x2gobroker-ssh").st_gid in os.getgroups():
cmd_line.append(["sudo", "-g", x2gobroker.defaults.X2GOBROKER_DAEMON_GROUP])
except OSError:
try:
if os.stat("/usr/bin/x2gobroker-ssh").st_gid in os.getgroups():
cmd_line.extend(["sudo", "-g", x2gobroker.defaults.X2GOBROKER_DAEMON_GROUP])
except OSError:
pass
cmd_line.extend([
'{x2gobroker_agent_binary}'.format(x2gobroker_agent_binary=x2gobroker.defaults.X2GOBROKER_AGENT_CMD),
'{username}'.format(username=username),
'{task}'.format(task=task),
])
for cmdline_arg in cmdline_args:
cmd_line.append('{arg}'.format(arg=cmdline_arg))
logger.info('Executing agent command locally: {cmd}'.format(cmd=" ".join(cmd_line)))
### FIXME: why do we set result to ['FAILED'] here and override it later???
result = ['FAILED']
try:
agent_process = subprocess.Popen(cmd_line,
stdin=None,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=False,
)
result = agent_process.stdout.read().decode().split('\n')
logger.info('Executing agent command succeeded.')
# skipping process terminating (not needed and not permitted
# as the broker agent is installed setuid root.
agent_process.communicate()
except OSError as e:
logger.warning('Executing agent command failed. Error message is: {emsg}.'.format(emsg=str(e)))
result = None
if result:
logger.info('Broker agent answered: {answer}'.format(answer="; ".join(result)))
if result and result[0].startswith('OK'):
return (True, [ r for r in result[1:] if r ])
else:
return (False, [])
raise x2gobroker.x2gobroker_exceptions.X2GoBrokerAgentException('Query to local X2Go Broker Agent failed with no response')
def _call_remote_broker_agent(username, task, cmdline_args=[], remote_agent=None, logger=None):
"""\
Launch remote X2Go Broker Agent via SSH and process its output.
:param username: run the broker agent for this user
:type username: ``str``
:param task: task name to execute via the broker agent (listsessions, getservers, etc.)
:type task: ``str``
:param cmdline_args: additional command line parameters for the broker agent
:type cmdline_args: ``list``
:param remote_agent: information about the remote agent that is to be called
:type remote_agent: ``dict``
:param logger: logger instance to report log messages to
:type logger: :class:`logging.<Some>Logger`
:raises X2GoBrokerAgentException: if the call to the remote broker agents fails.
:returns: ``(<success>, <data>)``, a tuple with the <success> flag as first item
and the data retrieved from the broker agent as second item
:rtype: ``tuple``
"""
if logger is None:
logger = logger_broker
if remote_agent is None:
logger_error.error('With the SSH agent-query-mode a remote agent host (hostname, hostaddr, port) has to be specified!')
elif 'host_key_policy' not in remote_agent:
remote_agent['host_key_policy'] = paramiko.WarningPolicy()
remote_hostaddr = None
remote_hostname = None
if 'hostaddr' in remote_agent:
remote_hostaddr = remote_agent['hostaddr']
if 'hostname' in remote_agent:
remote_hostname = remote_agent['hostname']
if remote_hostaddr is None and remote_hostname is None:
raise x2gobroker.x2gobroker_exceptions.X2GoBrokerAgentException('Internal error: The remote_agent dict must always specify either a hostaddr or a hostname key!')
if 'port' in remote_agent:
remote_port = int(remote_agent['port'])
else:
remote_port = 22
cmd_line = [
'{x2gobroker_agent_binary}'.format(x2gobroker_agent_binary=x2gobroker.defaults.X2GOBROKER_AGENT_CMD),
'{username}'.format(username=username),
'{task}'.format(task=task),
]
for cmdline_arg in cmdline_args:
cmd_line.append('"{arg}"'.format(arg=cmdline_arg))
remote_username = x2gobroker.defaults.X2GOBROKER_AGENT_USER
# check how we shall connect to the remote agent's SSH host...
_remote_sshserver = None
if x2gobroker.utils.portscan(remote_hostname, remote_port):
_remote_sshserver = remote_hostname
elif x2gobroker.utils.portscan(remote_hostaddr, remote_port):
_remote_sshserver = remote_hostaddr
if _remote_sshserver:
# now, connect and use paramiko Client to negotiate SSH2 across the connection
try:
client = paramiko.SSHClient()
client.load_system_host_keys()
if os.path.exists(os.path.expanduser("~/.ssh/known_hosts")):
client.load_host_keys(os.path.expanduser("~/.ssh/known_hosts"))
client.set_missing_host_key_policy(remote_agent['host_key_policy'])
client.connect(_remote_sshserver, remote_port, remote_username, look_for_keys=True, allow_agent=True)
result = []
ssh_transport = client.get_transport()
if ssh_transport and ssh_transport.is_authenticated():
cmd = ' '.join(cmd_line)
cmd = 'sh -c \'{cmd}\''.format(cmd=cmd)
logger.info('Executing agent command on remote host {hostname} ({hostaddr}): {cmd}'.format(hostname=remote_hostname, hostaddr=remote_hostaddr, cmd=cmd))
(stdin, stdout, stderr) = client.exec_command(cmd)
result = stdout.read().decode().split('\n')
err = stderr.read().decode().replace('\n', ' ')
if err:
logger.warning('Remote agent command (host: {hostname} ({hostaddr})) reported an error: {err}'.format(hostname=remote_hostname, hostaddr=remote_hostaddr, err=err))
result = None
client.close()
if result:
logger.info('Broker agent answered: {answer}'.format(answer="; ".join(result)))
if result and result[0].startswith('OK'):
return (True, [ r for r in result[1:] if r ])
else:
return (False, [])
except paramiko.AuthenticationException:
raise x2gobroker.x2gobroker_exceptions.X2GoBrokerAgentException('Authentication to remote X2Go Broker Agent Host failed (user: {user}, hostname: {hostname}, host address: {hostaddr}, port: {port}) failed'.format(user=remote_username, hostname=remote_hostname, hostaddr=remote_hostaddr, port=remote_port))
except (paramiko.SSHException, paramiko.BadHostKeyException, socket.error):
raise x2gobroker.x2gobroker_exceptions.X2GoBrokerAgentException('Query to remote X2Go Broker Agent (user: {user}, hostname: {hostname}, host address: {hostaddr}, port: {port}) failed'.format(user=remote_username, hostname=remote_hostname, hostaddr=remote_hostaddr, port=remote_port))
else:
raise x2gobroker.x2gobroker_exceptions.X2GoBrokerAgentException('Could not ping remote X2Go Broker Agent host: {hostname} ({hostaddr})'.format(hostname=remote_hostname, hostaddr=remote_hostaddr))
[docs]def ping(remote_agent=None, logger=None, **kwargs):
"""\
Ping X2Go Broker Agent.
:param remote_agent: information about the remote agent that is to be called
:type remote_agent: ``dict``
:param logger: logger instance to report log messages to
:type logger: :class:`logging.<Some>Logger`
:returns: ``True`` if broker agent responds
:rtype: ``bool``
"""
if logger is None:
logger = logger_broker
username='foo'
if remote_agent is None:
return _call_local_broker_agent(username, task='ping', logger=logger)[0]
else:
return remote_agent is not None and \
(x2gobroker.utils.portscan(remote_agent['hostaddr'], remote_agent['port']) or x2gobroker.utils.portscan(remote_agent['hostname'], remote_agent['port'])) and \
_call_remote_broker_agent(username, task='ping', remote_agent=remote_agent, logger=logger)[0]
tasks['ping'] = ping
[docs]def list_sessions(username, remote_agent=None, logger=None, **kwargs):
"""\
Query X2Go Broker Agent for a session list for a given username.
:param username: run the query on behalf of this username
:type username: ``str``
:param remote_agent: information about the remote agent that is to be called
:type remote_agent: ``dict``
:param logger: logger instance to report log messages to
:type logger: :class:`logging.<Some>Logger`
:returns: ``(<success>, <list-of-sessions>)``, a tuple with the <success> flag as first item
and a session ``list`` as second item
:rtype: ``tuple``
"""
if logger is None:
logger = logger_broker
return call_broker_agent(username, task='listsessions', remote_agent=remote_agent, logger=logger, **kwargs)
tasks['listsessions'] = list_sessions
[docs]def suspend_session(username, session_name, remote_agent=None, logger=None, **kwargs):
"""\
Trigger a session suspensions via the X2Go Broker Agent.
:param username: suspend the session on behalf of this username
:type username: ``str``
:param remote_agent: information about the remote agent that is to be called
:type remote_agent: ``dict``
:param logger: logger instance to report log messages to
:type logger: :class:`logging.<Some>Logger`
:returns: ``(<success>, [])``, a tuple with the <success> flag as first item
:rtype: ``tuple``
"""
if logger is None:
logger = logger_broker
return call_broker_agent(username, task='suspendsession', cmdline_args=[session_name, ], remote_agent=remote_agent, logger=logger, **kwargs)
tasks['suspendsession'] = suspend_session
[docs]def terminate_session(username, session_name, remote_agent=None, logger=None, **kwargs):
"""\
Trigger a session termination via the X2Go Broker Agent.
:param username: terminate the session on behalf of this username
:type username: ``str``
:param remote_agent: information about the remote agent that is to be called
:type remote_agent: ``dict``
:param logger: logger instance to report log messages to
:type logger: :class:`logging.<Some>Logger`
:returns: ``(<success>, [])``, a tuple with the <success> flag as first item
:rtype: ``tuple``
"""
if logger is None:
logger = logger_broker
return call_broker_agent(username, task='terminatesession', cmdline_args=[session_name, ], remote_agent=remote_agent, logger=logger, **kwargs)
tasks['terminatesession'] = terminate_session
[docs]def has_sessions(username, remote_agent=None, logger=None, **kwargs):
"""\
Query X2Go Broker Agent to detect running/suspended sessions on
the remote X2Go Server (farm).
:param username: run the query on behalf of this username
:type username: ``str``
:param remote_agent: information about the remote agent that is to be called
:type remote_agent: ``dict``
:param logger: logger instance to report log messages to
:type logger: :class:`logging.<Some>Logger`
:returns: ``(<success>, <has-running-sessions>, <has-suspended-session>)``, a tuple of two Boolean values
:rtype: ``tuple``
"""
if logger is None:
logger = logger_broker
_success, _session_list = list_sessions(username, remote_agent=remote_agent, logger=logger, **kwargs)
if type(_session_list) is list:
return (_success, [ s.split('|')[3] for s in _session_list if s.split('|')[4] == 'R' ], [ s.split('|')[3] for s in _session_list if s.split('|')[4] == 'S' ])
else:
return (False, [], [])
tasks['has-sessions'] = has_sessions
[docs]def find_busy_servers(username, remote_agent=None, logger=None, **kwargs):
"""\
Query X2Go Broker Agent for a list of servers with running
and/or suspended sessions and a percentage that tells about
the busy-state of the server.
The result is independent from the username given.
:param username: run the query on behalf of this username
:type username: ``str``
:param remote_agent: information about the remote agent that is to be called
:type remote_agent: ``dict``
:param logger: logger instance to report log messages to
:type logger: :class:`logging.<Some>Logger`
:returns: ``(<success>, <server-usage>)``, a tuple with the <success> flag as first item
and a dict reflecting the relative server usage
:rtype: ``tuple``
"""
if logger is None:
logger = logger_broker
_success, server_list = call_broker_agent(username, task='findbusyservers', remote_agent=remote_agent, logger=logger, **kwargs)
server_usage = {}
if server_list and type(server_list) is list:
for server_item in server_list:
if ':' in server_item:
usage, server = server_item.split(':')
server_usage.update({ server: int(usage) })
else:
_success = False
return (_success, server_usage)
tasks['findbusyservers'] = find_busy_servers
[docs]def check_load(remote_agent=None, logger=None, **kwargs):
"""\
Query X2Go Broker Agent for a summary of system load specific
parameters.
:param remote_agent: information about the remote agent that is to be called
:type remote_agent: ``dict``
:param logger: logger instance to report log messages to
:type logger: :class:`logging.<Some>Logger`
:returns: ``(<success>, <load-factor>)``, a tuple with the <success> flag as first item
and the queried server's load factor as second item
:rtype: ``tuple``
"""
if logger is None:
logger = logger_broker
if not remote_agent:
logger.error('no remote agent was given, can\'t query load')
return "NO-REMOTE-AGENT"
if remote_agent == 'LOCAL':
logger.error('no load checking support if remote agent is set to \'LOCAL\'')
return "REMOTE-AGENT-IS-SET-TO-LOCAL"
try:
if "username" in list(kwargs.keys()):
del kwargs["username"]
_success, _load_params = call_broker_agent(username='foo', task='checkload', remote_agent=remote_agent, logger=logger, **kwargs)
except x2gobroker.x2gobroker_exceptions.X2GoBrokerAgentException as e:
logger.error('querying remote agent on host {hostname} failed: {errmsg}'.format(hostname=remote_agent['hostname'], errmsg=str(e)))
return "HOST-UNREACHABLE"
p = {}
for _param in _load_params:
if ':' in _param:
key, val = _param.split(':', 1)
p[key] = float(val)
load_factor = None
try:
if p['memAvail'] == 0:
p['memAvail'] = p['myMemAvail']
load_factor = int( ( (p['memAvail']/1000) * p['numCPU'] * p['typeCPU'] * 100 ) / p['loadavgXX'] )
except KeyError:
return "LOAD-DATA-BOGUS"
return load_factor
tasks['checkload'] = check_load
[docs]def add_authorized_key(username, pubkey_hash, authorized_keys_file='%h/.x2go/authorized_keys', remote_agent=None, logger=None, **kwargs):
"""\
Add a public key hash to the user's authorized_keys file.
:param username: run the query on behalf of this username
:type username: ``str``
:param pubkey_hash: the public key hash as found in SSH authorized_keys files
:type pubkey_hash: ``str``
:param authorized_keys_file: the full path to the remote X2Go server's authorized_keys file
:type authorized_keys_file: ``str``
:param remote_agent: information about the remote agent that is to be called
:type remote_agent: ``dict``
:param logger: logger instance to report log messages to
:type logger: :class:`logging.<Some>Logger`
:returns: ``(<success>, [])``, a tuple with the <success> flag as first item
:rtype: ``tuple``
"""
if logger is None:
logger = logger_broker
return call_broker_agent(username, task='addauthkey', cmdline_args=[pubkey_hash, authorized_keys_file, ], remote_agent=remote_agent, logger=logger, **kwargs)
tasks['addauthkey'] = add_authorized_key
[docs]def delete_authorized_key(username, pubkey_hash, authorized_keys_file='%h/.x2go/authorized_keys', remote_agent=None, delay_deletion=0, logger=None, **kwargs):
"""\
Remove a public key hash from the user's authorized_keys file.
:param username: run the query on behalf of this username
:type username: ``str``
:param pubkey_hash: the public key hash as found in SSH authorized_keys files
:type pubkey_hash: ``str``
:param authorized_keys_file: the full path to the remote X2Go server's authorized_keys file
:type authorized_keys_file: ``str``
:param remote_agent: information about the remote agent that is to be called
:type remote_agent: ``dict``
:param logger: logger instance to report log messages to
:type logger: :class:`logging.<Some>Logger`
:returns: ``(<success>, [])``, a tuple with the <success> flag as first item
:rtype: ``tuple``
"""
if logger is None:
logger = logger_broker
# this is for the logger output
if remote_agent in ('LOCAL', None):
_hostname = _hostaddr = 'LOCAL'
else:
_hostname = remote_agent['hostname']
_hostaddr = remote_agent['hostaddr']
if delay_deletion > 0:
delayed_execution(delete_authorized_key, delay=delay_deletion, username=username, pubkey_hash=pubkey_hash, authorized_keys_file=authorized_keys_file, remote_agent=remote_agent, )
logger.debug('Scheduled deletion of authorized key in {delay}s: user={user}, hostname={hostname}, hostaddr={hostaddr}'.format(delay=delay_deletion, user=username, hostname=_hostname, hostaddr=_hostaddr))
else:
return call_broker_agent(username, task='delauthkey', cmdline_args=[pubkey_hash, authorized_keys_file, ], remote_agent=remote_agent, logger=logger, **kwargs)
tasks['delauthkey'] = delete_authorized_key
[docs]def get_servers(username, remote_agent=None, logger=None, **kwargs):
"""\
Query X2Go Broker Agent for the list of currently used servers.
The result is independent from the username given.
:param username: run the query on behalf of this username
:type username: ``str``
:param remote_agent: information about the remote agent that is to be called
:type remote_agent: ``dict``
:param logger: logger instance to report log messages to
:type logger: :class:`logging.<Some>Logger`
:returns: ``(<success>, <server-list>)``, a tuple with the <success> flag as first item
and the list of used X2Go Servers as second item
:rtype: ``tuple``
"""
if logger is None:
logger = logger_broker
result = {}
success, lines = call_broker_agent(username, task='getservers', remote_agent=remote_agent, logger=logger, **kwargs)
if success:
for line in lines:
try:
if " " in line:
server, num_sessions = line.split(" ", 1)
result[server] = int(num_sessions)
except ValueError:
pass
return success, result
tasks['getservers'] = get_servers
[docs]def tasks_available(username, remote_agent=None, logger=None, **kwargs):
"""\
Query X2Go Broker Agent for the list of available tasks.
Depending on the remove broker agent's version, the result of this
query can vary tremendously from X2Go Server to X2Go Server.
:param username: run the query on behalf of this username
:type username: ``str``
:param remote_agent: information about the remote agent that is to be called
:type remote_agent: ``dict``
:param logger: logger instance to report log messages to
:type logger: :class:`logging.<Some>Logger`
:returns: ``(<success>, <server-list>)``, a tuple with the <success> flag as first item
and a list of available broker agent tasks as second item
:rtype: ``tuple``
"""
if logger is None:
logger = logger_broker
return call_broker_agent(username, task='availabletasks', remote_agent=remote_agent, logger=logger, **kwargs)
tasks['availabletasks'] = tasks_available
[docs]def genkeypair(local_username, client_address, key_type='RSA', logger=None):
"""\
Generate an SSH pub/priv key pair without writing the private key to file.
:param local_username: the key is for this user
:type local_username: ``str``
:param client_address: the key is only valid for this client
:type client_address: ``str``
:param key_type: either of: RSA, DSA
:type key_type: ``str``
:param logger: logger instance to report log messages to
:type logger: :class:`logging.<Some>Logger`
:returns: two-item tuple: ``(<pubkey>, <privkey>)``
:rtype: ``tuple``
"""
key = None
pubkey = None
privkey = None
# generate key pair
if key_type == 'RSA':
key = paramiko.RSAKey.generate(2048)
elif key_type == 'DSA':
key = paramiko.DSSKey.generate(1024)
if key:
# assemble the public key
if key_type == "RSA":
pubkey_type = 'ssh-rsa'
elif key_type == "DSA":
pubkey_type = 'ssh-dss'
# FIXME: the from option does not work properly by some reason. Fix it later
#pubkey = "from={client_address},no-X11-forwarding,no-pty,no-user-rc {pubkey_type} {pubkey} {local_username}@{client_address}".format(pubkey=key.get_base64(), pubkey_type=pubkey_type, local_username=local_username, client_address=client_address)
pubkey = "no-X11-forwarding,no-pty,no-user-rc {pubkey_type} {pubkey} {local_username}@{client_address}".format(pubkey=key.get_base64(), pubkey_type=pubkey_type, local_username=local_username, client_address=client_address)
# assemble the private key
privkey_obj = io.StringIO()
key.write_private_key(privkey_obj)
privkey = privkey_obj.getvalue()
return (pubkey, privkey)