Skip to content

Commit

Permalink
Feature: support managing disk information in database
Browse files Browse the repository at this point in the history
With database disk records commited by `disks.yaml`,
disk formatting cloud be performed without `format.yaml`.

And it will get and wirte disk `size` and `URI` during
disk formatting, thus record the ID of service(chunkserver)
with associated disk when deploy curvebs cluster. Use
`curveadm disks ls` to view all disk information.

Usage:
        curveadm disks commit /path/to/disks.yaml
        curveadm disks ls
        curveadm format

Signed-off-by: Lijin Xiong <[email protected]>
  • Loading branch information
Lijin Xiong committed Mar 3, 2023
1 parent 39eb77f commit de36749
Show file tree
Hide file tree
Showing 32 changed files with 1,426 additions and 53 deletions.
35 changes: 29 additions & 6 deletions cli/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ type CurveAdm struct {
memStorage *utils.SafeMap

// properties (hosts/cluster)
hosts string // hosts
clusterId int // current cluster id
clusterUUId string // current cluster uuid
clusterName string // current cluster name
clusterTopologyData string // cluster topology
clusterPoolData string // cluster pool
hosts string // hosts
disks string // disks
diskRecords []storage.Disk // disk list
clusterId int // current cluster id
clusterUUId string // current cluster uuid
clusterName string // current cluster name
clusterTopologyData string // cluster topology
clusterPoolData string // cluster pool
}

/*
Expand Down Expand Up @@ -174,6 +176,23 @@ func (curveadm *CurveAdm) init() error {
log.Field("ClusterName", cluster.Name))
}

// (8) Get Disks
var disks storage.Disks
diskses, err := s.GetDisks()
if err != nil {
log.Error("Get disks failed", log.Field("Error", err))
return errno.ERR_GET_DISKS_FAILED.E(err)
} else if len(diskses) > 0 {
disks = diskses[0]
}

// (9) Get Disk Records
diskRecords, err := s.GetDisk(comm.DISK_FILTER_ALL)
if err != nil {
log.Error("Get disk records failed", log.Field("Error", err))
return errno.ERR_GET_DISK_RECORDS_FAILED.E(err)
}

curveadm.dbpath = dbpath
curveadm.logpath = logpath
curveadm.config = config
Expand All @@ -183,6 +202,8 @@ func (curveadm *CurveAdm) init() error {
curveadm.storage = s
curveadm.memStorage = utils.NewSafeMap()
curveadm.hosts = hosts.Data
curveadm.disks = disks.Data
curveadm.diskRecords = diskRecords
curveadm.clusterId = cluster.Id
curveadm.clusterUUId = cluster.UUId
curveadm.clusterName = cluster.Name
Expand Down Expand Up @@ -264,6 +285,8 @@ func (curveadm *CurveAdm) Err() io.Writer { return curveadm.e
func (curveadm *CurveAdm) Storage() *storage.Storage { return curveadm.storage }
func (curveadm *CurveAdm) MemStorage() *utils.SafeMap { return curveadm.memStorage }
func (curveadm *CurveAdm) Hosts() string { return curveadm.hosts }
func (curveadm *CurveAdm) Disks() string { return curveadm.disks }
func (curveadm *CurveAdm) DiskRecords() []storage.Disk { return curveadm.diskRecords }
func (curveadm *CurveAdm) ClusterId() int { return curveadm.clusterId }
func (curveadm *CurveAdm) ClusterUUId() string { return curveadm.clusterUUId }
func (curveadm *CurveAdm) ClusterName() string { return curveadm.clusterName }
Expand Down
2 changes: 2 additions & 0 deletions cli/command/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/opencurve/curveadm/cli/command/client"
"github.com/opencurve/curveadm/cli/command/cluster"
"github.com/opencurve/curveadm/cli/command/config"
"github.com/opencurve/curveadm/cli/command/disks"
"github.com/opencurve/curveadm/cli/command/hosts"
"github.com/opencurve/curveadm/cli/command/pfs"
"github.com/opencurve/curveadm/cli/command/playground"
Expand Down Expand Up @@ -61,6 +62,7 @@ func addSubCommands(cmd *cobra.Command, curveadm *cli.CurveAdm) {
cluster.NewClusterCommand(curveadm), // curveadm cluster ...
config.NewConfigCommand(curveadm), // curveadm config ...
hosts.NewHostsCommand(curveadm), // curveadm hosts ...
disks.NewDisksCommand(curveadm), // curveadm disks ...
playground.NewPlaygroundCommand(curveadm), // curveadm playground ...
target.NewTargetCommand(curveadm), // curveadm target ...
pfs.NewPFSCommand(curveadm), // curveadm pfs ...
Expand Down
45 changes: 45 additions & 0 deletions cli/command/disks/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2023 NetEase Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* Project: CurveAdm
* Created Date: 2023-02-24
* Author: Lijin Xiong ([email protected])
*/

package disks

import (
"github.com/opencurve/curveadm/cli/cli"
cliutil "github.com/opencurve/curveadm/internal/utils"
"github.com/spf13/cobra"
)

func NewDisksCommand(curveadm *cli.CurveAdm) *cobra.Command {
cmd := &cobra.Command{
Use: "disks",
Short: "Manage disks",
Args: cliutil.NoArgs,
RunE: cliutil.ShowHelp(curveadm.Err()),
}

cmd.AddCommand(
NewCommitCommand(curveadm),
NewShowCommand(curveadm),
NewListCommand(curveadm),
)
return cmd
}
220 changes: 220 additions & 0 deletions cli/command/disks/commit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
* Copyright (c) 2023 NetEase Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* Project: CurveAdm
* Created Date: 2023-02-24
* Author: Lijin Xiong ([email protected])
*/

package disks

import (
"strings"

"github.com/fatih/color"
"github.com/opencurve/curveadm/cli/cli"
"github.com/opencurve/curveadm/internal/common"
comm "github.com/opencurve/curveadm/internal/common"
"github.com/opencurve/curveadm/internal/configure/disks"
"github.com/opencurve/curveadm/internal/configure/topology"
"github.com/opencurve/curveadm/internal/errno"
"github.com/opencurve/curveadm/internal/storage"
"github.com/opencurve/curveadm/internal/tui"
tuicomm "github.com/opencurve/curveadm/internal/tui/common"
"github.com/opencurve/curveadm/internal/utils"
"github.com/spf13/cobra"
)

const (
COMMIT_EXAMPLE = `Examples:
$ curveadm disks commit /path/to/disks.yaml # Commit disks`
)

type commitOptions struct {
filename string
slient bool
}

func NewCommitCommand(curveadm *cli.CurveAdm) *cobra.Command {
var options commitOptions
cmd := &cobra.Command{
Use: "commit DISKS [OPTIONS]",
Short: "Commit disks",
Args: utils.ExactArgs(1),
Example: COMMIT_EXAMPLE,
RunE: func(cmd *cobra.Command, args []string) error {
options.filename = args[0]
return runCommit(curveadm, options)
},
DisableFlagsInUseLine: true,
}

flags := cmd.Flags()
flags.BoolVarP(&options.slient, "slient", "s", false, "Slient output for disks commit")

return cmd
}

func readAndCheckDisks(curveadm *cli.CurveAdm, options commitOptions) (string, []*disks.DiskConfig, error) {
var dcs []*disks.DiskConfig
// 1) read disks from file
if !utils.PathExist(options.filename) {
return "", dcs, errno.ERR_DISKS_FILE_NOT_FOUND.
F("%s: no such file", utils.AbsPath(options.filename))
}
data, err := utils.ReadFile(options.filename)
if err != nil {
return data, dcs, errno.ERR_READ_DISKS_FILE_FAILED.E(err)
}

// 2) display disks difference
oldData := curveadm.Disks()
if !options.slient {
diff := utils.Diff(oldData, data)
curveadm.WriteOutln(diff)
}

// 3) check disks data
dcs, err = disks.ParseDisks(data, curveadm)
return data, dcs, err
}

func assambleNewDiskRecords(dcs []*disks.DiskConfig,
oldDiskRecords []storage.Disk) ([]storage.Disk, []storage.Disk) {
keySep := ":"
newDiskMap := make(map[string]bool)

var newDiskRecords, diskRecordDeleteList []storage.Disk
for _, dc := range dcs {
for _, host := range dc.GetHost() {
key := strings.Join([]string{host, dc.GetDevice()}, keySep)
newDiskMap[key] = true
newDiskRecords = append(
newDiskRecords, storage.Disk{
Host: host,
Device: dc.GetDevice(),
Size: comm.DISK_DEFAULT_NULL_SIZE,
URI: comm.DISK_DEFAULT_NULL_URI,
MountPoint: dc.GetMountPoint(),
FormatPercent: dc.GetFormatPercent(),
ChunkServerID: comm.DISK_DEFAULT_NULL_CHUNKSERVER_ID,
})
}
}

for _, dr := range oldDiskRecords {
key := strings.Join([]string{dr.Host, dr.Device}, keySep)
if _, ok := newDiskMap[key]; !ok {
diskRecordDeleteList = append(diskRecordDeleteList, dr)
}
}

return newDiskRecords, diskRecordDeleteList
}

func writeDiskRecord(dr storage.Disk, curveadm *cli.CurveAdm) error {
if diskRecords, err := curveadm.Storage().GetDisk(
common.DISK_FILTER_DEVICE, dr.Host, dr.Device); err != nil {
return err
} else if len(diskRecords) == 0 {
if err := curveadm.Storage().SetDisk(
dr.Host,
dr.Device,
dr.MountPoint,
dr.ContainerImage,
dr.FormatPercent); err != nil {
return err
}
}
return nil
}

func syncDiskRecords(data string, dcs []*disks.DiskConfig,
curveadm *cli.CurveAdm, options commitOptions) error {
oldDiskRecords := curveadm.DiskRecords()
tui.SortDiskRecords(oldDiskRecords)

newDiskRecords, diskRecordDeleteList := assambleNewDiskRecords(dcs, oldDiskRecords)
tui.SortDiskRecords(newDiskRecords)
oldDiskRecordsString := tui.FormatDisks(oldDiskRecords)
newDiskRecordsString := tui.FormatDisks(newDiskRecords)

if !options.slient {
diff := utils.Diff(oldDiskRecordsString, newDiskRecordsString)
curveadm.WriteOutln(diff)
}

pass := tuicomm.ConfirmYes("Disk changes are showing above. Do you want to continue?")
if !pass {
curveadm.WriteOut(tuicomm.PromptCancelOpetation("commit disk table"))
return errno.ERR_CANCEL_OPERATION
}

// write new disk records
for _, dr := range newDiskRecords {
if err := writeDiskRecord(dr, curveadm); err != nil {
return err
}
}

// delete obsolete disk records
for _, dr := range diskRecordDeleteList {
if dr.ChunkServerID != comm.DISK_DEFAULT_NULL_CHUNKSERVER_ID {
return errno.ERR_DELETE_SERVICE_BINDING_DISK.
F("The disk[%s:%s] is used by service[%s:%s]",
dr.Host, dr.Device, topology.ROLE_CHUNKSERVER, dr.ChunkServerID)
}

if err := curveadm.Storage().DeleteDisk(dr.Host, dr.Device); err != nil {
return errno.ERR_UPDATE_DISK_FAILED.E(err)
}
}

return nil
}

func runCommit(curveadm *cli.CurveAdm, options commitOptions) error {
// 1) read and check disks
data, dcs, err := readAndCheckDisks(curveadm, options)
if err != nil {
return err
}

// 2) confirm by user
pass := tuicomm.ConfirmYes("Do you want to continue?")
if !pass {
curveadm.WriteOut(tuicomm.PromptCancelOpetation("commit disks"))
return errno.ERR_CANCEL_OPERATION
}

// 3) add disk records
err = syncDiskRecords(data, dcs, curveadm, options)
if err != nil {
return err
}

// 4) add disks data
err = curveadm.Storage().SetDisks(data)
if err != nil {
return errno.ERR_UPDATE_DISKS_FAILED.
F("commit disks failed")
}

// 5) print success prompt
curveadm.WriteOutln(color.GreenString("Disks updated"))
return nil
}
Loading

0 comments on commit de36749

Please sign in to comment.