-
Notifications
You must be signed in to change notification settings - Fork 116
/
ciscoEntityFruControlMIB.py
179 lines (143 loc) · 5.51 KB
/
ciscoEntityFruControlMIB.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
from enum import Enum, unique
from sonic_ax_impl import mibs
from ax_interface import MIBMeta, ValueType, SubtreeMIBEntry
CHASSIS_INFO_KEY_TEMPLATE = 'chassis {}'
PSU_INFO_KEY_TEMPLATE = 'PSU {}'
PSU_PRESENCE_OK = 'true'
PSU_STATUS_OK = 'true'
@unique
class CHASSISInfoDB(str, Enum):
"""
CHASSIS info keys
"""
PSU_NUM = "psu_num"
@unique
class PSUInfoDB(str, Enum):
"""
PSU info keys
"""
PRESENCE = "presence"
STATUS = "status"
def get_chassis_data(chassis_info):
"""
:param chassis_info: chassis info dict
:return: tuple (psu_num) of chassis;
Empty string if field not in chassis_info
"""
return tuple(chassis_info.get(chassis_field.value, "") for chassis_field in CHASSISInfoDB)
def get_psu_data(psu_info):
"""
:param psu_info: psu info dict
:return: tuple (presence, status) of psu;
Empty string if field not in psu_info
"""
return tuple(psu_info.get(psu_field.value, "") for psu_field in PSUInfoDB)
class PowerStatusHandler:
"""
Class to handle the SNMP request
"""
def __init__(self):
"""
init the handler
"""
self.statedb = mibs.init_db()
self.statedb.connect(self.statedb.STATE_DB)
def _get_num_psus(self):
"""
Get PSU number
:return: the number of supported PSU
"""
chassis_name = CHASSIS_INFO_KEY_TEMPLATE.format(1)
chassis_info = self.statedb.get_all(self.statedb.STATE_DB, mibs.chassis_info_table(chassis_name))
num_psus = get_chassis_data(chassis_info)
return int(num_psus[0])
def _get_psu_presence(self, psu_index):
"""
Get PSU presence
:return: the presence of particular PSU
"""
psu_name = PSU_INFO_KEY_TEMPLATE.format(psu_index)
psu_info = self.statedb.get_all(self.statedb.STATE_DB, mibs.psu_info_table(psu_name))
presence, status = get_psu_data(psu_info)
return presence == PSU_PRESENCE_OK
def _get_psu_status(self, psu_index):
"""
Get PSU status
:return: the status of particular PSU
"""
psu_name = PSU_INFO_KEY_TEMPLATE.format(psu_index)
psu_info = self.statedb.get_all(self.statedb.STATE_DB, mibs.psu_info_table(psu_name))
presence, status = get_psu_data(psu_info)
return status == PSU_STATUS_OK
def _get_psu_index(self, sub_id):
"""
Get the PSU index from sub_id
:return: the index of supported PSU
"""
if not sub_id or len(sub_id) > 1:
return None
psu_index = int(sub_id[0])
try:
num_psus = self._get_num_psus()
except Exception:
# Any unexpected exception or error, log it and keep running
mibs.logger.exception("PowerStatusHandler._get_psu_index() caught an unexpected exception during _get_num_psus()")
return None
if psu_index < 1 or psu_index > num_psus:
return None
return psu_index
def get_next(self, sub_id):
"""
:param sub_id: The 1-based snmp sub-identifier query.
:return: the next sub id.
"""
if not sub_id:
return (1,)
psu_index = self._get_psu_index(sub_id)
try:
num_psus = self._get_num_psus()
except Exception:
# Any unexpected exception or error, log it and keep running
mibs.logger.exception("PowerStatusHandler.get_next() caught an unexpected exception during _get_num_psus()")
return None
if psu_index and psu_index + 1 <= num_psus:
return (psu_index + 1,)
return None
def get_psu_status(self, sub_id):
"""
:param sub_id: The 1-based sub-identifier query.
:return: the status of requested PSU according to cefcModuleOperStatus ModuleOperType
2 - PSU has correct functionalling - ok
7 - PSU has a problem with functionalling - failed
8 - the module is provisioned, but it is missing. This is a failure state.
:ref: https://www.cisco.com/c/en/us/td/docs/switches/wan/mgx/mgx_8850/software/mgx_r2-0-10/pxm/reference/guide/pxm/cscoent.html
"""
psu_index = self._get_psu_index(sub_id)
if not psu_index:
return None
try:
psu_presence = self._get_psu_presence(psu_index)
except Exception:
# Any unexpected exception or error, log it and keep running
mibs.logger.exception("PowerStatusHandler.get_psu_status() caught an unexpected exception during _get_psu_presence()")
return None
if psu_presence:
try:
psu_status = self._get_psu_status(psu_index)
except Exception:
# Any unexpected exception or error, log it and keep running
mibs.logger.exception("PowerStatusHandler.get_psu_status() caught an unexpected exception during _get_psu_status()")
return None
if psu_status:
return 2
return 7
else:
return 8
class cefcFruPowerStatusTable(metaclass=MIBMeta, prefix='.1.3.6.1.4.1.9.9.117.1.1.2'):
"""
'cefcFruPowerStatusTable' http://oidref.com/1.3.6.1.4.1.9.9.117.1.1.2
"""
power_status_handler = PowerStatusHandler()
# cefcFruPowerStatusTable = '1.3.6.1.4.1.9.9.117.1.1.2'
# csqIfQosGroupStatsEntry = '1.3.6.1.4.1.9.9.117.1.1.2.1'
psu_status = SubtreeMIBEntry('1.2', power_status_handler, ValueType.INTEGER, power_status_handler.get_psu_status)