Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: batching #343

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
si := dependencies.Si
bstore := dependencies.Bstore
machine := dependencies.ListenAddr
prover := dependencies.Prover
var activeTasks []harmonytask.TaskInterface

sender, sendTask := message.NewSender(full, full, db)
Expand Down Expand Up @@ -195,7 +196,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
cfg.Subsystems.EnableUpdateSubmit

if hasAnySealingTask {
sealingTasks, err := addSealingTasks(ctx, hasAnySealingTask, db, full, sender, as, cfg, slrLazy, asyncParams, si, stor, bstore, machine)
sealingTasks, err := addSealingTasks(ctx, hasAnySealingTask, db, full, sender, as, cfg, slrLazy, asyncParams, si, stor, bstore, machine, prover)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -243,14 +244,18 @@ func addSealingTasks(
ctx context.Context, hasAnySealingTask bool, db *harmonydb.DB, full api.Chain, sender *message.Sender,
as *multictladdr.MultiAddressSelector, cfg *config.CurioConfig, slrLazy *lazy.Lazy[*ffi.SealCalls],
asyncParams func() func() (bool, error), si paths.SectorIndex, stor *paths.Remote,
bstore curiochain.CurioBlockstore, machineHostPort string) ([]harmonytask.TaskInterface, error) {
bstore curiochain.CurioBlockstore, machineHostPort string, prover storiface.Prover) ([]harmonytask.TaskInterface, error) {
var activeTasks []harmonytask.TaskInterface
// Sealing / Snap

var sp *seal.SealPoller
var slr *ffi.SealCalls
var err error
if hasAnySealingTask {
sp = seal.NewPoller(db, full)
sp, err = seal.NewPoller(db, full, cfg)
if err != nil {
return nil, xerrors.Errorf("creating seal poller: %w", err)
}
go sp.RunPoller(ctx)

slr = must.One(slrLazy.Val())
Expand Down Expand Up @@ -303,7 +308,7 @@ func addSealingTasks(
}

if cfg.Subsystems.EnableSendPrecommitMsg {
precommitTask := seal.NewSubmitPrecommitTask(sp, db, full, sender, as, cfg.Fees.MaxPreCommitGasFee, cfg.Fees.CollateralFromMinerBalance, cfg.Fees.DisableCollateralFallback)
precommitTask := seal.NewSubmitPrecommitTask(sp, db, full, sender, as, cfg)
activeTasks = append(activeTasks, precommitTask)
}
if cfg.Subsystems.EnablePoRepProof {
Expand All @@ -321,7 +326,7 @@ func addSealingTasks(
}
}
if cfg.Subsystems.EnableSendCommitMsg {
commitTask := seal.NewSubmitCommitTask(sp, db, full, sender, as, cfg)
commitTask := seal.NewSubmitCommitTask(sp, db, full, sender, as, cfg, prover)
activeTasks = append(activeTasks, commitTask)
}

Expand Down
60 changes: 60 additions & 0 deletions deps/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 50 additions & 0 deletions deps/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package config
import (
"time"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"

"github.com/filecoin-project/lotus/chain/types"
)

Expand Down Expand Up @@ -70,6 +73,18 @@ func DefaultCurioConfig() *CurioConfig {
AlertManagerURL: "http://localhost:9093/api/v2/alerts",
},
},
Batching: CurioBatchingConfig{
PreCommit: PreCommitBatchingConfig{
BaseFeeThreshold: types.MustParseFIL("0.005"),
Timeout: Duration(4 * time.Hour),
Slack: Duration(6 * time.Hour),
},
Commit: CommitBatchingConfig{
BaseFeeThreshold: types.MustParseFIL("0.005"),
Timeout: Duration(1 * time.Hour),
Slack: Duration(1 * time.Hour),
},
},
}
}

Expand All @@ -85,6 +100,7 @@ type CurioConfig struct {
Seal CurioSealConfig
Apis ApisConfig
Alerting CurioAlertingConfig
Batching CurioBatchingConfig
}

func DefaultDefaultMaxFee() types.FIL {
Expand All @@ -96,6 +112,10 @@ type BatchFeeConfig struct {
PerSector types.FIL
}

func (b *BatchFeeConfig) FeeForSectors(nSectors int) abi.TokenAmount {
return big.Add(big.Int(b.Base), big.Mul(big.NewInt(int64(nSectors)), big.Int(b.PerSector)))
}

type CurioSubsystemsConfig struct {
// EnableWindowPost enables window post to be executed on this curio instance. Each machine in the cluster
// with WindowPoSt enabled will also participate in the window post scheduler. It is possible to have multiple
Expand Down Expand Up @@ -500,3 +520,33 @@ type ApisConfig struct {
// Chain API auth secret for the Curio nodes to use.
StorageRPCSecret string
}

type CurioBatchingConfig struct {
// Precommit Batching configuration
PreCommit PreCommitBatchingConfig

// Commit batching configuration
Commit CommitBatchingConfig
}

type PreCommitBatchingConfig struct {
// Base fee value below which we should try to send Precommit messages immediately
BaseFeeThreshold types.FIL

// Maximum amount of time any given sector in the batch can wait for the batch to accumulate
Timeout Duration

// Time buffer for forceful batch submission before sectors/deal in batch would start expiring
Slack Duration
}

type CommitBatchingConfig struct {
// Base fee value below which we should try to send Commit messages immediately
BaseFeeThreshold types.FIL

// Maximum amount of time any given sector in the batch can wait for the batch to accumulate
Timeout Duration

// Time buffer for forceful batch submission before sectors/deals in batch would start expiring
Slack Duration
}
5 changes: 5 additions & 0 deletions deps/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ type Deps struct {
ListenAddr string
Name string
Alert *alertmanager.AlertNow
Prover storiface.Prover
}

const (
Expand Down Expand Up @@ -348,6 +349,10 @@ Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`,
deps.Name = cctx.String("name")
}

if deps.Prover == nil {
deps.Prover = ffiwrapper.ProofProver
}

return nil
}

Expand Down
34 changes: 34 additions & 0 deletions documentation/en/configuration/default-curio-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -502,4 +502,38 @@ description: The default curio configuration
# type: string
#WebHookURL = ""


[Batching]
[Batching.PreCommit]
# Base fee value below which we should try to send Precommit messages immediately
#
# type: types.FIL
#BaseFeeThreshold = "0.005 FIL"

# Maximum amount of time any given sector in the batch can wait for the batch to accumulate
#
# type: Duration
#Timeout = "4h0m0s"

# Time buffer for forceful batch submission before sectors/deal in batch would start expiring
#
# type: Duration
#Slack = "6h0m0s"

[Batching.Commit]
# Base fee value below which we should try to send Commit messages immediately
#
# type: types.FIL
#BaseFeeThreshold = "0.005 FIL"

# Maximum amount of time any given sector in the batch can wait for the batch to accumulate
#
# type: Duration
#Timeout = "1h0m0s"

# Time buffer for forceful batch submission before sectors/deals in batch would start expiring
#
# type: Duration
#Slack = "1h0m0s"

```
8 changes: 6 additions & 2 deletions harmony/harmonydb/sql/20231217-sdr-pipeline.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ create table sectors_sdr_pipeline (
after_tree_r bool not null default false,

-- synthetic proofs (Added in 20240617-synthetic-proofs.sql)
-- task_id_synth bigint,
-- after_synth bool not null default false,
-- task_id_synth bigint,
-- after_synth bool not null default false,

-- precommit_ready_at (Added in 20241210-sdr-batching.sql)

-- precommit message sending
precommit_msg_cid text,
Expand Down Expand Up @@ -66,6 +68,8 @@ create table sectors_sdr_pipeline (
task_id_move_storage bigint,
after_move_storage bool not null default false,

-- commit_ready_at (Added in 20241210-sdr-batching.sql)

-- Commit message sending
commit_msg_cid text,

Expand Down
46 changes: 46 additions & 0 deletions harmony/harmonydb/sql/20241210-sdr-batching.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
ALTER TABLE sectors_sdr_pipeline ADD COLUMN precommit_ready_at TIMESTAMPTZ;
ALTER TABLE sectors_sdr_pipeline ADD COLUMN commit_ready_at TIMESTAMPTZ;
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved

-- Function to precommit_ready_at value. Used by the trigger
CREATE OR REPLACE FUNCTION set_precommit_ready_at()
RETURNS TRIGGER AS $$
BEGIN
-- Check if after_tree_r column is changing from FALSE to TRUE
IF OLD.after_tree_r = FALSE AND NEW.after_tree_r = TRUE THEN
-- Explicitly set precommit_ready_at to the current UTC timestamp
UPDATE sectors_sdr_pipeline SET precommit_ready_at = CURRENT_TIMESTAMP AT TIME ZONE 'UTC'
WHERE sp_id = NEW.sp_id AND sector_number = NEW.sector_number;
END IF;

-- Return the modified row
RETURN NEW;
END;
$$ LANGUAGE plpgsql;


-- Function to set commit_ready_at. Used by trigger
CREATE OR REPLACE FUNCTION set_commit_ready_at()
RETURNS TRIGGER AS $$
BEGIN
-- Check if after_porep column is changing from FALSE to TRUE
IF OLD.after_porep = FALSE AND NEW.after_porep = TRUE THEN
-- Explicitly set precommit_ready_at to the current UTC timestamp
UPDATE sectors_sdr_pipeline SET commit_ready_at = CURRENT_TIMESTAMP AT TIME ZONE 'UTC'
WHERE sp_id = NEW.sp_id AND sector_number = NEW.sector_number;
END IF;

-- Return the modified row
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER update_precommit_ready_at
AFTER INSERT OR UPDATE OR DELETE ON sectors_sdr_pipeline
FOR EACH ROW EXECUTE FUNCTION set_precommit_ready_at();


CREATE TRIGGER update_commit_ready_at
AFTER INSERT OR UPDATE OR DELETE ON sectors_sdr_pipeline
FOR EACH ROW EXECUTE FUNCTION set_commit_ready_at();


3 changes: 3 additions & 0 deletions itests/curio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ func TestCurioHappyPath(t *testing.T) {

require.Contains(t, baseCfg.Addresses[0].MinerAddresses, maddr.String())

baseCfg.Batching.PreCommit.Timeout = config.Duration(5 * time.Second)
baseCfg.Batching.Commit.Timeout = config.Duration(5 * time.Minute)

temp := os.TempDir()
dir, err := os.MkdirTemp(temp, "curio")
require.NoError(t, err)
Expand Down
Loading
Loading