Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

channelz: pass parent pointer instead of parent ID to RegisterSubChannel #7101

Merged
merged 1 commit into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions channelz/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (s) TestGetChannel(t *testing.T) {
},
})

subChan := channelz.RegisterSubChannel(cids[0].ID, refNames[2])
subChan := channelz.RegisterSubChannel(cids[0], refNames[2])
channelz.AddTraceEvent(logger, subChan, 0, &channelz.TraceEvent{
Desc: "SubChannel Created",
Severity: channelz.CtInfo,
Expand Down Expand Up @@ -425,7 +425,7 @@ func (s) TestGetSubChannel(t *testing.T) {
Desc: "Channel Created",
Severity: channelz.CtInfo,
})
subChan := channelz.RegisterSubChannel(chann.ID, refNames[1])
subChan := channelz.RegisterSubChannel(chann, refNames[1])
defer channelz.RemoveEntry(subChan.ID)
channelz.AddTraceEvent(logger, subChan, 0, &channelz.TraceEvent{
Desc: subchanCreated,
Expand Down
2 changes: 1 addition & 1 deletion clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.
addrs: copyAddressesWithoutBalancerAttributes(addrs),
scopts: opts,
dopts: cc.dopts,
channelz: channelz.RegisterSubChannel(cc.channelz.ID, ""),
channelz: channelz.RegisterSubChannel(cc.channelz, ""),
resetBackoff: make(chan struct{}),
stateChan: make(chan struct{}),
}
Expand Down
21 changes: 11 additions & 10 deletions internal/channelz/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,20 +143,21 @@ func RegisterChannel(parent *Channel, target string) *Channel {
// Returns a unique channelz identifier assigned to this subChannel.
//
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterSubChannel(pid int64, ref string) *SubChannel {
func RegisterSubChannel(parent *Channel, ref string) *SubChannel {
id := IDGen.genID()
if !IsOn() {
return &SubChannel{ID: id}
}

sc := &SubChannel{
RefName: ref,
ID: id,
sockets: make(map[int64]string),
parent: db.getChannel(pid),
trace: &ChannelTrace{CreationTime: time.Now(), Events: make([]*traceEvent, 0, getMaxTraceEntry())},
RefName: ref,
parent: parent,
}
db.addSubChannel(id, sc, pid)

if !IsOn() {
return sc
}

sc.sockets = make(map[int64]string)
sc.trace = &ChannelTrace{CreationTime: time.Now(), Events: make([]*traceEvent, 0, getMaxTraceEntry())}
db.addSubChannel(id, sc, parent.ID)
return sc
}

Expand Down
19 changes: 13 additions & 6 deletions internal/transport/keepalive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,16 @@ func (s) TestKeepaliveServerWithResponsiveClient(t *testing.T) {
}
}

func channelzSubChannel(t *testing.T) *channelz.SubChannel {
ch := channelz.RegisterChannel(nil, "test chan")
sc := channelz.RegisterSubChannel(ch, "test subchan")
t.Cleanup(func() {
channelz.RemoveEntry(sc.ID)
channelz.RemoveEntry(ch.ID)
})
return sc
}

// TestKeepaliveClientClosesUnresponsiveServer creates a server which does not
// respond to keepalive pings, and makes sure that the client closes the
// transport once the keepalive logic kicks in. Here, we set the
Expand All @@ -257,14 +267,13 @@ func (s) TestKeepaliveServerWithResponsiveClient(t *testing.T) {
func (s) TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) {
connCh := make(chan net.Conn, 1)
copts := ConnectOptions{
ChannelzParent: channelz.RegisterSubChannel(-1, "test subchan"),
ChannelzParent: channelzSubChannel(t),
KeepaliveParams: keepalive.ClientParameters{
Time: 10 * time.Millisecond,
Timeout: 10 * time.Millisecond,
PermitWithoutStream: true,
},
}
defer channelz.RemoveEntry(copts.ChannelzParent.ID)
client, cancel := setUpWithNoPingServer(t, copts, connCh)
defer cancel()
defer client.Close(fmt.Errorf("closed manually by test"))
Expand All @@ -288,13 +297,12 @@ func (s) TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) {
func (s) TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) {
connCh := make(chan net.Conn, 1)
copts := ConnectOptions{
ChannelzParent: channelz.RegisterSubChannel(-1, "test subchan"),
ChannelzParent: channelzSubChannel(t),
KeepaliveParams: keepalive.ClientParameters{
Time: 10 * time.Millisecond,
Timeout: 10 * time.Millisecond,
},
}
defer channelz.RemoveEntry(copts.ChannelzParent.ID)
client, cancel := setUpWithNoPingServer(t, copts, connCh)
defer cancel()
defer client.Close(fmt.Errorf("closed manually by test"))
Expand All @@ -319,13 +327,12 @@ func (s) TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) {
func (s) TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
connCh := make(chan net.Conn, 1)
copts := ConnectOptions{
ChannelzParent: channelz.RegisterSubChannel(-1, "test subchan"),
ChannelzParent: channelzSubChannel(t),
KeepaliveParams: keepalive.ClientParameters{
Time: 500 * time.Millisecond,
Timeout: 500 * time.Millisecond,
},
}
defer channelz.RemoveEntry(copts.ChannelzParent.ID)
// TODO(i/6099): Setup a server which can ping and no-ping based on a flag to
// reduce the flakiness in this test.
client, cancel := setUpWithNoPingServer(t, copts, connCh)
Expand Down
15 changes: 5 additions & 10 deletions internal/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,7 @@ func setUp(t *testing.T, port int, ht hType) (*server, *http2Client, func()) {
func setUpWithOptions(t *testing.T, port int, sc *ServerConfig, ht hType, copts ConnectOptions) (*server, *http2Client, func()) {
server := setUpServerOnly(t, port, sc, ht)
addr := resolver.Address{Addr: "localhost:" + server.port}
copts.ChannelzParent = channelz.RegisterSubChannel(-1, "test channel")
t.Cleanup(func() { channelz.RemoveEntry(copts.ChannelzParent.ID) })
copts.ChannelzParent = channelzSubChannel(t)

connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
ct, connErr := NewClientTransport(connectCtx, context.Background(), addr, copts, func(GoAwayReason) {})
Expand Down Expand Up @@ -1321,9 +1320,8 @@ func (s) TestClientHonorsConnectContext(t *testing.T) {
connectCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
time.AfterFunc(100*time.Millisecond, cancel)

parent := channelz.RegisterSubChannel(-1, "test channel")
parent := channelzSubChannel(t)
copts := ConnectOptions{ChannelzParent: parent}
defer channelz.RemoveEntry(parent.ID)
_, err = NewClientTransport(connectCtx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {})
if err == nil {
t.Fatalf("NewClientTransport() returned successfully; wanted error")
Expand Down Expand Up @@ -1414,8 +1412,7 @@ func (s) TestClientWithMisbehavedServer(t *testing.T) {
connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
defer cancel()

parent := channelz.RegisterSubChannel(-1, "test channel")
defer channelz.RemoveEntry(parent.ID)
parent := channelzSubChannel(t)
copts := ConnectOptions{ChannelzParent: parent}
ct, err := NewClientTransport(connectCtx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {})
if err != nil {
Expand Down Expand Up @@ -2425,9 +2422,8 @@ func (s) TestClientHandshakeInfo(t *testing.T) {

copts := ConnectOptions{
TransportCredentials: creds,
ChannelzParent: channelz.RegisterSubChannel(-1, "test subchannel"),
ChannelzParent: channelzSubChannel(t),
}
defer channelz.RemoveEntry(copts.ChannelzParent.ID)
tr, err := NewClientTransport(ctx, context.Background(), addr, copts, func(GoAwayReason) {})
if err != nil {
t.Fatalf("NewClientTransport(): %v", err)
Expand Down Expand Up @@ -2467,9 +2463,8 @@ func (s) TestClientHandshakeInfoDialer(t *testing.T) {

copts := ConnectOptions{
Dialer: dialer,
ChannelzParent: channelz.RegisterSubChannel(-1, "test subchannel"),
ChannelzParent: channelzSubChannel(t),
}
defer channelz.RemoveEntry(copts.ChannelzParent.ID)
tr, err := NewClientTransport(ctx, context.Background(), addr, copts, func(GoAwayReason) {})
if err != nil {
t.Fatalf("NewClientTransport(): %v", err)
Expand Down
4 changes: 2 additions & 2 deletions test/channelz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,8 +554,8 @@ func (s) TestCZRecusivelyDeletionOfEntry(t *testing.T) {
// Socket1 Socket2

topChan := channelz.RegisterChannel(nil, "")
subChan1 := channelz.RegisterSubChannel(topChan.ID, "")
subChan2 := channelz.RegisterSubChannel(topChan.ID, "")
subChan1 := channelz.RegisterSubChannel(topChan, "")
subChan2 := channelz.RegisterSubChannel(topChan, "")
skt1 := channelz.RegisterSocket(&channelz.Socket{SocketType: channelz.SocketTypeNormal, Parent: subChan1})
skt2 := channelz.RegisterSocket(&channelz.Socket{SocketType: channelz.SocketTypeNormal, Parent: subChan1})

Expand Down
Loading