Skip to content

Commit

Permalink
feat: implement bandwidth limit for proxy-cache (goharbor#20812)
Browse files Browse the repository at this point in the history
Signed-off-by: Shengwen Yu <[email protected]>
Signed-off-by: kunal-511 <[email protected]>
  • Loading branch information
Shengwen YU authored and kunal-511 committed Aug 22, 2024
1 parent 6dc9a00 commit f21f193
Show file tree
Hide file tree
Showing 13 changed files with 137 additions and 14 deletions.
4 changes: 4 additions & 0 deletions api/v2.0/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7340,6 +7340,10 @@ definitions:
type: string
description: 'The ID of the tag retention policy for the project'
x-nullable: true
proxy_speed_kb:
type: string
description: 'The bandwidth limit of proxy cache, in Kbps (kilobits per second). It limits the communication between Harbor and the upstream registry, not the client and the Harbor.'
x-nullable: true
ProjectSummary:
type: object
properties:
Expand Down
2 changes: 1 addition & 1 deletion src/controller/proxy/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (c *controller) HeadManifest(_ context.Context, art lib.ArtifactInfo, remot
func (c *controller) ProxyBlob(ctx context.Context, p *proModels.Project, art lib.ArtifactInfo) (int64, io.ReadCloser, error) {
remoteRepo := getRemoteRepo(art)
log.Debugf("The blob doesn't exist, proxy the request to the target server, url:%v", remoteRepo)
rHelper, err := NewRemoteHelper(ctx, p.RegistryID)
rHelper, err := NewRemoteHelper(ctx, p.RegistryID, WithSpeed(p.ProxyCacheSpeed()))
if err != nil {
return 0, nil, err
}
Expand Down
37 changes: 37 additions & 0 deletions src/controller/proxy/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package proxy

type Option func(*Options)

type Options struct {
// Speed is the data transfer speed for proxy cache from Harbor to upstream registry, no limit by default.
Speed int32
}

func NewOptions(opts ...Option) *Options {
o := &Options{}
for _, opt := range opts {
opt(o)
}

return o
}

func WithSpeed(speed int32) Option {
return func(o *Options) {
o.Speed = speed
}
}
33 changes: 33 additions & 0 deletions src/controller/proxy/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package proxy

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestNewOptions(t *testing.T) {
// test default options
o := NewOptions()
assert.Equal(t, int32(0), o.Speed)

// test with options
// with speed
withSpeed := WithSpeed(1024)
o = NewOptions(withSpeed)
assert.Equal(t, int32(1024), o.Speed)
}
17 changes: 14 additions & 3 deletions src/controller/proxy/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/docker/distribution"

"github.com/goharbor/harbor/src/lib"
"github.com/goharbor/harbor/src/pkg/reg"
"github.com/goharbor/harbor/src/pkg/reg/adapter"
"github.com/goharbor/harbor/src/pkg/reg/model"
Expand All @@ -43,13 +44,16 @@ type remoteHelper struct {
regID int64
registry adapter.ArtifactRegistry
registryMgr reg.Manager
opts *Options
}

// NewRemoteHelper create a remote interface
func NewRemoteHelper(ctx context.Context, regID int64) (RemoteInterface, error) {
func NewRemoteHelper(ctx context.Context, regID int64, opts ...Option) (RemoteInterface, error) {
r := &remoteHelper{
regID: regID,
registryMgr: reg.Mgr}
registryMgr: reg.Mgr,
opts: NewOptions(opts...),
}
if err := r.init(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -83,7 +87,14 @@ func (r *remoteHelper) init(ctx context.Context) error {
}

func (r *remoteHelper) BlobReader(repo, dig string) (int64, io.ReadCloser, error) {
return r.registry.PullBlob(repo, dig)
sz, bReader, err := r.registry.PullBlob(repo, dig)
if err != nil {
return 0, nil, err
}
if r.opts != nil && r.opts.Speed > 0 {
bReader = lib.NewReader(bReader, r.opts.Speed)
}
return sz, bReader, err
}

func (r *remoteHelper) Manifest(repo string, ref string) (distribution.Manifest, string, error) {
Expand Down
5 changes: 3 additions & 2 deletions src/controller/replication/transfer/image/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

common_http "github.com/goharbor/harbor/src/common/http"
trans "github.com/goharbor/harbor/src/controller/replication/transfer"
"github.com/goharbor/harbor/src/lib"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/pkg/reg/adapter"
"github.com/goharbor/harbor/src/pkg/reg/model"
Expand Down Expand Up @@ -380,7 +381,7 @@ func (t *transfer) copyBlobByMonolithic(srcRepo, dstRepo, digest string, sizeFro
return err
}
if speed > 0 {
data = trans.NewReader(data, speed)
data = lib.NewReader(data, speed)
}
defer data.Close()
// get size 0 from PullBlob, use size from distribution.Descriptor instead.
Expand Down Expand Up @@ -435,7 +436,7 @@ func (t *transfer) copyBlobByChunk(srcRepo, dstRepo, digest string, sizeFromDesc
}

if speed > 0 {
data = trans.NewReader(data, speed)
data = lib.NewReader(data, speed)
}
// failureEnd will only be used for adjusting content range when issue happened during push the chunk.
var failureEnd int64
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package transfer
package lib

import (
"fmt"
Expand Down
1 change: 1 addition & 0 deletions src/pkg/project/models/pro_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ const (
ProMetaAutoScan = "auto_scan"
ProMetaReuseSysCVEAllowlist = "reuse_sys_cve_allowlist"
ProMetaAutoSBOMGen = "auto_sbom_generation"
ProMetaProxySpeed = "proxy_speed_kb"
)
13 changes: 13 additions & 0 deletions src/pkg/project/models/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,19 @@ func (p *Project) AutoSBOMGen() bool {
return isTrue(auto)
}

// ProxyCacheSpeed ...
func (p *Project) ProxyCacheSpeed() int32 {
speed, exist := p.GetMetadata(ProMetaProxySpeed)
if !exist {
return 0
}
speedInt, err := strconv.ParseInt(speed, 10, 32)
if err != nil {
return 0
}
return int32(speedInt)
}

// FilterByPublic returns orm.QuerySeter with public filter
func (p *Project) FilterByPublic(_ context.Context, qs orm.QuerySeter, _ string, value interface{}) orm.QuerySeter {
subQuery := `SELECT project_id FROM project_metadata WHERE name = 'public' AND value = '%s'`
Expand Down
10 changes: 5 additions & 5 deletions src/server/middleware/repoproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func BlobGetMiddleware() func(http.Handler) http.Handler {

func handleBlob(w http.ResponseWriter, r *http.Request, next http.Handler) error {
ctx := r.Context()
art, p, proxyCtl, err := preCheck(ctx)
art, p, proxyCtl, err := preCheck(ctx, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -96,14 +96,14 @@ func handleBlob(w http.ResponseWriter, r *http.Request, next http.Handler) error
return nil
}

func preCheck(ctx context.Context) (art lib.ArtifactInfo, p *proModels.Project, ctl proxy.Controller, err error) {
func preCheck(ctx context.Context, withProjectMetadata bool) (art lib.ArtifactInfo, p *proModels.Project, ctl proxy.Controller, err error) {
none := lib.ArtifactInfo{}
art = lib.GetArtifactInfo(ctx)
if art == none {
return none, nil, nil, errors.New("artifactinfo is not found").WithCode(errors.NotFoundCode)
}
ctl = proxy.ControllerInstance()
p, err = project.Ctl.GetByName(ctx, art.ProjectName, project.Metadata(false))
p, err = project.Ctl.GetByName(ctx, art.ProjectName, project.Metadata(withProjectMetadata))
return
}

Expand Down Expand Up @@ -155,7 +155,7 @@ func defaultBlobURL(projectName string, name string, digest string) string {

func handleManifest(w http.ResponseWriter, r *http.Request, next http.Handler) error {
ctx := r.Context()
art, p, proxyCtl, err := preCheck(ctx)
art, p, proxyCtl, err := preCheck(ctx, true)
if err != nil {
return err
}
Expand All @@ -174,7 +174,7 @@ func handleManifest(w http.ResponseWriter, r *http.Request, next http.Handler) e
next.ServeHTTP(w, r)
return nil
}
remote, err := proxy.NewRemoteHelper(r.Context(), p.RegistryID)
remote, err := proxy.NewRemoteHelper(r.Context(), p.RegistryID, proxy.WithSpeed(p.ProxyCacheSpeed()))
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions src/server/middleware/repoproxy/tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TagsListMiddleware() func(http.Handler) http.Handler {
return middleware.New(func(w http.ResponseWriter, r *http.Request, next http.Handler) {
ctx := r.Context()

art, p, _, err := preCheck(ctx)
art, p, _, err := preCheck(ctx, false)
if err != nil {
libhttp.SendError(w, err)
return
Expand Down Expand Up @@ -69,7 +69,7 @@ func TagsListMiddleware() func(http.Handler) http.Handler {
util.SendListTagsResponse(w, r, tags)
}()

remote, err := proxy.NewRemoteHelper(ctx, p.RegistryID)
remote, err := proxy.NewRemoteHelper(ctx, p.RegistryID, proxy.WithSpeed(p.ProxyCacheSpeed()))
if err != nil {
logger.Warningf("failed to get remote interface, error: %v, fallback to local tags", err)
return
Expand Down
17 changes: 17 additions & 0 deletions src/server/v2.0/handler/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ func (a *projectAPI) CreateProject(ctx context.Context, params operation.CreateP
}
}

// ignore metadata.proxy_speed_kb for non-proxy-cache project
if req.RegistryID == nil {
req.Metadata.ProxySpeedKb = nil
}

// ignore enable_content_trust metadata for proxy cache project
// see https://github.com/goharbor/harbor/issues/12940 to get more info
if req.RegistryID != nil {
Expand Down Expand Up @@ -551,6 +556,11 @@ func (a *projectAPI) UpdateProject(ctx context.Context, params operation.UpdateP
}
}

// ignore metadata.proxy_speed_kb for non-proxy-cache project
if params.Project.Metadata != nil && !p.IsProxy() {
params.Project.Metadata.ProxySpeedKb = nil
}

// ignore enable_content_trust metadata for proxy cache project
// see https://github.com/goharbor/harbor/issues/12940 to get more info
if params.Project.Metadata != nil && p.IsProxy() {
Expand Down Expand Up @@ -792,6 +802,13 @@ func (a *projectAPI) validateProjectReq(ctx context.Context, req *models.Project
if !permitted {
return errors.BadRequestError(fmt.Errorf("unsupported registry type %s", string(registry.Type)))
}

// validate metadata.proxy_speed_kb. It should be an int32
if ps := req.Metadata.ProxySpeedKb; ps != nil {
if _, err := strconv.ParseInt(*ps, 10, 32); err != nil {
return errors.BadRequestError(nil).WithMessage(fmt.Sprintf("metadata.proxy_speed_kb should by an int32, but got: '%s', err: %s", *ps, err))
}
}
}

if req.StorageLimit != nil {
Expand Down
6 changes: 6 additions & 0 deletions src/server/v2.0/handler/project_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ func (p *projectMetadataAPI) validate(metas map[string]string) (map[string]strin
return nil, errors.New(nil).WithCode(errors.BadRequestCode).WithMessage("invalid value: %s", value)
}
metas[proModels.ProMetaSeverity] = strings.ToLower(severity.String())
case proModels.ProMetaProxySpeed:
v, err := strconv.ParseInt(value, 10, 32)
if err != nil {
return nil, errors.New(nil).WithCode(errors.BadRequestCode).WithMessage("invalid value: %s", value)
}
metas[proModels.ProMetaProxySpeed] = strconv.FormatInt(v, 10)
default:
return nil, errors.New(nil).WithCode(errors.BadRequestCode).WithMessage("invalid key: %s", key)
}
Expand Down

0 comments on commit f21f193

Please sign in to comment.