Hackfut Security File Manager
Current Path:
/opt/alt/python37/lib/python3.7/site-packages/lvestats/lib
opt
/
alt
/
python37
/
lib
/
python3.7
/
site-packages
/
lvestats
/
lib
/
📁
..
📄
__init__.py
(219 B)
📁
__pycache__
📁
chart
📄
cloudlinux_statistics.py
(24.38 KB)
📄
cloudlinux_statsnotifier.py
(5.21 KB)
📁
commons
📄
config.py
(2.94 KB)
📄
db_functions.py
(1.6 KB)
📄
dbengine.py
(9.29 KB)
📁
info
📄
jsonhandler.py
(482 B)
📄
lve_create_db.py
(1.92 KB)
📄
lve_list.py
(3.39 KB)
📄
lveinfolib.py
(39.95 KB)
📄
lveinfolib_gov.py
(13.59 KB)
📄
lvestats_server.py
(8.1 KB)
📄
notifications_helper.py
(5.45 KB)
📁
parsers
📄
server_id.py
(5.39 KB)
📄
snapshot.py
(7.39 KB)
📄
uidconverter.py
(5.11 KB)
📄
ustate.py
(8.65 KB)
Editing: server_id.py
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import print_function from __future__ import absolute_import from past.builtins import basestring from uuid import uuid4 from clcommon.clconfpars import load, change_settings from lvestats.lib import config from lvestats.lib import lveinfolib from lvestats.orm.history import history, history_x60 from lvestats.orm.history_gov import history_gov from lvestats.orm.incident import incident from lvestats.orm.servers import servers from lvestats.orm.user import user from past.builtins import raw_input class UpdateOrCreateServerID(object): """Change old-style server ID 'localhost' to auto-generated uuid User can set his own server ID """ restricted_server_id_list = ('localhost',) server_id_length = 10 def __init__(self, engine, config_path=None): """ :type engine: sqlalchemy.engine.base.Engine :type config_path: Union[str, None] """ self.engine = engine self.config_path = config_path or config.GLOBAL_CONFIG_LOCATION def _get_current_server_id_cfg(self): """ :rtype: Union[None, str] """ try: cfg = load(path=self.config_path) return cfg.get('server_id') except Exception as e: print('Error: {0}'.format(e)) return None def _set_server_id_cfg(self, server_id): """Sets option 'server_id' in config file :type server_id: str :rtype: Union[None, str] """ try: change_settings({'server_id': server_id}, self.config_path) return server_id except Exception as e: print('Error: {0}'.format(e)) return None def _update_server_id_table(self, conn, table, current_server_id, new_server_id): """ updates server_id in one of the tables :type conn: sqlalchemy.engine.base.Connection :type table: Union[history, history_gov, incident, user, history_x60] :type current_server_id: str :type new_server_id: str """ sql_update = table.__table__.update().where( table.server_id == current_server_id ) conn.execute(sql_update.values( {'server_id': new_server_id} )) def _update_server_id_db(self, current_server_id, new_server_id): """ updates server_id for all tables in the database :type current_server_id: str :type new_server_id: str :rtype: Union[None, str] """ sql_tables = [history, history_gov, incident, user, history_x60] lve_version = lveinfolib.get_lve_version(self.engine, current_server_id) conn = self.engine.connect() tx = conn.begin() try: for table in sql_tables: self._update_server_id_table(conn, table, current_server_id, new_server_id) # update servers table, where server_id is a primary key conn.execute(servers.__table__.insert().values( {'server_id': new_server_id, "lve_version": lve_version} )) conn.execute(servers.__table__.delete().where(servers.server_id == current_server_id)) except Exception as e: tx.rollback() conn.close() print('Error: {0}'.format(e)) return None else: tx.commit() conn.close() return new_server_id def _resolve_localhost(self, prompt=True, current_server_id=None): """ :type current_server_id: Union[None, str] :type prompt: bool :rtype: Union[None, str] """ server_id = str(uuid4())[:self.server_id_length] if prompt: res = raw_input('Enter new server ID [\'{0}\']: '.format(server_id)) print(res) if res: server_id = res.strip() if server_id in self.restricted_server_id_list: print('Server ID cannot be \'{0}\''.format(server_id)) return None if server_id == current_server_id: print("Server ID is already set to '{0}'".format(server_id)) return server_id if len(server_id) > 255: print("Server ID length should be less than 255") return None cur_server_id = self._get_current_server_id_cfg() if cur_server_id is not None and not prompt and \ cur_server_id not in self.restricted_server_id_list: return cur_server_id if self._update_server_id_db(current_server_id or cur_server_id or 'localhost', server_id): return self._set_server_id_cfg(server_id) return server_id def prompt(self): """ :rtype: Union[None, str] """ cur_cfg_server_id = self._get_current_server_id_cfg() # if server ID = 'localhost' if isinstance(cur_cfg_server_id, basestring) and \ cur_cfg_server_id.strip() in self.restricted_server_id_list: print('Server ID cannot be \'{0}\''.format(cur_cfg_server_id)) return self._resolve_localhost(current_server_id=cur_cfg_server_id) def auto(self): """ :rtype: Union[None, str] """ return self._resolve_localhost(prompt=False)
Upload File
Create Folder