Linux iad1-shared-b7-18 6.6.49-grsec-jammy+ #10 SMP Thu Sep 12 23:23:08 UTC 2024 x86_64
Apache
: 67.205.6.31 | : 216.73.216.47
Cant Read [ /etc/named.conf ]
8.2.29
fernandoquevedo
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
UNLOCK SHELL
HASH IDENTIFIER
README
+ Create Folder
+ Create File
/
usr /
local /
lib /
python3.10 /
dist-packages /
[ HOME SHELL ]
Name
Size
Permission
Action
__pycache__
[ DIR ]
drwxr-xr-x
pyzabbix
[ DIR ]
drwxr-xr-x
pyzabbix-1.3.1.dist-info
[ DIR ]
drwxr-xr-x
zabbix_api-0.5.6.dist-info
[ DIR ]
drwxr-xr-x
zabbix_api.py
12.96
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : zabbix_api.py
# This is a port of the ruby zabbix api found here: # http://trac.red-tux.net/browser/ruby/api/zbx_api.rb # # LGPL 2.1 http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html # Zabbix API Python Library. # Original Ruby Library is Copyright (C) 2009 Andrew Nelson nelsonab(at)red-tux(dot)net # Python Library is Copyright (C) 2009 Brett Lentz brett.lentz(at)gmail(dot)com # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA # NOTES: # The API requires zabbix 1.8 or later. # Currently, not all of the API is implemented, and some functionality is # broken. This is a work in progress. import base64 import hashlib import logging import string import sys import ssl import socket try: import urllib2 except ImportError: import urllib.request as urllib2 # python3 import re from collections import deque try: from ssl import _create_unverified_context HAS_SSLCONTEXT = True except ImportError: HAS_SSLCONTEXT = False default_log_handler = logging.StreamHandler(sys.stdout) __logger = logging.getLogger("zabbix_api") __logger.addHandler(default_log_handler) __logger.log(10, "Starting logging") try: # Separate module or Python <2.6 import simplejson as json __logger.log(15, "Using simplejson library") except ImportError: # Python >=2.6 import json __logger.log(15, "Using native json library") def checkauth(fn): """ Decorator to check authentication of the decorated method """ def ret(self, *args): self.__checkauth__() return fn(self, args) return ret def dojson(fn): def wrapper(self, method, opts): self.logger.log(logging.DEBUG, "Going to do_request for %s with opts %s" % (repr(fn), repr(opts))) return self.do_request(self.json_obj(method, opts))['result'] return wrapper def version_compare(v1, v2): """ The result is 0 if v1 == v2, -1 if v1 < v2, and +1 if v1 > v2 """ for v1_part, v2_part in zip(v1.split("."), v2.split(".")): if v1_part.isdecimal() and v2_part.isdecimal(): if int(v1_part) > int(v2_part): return 1 elif int(v1_part) < int(v2_part): return -1 else: if v1 > v2: return 1 elif v1 < v2: return -1 return 0 class ZabbixAPIException(Exception): """ generic zabbix api exception code list: -32602 - Invalid params (eg already exists) -32500 - no permissions """ pass class Already_Exists(ZabbixAPIException): pass class InvalidProtoError(ZabbixAPIException): """ Recived an invalid proto """ pass class APITimeout(ZabbixAPIException): pass class ZabbixAPI(object): __username__ = '' __password__ = '' __tokenauth__ = False auth = '' url = '/api_jsonrpc.php' params = None method = None # HTTP or HTTPS proto = 'http' # HTTP authentication httpuser = None httppasswd = None timeout = 10 validate_certs = None # sub-class instances. # Constructor Params: # server: Server to connect to # path: Path leading to the zabbix install # proto: Protocol to use. http or https # We're going to use proto://server/path to find the JSON-RPC api. # # user: HTTP auth username # passwd: HTTP auth password # log_level: logging level # r_query_len: max len query history # **kwargs: Data to pass to each api module def __init__(self, server='http://localhost/zabbix', user=httpuser, passwd=httppasswd, log_level=logging.WARNING, timeout=10, r_query_len=10, validate_certs=True, **kwargs): """ Create an API object. """ self._setuplogging() self.set_log_level(log_level) self.server = server self.url = server + '/api_jsonrpc.php' self.proto = self.server.split("://")[0] # self.proto=proto self.httpuser = user self.httppasswd = passwd self.timeout = timeout self.kwargs = kwargs self.id = 0 self.r_query = deque([], maxlen=r_query_len) self.validate_certs = validate_certs self.debug(logging.INFO, "url: " + self.url) def _setuplogging(self): self.logger = logging.getLogger("zabbix_api.%s" % self.__class__.__name__) def set_log_level(self, level): self.debug(logging.INFO, "Set logging level to %d" % level) self.logger.setLevel(level) def recent_query(self): """ return recent query """ return list(self.r_query) def debug(self, level, var="", msg=None): strval = str(level) + ": " if msg: strval = strval + str(msg) if var != "": strval = strval + str(var) self.logger.log(level, strval) def json_obj(self, method, params={}, auth=True): obj = {'jsonrpc': '2.0', 'method': method, 'params': params, 'auth': self.auth, 'id': self.id} if not auth: del obj['auth'] self.debug(logging.DEBUG, "json_obj: " + str(obj)) return json.dumps(obj) def login(self, user='', password='', save=True, api_token=None): if api_token is not None: # due to ZBX-21688 we are unable to check if the token is valid # obj = self.json_obj('user.checkAuthentication', {'sessionid': api_token}, auth=False) # result = self.do_request(obj) self.debug(logging.DEBUG, "Using API Token for auth") self.auth=api_token self.__tokenauth__ = True return if user != '': l_user = user l_password = password if save: self.__username__ = user self.__password__ = password elif self.__username__ != '': l_user = self.__username__ l_password = self.__password__ else: raise ZabbixAPIException("No authentication information available.") # don't print the raw password. hashed_pw_string = "md5(" + hashlib.md5(l_password.encode('utf-8')).hexdigest() + ")" self.debug(logging.DEBUG, "Trying to login with %s:%s" % (repr(l_user), repr(hashed_pw_string))) if version_compare(self.api_version(), '5.4') >= 0: login_arg = {'username': l_user, 'password': l_password} else: login_arg = {'user': l_user, 'password': l_password} obj = self.json_obj('user.login', login_arg, auth=False) result = self.do_request(obj) self.auth = result['result'] def logout(self): if self.__tokenauth__: # Do nothing for logout for API tokens. self.debug(logging.DEBUG, "Clearing auth information due to use of API Token") self.auth = '' self.__username__ = '' self.__password__ = '' self.__tokenauth__ = False return if self.auth == '': raise ZabbixAPIException("No authentication information available.") self.debug(logging.DEBUG, "Trying to logout user: %s." % self.__username__) obj = self.json_obj('user.logout', auth=True) result = self.do_request(obj) if result['result']: self.auth = '' self.__username__ = '' self.__password__ = '' def test_login(self): if self.auth != '': obj = self.json_obj('user.checkAuthentication', {'sessionid': self.auth}) result = self.do_request(obj) if not result['result']: self.auth = '' return False # auth hash bad return True # auth hash good else: return False def do_request(self, json_obj): headers = {'Content-Type': 'application/json-rpc', 'User-Agent': 'python/zabbix_api'} if self.httpuser: self.debug(logging.INFO, "HTTP Auth enabled") credentials = (self.httpuser + ':' + self.httppasswd).encode('ascii') auth = 'Basic ' + base64.b64encode(credentials).decode("ascii") headers['Authorization'] = auth self.r_query.append(str(json_obj)) self.debug(logging.INFO, "Sending: " + str(json_obj)) self.debug(logging.DEBUG, "Sending headers: " + str(headers)) request = urllib2.Request(url=self.url, data=json_obj.encode('utf-8'), headers=headers) if self.proto == "https": if HAS_SSLCONTEXT and not self.validate_certs: https_handler = urllib2.HTTPSHandler(debuglevel=0, context=_create_unverified_context()) else: https_handler = urllib2.HTTPSHandler(debuglevel=0) opener = urllib2.build_opener(https_handler) elif self.proto == "http": http_handler = urllib2.HTTPHandler(debuglevel=0) opener = urllib2.build_opener(http_handler) else: raise ZabbixAPIException("Unknow protocol %s" % self.proto) urllib2.install_opener(opener) try: response = opener.open(request, timeout=self.timeout) except ssl.SSLError as e: if hasattr(e, 'message'): e = e.message raise ZabbixAPIException("ssl.SSLError - %s" % e) except socket.timeout as e: raise APITimeout("HTTP read timeout",) except urllib2.URLError as e: if hasattr(e, 'message') and e.message: e = e.message elif hasattr(e, 'reason'): e = e.reason raise ZabbixAPIException("urllib2.URLError - %s" % e) self.debug(logging.INFO, "Response Code: " + str(response.code)) # NOTE: Getting a 412 response code means the headers are not in the # list of allowed headers. if response.code != 200: raise ZabbixAPIException("HTTP ERROR %s: %s" % (response.status, response.reason)) reads = response.read() if len(reads) == 0: raise ZabbixAPIException("Received zero answer") try: jobj = json.loads(reads.decode('utf-8')) except ValueError as msg: print ("unable to decode. returned string: %s" % reads) sys.exit(-1) self.debug(logging.DEBUG, "Response Body: " + str(jobj)) self.id += 1 if 'error' in jobj: # some exception msg = "Error %s: %s, %s while sending %s" % (jobj['error']['code'], jobj['error']['message'], jobj['error']['data'], str(json_obj)) if re.search(".*already\sexists.*", jobj["error"]["data"], re.I): # already exists raise Already_Exists(msg, jobj['error']['code']) else: raise ZabbixAPIException(msg, jobj['error']['code']) return jobj def logged_in(self): if self.auth != '': return True return False def api_version(self, **options): obj = self.do_request(self.json_obj('apiinfo.version', options, auth=False)) return obj['result'] def __checkauth__(self): if not self.logged_in(): raise ZabbixAPIException("Not logged in.") def __getattr__(self, name): return ZabbixAPISubClass(self, dict({"prefix": name}, **self.kwargs)) class ZabbixAPISubClass(ZabbixAPI): """ wrapper class to ensure all calls go through the parent object """ parent = None data = None def __init__(self, parent, data, **kwargs): self._setuplogging() self.debug(logging.INFO, "Creating %s" % self.__class__.__name__) self.data = data self.parent = parent # Save any extra info passed in for key, val in kwargs.items(): setattr(self, key, val) self.debug(logging.WARNING, "Set %s:%s" % (repr(key), repr(val))) def __getattr__(self, name): if self.data["prefix"] == "configuration" and name == "import_": # workaround for "import" method name = "import" def method(*opts): return self.universal("%s.%s" % (self.data["prefix"], name), opts[0]) return method def __checkauth__(self): self.parent.__checkauth__() def do_request(self, req): return self.parent.do_request(req) def json_obj(self, method, param): return self.parent.json_obj(method, param) @dojson @checkauth def universal(self, **opts): return opts
Close