Skip to content

Commit

Permalink
Add ZeroMQ communication channel between sairedis and syncd (#659)
Browse files Browse the repository at this point in the history
* [sairedis] Use separate db connector for VID index generator

* [sairedis] Add Channel class

* [sairedis] Start using Channel class in RedisChannel

* [sairedis] Add zmq configuration to ContextConfig

* [sairedis] Change reddis channel class to base channel class

* [syncd] Fix aspell

* [sairedis] Disable zeromq by default

* [sairedis] Add ZeroMQChannel class

* Update aspell

* [syncd] Add NotificationProducerBase class

* [syncd] Add RedisNotificationProducer class

* [syncd] Start using RediisNotificationProducer class

* [syncd] Add ZeroMQNotificationProducer class

* [syncd] Start uisng ZeroMQNotificationProducer class

* [sairedis] Start using ZeroMQChannel clas

* [saiplayer] Use lib zmq in saiplayer Makefile

* [tests] Add libzmq to makefile

* [sairedis] Force sync mode when zmq enabled

* [syncd] Force sync mode when zmq enabled

* [syncd] Add libzmq to makefile

* [syncd] Add SelectableChannel class

* [syncd] Add RedisSelectableChannel class

* [syncd] Start using SelectableChannel base

* [saidump] Add libzmq to makefile

* [syncd] Add ZeroMQSelectableChannel class

* [syncd] Start using ZeroMQSelectableChannel

* [sairedis] Remove unused includes from Channel class

* [sairedis] Fix ZeroMQChannel error checks

* [syncd] Use zmq_poll in separate thread to ignore edge trigger poll

* [sairedis] Fix aspell

* [tests] Update aspell dict

* [tests] Add zmq channel unittests

* [syncd] Fix ZeroMQNotificationProducer connect error code

* Update aspell

* [syncd] Increase ZeroMQSelectableChannel timeout to 2min

* Update aspell

* Modify libzmq order in Makefiles

* [syncd] Fix merge issues
  • Loading branch information
kcudnik authored Oct 16, 2020
1 parent 017056a commit 1d84b90
Show file tree
Hide file tree
Showing 39 changed files with 1,495 additions and 169 deletions.
76 changes: 76 additions & 0 deletions lib/inc/Channel.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#pragma once

#include "swss/producertable.h"
#include "swss/consumertable.h"
#include "swss/notificationconsumer.h"
#include "swss/selectableevent.h"
#include "swss/sal.h"

extern "C" {
#include "sai.h"
}

#include <memory>
#include <functional>

namespace sairedis
{
class Channel
{
public:

typedef std::function<void(const std::string&,const std::string&, const std::vector<swss::FieldValueTuple>&)> Callback;

public:

Channel(
_In_ Callback callback);

virtual ~Channel();

public:

virtual void setBuffered(
_In_ bool buffered) = 0;

virtual void flush() = 0;

virtual void set(
_In_ const std::string& key,
_In_ const std::vector<swss::FieldValueTuple>& values,
_In_ const std::string& command) = 0;

virtual void del(
_In_ const std::string& key,
_In_ const std::string& command) = 0;

virtual sai_status_t wait(
_In_ const std::string& command,
_Out_ swss::KeyOpFieldsValuesTuple& kco) = 0;

protected:

virtual void notificationThreadFunction() = 0;

protected:

Callback m_callback;

protected: // notification

/**
* @brief Indicates whether notification thread should be running.
*/
volatile bool m_runNotificationThread;

/**
* @brief Event used to nice end notifications thread.
*/
swss::SelectableEvent m_notificationThreadShouldEndEvent;

/**
* @brief Notification thread
*/
std::shared_ptr<std::thread> m_notificationThread;
};
}
6 changes: 6 additions & 0 deletions lib/inc/ContextConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ namespace sairedis

std::string m_dbState;

bool m_zmqEnable;

std::string m_zmqEndpoint;

std::string m_zmqNtfEndpoint;

std::shared_ptr<SwitchConfigContainer> m_scc;
};
}
51 changes: 15 additions & 36 deletions lib/inc/RedisChannel.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "Channel.h"
#include "RemoteSaiInterface.h"
#include "SwitchContainer.h"
#include "VirtualObjectIdManager.h"
Expand All @@ -17,49 +18,42 @@

namespace sairedis
{
class RedisChannel
class RedisChannel:
public Channel
{
public:

typedef std::function<void(const std::string&,const std::string&, const std::vector<swss::FieldValueTuple>&)> Callback;

public:

RedisChannel(
_In_ const std::string& dbAsic,
_In_ Callback callback);
_In_ Channel::Callback callback);

virtual ~RedisChannel();

public:

std::shared_ptr<swss::DBConnector> getDbConnector() const;

void setBuffered(
_In_ bool buffered);
virtual void setBuffered(
_In_ bool buffered) override;

void flush();
virtual void flush() override;

void set(
virtual void set(
_In_ const std::string& key,
_In_ const std::vector<swss::FieldValueTuple>& values,
_In_ const std::string& command);
_In_ const std::string& command) override;

void del(
virtual void del(
_In_ const std::string& key,
_In_ const std::string& command);
_In_ const std::string& command) override;

sai_status_t wait(
virtual sai_status_t wait(
_In_ const std::string& command,
_Out_ swss::KeyOpFieldsValuesTuple& kco);

private:

void notificationThreadFunction();
_Out_ swss::KeyOpFieldsValuesTuple& kco) override;

private:
protected:

Callback m_callback;
virtual void notificationThreadFunction() override;

private:

Expand All @@ -85,11 +79,6 @@ namespace sairedis

private: // notification

/**
* @brief Indicates whether notification thread should be running.
*/
volatile bool m_runNotificationThread;

/**
* @brief Database connector used for notifications.
*/
Expand All @@ -99,15 +88,5 @@ namespace sairedis
* @brief Notification consumer.
*/
std::shared_ptr<swss::NotificationConsumer> m_notificationConsumer;

/**
* @brief Event used to nice end notifications thread.
*/
swss::SelectableEvent m_notificationThreadShouldEndEvent;

/**
* @brief Notification thread
*/
std::shared_ptr<std::thread> m_notificationThread;
};
}
15 changes: 6 additions & 9 deletions lib/inc/RedisRemoteSaiInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "SkipRecordAttrContainer.h"
#include "RedisChannel.h"
#include "SwitchConfigContainer.h"
#include "ContextConfig.h"

#include "swss/producertable.h"
#include "swss/consumertable.h"
Expand Down Expand Up @@ -72,9 +73,7 @@ namespace sairedis
public:

RedisRemoteSaiInterface(
_In_ uint32_t globalContext,
_In_ std::shared_ptr<SwitchConfigContainer> scc,
_In_ const std::string& dbAsic,
_In_ std::shared_ptr<ContextConfig> contextConfig,
_In_ std::function<sai_switch_notifications_t(std::shared_ptr<Notification>)> notificationCallback,
_In_ std::shared_ptr<Recorder> recorder);

Expand Down Expand Up @@ -438,9 +437,7 @@ namespace sairedis

private:

uint32_t m_globalContext;

std::shared_ptr<SwitchConfigContainer> m_switchConfigContainer;
std::shared_ptr<ContextConfig> m_contextConfig;

bool m_asicInitViewMode;

Expand All @@ -456,18 +453,18 @@ namespace sairedis

std::shared_ptr<VirtualObjectIdManager> m_virtualObjectIdManager;

std::shared_ptr<swss::DBConnector> m_db;

std::shared_ptr<RedisVidIndexGenerator> m_redisVidIndexGenerator;

std::weak_ptr<saimeta::Meta> m_meta;

std::shared_ptr<SkipRecordAttrContainer> m_skipRecordAttrContainer;

std::shared_ptr<RedisChannel> m_redisChannel;
std::shared_ptr<Channel> m_communicationChannel;

std::function<sai_switch_notifications_t(std::shared_ptr<Notification>)> m_notificationCallback;

std::string m_dbAsic;

std::map<sai_object_id_t, swss::TableDump> m_tableDump;
};
}
68 changes: 68 additions & 0 deletions lib/inc/ZeroMQChannel.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#pragma once

#include "Channel.h"

#include "swss/producertable.h"
#include "swss/consumertable.h"
#include "swss/notificationconsumer.h"
#include "swss/selectableevent.h"

#include <memory>
#include <functional>

namespace sairedis
{
class ZeroMQChannel:
public Channel
{
public:

ZeroMQChannel(
_In_ const std::string& endpoint,
_In_ const std::string& ntfEndpoint,
_In_ Channel::Callback callback);

virtual ~ZeroMQChannel();

public:

virtual void setBuffered(
_In_ bool buffered) override;

virtual void flush() override;

virtual void set(
_In_ const std::string& key,
_In_ const std::vector<swss::FieldValueTuple>& values,
_In_ const std::string& command) override;

virtual void del(
_In_ const std::string& key,
_In_ const std::string& command) override;

virtual sai_status_t wait(
_In_ const std::string& command,
_Out_ swss::KeyOpFieldsValuesTuple& kco) override;

protected:

virtual void notificationThreadFunction() override;

private:

std::string m_endpoint;

std::string m_ntfEndpoint;

std::vector<uint8_t> m_buffer;

void* m_context;

void* m_socket;

void* m_ntfContext;

void* m_ntfSocket;

};
}
21 changes: 21 additions & 0 deletions lib/src/Channel.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#include "Channel.h"

#include "swss/logger.h"

using namespace sairedis;

Channel::Channel(
_In_ Callback callback):
m_callback(callback)
{
SWSS_LOG_ENTER();

// empty
}

Channel::~Channel()
{
SWSS_LOG_ENTER();

// empty
}
4 changes: 1 addition & 3 deletions lib/src/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ Context::Context(

// will create notification thread
m_redisSai = std::make_shared<RedisRemoteSaiInterface>(
m_contextConfig->m_guid,
m_contextConfig->m_scc,
m_contextConfig->m_dbAsic,
m_contextConfig,
std::bind(&Context::handle_notification, this, _1),
m_recorder);

Expand Down
3 changes: 2 additions & 1 deletion lib/src/ContextConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ ContextConfig::ContextConfig(
m_dbAsic(dbAsic),
m_dbCounters(dbCounters),
m_dbFlex(dbFlex),
m_dbState(dbState)
m_dbState(dbState),
m_zmqEnable(false)
{
SWSS_LOG_ENTER();

Expand Down
13 changes: 11 additions & 2 deletions lib/src/ContextConfigContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ std::shared_ptr<ContextConfigContainer> ContextConfigContainer::getDefault()
auto sc = std::make_shared<SwitchConfig>(0, "");

cc->insert(sc);

ccc->m_map[0] = cc;

return ccc;
}

Expand Down Expand Up @@ -113,6 +113,15 @@ std::shared_ptr<ContextConfigContainer> ContextConfigContainer::loadFromFile(

auto cc = std::make_shared<ContextConfig>(guid, name, dbAsic, dbCounters, dbFlex, dbState);

cc->m_zmqEnable = item["zmq_enable"];
cc->m_zmqEndpoint = item["zmq_endpoint"];
cc->m_zmqNtfEndpoint = item["zmq_ntf_endpoint"];

SWSS_LOG_NOTICE("contextConfig zmq enable %s, endpoint: %s, ntf endpoint: %s",
(cc->m_zmqEnable) ? "true" : "false",
cc->m_zmqEndpoint.c_str(),
cc->m_zmqNtfEndpoint.c_str());

for (size_t k = 0; k < item["switches"].size(); k++)
{
json& sw = item["switches"][k];
Expand Down
4 changes: 3 additions & 1 deletion lib/src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ lib_LTLIBRARIES = libsairedis.la

noinst_LIBRARIES = libSaiRedis.a
libSaiRedis_a_SOURCES = \
ZeroMQChannel.cpp \
Channel.cpp \
Context.cpp \
ContextConfigContainer.cpp \
ContextConfig.cpp \
Expand Down Expand Up @@ -92,7 +94,7 @@ bin_PROGRAMS = tests

tests_SOURCES = tests.cpp
tests_CPPFLAGS = $(DBGFLAGS) $(AM_CPPFLAGS) $(CFLAGS_COMMON)
tests_LDADD = -lhiredis -lswsscommon -lpthread $(top_srcdir)/meta/libsaimetadata.la $(top_srcdir)/meta/libsaimeta.la libsairedis.la
tests_LDADD = -lhiredis -lswsscommon -lpthread $(top_srcdir)/meta/libsaimetadata.la $(top_srcdir)/meta/libsaimeta.la libsairedis.la -lzmq

TESTS = tests

Loading

0 comments on commit 1d84b90

Please sign in to comment.