You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
im trying to create a schedule dynamically based on a trigger ( which is a sensor event ).
the code shows that everything is working fine, no exception, even all logs are going well.
except that no schedules has been created.
@sensor(name="redis_pipeline_sensor", jobs=load_pipelines())
def redis_pipeline_sensor(context):
"""
Sensor that polls Redis for pipeline trigger messages and dynamically triggers jobs.
"""
# Check for a message in the Redis queue
message = redis_client.lpop("dagster_pipeline_queue")
if message:
context.log.info("✅ Message received from Redis")
message_data = json.loads(message)
# Extract config and derive job_name
run_config = message_data.get("config", {})
ops = run_config.get("ops", {})
if len(ops) != 1:
context.log.error("No valid ops found in config to derive job_name. Skipping.")
return
op_name = next(iter(ops)) # Get the first (and assumed only) key in `ops`
if not op_name.startswith("fetch_"):
context.log.error("Unable to derive job_name from op name. Skipping.")
return
job_name = op_name.replace("fetch_", "") + "_pipeline"
context.log.info(f"Derived job_name '{job_name}' from op '{op_name}'.")
# Dynamically find the job by name
pipelines = {job.name: job for job in load_pipelines()}
if job_name not in pipelines:
context.log.error(f"Job '{job_name}' not found. Skipping.")
return
# Handle scheduling logic
schedule_cron = message_data.get("schedule_cron") # New: Cron schedule for the job
run_key = message_data.get("run_key", None)
if schedule_cron:
context.log.info(f"Creating a schedule for job '{job_name}' with cron '{schedule_cron}'.")
# Create a schedule definition and add it to the global registry
def schedule_fn(_context):
return RunRequest(run_key=run_key, run_config=run_config)
dynamic_schedule = ScheduleDefinition(
name=f"{job_name}_schedule",
cron_schedule=schedule_cron,
job=pipelines[job_name],
run_config_fn=schedule_fn,
)
dynamic_schedules.append(dynamic_schedule)
context.log.info(f"Schedule created for job '{job_name}' with cron '{schedule_cron}'.")
else:
# Trigger the job immediately
context.log.info(f"Triggering job '{job_name}' immediately.")
yield RunRequest(run_key=run_key, run_config=run_config, job_name=job_name)
What did you expect to happen?
if it's possible and with the vision design of dagster need to be shown on the list of schedules and the UI.
an error if it's not possible ( as the graphQL ... ).
How to reproduce?
the same code + dynamic_schedules is a global variable that returned by the repository.
Dagster version
dagster, version 1.9.3
Deployment type
Docker Compose
Deployment details
No response
Additional information
No response
Message from the maintainers
Impacted by this issue? Give it a 👍! We factor engagement into prioritization.
By submitting this issue, you agree to follow Dagster's Code of Conduct.
The text was updated successfully, but these errors were encountered:
What's the issue?
im trying to create a schedule dynamically based on a trigger ( which is a sensor event ).
the code shows that everything is working fine, no exception, even all logs are going well.
except that no schedules has been created.
What did you expect to happen?
if it's possible and with the vision design of dagster need to be shown on the list of schedules and the UI.
an error if it's not possible ( as the graphQL ... ).
How to reproduce?
the same code +
dynamic_schedules
is a global variable that returned by the repository.Dagster version
dagster, version 1.9.3
Deployment type
Docker Compose
Deployment details
No response
Additional information
No response
Message from the maintainers
Impacted by this issue? Give it a 👍! We factor engagement into prioritization.
By submitting this issue, you agree to follow Dagster's Code of Conduct.
The text was updated successfully, but these errors were encountered: