Файловый менеджер - Редактировать - /home/lakoyani/lakoyani.com.fj/utils.tar
Назад
libcloudlinux.py 0000644 00000067247 14711144154 0010015 0 ustar 00 # coding:utf-8 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import print_function from __future__ import division from __future__ import absolute_import import copy import sys import json import argparse import socket import base64 import os import subprocess import re from enum import Enum from dataclasses import dataclass from typing import Callable, List, Optional from past.builtins import basestring, unicode # noqa from future.utils import iteritems from clcommon.utils import silence_stdout_until_process_exit, get_cl_version from cllicense import CloudlinuxLicenseLib from cpanel_api import get_cpanel_api_class from clcommon.lib.cledition import is_cl_solo_edition, is_container from clcommon.cpapi import is_hitting_max_accounts_limit, get_main_username_by_uid LVEMANAGER_PLUGIN_NAMES = { 'python_selector': 'Python Selector', 'nodejs_selector': 'Node.js Selector', 'php_selector': 'PHP Selector', 'resource_usage': 'Resource Usage', 'wpos': 'AccelerateWP' } PASSENGER_DEPEND_PLUGINS = ['python_selector', 'nodejs_selector'] DEFAULT_PLUGIN_NAME = 'CloudLinux Manager' CAGEFS_ENTER_PROXIED_BIN = '/usr/bin/cagefs_enter.proxied' if not os.path.exists(CAGEFS_ENTER_PROXIED_BIN): CAGEFS_ENTER_PROXIED_BIN = '/bin/cagefs_enter.proxied' def is_json(data): try: json.loads(data) return True except ValueError as error: return False class CloudlinuxCliBase(object): request_data = {} result = None available_request_params = [ 'owner', 'command', 'method', 'params', 'user_info', 'mockJson', 'attachments', 'plugin_name', 'lang' ] NOT_FLAGGED_PARAMS = ['config-files', 'content', 'passenger-log-file', 'ignore-list', 'wp-path', 'upgrade-url'] license_is_checked = False current_plugin_name = '' licence = CloudlinuxLicenseLib() def __init__(self): self.skip_cagefs_check = False self.user_info = {} self.parsing_request_data() self.check_xss() self.drop_permission() self.command_methods = { 'spa-ping': self.spa_ping, 'cloudlinux-top': self.cl_top, 'cloudlinux-selector': self.cl_selector, 'cloudlinux-statistics': self.cl_statistics, 'cloudlinux-charts': self.cl_chart, 'cloudlinux-quota': self.cl_quota, 'cpanel-api': self.cpanel_api, 'cloudlinux-xray-user-manager': self.cl_xray_user_manager, 'cloudlinux-statsnotifier': self.cl_statsnotifier, 'cloudlinux-awp-user': self.cloudlinux_awp_user, 'cl-smart-advice-user': self.cl_smart_advice_user, 'cl-install-plugin': self.cl_install_plugin } def check_xss(self): for key in self.request_data.keys(): if key not in self.available_request_params: self.exit_with_error('BAD REQUEST 1:' + key) for name, val in iteritems(self.request_data): if isinstance(val, dict): # if post key is "params" for key, inner_value in iteritems(val): self.check_param_key(key) if self.request_data['command'] == 'cloudlinux-packages' \ and name == 'params' \ and key == 'package': self.request_data[name][key] = self.escape_param_value(inner_value) elif self.request_data['command'] == 'cloudlinux-support': pass elif self.request_data['command'] == 'cloudlinux-selector' \ and name == 'params' \ and key == 'options': pass elif self.request_data['command'] == 'lvectl' \ and name == 'params' \ and key == 'stdin': pass elif self.request_data['command'] == 'cloudlinux-selector' \ and name == 'params' \ and key == 'env-vars': pass elif self.request_data['command'] == 'cloudlinux-xray-manager' \ and name == 'params' \ and key == 'url': pass elif self.request_data['command'] == 'cloudlinux-xray-user-manager' \ and name == 'params' \ and key == 'url': pass elif self.request_data['command'] == 'wmt-api' \ and name == 'params' \ and key == 'config-change': pass elif self.request_data['command'] == 'cloudlinux-xray-manager' \ and name == 'params' \ and key == 'email': pass elif self.request_data['command'] == 'cloudlinux-awp-admin' \ and name == 'params' \ and key == 'upgrade-url': pass else: self.check_param_value(inner_value) else: self.check_param_value(val) def get_env(self): """ Get env for subprocess call """ env_copy = os.environ.copy() if self.request_data.get('lang'): lang = self.request_data.get('lang') if not re.match(r'^[a-z]{2}$', lang): lang = 'en' env_copy['LC_ALL'] = lang return env_copy def get_server_ip(self): """ Get the server's IP address. """ try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) return s.getsockname()[0] except Exception as e: return None def check_param_key(self, key): if not re.search('^[\w\-]+$', key): self.exit_with_error('BAD REQUEST 2') def check_param_value(self, val): if isinstance(val, basestring): if re.search('[`\|\$;&\n]', val, re.M): self.exit_with_error('BAD REQUEST 3') def escape_param_value(self, val): chars = "\\\"\'" for c in chars: val = val.replace(c, "\\" + c) return val def main(self): command = self.request_data['command'] endpoint = self.command_methods.get(command) allowed_methods = ['cloudlinux-license', 'external-info', 'spa-get-user-info'] # not requires license check if endpoint: if not self.license_is_checked and command not in allowed_methods: self.check_license() if 'mockJson' in self.request_data: self.spa_mock(self.request_data['mockJson']) endpoint() else: if command: self.exit_with_error("No such module " + command) else: self.exit_with_error("Command not defined") def parsing_request_data(self): """ parsing entry data, encode it from base64 to dictionary :return: """ parser = argparse.ArgumentParser() parser.add_argument('--data') parser.add_argument('--skip-cagefs-check', action='store_true', default=False) try: arguments = parser.parse_args() except: self.exit_with_error("Unknown param in request") if arguments.data: data_in_base64 = arguments.data data_in_json = base64.b64decode(data_in_base64).decode("utf-8") try: self.request_data = json.loads(data_in_json) self.skip_cagefs_check = arguments.skip_cagefs_check except ValueError: self.exit_with_error("Need json-array") self.user_info = self.get_user_info() self.define_current_plugin() else: self.exit_with_error("No --data param in request") def get_user_info(self): user_info = self.request_data.get('user_info') or {} if self.request_data['owner'] == 'user' and any(value is None for value in user_info.values()): euid = os.geteuid() username = get_main_username_by_uid(euid) user_info = {'username': username, 'lve-id': euid} return user_info def cl_top(self): # This imports from other package (cagefs), so we turn off pylint import checker for this line from lvestats.lib.info.cloudlinux_top import CloudLinuxTop #pylint: disable=E0401 import lvestats.lib.config as config #pylint: disable=E0401 list_to_request = self.prepair_params_for_command() result = '' try: result, exitcode = CloudLinuxTop(config.read_config()).main(*list_to_request) except config.ConfigError as ce: ce.log_and_exit() self.exit_with_error(str(ce)) if self.request_data.get('owner') == 'user': json_result = {} try: json_result = json.loads(result) except: self.exit_with_error(result) if json_result.get('result') != 'success': self.exit_with_error(json_result.get('result'), json_result.get('context'), ignore_errors=True) print(result) silence_stdout_until_process_exit() sys.exit(exitcode) def cl_quota(self): list_to_request = self.prepair_params_for_command() result = self.run_util('/usr/bin/cl-quota', *list_to_request, ignore_errors=True) print(result) def cl_xray_user_manager(self): list_to_request = self.prepair_params_for_command() list_to_request.remove("--json") result = self.run_util('/opt/alt/php-xray/cloudlinux-xray-user-manager', *list_to_request, ignore_errors=False) print(result) def cl_smart_advice_user(self): cli_command = '/opt/alt/php-xray/cl-smart-advice-user' list_to_request = self.prepair_params_for_command(with_json=False) # Workaround to run the command in background if '--async' in list_to_request: subprocess.Popen([cli_command, *list_to_request], stdin=subprocess.PIPE,stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) self.exit_with_success() result = self.run_util(cli_command, *list_to_request, ignore_errors=False) print(result) def cpanel_api(self): owner = self.request_data.get('owner') method = self.request_data.pop('method') list_to_request = self.prepair_params_for_command(with_json=False, add_dash=False) cpanel_api = get_cpanel_api_class(owner) self.exit_with_success({'data': cpanel_api.run(method, list_to_request)}) def cl_chart(self): list_to_request = self.prepair_params_for_command() try: list_to_request.remove("--json") except ValueError: pass for param in list_to_request: if param.startswith('--output'): self.exit_with_error('BAD REQUEST 2') list_to_request.insert(0, '/usr/sbin/lvechart') response = subprocess.check_output(list_to_request, shell=False, text=True) print(json.dumps({"result": "success", "chart": response})) silence_stdout_until_process_exit() sys.exit(0) def drop_permission(self): """ Drop permission to users, if owner of script is user :return: """ data = self.request_data if data['owner'] in ['reseller', 'user'] and\ ('lve-id' not in self.user_info or 'username' not in self.user_info): self.exit_with_error("User id does not specified") def prepair_params_for_command(self, with_json=True, escaped_strings=False, add_dash=True): """ Method that converts given dict of parameters into list of strings that should be passed as arguments command-line application :param with_json: add --json argument :param escaped_strings: ONLY FOR BACKWARDS COMPATIBILITY! SHOULD BE False FOR ALL NEW METHODS! :param add_dash: if we need to add dashes to params :return: """ value_template = "--{0}={1}" if add_dash else "{0}={1}" data = copy.deepcopy(self.request_data) list_to_request = [] if "method" in data: for method in data["method"].split(' '): list_to_request.append(method) if "params" not in data: data['params'] = {} if "json" not in data['params'] and with_json: data['params']['json'] = '' for param, value in iteritems(data['params']): if param != 'additional-params': # TODO: looks like we can remove option escaped_strings # and always use value.encode('utf-8') here # same goal may be reached using utils.byteify(json.loads(...)) # but to do that, we need some tests covering unicode params # (especially for cloudlinux-packages) # unfortunately, we do not have one ;( # THIS IS NEEDED ONLY FOR CL-PACKAGES UTILITY if value and escaped_strings is True: list_to_request.append(value_template.format(param, value.encode('unicode-escape').decode())) elif (value or param in self.NOT_FLAGGED_PARAMS) and escaped_strings is False: list_to_request.append(value_template.format(param, value)) else: list_to_request.append("--{0}".format(param)) if self.request_data['owner'] == 'reseller': list_to_request.append('--for-reseller={0}'.format(self.user_info['username'])) if 'additional-params' in data['params'] \ and data['params']['additional-params'] != '': list_to_request.append("--") for param in data['params']['additional-params'].split(): list_to_request.append("{0}".format(param)) return list_to_request def is_edition_migration_available(self): # check if edition migration is supported return os.path.isfile('/usr/sbin/clncheck') def update_license(self): # Register by broken license with open(os.devnull, 'w') as devnull: clnreg_cmd = ['/usr/sbin/clnreg_ks', '--force'] if self.is_edition_migration_available(): clnreg_cmd.append('--migrate-silently') subprocess.call(clnreg_cmd, stderr=devnull, stdout=devnull, shell=False) subprocess.call(['/usr/bin/cldetect', '--update-license'], stderr=devnull, stdout=devnull, shell=False) self.check_license(False) def check_license(self, with_recovery=True): if not self.kernel_is_supported(): if self.request_data['owner'] in ['reseller']: self.exit_with_error( code=503, error_id='ERROR.not_available_plugin', context={'pluginName': LVEMANAGER_PLUGIN_NAMES.get(self.current_plugin_name, DEFAULT_PLUGIN_NAME)}, icon='disabled') elif self.request_data['owner'] in ['admin']: self.exit_with_error('Kernel is not supported') if is_hitting_max_accounts_limit(): if self.request_data['owner'] == 'admin': self.exit_with_error('ERROR.hitting_max_accounts_limit') if self.request_data['owner'] == 'user': self.exit_with_error( code=503, error_id='ERROR.not_available_plugin', context={'pluginName': LVEMANAGER_PLUGIN_NAMES.get(self.current_plugin_name, DEFAULT_PLUGIN_NAME)}, icon='disabled') if not self.licence.get_license_status(): if self.request_data['owner'] in ['reseller', 'user']: interpreter = 'nodejs' if self.request_data.get('params') \ and self.request_data['params'].get('interpreter'): interpreter = self.request_data['params']['interpreter'] pluginNames = { 'reseller': 'CloudLinux Manager', 'user': {'python': 'Python Selector', 'nodejs':'Node.js Selector'} .get(interpreter, 'Node.js Selector') } self.exit_with_error( code=503, error_id='ERROR.not_available_plugin', context={'pluginName': LVEMANAGER_PLUGIN_NAMES.get(self.current_plugin_name, DEFAULT_PLUGIN_NAME)}, icon='disabled') else: if with_recovery: self.update_license() else: self.exit_with_error('License is not valid') else: self.license_is_checked = True def exit_with_error(self, error_string='', context=None, code=None, error_id=None, icon=None, ignore_errors=False): result = {"result": error_string} if context: result['context'] = context if code: result['code'] = code if error_id: result['error_id'] = error_id if icon: result['icon'] = icon if ignore_errors: result['ignore'] = ignore_errors print(json.dumps(result)) sys.exit(1) def exit_with_success(self, response=None): data = copy.deepcopy(response) if response else {} data['result'] = 'success' print(json.dumps(data)) sys.exit(0) def cl_statistics(self): # This imports from other package (cagefs), so we turn off pylint import checker for this line from lvestats.lib.cloudlinux_statistics import main #pylint: disable=E0401 import lvestats.lib.config as config #pylint: disable=E0401 from lvestats.lib.dbengine import make_db_engine #pylint: disable=E0401 list_to_request = self.prepair_params_for_command() try: cnf = config.read_config() dbengine = make_db_engine(cnf) main(dbengine, argv_=list_to_request,server_id=cnf.get('server_id', 'localhost')) silence_stdout_until_process_exit() sys.exit(0) except config.ConfigError as ce: ce.log_and_exit() self.exit_with_error(ce) def spa_mock(self, file): file_path = '/usr/share/l.v.e-manager/spa/src/jsons/%s.json' % (file) # check if passed file param doesn't use relative path. E.g.: '../../file' if os.path.realpath(file_path) != file_path: self.exit_with_error('BAD REQUEST 3') with open(file_path, 'r') as f: print(f.read()) sys.exit(0) def get_lve_version(self): try: ver = subprocess.check_output( 'cat /proc/lve/list | grep -Po \'^\d{1,2}:\'', shell=True, executable='/bin/bash', text=True ).strip() return int(ver[:-1]) except: return 0 def get_cloudlinux_version(self): return subprocess.check_output( 'uname -r | grep -Po \'el\d\w?\'', shell=True, executable='/bin/bash', text=True ).strip() # Common methods def spa_ping(self): self.exit_with_success() def cl_selector(self): try: from clselector.cl_selector import CloudlinuxSelector except: self.exit_with_error('Module unavailable') if self.user_info.get('username') and 'interpreter' in self.request_data['params']\ and self.request_data['params']['interpreter'] == 'php': self.check_php_selector_user_availablility() list_to_request = self.prepair_params_for_command() cll = CloudlinuxSelector() cll.run(list_to_request) def check_php_selector_user_availablility(self): """ Additional check only for php selector :return: """ try: LIBDIR = '/usr/share/cagefs' sys.path.append(LIBDIR) import cagefsctl if not cagefsctl.cagefs_is_enabled or \ not cagefsctl.is_user_enabled(self.user_info['username']): raise RuntimeError('Cagefs is disabled or missing') except (ImportError, RuntimeError): self.exit_with_error( code=503, error_id='ERROR.cagefsDisabled', ) from clselect.clselectexcept import BaseClSelectException try: from clselect import ClSelect ClSelect.check_multiphp_system_default_version() except (BaseClSelectException): self.exit_with_error( code=503, error_id='ERROR.systemVersionAltPHP', ) def define_current_plugin(self): self.current_plugin_name = self.request_data.get('plugin_name') def is_error_response_default(self, json_result): return json_result.get('result') != 'success' and json_result.get('success') != 1 def run_util(self, name, *args, **kwargs): command = [name] + list(args) error_checker = kwargs.get('error_checker', self.is_error_response_default) try: p = subprocess.Popen( command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=self.get_env()) (result, err) = p.communicate(kwargs.pop('stdin', None)) is_error = p.returncode != 0 or not is_json(result) if not is_error: json_result = json.loads(result) is_error = error_checker(json_result) if is_error: result = result + err if is_json(result): json_result = json.loads(result) if json_result.get('message'): json_result['result'] = json_result.pop('message') result = json.dumps(json_result) if kwargs.get("ignore_errors", False): # Check new result concatenated with error if is_json(result): result = json.loads(result) result['ignore'] = True result = json.dumps(result) else: result = self.ignored_error_message(result) print(result) exit(1) return result except Exception as e: self.exit_with_error("Can't run %(command)s", context={'command': ' '.join(command)}, ignore_errors=True) def ignored_error_message(self, message): return json.dumps({ "result": message, "ignore": True }) def kernel_is_supported(self): try: if is_container(): return True if is_cl_solo_edition(skip_jwt_check=True): # CL9 uses Alma kernel which doesn't have 'lve' in its name if get_cl_version() == 'cl9': return True uname = subprocess.check_output('uname -r', shell=True, executable='/bin/bash', text=True) return 'lve' in uname else: f = open('/proc/lve/list', 'r') line = f.readline() f.close() return bool(line) except IOError: return False def cl_statsnotifier(self): from lvestats.lib.cloudlinux_statsnotifier import main #pylint: disable=E0401 list_to_request = self.prepair_params_for_command(with_json=True) exit_code = main(args_=list_to_request) #pylint: disable=E0401 sys.exit(exit_code) def cloudlinux_awp_user(self): cli_command = '/usr/bin/cloudlinux-awp-user' list_to_request = self.prepair_params_for_command(with_json=False) result = self.run_util(cli_command, *list_to_request, ignore_errors=False) print(result) def cl_install_plugin(self): """ This method is needed just for dev server to allow work with mocks """ self.exit_with_success() # user cli parts class CommandType(Enum): HEAVY = 'heavy' SIMPLE = 'simple' class ConfigLimitValue(Enum): ALL = 'all' # limit all requests HEAVY = 'heavy' # don't limit white-listed 'simple' requests UNLIMITED = 'unlimited' # don't limit at all @classmethod def _missing_(cls, value): return cls.ALL @dataclass class Rule: callable: Callable result: CommandType class LimitStrategyBase: """ Base limits strategy to decide - run incoming request with or without cagefs limits Strategy execution means that used script (cloudlinux_cli_user.py) will be re-executed with (or not) additional cagefs flags """ cagefs_args: List[str] def execute(self, command: str, args: List[str], request_data: dict) -> Optional[int]: full_command = self.get_full_command(command, args, request_data) p = subprocess.Popen(full_command) p.communicate() return p.returncode def get_full_command(self, command: str, args: List[str], request_data: dict) -> List[str]: cmd = [*sys.argv, f'--skip-cagefs-check'] return [CAGEFS_ENTER_PROXIED_BIN, *self.cagefs_args, *cmd] class NoCagefsStrategy(LimitStrategyBase): """ Strategy for hardcoded commands, that should always run even without the cagefs with unknown reason This strategy does not re-executes the script with `cagefs_enter.proxied`, just letting them to finish as is TODO: LVEMAN-1767 """ def execute(self, *args, **kwargs) -> Optional[int]: return None class AllLimitStrategy(LimitStrategyBase): """ Strategy to limit all commands """ cagefs_args = [] class NoLimitStrategy(LimitStrategyBase): """ Strategy to don't limit all commands """ cagefs_args = ['--no-io-and-memory-limit', '--no-cpu-limit', '--no-max-enter'] class LimitStrategyHeavy(LimitStrategyBase): """ Strategy to don't limit whitelisted commands By default - all commands are HEAVY and will be limited Add `rules` to mark any command as SIMPLE and run without limits """ cagefs_args = [] default_rule = Rule(callable=lambda args: True, result=CommandType.HEAVY) rules = { 'cloudlinux-selector': [ Rule(callable=lambda args: 'get' in args, result=CommandType.SIMPLE), Rule(callable=lambda args: 'start' in args, result=CommandType.SIMPLE), Rule(callable=lambda args: 'restart' in args, result=CommandType.SIMPLE), Rule(callable=lambda args: 'stop' in args, result=CommandType.SIMPLE), ] } def _check_rules(self, command: str, args: List[str]) -> CommandType: command_type = None for rule in self.rules.get(command, []) + [self.default_rule]: if rule.callable(args): command_type = rule.result break if command_type == CommandType.SIMPLE: self.cagefs_args = ['--no-io-and-memory-limit', '--no-cpu-limit', '--no-max-enter'] else: self.cagefs_args = [] return command_type def get_full_command(self, command: str, args: List[str], request_data: dict) -> List[str]: self._check_rules(command, args) return super().get_full_command(command, args, request_data) cloudlinux-selector.py 0000755 00000001216 14711144154 0011127 0 ustar 00 #!/opt/cloudlinux/venv/bin/python3 -sbb # -*- coding: utf-8 -*- # cloudlinux-selector Utility to check Cloudlinux license # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2022 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import print_function from __future__ import division from __future__ import absolute_import import sys from clselector.cl_selector import CloudlinuxSelector def main(argv): """ Main run function """ cll = CloudlinuxSelector() return cll.run(argv) if __name__ == "__main__": sys.exit(main(sys.argv[1:])) cloudlinux_cli_user.py 0000755 00000026243 14711144154 0011205 0 ustar 00 # coding:utf-8 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import print_function from __future__ import division from __future__ import absolute_import import json import logging import subprocess import os import sys from libcloudlinux import ( CloudlinuxCliBase, LVEMANAGER_PLUGIN_NAMES, DEFAULT_PLUGIN_NAME, PASSENGER_DEPEND_PLUGINS, AllLimitStrategy, NoLimitStrategy, LimitStrategyHeavy, NoCagefsStrategy, LimitStrategyBase, ConfigLimitValue, ) from clselector.clpassenger_detectlib import is_clpassenger_active from clcommon import ClPwd from clcommon.utils import is_litespeed_running from clcommon.lib.cledition import is_cl_solo_edition from cldetectlib import get_param_from_file from clcommon.const import Feature from clcommon.cpapi import is_panel_feature_supported CONFIG = '/etc/sysconfig/cloudlinux' SMART_ADVICE_USER_CLI = '/opt/alt/php-xray/cl-smart-advice-user' PERCENTS_STATS_MODE_FLAG = '/opt/cloudlinux/flags/enabled-flags.d/percentage-user-stats-mode.flag' # NB: this logger's out is stderr, result JSON out is stdout - so with active logger web will not work properly # because of stderr redirection 2>&1 # so it is MUST be silent(NOTSET) in normal situation # also it is not possible to use file logger here - script works inside the cagefs with user's rights logger = logging.getLogger(__name__) logger.setLevel(logging.NOTSET) init_formatter = logging.Formatter('[%(asctime)s] %(funcName)s:%(lineno)s - %(message)s') cagefs_formatter = logging.Formatter('{cagefs} [%(asctime)s] %(funcName)s:%(lineno)s - %(message)s') h = logging.StreamHandler() h.setFormatter(init_formatter) logger.addHandler(h) logger.debug('cli start') class CloudlinuxCliUser(CloudlinuxCliBase): limit_strategy: LimitStrategyBase def __init__(self): self.web_resource_limit_mode = ConfigLimitValue.ALL limit_mode = get_param_from_file(CONFIG, 'web_resource_limit_mode', '=', ConfigLimitValue.ALL.value) self.web_resource_limit_mode = ConfigLimitValue(limit_mode) super(CloudlinuxCliUser, self).__init__() self.command_methods.update({ 'spa-get-domains': self.spa_user_domains, 'spa-get-homedir': self.spa_user_homedir, 'cloudlinux-snapshots': self.cl_snapshots, 'spa-get-user-info': self.spa_get_user_info }) def __init_limit_strategy(self): """ Set default strategy from the `CONFIG` values """ if self.skip_cagefs_check: logger.handlers[0].setFormatter(cagefs_formatter) # update log format to easier log review # we cannot use cagefs when it is not available if not is_panel_feature_supported(Feature.CAGEFS): self.limit_strategy = NoCagefsStrategy() else: self.limit_strategy = { ConfigLimitValue.ALL: AllLimitStrategy, ConfigLimitValue.HEAVY: LimitStrategyHeavy, ConfigLimitValue.UNLIMITED: NoLimitStrategy, }.get(self.web_resource_limit_mode, AllLimitStrategy)() logger.debug( f'Limits strategy inited as {self.limit_strategy.__class__}' f'\n\tBecause of:' f'\n\tself.web_resource_limit_mode: {self.web_resource_limit_mode}' ) def set_limit_strategy(self, strategy: LimitStrategyBase): logger.debug(f'Limit strategy is explicitly set to {strategy.__class__}') self.limit_strategy = strategy def __check_exclusive_commands(self): """ Check is command currently run without cagefs; commands list is taken from Spa.php `processRequest()` This function should be removed in the same task as the `LimitStrategyNoCagefs` """ data = self.request_data if data.get('params', {}).get('interpreter') == 'php' or data.get('command') in { 'cloudlinux-statistics', 'cloudlinux-quota', 'cloudlinux-top', 'cloudlinux-snapshots', 'cloudlinux-charts', 'cloudlinux-statsnotifier', 'spa-get-user-info', 'cloudlinux-awp-user', 'cloudlinux-xray-user-manager', 'spa-get-domains', 'cpanel-api', 'cl-smart-advice-user', }: logger.debug('Executable command found in the exclusive list') self.set_limit_strategy(NoCagefsStrategy()) def drop_permission(self): """ Drop permission to users, if owner of script is user :return: """ logger.debug( 'drop permissions start' f'\n\targv is: {sys.argv}' f'\n\trequest data is: {self.request_data}' ) self.__init_limit_strategy() data = self.request_data if data['owner'] != 'user': self.exit_with_error("User not allowed") super(CloudlinuxCliUser, self).drop_permission() args = self.prepair_params_for_command() logger.debug(f'prepared args is: {args}') if data.get('command'): if self.skip_cagefs_check: logger.debug('cagefs skipped: --skip-cagefs-check arg found') else: self.__check_exclusive_commands() # if rc is None - script won't enter the cagefs # otherwise - command is executed in the cagefs rc = self.limit_strategy.execute(data['command'], args, self.request_data) if rc is not None: logger.debug(f'command executed inside of the cagefs with rc: {rc}') sys.exit(rc) else: logger.debug(f'cagefs skipped: strategy is {self.limit_strategy.__class__}') # skip checking plugin availability on spa-get-user-info if data.get('command') != 'spa-get-user-info': self.check_plugin_availability() logger.debug('drop permissons end') def spa_user_domains(self): print(json.dumps({"result": "success", "list": self.get_user_domains()})) sys.exit(0) def spa_user_homedir(self): print(json.dumps({"result": "success", "homedir": self.get_user_homedir()})) sys.exit(0) def spa_get_user_info(self): try: print(json.dumps( { "result": "success", "domains": self.get_user_domains(), "homedir": self.get_user_homedir(), "is_litespeed_running": is_litespeed_running(), "is_cl_solo_edition": is_cl_solo_edition(skip_jwt_check=True), "smart_advice": os.path.isfile(SMART_ADVICE_USER_CLI), "is_lve_supported": is_panel_feature_supported(Feature.LVE), "user_stats_mode": self.get_stats_mode(), "server_ip": self.get_server_ip() })) except: self.exit_with_error('Module unavailable') sys.exit(0) def get_user_domains(self): try: from clcommon.cpapi import userdomains except: self.exit_with_error('Module unavailable') return [x[0] for x in userdomains(self.user_info['username'])] def get_stats_mode(self): if os.path.isfile(PERCENTS_STATS_MODE_FLAG): return 'percent' return 'default' def get_user_homedir(self): try: pwdir = ClPwd().get_homedir(self.user_info['username']) return pwdir + "/" except KeyError: self.exit_with_error('No such user') def cl_snapshots(self): list_to_request = self.prepair_params_for_command() try: output = self.run_util('/usr/sbin/lve-read-snapshot', *list_to_request) except subprocess.CalledProcessError as processError: output = processError.output try: result = json.loads(output) except: self.exit_with_error(output) return self.exit_with_success({'data': result['data']}) sys.exit(0) def check_plugin_availability(self): plugin_names = { 'nodejs_selector': 'Node.js Selector', 'python_selector': 'Python Selector', } selector_enabled = True manager = None try: if self.current_plugin_name == 'nodejs_selector': from clselect.clselectnodejs.node_manager import NodeManager manager = NodeManager() if self.current_plugin_name == 'python_selector': from clselect.clselectpython.python_manager import PythonManager manager = PythonManager() if manager: selector_enabled = manager.selector_enabled except: selector_enabled = False if not selector_enabled: self.exit_with_error( code=503, error_id='ERROR.not_available_plugin', context={'pluginName': plugin_names.get(self.current_plugin_name, 'Plugin')}, icon='disabled') plugin_available_checker = { 'nodejs_selector': self._plugin_available_nodejs, 'python_selector': self._plugin_available_python, 'php_selector': self._plugin_available_php, 'resource_usage': self._plugin_available_resource_usage, }.get(self.current_plugin_name) if plugin_available_checker: plugin_available = plugin_available_checker() else: plugin_available = True if not is_clpassenger_active() and self.current_plugin_name in PASSENGER_DEPEND_PLUGINS: self.exit_with_error( code=503, error_id='ERROR.not_available_passenger', context={'pluginName': LVEMANAGER_PLUGIN_NAMES.get(self.current_plugin_name, DEFAULT_PLUGIN_NAME)}, icon='disabled') if not plugin_available: self.exit_with_error( code=503, error_id='ERROR.not_available_plugin', context={'pluginName': LVEMANAGER_PLUGIN_NAMES.get(self.current_plugin_name, DEFAULT_PLUGIN_NAME)}, icon='disabled') def _plugin_available_nodejs(self): try: from clselect.clselectnodejs.node_manager import NodeManager manager = NodeManager() if not manager.selector_enabled or not is_clpassenger_active(): return False except: return False return True def _plugin_available_python(self): try: from clselect.clselectpython.python_manager import PythonManager manager = PythonManager() if not manager.selector_enabled or not is_clpassenger_active(): return False except: return False return True def _plugin_available_php(self): try: from clselect.clselectphp.php_manager import PhpManager manager = PhpManager() if not manager.selector_enabled: return False except: return False return True def _plugin_available_resource_usage(self): return True cpanel_api.py 0000644 00000023140 14711144154 0007213 0 ustar 00 # coding:utf-8 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import absolute_import from __future__ import print_function from lvemanager.helpers import exit_with_error, run_command import json from abc import abstractmethod from clcommon.lib.whmapi_lib import WhmApiRequest, WhmApiError import sys OWNER_ADMIN = 'admin' OWNER_USER = 'user' UAPI_IGNORED_ERRORS = ["The event UAPI::LangPHP::php_set_vhost_versions was handled successfully."] def get_cpanel_api_class(owner): """ Factory method that returns certain class depending on owner's type :param owner: Can be 'admin' or 'user' :return: """ if owner == OWNER_ADMIN: return CPanelAdminApi() return CPanelUserApi() class CPanelApi(object): """ Abstract method that defines abstract methods that must be implemented and common methods of the derived classes """ RETURN_JSON = '--output=json' ALLOWED_OPERATIONS = {} @abstractmethod def check_operation_allowed(self, called_method): """ Checks whether operation is allowed to perform :param called_method: :return: """ raise NotImplementedError('Method check_operation_allowed must be implemented!') @abstractmethod def prepare_running_command(self, called_method, params, return_json): """ Depending on the passed arguments, the method builds running command """ raise NotImplementedError('Method prepare_running_command must be implemented!') @abstractmethod def parse_result(self, called_method, result): """ Some results are parsed before sending back to the called. This method parses it and returns transformed result. """ raise NotImplementedError('Method parse_result must be implemented!') @staticmethod def check_method_allowed(method_name, methods): """ CPanel API contains several methods and not all of them if supported by this utility. """ for method_object in methods: if method_name == method_object['method']: return True return False def check_for_errors(self, result): """ Checks whether CPanel API returned error or not :param result: :return: """ raise NotImplementedError('Method check_for_errors must be implemented!') @staticmethod def get_method_parser(method_name, methods): """ Some result must be parsed before they are sent back to the caller. This method extracts the method that must be executed upon the result to transform it. """ for method_object in methods: if method_name == method_object['method']: return method_object.get('parser', None) def get_ignore_errors(self, method_name, methods): """ return ignore error flag for method :return: """ for method_object in methods: if method_name == method_object['method']: return method_object.get('ignore_errors', None) @staticmethod def parse_vhost_versions(result): """ Method parses the result of the method 'parse_vhost_versions'. The host will be inherited when its field 'sys_default' inside 'php_version_source' is equal to 1. """ parsed_result = [] for r in result: parsed_result.append({ 'version': r.get('version'), 'host': r.get('vhost'), 'php_fpm': True if r.get('php_fpm') else False, 'inherited': True if r.get('php_version_source', {}).get('sys_default') == 1 else False }) return parsed_result def run(self, called_method, params, return_json=True): """ Default method which first checks whether operation is allowed then if allowed runs prepared command. After receiving the result, it will call parse_result method to transform result into desired format. """ if self.check_operation_allowed(called_method): prepared_command = self.prepare_running_command(called_method, params, return_json) output = run_command(prepared_command) try: result = json.loads(output) return self.parse_result(called_method, result) except ValueError as e: exit_with_error(e) else: exit_with_error('Not allowed operation: {}'.format(called_method)) class CPanelAdminApi(CPanelApi): COMMAND = '/usr/local/cpanel/bin/whmapi1' ALLOWED_OPERATIONS = [ { 'method': 'php_get_vhost_versions', 'parser': lambda result: CPanelAdminApi.parse_vhost_versions(result) }, {'method': 'php_get_system_default_version'}, {'method': 'php_set_vhost_versions'}, {'method': 'php_get_installed_versions'} ] def check_operation_allowed(self, called_method): return self.check_method_allowed(called_method, self.ALLOWED_OPERATIONS) def prepare_running_command(self, called_method, params, return_json): transformed_params = {} for param in params: key, value = param.split('=') transformed_params[key] = value return transformed_params def parse_result(self, called_method, result): parser = self.get_method_parser(called_method, self.ALLOWED_OPERATIONS) if parser is not None: return parser(result) return result @staticmethod def parse_vhost_versions(result): parsed_result = [] for r in result['versions']: parsed_result.append({ 'version': r.get('version'), 'host': r.get('vhost'), 'user': r.get('account'), 'php_fpm': True if r.get('php_fpm') else False, 'inherited': True if r.get('php_version_source', {}).get('sys_default') == 1 else False }) return parsed_result def run(self, called_method, params, return_json=True): params = self.prepare_running_command(called_method, params, return_json) if self.check_operation_allowed(called_method): try: return self.parse_result(called_method, WhmApiRequest(called_method).with_arguments(**params).call()) except WhmApiError as e: print(e) sys.exit(1) else: exit_with_error('Not allowed operation: {}'.format(called_method)) class CPanelUserApi(CPanelApi): COMMAND = '/usr/bin/uapi' ALLOWED_OPERATIONS = { 'LangPHP': [ {'method': 'php_set_vhost_versions'}, { 'method': 'php_get_installed_versions', 'ignore_errors': True, }, { 'method': 'php_get_vhost_versions', 'parser': lambda result: CPanelApi.parse_vhost_versions(result), 'ignore_errors': True, }, { 'method': 'php_get_system_default_version', 'ignore_errors': True, } ] } def check_class_allowed(self, class_name): """ CPanel UAPI requires to pass class name and only few of classes are supported by this utility. """ return class_name in self.ALLOWED_OPERATIONS.keys() def check_operation_allowed(self, called_method): """ Checks whether operation is allowed or not. By CPanel UAPI supports several operations but only few of them can be called via this utility. """ class_name, method_name = self.extract_class_and_method(called_method) if self.check_class_allowed(class_name): return self.check_method_allowed(method_name, self.ALLOWED_OPERATIONS[class_name]) return False @staticmethod def extract_class_and_method(called_method): try: class_name, method_name = called_method.split('::') return class_name, method_name except ValueError: exit_with_error('Invalid method is passed: {}'.format(called_method)) def prepare_running_command(self, called_method, params, return_json): class_name, method_name = self.extract_class_and_method(called_method) params = [self.COMMAND, class_name, method_name] + params if return_json: params.append(self.RETURN_JSON) return params def parse_result(self, called_method, result): class_name, method_name = self.extract_class_and_method(called_method) ignore_errors = self.get_ignore_errors(method_name, self.ALLOWED_OPERATIONS[class_name]) self.check_for_errors(result, ignore_errors) parser = self.get_method_parser(method_name, self.ALLOWED_OPERATIONS[class_name]) if parser is not None: return parser(result['result']['data']) return result['result']['data'] def check_for_errors(self, result, ignore_errors = False): errors = result['result'].get('errors') if not errors: return for error in errors: # TODO: The next line is workaround of cPanel issue CPANEL-35122 # see https://support.cpanel.net/hc/en-us/articles/1500004317181-PHP-Selector-change-to-CloudLinux-PHP-version-reports-a-false-error if error in UAPI_IGNORED_ERRORS: continue exit_with_error(' '.join(errors), ignore_errors=ignore_errors) python_wrapper 0000755 00000001070 14711144154 0007553 0 ustar 00 #!/bin/bash # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT # if [[ $EUID -eq 0 ]]; then echo "This program is not intended to be run as root." 1>&2 exit 1 fi CWD=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) source ${CWD}/activate CL_PYTHON_VERSION="$(echo "${VIRTUAL_ENV}" | awk -F '/' '{print $NF}')" eval $(${CWD}/set_env_vars.py python) ABSOLUTE_PATH="${CWD}/python${CL_PYTHON_VERSION}_bin" exec "${ABSOLUTE_PATH}" "$@" npm_wrapper 0000755 00000003302 14711144154 0007024 0 ustar 00 #!/bin/bash if [[ $EUID -eq 0 ]]; then echo "This program is not intended to be run as root." 1>&2 exit 1 fi error_msg="Cloudlinux NodeJS Selector demands to store node modules for application in separate folder \ (virtual environment) pointed by symlink called \"node_modules\". That's why application should not contain \ folder/file with such name in application root" CWD=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) source "${CWD}/activate" eval $(${CWD}/set_env_vars.py nodejs) app_node_modules="${HOME}/${CL_APP_ROOT}/node_modules" venv_node_modules="${CL_VIRTUAL_ENV}/lib/node_modules" nodejs_npm="$CL_NODEHOME/usr/bin/npm" # install with its aliases and list with its alias without arguments or +args if [[ "$@" =~ ^(install|i|add|list|la|ll)$ || "$@" =~ ^(install|i|add|list|la|ll)[[:space:]].*$ ]]; then # We remove old symlink `~/app_root/node_modules` if it exists if [[ -L "${app_node_modules}" ]]; then rm -f "${app_node_modules}" || (echo "Can't remove symlink "${app_node_modules} 1>&2 && exit 1) # We print error end exit 1 if `~/app_root/node_modules` is dir or file elif [[ -d "${app_node_modules}" || -f "${app_node_modules}" ]]; then echo "${error_msg}" 1>&2 && exit 1 fi # we should create venv/node_modules, https://docs.cloudlinux.com/index.html?link_traversal_protection.html mkdir -p "${venv_node_modules}" # Create symlink ~/app_root/node_modules to ~/nodevenv/app_root/int_version/lib/node_modules ln -fs "${venv_node_modules}" "${app_node_modules}" ln -sf "${HOME}/${CL_APP_ROOT}/package.json" "${CL_VIRTUAL_ENV}/lib/package.json" exec "${nodejs_npm}" "$@" --prefix="${CL_VIRTUAL_ENV}/lib" else exec "${nodejs_npm}" "$@" fi node_wrapper 0000755 00000000414 14711144154 0007160 0 ustar 00 #!/bin/bash if [[ $EUID -eq 0 ]]; then echo "This program is not intended to be run as root." 1>&2 exit 1 fi CWD=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) source ${CWD}/activate eval $(${CWD}/set_env_vars.py nodejs) exec "${CL_NODEHOME}/usr/bin/node" "$@"