Skip to content

Commit

Permalink
Improve savepoint and update (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
elanv committed Jul 7, 2021
2 parents c56cd0a + 57d0b7c commit 293d407
Show file tree
Hide file tree
Showing 22 changed files with 1,922 additions and 1,524 deletions.
23 changes: 11 additions & 12 deletions api/v1beta1/flinkcluster_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,17 @@ func TestSetDefault(t *testing.T) {
var defaultJmBlobPort = int32(6124)
var defaultJmQueryPort = int32(6125)
var defaultJmUIPort = int32(8081)
var defaultJmIngressTLSUse = false
var defaultTmDataPort = int32(6121)
var defaultTmRPCPort = int32(6122)
var defaultTmQueryPort = int32(6125)
var defaultJobAllowNonRestoredState = false
var defaultJobParallelism = int32(1)
var defaultJobNoLoggingToStdout = false
var defaultJobRestartPolicy = JobRestartPolicyNever
var defatulJobManagerIngressTLSUse = false
var defaultMemoryOffHeapRatio = int32(25)
var defaultMemoryOffHeapMin = resource.MustParse("600M")
defaultRecreateOnUpdate := new(bool)
*defaultRecreateOnUpdate = true
var defaultRecreateOnUpdate = true
var expectedCluster = FlinkCluster{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{},
Expand All @@ -70,7 +69,7 @@ func TestSetDefault(t *testing.T) {
Replicas: &defaultJmReplicas,
AccessScope: "Cluster",
Ingress: &JobManagerIngressSpec{
UseTLS: &defatulJobManagerIngressTLSUse,
UseTLS: &defaultJmIngressTLSUse,
},
Ports: JobManagerPorts{
RPC: &defaultJmRPCPort,
Expand Down Expand Up @@ -115,7 +114,7 @@ func TestSetDefault(t *testing.T) {
MountPath: "/etc/hadoop/conf",
},
EnvVars: nil,
RecreateOnUpdate: defaultRecreateOnUpdate,
RecreateOnUpdate: &defaultRecreateOnUpdate,
},
Status: FlinkClusterStatus{},
}
Expand All @@ -134,23 +133,22 @@ func TestSetNonDefault(t *testing.T) {
var jmBlobPort = int32(8124)
var jmQueryPort = int32(8125)
var jmUIPort = int32(9081)
var jmIngressTLSUse = true
var tmDataPort = int32(8121)
var tmRPCPort = int32(8122)
var tmQueryPort = int32(8125)
var jobAllowNonRestoredState = true
var jobParallelism = int32(2)
var jobNoLoggingToStdout = true
var jobRestartPolicy = JobRestartPolicyFromSavepointOnFailure
var jobManagerIngressTLSUse = true
var memoryOffHeapRatio = int32(50)
var memoryOffHeapMin = resource.MustParse("600M")
var recreateOnUpdate = false
var securityContextUserGroup = int64(9999)
var securityContext = corev1.PodSecurityContext{
RunAsUser: &securityContextUserGroup,
RunAsGroup: &securityContextUserGroup,
}
defaultRecreateOnUpdate := new(bool)
*defaultRecreateOnUpdate = true
var cluster = FlinkCluster{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{},
Expand All @@ -164,7 +162,7 @@ func TestSetNonDefault(t *testing.T) {
Replicas: &jmReplicas,
AccessScope: "Cluster",
Ingress: &JobManagerIngressSpec{
UseTLS: &jobManagerIngressTLSUse,
UseTLS: &jmIngressTLSUse,
},
Ports: JobManagerPorts{
RPC: &jmRPCPort,
Expand Down Expand Up @@ -208,7 +206,8 @@ func TestSetNonDefault(t *testing.T) {
HadoopConfig: &HadoopConfig{
MountPath: "/opt/flink/hadoop/conf",
},
EnvVars: nil,
EnvVars: nil,
RecreateOnUpdate: &recreateOnUpdate,
},
Status: FlinkClusterStatus{},
}
Expand All @@ -228,7 +227,7 @@ func TestSetNonDefault(t *testing.T) {
Replicas: &jmReplicas,
AccessScope: "Cluster",
Ingress: &JobManagerIngressSpec{
UseTLS: &jobManagerIngressTLSUse,
UseTLS: &jmIngressTLSUse,
},
Ports: JobManagerPorts{
RPC: &jmRPCPort,
Expand Down Expand Up @@ -273,7 +272,7 @@ func TestSetNonDefault(t *testing.T) {
MountPath: "/opt/flink/hadoop/conf",
},
EnvVars: nil,
RecreateOnUpdate: defaultRecreateOnUpdate,
RecreateOnUpdate: &recreateOnUpdate,
},
Status: FlinkClusterStatus{},
}
Expand Down
123 changes: 71 additions & 52 deletions api/v1beta1/flinkcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,17 @@ const (

// JobState defines states for a Flink job deployment.
const (
JobStatePending = "Pending"
JobStateRunning = "Running"
JobStateUpdating = "Updating"
JobStateSucceeded = "Succeeded"
JobStateFailed = "Failed"
JobStateCancelled = "Cancelled"
JobStateSuspended = "Suspended"
JobStateUnknown = "Unknown"
JobStateLost = "Lost"
JobStatePending = "Pending"
JobStateUpdating = "Updating"
JobStateRestarting = "Restarting"
JobStateDeploying = "Deploying"
JobStateDeployFailed = "DeployFailed"
JobStateRunning = "Running"
JobStateSucceeded = "Succeeded"
JobStateCancelled = "Cancelled"
JobStateFailed = "Failed"
JobStateLost = "Lost"
JobStateUnknown = "Unknown"
)

// AccessScope defines the access scope of JobManager service.
Expand Down Expand Up @@ -85,24 +87,23 @@ const (
ControlNameJobCancel = "job-cancel"

// control state
ControlStateProgressing = "Progressing"
ControlStateSucceeded = "Succeeded"
ControlStateFailed = "Failed"
ControlStateRequested = "Requested"
ControlStateInProgress = "InProgress"
ControlStateSucceeded = "Succeeded"
ControlStateFailed = "Failed"
)

// Savepoint status
const (
SavepointStateNotTriggered = "NotTriggered"
SavepointStateInProgress = "InProgress"
SavepointStateTriggerFailed = "TriggerFailed"
SavepointStateFailed = "Failed"
SavepointStateSucceeded = "Succeeded"

SavepointTriggerReasonUserRequested = "user requested"
SavepointTriggerReasonScheduled = "scheduled"
SavepointTriggerReasonScheduledInitial = "scheduled initial" // The first triggered savepoint has slightly different flow
SavepointTriggerReasonJobCancel = "job cancel"
SavepointTriggerReasonUpdate = "update"
SavepointTriggerReasonUserRequested = "user requested"
SavepointTriggerReasonJobCancel = "job cancel"
SavepointTriggerReasonScheduled = "scheduled"
SavepointTriggerReasonUpdate = "update"
)

// ImageSpec defines Flink image of JobManager and TaskManager containers.
Expand Down Expand Up @@ -348,12 +349,20 @@ type JobSpec struct {
// Allow non-restored state, default: false.
AllowNonRestoredState *bool `json:"allowNonRestoredState,omitempty"`

// Should take savepoint before upgrading the job, default: false.
TakeSavepointOnUpgrade *bool `json:"takeSavepointOnUpgrade,omitempty"`

// Savepoints dir where to store savepoints of the job.
SavepointsDir *string `json:"savepointsDir,omitempty"`

// Should take savepoint before updating job, default: true.
// If this is set as false, maxStateAgeToRestoreSeconds must be provided to limit the savepoint age to restore.
TakeSavepointOnUpdate *bool `json:"takeSavepointOnUpdate,omitempty"`

// Maximum age of the savepoint that allowed to restore state..
// This is applied to auto restart on failure, update from stopped state and update without taking savepoint.
// If nil, job can be restarted only when the latest savepoint is the final job state (created by "stop with savepoint")
// - that is, only when job can be resumed from the suspended state.
// +kubebuilder:validation:Minimum=0
MaxStateAgeToRestoreSeconds *int32 `json:"maxStateAgeToRestoreSeconds,omitempty"`

// Automatically take a savepoint to the `savepointsDir` every n seconds.
AutoSavepointSeconds *int32 `json:"autoSavepointSeconds,omitempty"`

Expand Down Expand Up @@ -555,7 +564,7 @@ type JobStatus struct {
// The ID of the Flink job.
ID string `json:"id,omitempty"`

// The state of the Kubernetes job.
// The state of the Flink job deployment.
State string `json:"state"`

// The actual savepoint from which this job started.
Expand All @@ -571,21 +580,26 @@ type JobStatus struct {
// Savepoint location.
SavepointLocation string `json:"savepointLocation,omitempty"`

// Last savepoint trigger ID.
LastSavepointTriggerID string `json:"lastSavepointTriggerID,omitempty"`
// Last successful savepoint completed timestamp.
SavepointTime string `json:"savepointTime,omitempty"`

// Last savepoint trigger time. This is updated to make sure multiple
// savepoints will not be taken simultaneously.
LastSavepointTriggerTime string `json:"lastSavepointTriggerTime,omitempty"`
// The savepoint recorded in savepointLocation is the final state of the job.
FinalSavepoint bool `json:"finalSavepoint,omitempty"`

// Last successful or failed savepoint operation timestamp.
LastSavepointTime string `json:"lastSavepointTime,omitempty"`
// The timestamp of the Flink job deployment that creating job submitter.
DeployTime string `json:"deployTime,omitempty"`

// The Flink job started timestamp.
StartTime string `json:"startTime,omitempty"`

// The Flink job ended timestamp.
EndTime string `json:"endTime,omitempty"`

// The number of restarts.
RestartCount int32 `json:"restartCount,omitempty"`
}

// SavepointStatus defines the status of savepoint progress
// SavepointStatus is the status of savepoint progress.
type SavepointStatus struct {
// The ID of the Flink job.
JobID string `json:"jobID,omitempty"`
Expand All @@ -599,8 +613,8 @@ type SavepointStatus struct {
// Savepoint triggered reason.
TriggerReason string `json:"triggerReason,omitempty"`

// Savepoint requested time.
RequestTime string `json:"requestTime,omitempty"`
// Savepoint status update time.
UpdateTime string `json:"requestTime,omitempty"`

// Savepoint state.
State string `json:"state"`
Expand All @@ -609,6 +623,27 @@ type SavepointStatus struct {
Message string `json:"message,omitempty"`
}

type RevisionStatus struct {
// When the controller creates new ControllerRevision, it generates hash string from the FlinkCluster spec
// which is to be stored in ControllerRevision and uses it to compose the ControllerRevision name.
// Then the controller updates nextRevision to the ControllerRevision name.
// When update process is completed, the controller updates currentRevision as nextRevision.
// currentRevision and nextRevision is composed like this:
// <FLINK_CLUSTER_NAME>-<FLINK_CLUSTER_SPEC_HASH>-<REVISION_NUMBER_IN_CONTROLLERREVISION>
// e.g., myflinkcluster-c464ff7-5

// CurrentRevision indicates the version of FlinkCluster.
CurrentRevision string `json:"currentRevision,omitempty"`

// NextRevision indicates the version of FlinkCluster updating.
NextRevision string `json:"nextRevision,omitempty"`

// collisionCount is the count of hash collisions for the FlinkCluster. The controller
// uses this field as a collision avoidance mechanism when it needs to create the name for the
// newest ControllerRevision.
CollisionCount *int32 `json:"collisionCount,omitempty"`
}

// JobManagerIngressStatus defines the status of a JobManager ingress.
type JobManagerIngressStatus struct {
// The name of the Kubernetes ingress resource.
Expand Down Expand Up @@ -644,30 +679,14 @@ type FlinkClusterStatus struct {
// The status of the components.
Components FlinkClusterComponentsStatus `json:"components"`

// The status of control requested by user
// The status of control requested by user.
Control *FlinkClusterControlStatus `json:"control,omitempty"`

// The status of savepoint progress
// The status of savepoint progress.
Savepoint *SavepointStatus `json:"savepoint,omitempty"`

// When the controller creates new ControllerRevision, it generates hash string from the FlinkCluster spec
// which is to be stored in ControllerRevision and uses it to compose the ControllerRevision name.
// Then the controller updates nextRevision to the ControllerRevision name.
// When update process is completed, the controller updates currentRevision as nextRevision.
// currentRevision and nextRevision is composed like this:
// <FLINK_CLUSTER_NAME>-<FLINK_CLUSTER_SPEC_HASH>-<REVISION_NUMBER_IN_CONTROLLERREVISION>
// e.g., myflinkcluster-c464ff7-5

// CurrentRevision indicates the version of FlinkCluster.
CurrentRevision string `json:"currentRevision,omitempty"`

// NextRevision indicates the version of FlinkCluster updating.
NextRevision string `json:"nextRevision,omitempty"`

// collisionCount is the count of hash collisions for the FlinkCluster. The controller
// uses this field as a collision avoidance mechanism when it needs to create the name for the
// newest ControllerRevision.
CollisionCount *int32 `json:"collisionCount,omitempty"`
// The status of revision.
Revision RevisionStatus `json:"revision,omitempty"`

// Last update timestamp for this status.
LastUpdateTime string `json:"lastUpdateTime,omitempty"`
Expand Down
Loading

0 comments on commit 293d407

Please sign in to comment.