Skip to content

Commit

Permalink
Manifest Kubernetes resources for reset command (#563)
Browse files Browse the repository at this point in the history
Closes #556
  • Loading branch information
raminqaf authored Dec 13, 2024
1 parent 28f3e29 commit 658242e
Show file tree
Hide file tree
Showing 8 changed files with 446 additions and 12 deletions.
1 change: 1 addition & 0 deletions docs/docs/user/references/cli-commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ $ kpops reset [OPTIONS] PIPELINE_PATHS...
* `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run]
* `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose]
* `--parallel / --no-parallel`: Enable or disable parallel execution of pipeline steps. If enabled, multiple steps can be processed concurrently. If disabled, steps will be processed sequentially. [default: no-parallel]
* `--operation-mode [argo|manifest|managed]`: How KPOps should operate. [env var: KPOPS_OPERATION_MODE; default: managed]
* `--help`: Show this message and exit.

## `kpops schema`
Expand Down
25 changes: 25 additions & 0 deletions kpops/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,31 @@ def manifest_destroy(
yield resource


def manifest_reset(
pipeline_path: Path,
dotenv: list[Path] | None = None,
config: Path = Path(),
steps: set[str] | None = None,
filter_type: FilterType = FilterType.INCLUDE,
environment: str | None = None,
verbose: bool = True,
operation_mode: OperationMode = OperationMode.MANIFEST,
) -> Iterator[tuple[KubernetesManifest, ...]]:
pipeline = generate(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=steps,
filter_type=filter_type,
environment=environment,
verbose=verbose,
operation_mode=operation_mode,
)
for component in pipeline.components:
resource = component.manifest_reset()
yield resource


def manifest_clean(
pipeline_path: Path,
dotenv: list[Path] | None = None,
Expand Down
42 changes: 30 additions & 12 deletions kpops/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,19 +286,37 @@ def reset(
dry_run: bool = DRY_RUN,
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
operation_mode: OperationMode = OPERATION_MODE_OPTION,
):
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
kpops.reset(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)
match operation_mode:
case OperationMode.MANAGED:
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
kpops.reset(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)
case _:
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
resources = kpops.manifest_reset(
pipeline_file_path,
dotenv,
config,
parse_steps(steps),
filter_type,
environment,
verbose,
operation_mode,
)
for resource in resources:
for rendered_manifest in resource:
print_yaml(rendered_manifest.model_dump())


@app.command(help="Clean pipeline steps")
Expand Down
23 changes: 23 additions & 0 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from kpops.const.file_type import DEFAULTS_YAML, PIPELINE_YAML
from kpops.manifests.argo import ArgoHook, enrich_annotations
from kpops.manifests.kubernetes import KubernetesManifest
from kpops.manifests.strimzi.kafka_topic import StrimziKafkaTopic
from kpops.utils.docstring import describe_attr

log = logging.getLogger("StreamsApp")
Expand Down Expand Up @@ -66,6 +67,19 @@ def manifest_deploy(self) -> tuple[KubernetesManifest, ...]:
self.template_flags,
)

@override
def manifest_reset(self) -> tuple[KubernetesManifest, ...]:
self.values.kafka.delete_output = False
values = self.to_helm_values()

return self.helm.template(
self.helm_release_name,
self.helm_chart,
self.namespace,
values,
self.template_flags,
)

async def clean_pvcs(self, dry_run: bool) -> None:
app_full_name = super(HelmApp, self).full_name
pvc_handler = PVCHandler(app_full_name, self.namespace)
Expand Down Expand Up @@ -185,6 +199,15 @@ def manifest_deploy(self) -> tuple[KubernetesManifest, ...]:

return manifests

@override
def manifest_reset(self) -> tuple[KubernetesManifest, ...]:
resource = self._cleaner.manifest_reset()
if self.to:
resource = resource + tuple(
StrimziKafkaTopic.from_topic(topic) for topic in self.to.kafka_topics
)
return resource

@override
def manifest_clean(self) -> tuple[KubernetesManifest, ...]:
if get_config().operation_mode is OperationMode.MANIFEST:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
---
apiVersion: batch/v1
kind: Job
metadata:
labels:
app: resources-manifest-pipeline-my-streams-app-clean
chart: streams-app-cleanup-job-3.0.3
release: resources-manifest-pipeline-my-streams-app-clean
name: resources-manifest-pipeline-my-streams-app-clean
spec:
backoffLimit: 6
template:
metadata:
labels:
app: resources-manifest-pipeline-my-streams-app-clean
release: resources-manifest-pipeline-my-streams-app-clean
spec:
containers:
- args:
- reset
env:
- name: ENV_PREFIX
value: APP_
- name: APP_BOOTSTRAP_SERVERS
value: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092
- name: APP_SCHEMA_REGISTRY_URL
value: http://localhost:8081/
- name: APP_INPUT_TOPICS
value: my-input-topic
- name: APP_INPUT_PATTERN
value: my-input-pattern
- name: APP_OUTPUT_TOPIC
value: my-output-topic
- name: APP_ERROR_TOPIC
value: resources-manifest-pipeline-my-streams-app-error
- name: APP_LABELED_OUTPUT_TOPICS
value: my-output-topic-label=my-labeled-topic-output,
- name: APP_LABELED_INPUT_TOPICS
value: my-input-topic-label=my-labeled-input-topic,
- name: APP_LABELED_INPUT_PATTERNS
value: my-input-topic-labeled-pattern=my-labeled-input-pattern,
- name: APP_APPLICATION_ID
value: my-streams-app-id
- name: JAVA_TOOL_OPTIONS
value: '-XX:MaxRAMPercentage=75.0 '
image: my-registry/my-streams-app-image:1.0.0
imagePullPolicy: Always
name: resources-manifest-pipeline-my-streams-app-clean
resources:
limits:
cpu: 500m
memory: 2G
requests:
cpu: 200m
memory: 300Mi
restartPolicy: OnFailure
ttlSecondsAfterFinished: 30

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-output-topic
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-error-topic
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-labeled-topic-output
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: resources-manifest-pipeline-my-streams-app-error
spec:
config:
cleanup.policy: compact,delete
partitions: 1
replicas: 1

Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
---
apiVersion: batch/v1
kind: Job
metadata:
labels:
app: resources-manifest-pipeline-my-streams-app-clean
chart: streams-app-cleanup-job-3.0.3
release: resources-manifest-pipeline-my-streams-app-clean
name: resources-manifest-pipeline-my-streams-app-clean
spec:
backoffLimit: 6
template:
metadata:
labels:
app: resources-manifest-pipeline-my-streams-app-clean
release: resources-manifest-pipeline-my-streams-app-clean
spec:
containers:
- args:
- reset
env:
- name: ENV_PREFIX
value: APP_
- name: APP_BOOTSTRAP_SERVERS
value: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092
- name: APP_SCHEMA_REGISTRY_URL
value: http://localhost:8081/
- name: APP_INPUT_TOPICS
value: my-input-topic
- name: APP_INPUT_PATTERN
value: my-input-pattern
- name: APP_OUTPUT_TOPIC
value: my-output-topic
- name: APP_ERROR_TOPIC
value: resources-manifest-pipeline-my-streams-app-error
- name: APP_LABELED_OUTPUT_TOPICS
value: my-output-topic-label=my-labeled-topic-output,
- name: APP_LABELED_INPUT_TOPICS
value: my-input-topic-label=my-labeled-input-topic,
- name: APP_LABELED_INPUT_PATTERNS
value: my-input-topic-labeled-pattern=my-labeled-input-pattern,
- name: APP_APPLICATION_ID
value: my-streams-app-id
- name: JAVA_TOOL_OPTIONS
value: '-XX:MaxRAMPercentage=75.0 '
image: my-registry/my-streams-app-image:1.0.0
imagePullPolicy: Always
name: resources-manifest-pipeline-my-streams-app-clean
resources:
limits:
cpu: 500m
memory: 2G
requests:
cpu: 200m
memory: 300Mi
restartPolicy: OnFailure
ttlSecondsAfterFinished: 30

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-output-topic
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-error-topic
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-labeled-topic-output
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: resources-manifest-pipeline-my-streams-app-error
spec:
config:
cleanup.policy: compact,delete
partitions: 1
replicas: 1

Loading

0 comments on commit 658242e

Please sign in to comment.