Skip to content

Commit

Permalink
Enable archival config per domain (#1351)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Dec 21, 2018
1 parent f419d49 commit 0fe330b
Show file tree
Hide file tree
Showing 31 changed files with 1,836 additions and 148 deletions.
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

446 changes: 440 additions & 6 deletions .gen/go/shared/types.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (s *server) startService() common.Daemon {
s.cfg.ClustersInfo.CurrentClusterName,
s.cfg.ClustersInfo.ClusterInitialFailoverVersions,
s.cfg.ClustersInfo.ClusterAddress,
s.cfg.ClustersInfo.DeploymentGroup,
)
params.DispatcherProvider = client.NewIPYarpcDispatcherProvider()
// TODO: We need to switch Cadence to use zap logger, until then just pass zap.NewNop
Expand Down
14 changes: 13 additions & 1 deletion common/cluster/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type (
ClusterNameForFailoverVersion(failoverVersion int64) string
// GetAllClientAddress return the frontend address for each cluster name
GetAllClientAddress() map[string]config.Address
// GetDeploymentGroup returns the deployment group of cluster
GetDeploymentGroup() string
}

metadataImpl struct {
Expand All @@ -68,21 +70,25 @@ type (
initialFailoverVersionClusters map[int64]string
// clusterToAddress contains the cluster name to corresponding frontend client
clusterToAddress map[string]config.Address
// deploymentGroup is the deployment group name of cluster
deploymentGroup string
}
)

// NewMetadata create a new instance of Metadata
func NewMetadata(enableGlobalDomain dynamicconfig.BoolPropertyFn, failoverVersionIncrement int64,
masterClusterName string, currentClusterName string,
clusterInitialFailoverVersions map[string]int64,
clusterToAddress map[string]config.Address) Metadata {
clusterToAddress map[string]config.Address, deploymentGroup string) Metadata {

if len(clusterInitialFailoverVersions) < 0 {
panic("Empty initial failover versions for cluster")
} else if len(masterClusterName) == 0 {
panic("Master cluster name is empty")
} else if len(currentClusterName) == 0 {
panic("Current cluster name is empty")
} else if len(deploymentGroup) == 0 {
panic("Deployment group name is empty")
}
initialFailoverVersionClusters := make(map[int64]string)
for clusterName, initialFailoverVersion := range clusterInitialFailoverVersions {
Expand Down Expand Up @@ -120,6 +126,7 @@ func NewMetadata(enableGlobalDomain dynamicconfig.BoolPropertyFn, failoverVersio
clusterInitialFailoverVersions: clusterInitialFailoverVersions,
initialFailoverVersionClusters: initialFailoverVersionClusters,
clusterToAddress: clusterToAddress,
deploymentGroup: deploymentGroup,
}
}

Expand Down Expand Up @@ -189,3 +196,8 @@ func (metadata *metadataImpl) ClusterNameForFailoverVersion(failoverVersion int6
func (metadata *metadataImpl) GetAllClientAddress() map[string]config.Address {
return metadata.clusterToAddress
}

// GetDeploymentGroup returns the deployment group name for cluster
func (metadata *metadataImpl) GetDeploymentGroup() string {
return metadata.deploymentGroup
}
3 changes: 3 additions & 0 deletions common/cluster/metadataTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
TestCurrentClusterFrontendAddress = "127.0.0.1:7104"
// TestAlternativeClusterFrontendAddress is the ip port address of alternative cluster
TestAlternativeClusterFrontendAddress = "127.0.0.1:8104"
// TestDeploymentGroup is alternative deployment group used for test
TestDeploymentGroup = "test"
)

var (
Expand Down Expand Up @@ -69,5 +71,6 @@ func GetTestClusterMetadata(enableGlobalDomain bool, isMasterCluster bool) Metad
TestCurrentClusterName: config.Address{RPCName: common.FrontendServiceName, RPCAddress: TestCurrentClusterFrontendAddress},
TestAlternativeClusterName: config.Address{RPCName: common.FrontendServiceName, RPCAddress: TestAlternativeClusterFrontendAddress},
},
TestDeploymentGroup,
)
}
5 changes: 5 additions & 0 deletions common/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ func ChildWorkflowExecutionFailedCausePtr(t s.ChildWorkflowExecutionFailedCause)
return &t
}

// ArchivalStatusPtr makes a copy and returns the pointer to an ArchivalStatus.
func ArchivalStatusPtr(t s.ArchivalStatus) *s.ArchivalStatus {
return &t
}

// StringDefault returns value if string pointer is set otherwise default value of string
func StringDefault(v *string) string {
var defaultString string
Expand Down
14 changes: 14 additions & 0 deletions common/mocks/ClusterMetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@ func (_m *ClusterMetadata) GetMasterClusterName() string {
return r0
}

// GetDeploymentGroup provides a mock function with given fields:
func (_m *ClusterMetadata) GetDeploymentGroup() string {
ret := _m.Called()

var r0 string
if rf, ok := ret.Get(0).(func() string); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(string)
}

return r0
}

// GetNextFailoverVersion provides a mock function with given fields: _a0, _a1
func (_m *ClusterMetadata) GetNextFailoverVersion(_a0 string, _a1 int64) int64 {
ret := _m.Called(_a0, _a1)
Expand Down
13 changes: 11 additions & 2 deletions common/persistence/cassandra/cassandraMetadataPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ const (

templateDomainConfigType = `{` +
`retention: ?, ` +
`emit_metric: ?` +
`emit_metric: ?, ` +
`archival_bucket: ?, ` +
`archival_status: ?` +
`}`

templateDomainReplicationConfigType = `{` +
Expand All @@ -64,6 +66,7 @@ const (

templateGetDomainByNameQuery = `SELECT domain.id, domain.name, domain.status, domain.description, ` +
`domain.owner_email, domain.data, config.retention, config.emit_metric, ` +
`config.archival_bucket, config.archival_status, ` +
`replication_config.active_cluster_name, replication_config.clusters, ` +
`is_global_domain, ` +
`config_version, ` +
Expand Down Expand Up @@ -152,6 +155,8 @@ func (m *cassandraMetadataPersistence) CreateDomain(request *p.CreateDomainReque
request.Info.Data,
request.Config.Retention,
request.Config.EmitMetric,
request.Config.ArchivalBucket,
request.Config.ArchivalStatus,
request.ReplicationConfig.ActiveClusterName,
p.SerializeClusterConfigs(request.ReplicationConfig.Clusters),
request.IsGlobalDomain,
Expand Down Expand Up @@ -245,6 +250,8 @@ func (m *cassandraMetadataPersistence) GetDomain(request *p.GetDomainRequest) (*
&info.Data,
&config.Retention,
&config.EmitMetric,
&config.ArchivalBucket,
&config.ArchivalStatus,
&replicationConfig.ActiveClusterName,
&replicationClusters,
&isGlobalDomain,
Expand Down Expand Up @@ -289,6 +296,8 @@ func (m *cassandraMetadataPersistence) UpdateDomain(request *p.UpdateDomainReque
request.Info.Data,
request.Config.Retention,
request.Config.EmitMetric,
request.Config.ArchivalBucket,
request.Config.ArchivalStatus,
request.ReplicationConfig.ActiveClusterName,
p.SerializeClusterConfigs(request.ReplicationConfig.Clusters),
request.ConfigVersion,
Expand Down Expand Up @@ -330,7 +339,7 @@ func (m *cassandraMetadataPersistence) DeleteDomain(request *p.DeleteDomainReque
func (m *cassandraMetadataPersistence) DeleteDomainByName(request *p.DeleteDomainByNameRequest) error {
var ID string
query := m.session.Query(templateGetDomainByNameQuery, request.Name)
err := query.Scan(&ID, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
err := query.Scan(&ID, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
if err != nil {
if err == gocql.ErrNotFound {
return nil
Expand Down
13 changes: 11 additions & 2 deletions common/persistence/cassandra/cassandraMetadataPersistenceV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (

templateGetDomainByNameQueryV2 = `SELECT domain.id, domain.name, domain.status, domain.description, ` +
`domain.owner_email, domain.data, config.retention, config.emit_metric, ` +
`config.archival_bucket, config.archival_status, ` +
`replication_config.active_cluster_name, replication_config.clusters, ` +
`is_global_domain, ` +
`config_version, ` +
Expand Down Expand Up @@ -78,6 +79,7 @@ const (

templateListDomainQueryV2 = `SELECT name, domain.id, domain.name, domain.status, domain.description, ` +
`domain.owner_email, domain.data, config.retention, config.emit_metric, ` +
`config.archival_bucket, config.archival_status, ` +
`replication_config.active_cluster_name, replication_config.clusters, ` +
`is_global_domain, ` +
`config_version, ` +
Expand Down Expand Up @@ -157,6 +159,8 @@ func (m *cassandraMetadataPersistenceV2) CreateDomain(request *p.CreateDomainReq
request.Info.Data,
request.Config.Retention,
request.Config.EmitMetric,
request.Config.ArchivalBucket,
request.Config.ArchivalStatus,
request.ReplicationConfig.ActiveClusterName,
p.SerializeClusterConfigs(request.ReplicationConfig.Clusters),
request.IsGlobalDomain,
Expand Down Expand Up @@ -213,6 +217,8 @@ func (m *cassandraMetadataPersistenceV2) UpdateDomain(request *p.UpdateDomainReq
request.Info.Data,
request.Config.Retention,
request.Config.EmitMetric,
request.Config.ArchivalBucket,
request.Config.ArchivalStatus,
request.ReplicationConfig.ActiveClusterName,
p.SerializeClusterConfigs(request.ReplicationConfig.Clusters),
request.ConfigVersion,
Expand Down Expand Up @@ -303,6 +309,8 @@ func (m *cassandraMetadataPersistenceV2) GetDomain(request *p.GetDomainRequest)
&info.Data,
&config.Retention,
&config.EmitMetric,
&config.ArchivalBucket,
&config.ArchivalStatus,
&replicationConfig.ActiveClusterName,
&replicationClusters,
&isGlobalDomain,
Expand Down Expand Up @@ -360,12 +368,13 @@ func (m *cassandraMetadataPersistenceV2) ListDomains(request *p.ListDomainsReque
&name,
&domain.Info.ID, &domain.Info.Name, &domain.Info.Status, &domain.Info.Description, &domain.Info.OwnerEmail, &domain.Info.Data,
&domain.Config.Retention, &domain.Config.EmitMetric,
&domain.Config.ArchivalBucket, &domain.Config.ArchivalStatus,
&domain.ReplicationConfig.ActiveClusterName, &replicationClusters,
&domain.IsGlobalDomain, &domain.ConfigVersion, &domain.FailoverVersion,
&domain.FailoverNotificationVersion, &domain.NotificationVersion,
) {
if name != domainMetadataRecordName {
// do not inlcude the metadata record
// do not include the metadata record
if domain.Info.Data == nil {
domain.Info.Data = map[string]string{}
}
Expand Down Expand Up @@ -411,7 +420,7 @@ func (m *cassandraMetadataPersistenceV2) DeleteDomain(request *p.DeleteDomainReq
func (m *cassandraMetadataPersistenceV2) DeleteDomainByName(request *p.DeleteDomainByNameRequest) error {
var ID string
query := m.session.Query(templateGetDomainByNameQueryV2, constDomainPartition, request.Name)
err := query.Scan(&ID, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
err := query.Scan(&ID, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
if err != nil {
if err == gocql.ErrNotFound {
return nil
Expand Down
6 changes: 4 additions & 2 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -969,8 +969,10 @@ type (
// DomainConfig describes the domain configuration
DomainConfig struct {
// NOTE: this retention is in days, not in seconds
Retention int32
EmitMetric bool
Retention int32
EmitMetric bool
ArchivalBucket string
ArchivalStatus workflow.ArchivalStatus
}

// DomainReplicationConfig describes the cross DC domain replication configuration
Expand Down
Loading

0 comments on commit 0fe330b

Please sign in to comment.