Skip to content

Commit

Permalink
[sonic-ycabled] fix grpc logic for timeout,cli HWSTATUS value retriva…
Browse files Browse the repository at this point in the history
…l logic for active-active cable (#264)

Signed-off-by: vaibhav-dahiya [email protected]
This PR adds some improvement logic to ycabled daemon

correct the loopback IP's for active-active cable type for libra gRPC read_side establishment
improves the code for timeout value injection for different RPC's for gRPC
for cli HW_STATUS the daemon tries to use cached old value, if not present try to do RPC for getting ForwardingState with timeout of 0.1 ms
Description
Motivation and Context
Required for gRPC logic improvement in few active-active scenario's

How Has This Been Tested?
UT and deploying the changes in actual testbed


Signed-off-by: vaibhav-dahiya <[email protected]>
  • Loading branch information
vdahiya12 authored Jun 16, 2022
1 parent 0d90023 commit ec84af4
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 20 deletions.
62 changes: 61 additions & 1 deletion sonic-ycabled/tests/test_y_cable_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4699,7 +4699,7 @@ def test_handle_show_mux_state_cmd_arg_tbl_notification_no_instance(self, mock_s
@patch('ycable.ycable_utilities.y_cable_helper.check_mux_cable_port_type', MagicMock(return_value=(True,"active-standby")))
@patch('os.path.isfile', MagicMock(return_value=True))
@patch('time.sleep', MagicMock(return_value=True))
def test_handle_show_mux_state_cmd_arg_tbl_notification_with_instance_auto(self, mock_swsscommon_table, platform_sfputil):
def test_handle_show_mux_state_cmd_arg_tbl_notification_with_instance_auto_active_standby(self, mock_swsscommon_table, platform_sfputil):

mock_table = MagicMock()
mock_table.get = MagicMock(
Expand Down Expand Up @@ -4747,6 +4747,66 @@ def get_mux_direction(self):
fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port)
assert(rc == None)

@patch('swsscommon.swsscommon.Table')
@patch('ycable.ycable_utilities.y_cable_helper.y_cable_platform_sfputil')
@patch('ycable.ycable_utilities.y_cable_helper.get_ycable_physical_port_from_logical_port', MagicMock(return_value=(0)))
@patch('ycable.ycable_utilities.y_cable_helper.logical_port_name_to_physical_port_list', MagicMock(return_value=[0]))
@patch('ycable.ycable_utilities.y_cable_helper.y_cable_wrapper_get_presence', MagicMock(return_value=True))
@patch('ycable.ycable_utilities.y_cable_helper.check_mux_cable_port_type', MagicMock(return_value=(True,"active-active")))
def test_handle_show_mux_state_cmd_arg_tbl_notification_with_instance_auto_active_active_none(self, mock_swsscommon_table, platform_sfputil):

mock_table = MagicMock()
mock_table.get = MagicMock(
side_effect=[(True, (('state', "auto"), ("soc_ipv4", "192.168.0.1/32"))), (True, (('index', 2), ))])
mock_swsscommon_table.return_value = mock_table

xcvrd_config_hwmode_state_cmd_sts_tbl = mock_swsscommon_table
xcvrd_config_hwmode_state_rsp_tbl = mock_swsscommon_table
xcvrd_show_hwmode_dir_cmd_sts_tbl = mock_swsscommon_table
xcvrd_show_hwmode_dir_rsp_tbl = mock_swsscommon_table
xcvrd_show_hwmode_dir_res_tbl = mock_swsscommon_table
port_tbl = mock_swsscommon_table
asic_index = 0
port = "Ethernet0"
platform_sfputil.get_asic_id_for_logical_port = 0
fvp = {"state": "active"}
swsscommon.Table.return_value.get.return_value = (
True, {"read_side": "2", "state": "active"})

rc = handle_show_hwmode_state_cmd_arg_tbl_notification(
fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port)
assert(rc == None)

@patch('swsscommon.swsscommon.Table')
@patch('ycable.ycable_utilities.y_cable_helper.y_cable_platform_sfputil')
@patch('ycable.ycable_utilities.y_cable_helper.get_ycable_physical_port_from_logical_port', MagicMock(return_value=(0)))
@patch('ycable.ycable_utilities.y_cable_helper.logical_port_name_to_physical_port_list', MagicMock(return_value=[0]))
@patch('ycable.ycable_utilities.y_cable_helper.y_cable_wrapper_get_presence', MagicMock(return_value=True))
@patch('ycable.ycable_utilities.y_cable_helper.check_mux_cable_port_type', MagicMock(return_value=(True,"active-active")))
def test_handle_show_mux_state_cmd_arg_tbl_notification_with_instance_auto_active_active(self, mock_swsscommon_table, platform_sfputil):

mock_table = MagicMock()
mock_table.get = MagicMock(
side_effect=[(True, (('state', "auto"), ("soc_ipv4", "192.168.0.1/32"))), (True, (('index', 2), ))])
mock_swsscommon_table.return_value = mock_table

xcvrd_config_hwmode_state_cmd_sts_tbl = mock_swsscommon_table
xcvrd_config_hwmode_state_rsp_tbl = mock_swsscommon_table
xcvrd_show_hwmode_dir_cmd_sts_tbl = mock_swsscommon_table
xcvrd_show_hwmode_dir_rsp_tbl = mock_swsscommon_table
xcvrd_show_hwmode_dir_res_tbl = mock_swsscommon_table
port_tbl = mock_swsscommon_table
asic_index = 0
port = "Ethernet0"
platform_sfputil.get_asic_id_for_logical_port = 0
fvp = {"state": "active"}
swsscommon.Table.return_value.get.return_value = (
False, {"read_side": "2", "state": "active"})

rc = handle_show_hwmode_state_cmd_arg_tbl_notification(
fvp, port_tbl, xcvrd_show_hwmode_dir_cmd_sts_tbl, xcvrd_show_hwmode_dir_rsp_tbl, xcvrd_show_hwmode_dir_res_tbl, asic_index, port)
assert(rc == -1)

def test_retry_setup_grpc_channel_for_port_incorrect(self):

status = True
Expand Down
58 changes: 39 additions & 19 deletions sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@

SELECT_TIMEOUT = 1000

#gRPC timeouts for RPC
QUERY_ADMIN_FORWARDING_TIMEOUT = 0.1
SET_ADMIN_FORWARDING_TIMEOUT = 0.5

y_cable_platform_sfputil = None
y_cable_platform_chassis = None
y_cable_is_platform_vs = None
Expand All @@ -46,8 +50,8 @@

LOOPBACK_INTERFACE_T0 = "10.212.64.1/32"
LOOPBACK_INTERFACE_LT0 = "10.212.64.2/32"
LOOPBACK_INTERFACE_T0_NIC = "10.1.0.37/32"
LOOPBACK_INTERFACE_LT0_NIC = "10.1.0.38/32"
LOOPBACK_INTERFACE_T0_NIC = "10.1.0.38/32"
LOOPBACK_INTERFACE_LT0_NIC = "10.1.0.39/32"
# rename and put in right place
# port id 0 -> maps to T0
# port id 1 -> maps to LT0
Expand Down Expand Up @@ -321,6 +325,9 @@ def wrapper(*args, **kwargs):

def retry_setup_grpc_channel_for_port(port, asic_index):

global grpc_port_stubs
global grpc_port_channels

config_db, port_tbl = {}, {}
namespaces = multi_asic.get_front_end_namespaces()
for namespace in namespaces:
Expand Down Expand Up @@ -354,7 +361,6 @@ def retry_setup_grpc_channel_for_port(port, asic_index):
return True

def setup_grpc_channel_for_port(port, soc_ip):
"TODO make these configurable like RESTAPI"
"""
root_cert = open('/etc/sonic/credentials/ca-chain-bundle.cert.pem', 'rb').read()
key = open('/etc/sonic/credentials/client.key.pem', 'rb').read()
Expand Down Expand Up @@ -386,9 +392,9 @@ def setup_grpc_channel_for_port(port, soc_ip):
stub = None

if stub is None:
helper_logger.log_warning("stub was not setup gRPC ip {} port {}, no gRPC server running ".format(soc_ip, port))
helper_logger.log_warning("stub was not setup for gRPC soc ip {} port {}, no gRPC soc server running ?".format(soc_ip, port))
if channel is None:
helper_logger.log_warning("channel was not setup gRPC ip {} port {}, no gRPC server running".format(soc_ip, port))
helper_logger.log_warning("channel was not setup for gRPC soc ip {} port {}, no gRPC server running ?".format(soc_ip, port))

return channel, stub

Expand Down Expand Up @@ -536,18 +542,23 @@ def setup_grpc_channels(stop_event):
"Could not retreive port inside config_db PORT table {} for gRPC channel initiation".format(logical_port_name))


def try_grpc(callback, *args, **kwargs):
def try_grpc(callback, rpc_timeout, *args, **kwargs):
"""
Handy function to invoke the callback and catch NotImplementedError
:param callback: Callback to be invoked
:param rpc_timeout: timeout for RPC in seconds
:param args: Arguments to be passed to callback
:param kwargs: Default return value if exception occur
:return: Default return value if exception occur else return value of the callback
"""

return_val = True
try:
resp = callback(*args)
if rpc_timeout is not None:
resp = callback(*args, timeout=rpc_timeout)
else:
resp = callback(*args)

if resp is None:
return_val = False
except grpc.RpcError as e:
Expand All @@ -557,7 +568,9 @@ def try_grpc(callback, *args, **kwargs):
elif e.code() == grpc.StatusCode.UNAVAILABLE:
helper_logger.log_notice("rpc unavailable for port= {}".format(str(e.code())))
elif e.code() == grpc.StatusCode.INVALID_ARGUMENT:
helper_logger.log_notice("rpc invalid for port= {}".format(str(e.code())))
helper_logger.log_notice("rpc invalid arguement for port= {}".format(str(e.code())))
elif e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
helper_logger.log_notice("rpc timeout exceeded for port= {} timeout = {}".format(str(e.code()), rpc_timeout))
else:
helper_logger.log_notice("rpc exception error for port= {}".format(str(e.code())))
resp = None
Expand Down Expand Up @@ -2919,9 +2932,15 @@ def handle_show_hwmode_state_cmd_arg_tbl_notification(fvp, port_tbl, xcvrd_show_
helper_logger.log_warning("Could not retreive fieldvalue pairs for {}, inside state_db table while responding to cli cmd show mux status {}".format(
port, hw_mux_cable_tbl[asic_index].getTableName()))
set_result_and_delete_port('state', 'unknown', xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port)
return -1

mux_port_dict = dict(fv)
read_side = mux_port_dict.get("read_side", None)
state = mux_port_dict.get("state", None)
if state is not None:
set_result_and_delete_port('state', state, xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port)
return

helper_logger.log_debug("Y_CABLE_DEBUG:before invoking RPC fwd_state read_side = {}".format(read_side))
# TODO state only for dummy value in this request MSG remove this
request = linkmgr_grpc_driver_pb2.AdminRequest(portid=[int(read_side), 1 - int(read_side)], state=[0, 0])
Expand All @@ -2930,16 +2949,12 @@ def handle_show_hwmode_state_cmd_arg_tbl_notification(fvp, port_tbl, xcvrd_show_

stub = grpc_port_stubs.get(port, None)
if stub is None:
helper_logger.log_notice("stub is None for getting forwarding state RPC port {}".format(port))
retry_setup_grpc_channel_for_port(port, asic_index)
stub = grpc_port_stubs.get(port, None)
if stub is None:
helper_logger.log_warning(
"stub was None for performing cli fwd mux RPC port {}, setting it up again did not work".format(port))
# no need to retry setup channels for mux cli hw mode command
helper_logger.log_warning("stub is None for getting forwarding state RPC port for cli query {}".format(port))
set_result_and_delete_port('state', 'unknown', xcvrd_show_hwmode_dir_cmd_sts_tbl[asic_index], xcvrd_show_hwmode_dir_rsp_tbl[asic_index], port)
return

ret, response = try_grpc(stub.QueryAdminForwardingPortState, request, timeout=0.1)
ret, response = try_grpc(stub.QueryAdminForwardingPortState, QUERY_ADMIN_FORWARDING_TIMEOUT , request)

(self_state, peer_state) = parse_grpc_response_forwarding_state(ret, response, read_side)
state = self_state
Expand Down Expand Up @@ -3041,7 +3056,7 @@ def handle_fwd_state_command_grpc_notification(fvp_m, hw_mux_cable_tbl, fwd_stat
peer_state = "unknown"
stub = grpc_port_stubs.get(port, None)
if stub is None:
helper_logger.log_notice("stub is None for getting forwarding state RPC port {}".format(port))
helper_logger.log_notice("stub is None for getting admin port forwarding state RPC port {}".format(port))
retry_setup_grpc_channel_for_port(port, asic_index)
stub = grpc_port_stubs.get(port, None)
if stub is None:
Expand All @@ -3052,7 +3067,7 @@ def handle_fwd_state_command_grpc_notification(fvp_m, hw_mux_cable_tbl, fwd_stat
fwd_state_response_tbl[asic_index].set(port, fvs_updated)
return

ret, response = try_grpc(stub.QueryAdminForwardingPortState, request, timeout=0.1)
ret, response = try_grpc(stub.QueryAdminForwardingPortState, QUERY_ADMIN_FORWARDING_TIMEOUT, request)

(self_state, peer_state) = parse_grpc_response_forwarding_state(ret, response, read_side)
if response is not None:
Expand Down Expand Up @@ -3117,7 +3132,7 @@ def handle_hw_mux_cable_table_grpc_notification(fvp, hw_mux_cable_tbl, asic_inde
state_req = 0

helper_logger.log_notice(
"calling RPC for hw mux_cable set state state peer = {} portid {} Ethernet port".format(peer, port))
"calling RPC for hw mux_cable set state state peer = {} portid Ethernet port {}".format(peer, port))

request = linkmgr_grpc_driver_pb2.AdminRequest(portid=[curr_read_side], state=[state_req])

Expand All @@ -3131,7 +3146,7 @@ def handle_hw_mux_cable_table_grpc_notification(fvp, hw_mux_cable_tbl, asic_inde
"stub was None for performing hw mux RPC port {}, setting it up again did not work".format(port))
return

ret, response = try_grpc(stub.SetAdminForwardingPortState, request, timeout=0.1)
ret, response = try_grpc(stub.SetAdminForwardingPortState, SET_ADMIN_FORWARDING_TIMEOUT, request)
if response is not None:
# Debug only, remove this section once Server side is Finalized
hw_response_port_ids = response.portid
Expand Down Expand Up @@ -3737,6 +3752,11 @@ def task_cli_worker(self):

fvp_dict = dict(fvp_m)

(status, cable_type) = check_mux_cable_port_type(port, port_tbl, asic_index)

if status is False or cable_type != "active-standby":
break

if "state" in fvp_dict:
# check if xcvrd got a probe command
port_instance = get_ycable_port_instance_from_logical_port(port)
Expand Down

0 comments on commit ec84af4

Please sign in to comment.