Getting Started
PyExecutor makes it easy to build, schedule, and publish multi-step Python workflows. Let's start with the basics.
1. Create Your First Workflow
- Go to Workflows tab in PyExecutor
- Click "New Workflow" and give it a name (e.g., "Service Health Check")
- Add your first step by clicking the "+" button
- Configure the step (we'll cover step types next)
- Save your workflow
2. Add Steps to Your Workflow
Steps execute in order, with each step's output available to the next step via Output Variable.
Step 1: Script
└─ Output Variable: "health_data"
Step 2: Condition
└─ Expression: health_data.get('status') == 'critical'
└─ True Branch: Send Alert
└─ False Branch: Log Success
3. Test & Deploy
- Test Manually: Click "Run Now" to execute immediately
- Schedule: Add cron expression (e.g., "0 */6 * * *" = every 6 hours)
- Publish as API: Enable "Publish as REST Endpoint" to expose via webhook
Core Concepts
Context Variables
Data passed between workflow steps. Each step's output becomes available to subsequent steps.
Step 1 Output:
user_data = {...}
Step 2 Access:
{{{{ user_data.email }}}}
Output Variables
Name assigned to each step's result. Used in subsequent steps via {{{{variable_name}}}}
Script Step Config:
Output Variable: "response"
Next Step:
Use: {{{{ response.total }}}}
Webhooks
Trigger workflows via HTTP requests. All request data (payload, query, headers) is available.
POST /api/hooks/process
Body: {{"customer": "john"}}
Access in workflow:
{{{{ webhook_payload.customer }}}}
Secrets
Encrypted credentials stored once, injected at runtime via environment variables.
Store Secret:
Name: "DB_PASSWORD"
Value: "secret123"
Access in Script:
import os
pwd = os.getenv('DB_PASSWORD')
Step Types
PyExecutor supports 12 step types, chained together to build complete workflows.
Script Step (Multi-Language)
Execute code in Python, JavaScript (Node.js), PowerShell, Bash, or Go inside isolated Docker containers. Each language has its own package management: PyPI, npm, or PowerShell Gallery.
Example: Process Health Check Data
import json
# Access context variables passed from previous steps
health_data = json.loads('''{{{{ raw_health_json }}}}''')
# Process the data
critical_services = [
s for s in health_data.get('services', [])
if s.get('status') == 'down'
]
# Return result as JSON string
result = {
'total_services': len(health_data.get('services', [])),
'critical_count': len(critical_services),
'needs_alert': len(critical_services) > 0
}
print(json.dumps(result))
API Step
Call external HTTP endpoints with context variable interpolation.
Example: Fetch Customer Data
Connector: "External API"
Method: GET
URL: https://api.example.com/customers/{{{{ webhook_payload.customer_id }}}}
Headers:
Authorization: Bearer {{{{ CUSTOMER_API_KEY }}}}
Body (for POST):
{
"email": "{{{{ webhook_payload.email }}}}",
"action": "update_profile"
}
Database Step
Execute SQL queries against PostgreSQL or MySQL with interpolated context.
Example: Log Workflow Execution
Connector: "Main Database"
Query:
INSERT INTO audit_log (
user_id, action, timestamp, details
) VALUES (
{{{{ webhook_payload.user_id }}}},
'workflow_executed',
NOW(),
'{{{{ json.dumps(audit_details) }}}}'
)
RETURNING id, created_at;
Transform Step
Python expressions to shape and combine data from previous steps.
Example: Format Report
# Variables from previous steps are in scope
result = {
'is_healthy': health_data.get('critical_count', 0) == 0,
'total_errors': len(error_log),
'success_rate': (
(customer_count - error_count) / customer_count * 100
if customer_count > 0 else 0
),
'timestamp': datetime.now().isoformat(),
'report': f"""Health Check Report
Status: {'OK' if health_data.get('critical_count', 0) == 0 else 'CRITICAL'}
Errors: {len(error_log)}
Customers Processed: {customer_count}
"""
}
print(json.dumps(result))
Notification Step
Send alerts via Email, Teams, Slack, Discord, Telegram, and more.
Example: Alert on Critical Issues
Connector: "Slack - #alerts"
Subject: Service Health Alert
Message:
Critical Services Down:
{{{{ health_data.critical_services_list }}}}
Timestamp: {{{{ now }}}}
Action: Review dashboard at
https://dashboard.example.com
Condition Step (If/Else)
Branch workflow execution based on boolean expressions.
Example: Route by Status
Expression:
health_data.get('critical_count', 0) > 0
True Branch (Critical Issues Found):
└─ Step 1: Send Urgent Slack Alert
└─ Step 2: Create Incident (API call)
└─ Step 3: Page On-Call Engineer
False Branch (Everything OK):
└─ Step 1: Log Success to Database
└─ Step 2: Send Digest Email
Each branch can contain multiple sub-steps, creating complex workflows.
Approval Step
Pause workflow execution and wait for human approval before continuing. Supports multiple designated approvers with email notifications and full audit trail.
Key Features
- Multi-Approver — Designate multiple approvers per step (org members + free-text emails)
- Email Notifications — Sends approval request emails via configured connectors
- Comment & Audit — Approvers can leave comments; every action is logged
- Timeout — Auto-expire after configurable hours/days
- MCP / CLI — Approve or reject from LLM chat or the command line
Example: Deployment Gate
Step: "Request Deployment Approval"
Type: approval
Config:
Approver(s): ops-lead@company.com, cto@company.com
Required Approvals: 1
Timeout: 24 hours
Require Comment: true
Subject: "Approval required for {{workflow_name}}"
Message: "Production deployment ready for review."
Behaviour:
→ Sends email to all designated approvers
→ Workflow pauses and polls every 10 seconds
→ First approver to approve/reject decides outcome
→ On approval: workflow continues to next step
→ On rejection: workflow fails with reason
→ On timeout: workflow fails as expired
Approvers are snapshotted per request — changing the config later won't affect in-flight approvals.
Subflow Step
Invoke another workflow as a child step within the parent. Map inputs from current context and receive merged results.
Example: Call Data Enrichment Sub-Workflow
Step: "Enrich Customer Data"
Type: subflow
Config:
Sub-Workflow: "data-enrichment-pipeline"
Input Mapping:
{
"customer_id": "{{webhook_payload.id}}",
"source": "crm"
}
Behaviour:
→ Runs the child workflow with mapped inputs
→ Child steps execute with label prefix [SubFlow: ...]
→ Result merged back as subflow_{step_label} keys
→ Parent continues with enriched context
Delay Step
Pause workflow execution for a specified number of seconds (max 300). Useful for rate limiting or waiting for external processes.
Example: Wait Before Retry
Step: "Wait for External System"
Type: delay
Config:
Seconds: 30
Message: "Waiting for payment processor to settle..."
Output:
delay_duration: 30
delay_message: "Waiting for payment processor to settle..."
Output Step
Map context variables to a structured workflow output. The mappings are persisted to the Job record as workflow_output and returned via the API.
Example: Return Report Summary
Step: "Return Results"
Type: output
Config:
Mappings:
"total_processed": "batch_result.count"
"errors": "validation_step.error_list"
"report_url": "upload_step.public_url"
→ Supports dot-notation for nested data
→ Result saved to Job.workflow_output
Loop Step
Iterate over an array from a previous step and execute sub-steps for each element. Each iteration has access to the current item.
Example: Process Each Order
Step: "Process Orders"
Type: loop
Config:
Array Source: "fetch_orders.data"
Item Variable: "current_order"
Sub-Steps:
└─ Step 1: Validate order (Script)
└─ Step 2: Update inventory (Database)
└─ Step 3: Send confirmation (Notification)
Webhooks & Context Variables
When a workflow is triggered via webhook, all request data is automatically available as context variables.
Available Variables
| Variable | Description |
|---|---|
| {{webhook_payload}} | Full request body (parsed JSON) |
| {{webhook_payload.key}} | Nested access with dot notation |
| {{webhook_query.param}} | URL query string parameters |
| {{webhook_headers.name}} | HTTP headers (lowercase) |
| {{webhook_method}} | HTTP method (GET, POST, PUT, etc.) |
| {{webhook_path}} | Request path after domain |
Example: Process Order Webhook
Incoming Webhook:
POST /api/hooks/process-order \
-H "Content-Type: application/json" \
-d '{
"order_id": 12345,
"customer": {
"email": "john@example.com",
"name": "John Doe"
},
"total": 199.99,
"items": 3
}'
Use in Script Step argument:
--customer-email {{webhook_payload.customer.email}}
--order-id {{webhook_payload.order_id}}
Use in Database Step:
INSERT INTO orders (order_id, customer_email, total, created_at)
VALUES ({{webhook_payload.order_id}}, '{{webhook_payload.customer.email}}', {{webhook_payload.total}}, NOW());
Use in API Step body:
{
"email": "{{webhook_payload.customer.email}}",
"name": "{{webhook_payload.customer.name}}",
"order_number": "{{webhook_payload.order_id}}"
}
Security
- ✅ API Key Authentication: Require X-API-Key header
- ✅ IP Allowlist: Restrict callers to trusted IPs
- ✅ Rate Limiting: Prevent abuse with request quotas
- ✅ TLS/HTTPS: All webhooks use encrypted connections
Multi-Language Script Execution
PyExecutor supports 5 programming languages. Each script runs in an isolated Docker container with language-specific runtimes pre-installed.
Supported Languages
| Language | Runtime | Package Manager | Command |
|---|---|---|---|
| Python | Python 3.12 | PyPI (pip) | python -u -c |
| JavaScript | Node.js 20.x | npm | node -e |
| PowerShell | PowerShell 7.4 | PowerShell Gallery | pwsh -Command |
| Bash | System bash | — | bash -c |
| Go | Go (apt) | — | go run (temp file) |
JavaScript Example
const data = JSON.parse(process.env.INPUT_DATA || '{}');
const result = {
processed: true,
items: data.items?.map(i => ({
...i, score: i.value * 1.15
}))
};
console.log(JSON.stringify(result));
PowerShell Example
$services = Get-Service | Where-Object {
$_.Status -eq 'Running'
} | Select-Object Name, Status
$result = @{
count = $services.Count
services = $services
} | ConvertTo-Json
Write-Output $result
Bash Example
#!/bin/bash
DISK_USAGE=$(df -h / | awk 'NR==2 {print $5}')
MEM_FREE=$(free -m | awk 'NR==2 {print $4}')
echo "{\"disk_usage\": \"$DISK_USAGE\", \"mem_free_mb\": $MEM_FREE}"
Go Example
package main
import (
"encoding/json"
"fmt"
)
func main() {
result := map[string]interface{}{
"status": "ok",
"fibonacci": fib(10),
}
out, _ := json.Marshal(result)
fmt.Println(string(out))
}
Docker Sandbox Security
- Network isolation:
--network none— no internet access from scripts - Memory limit: 256 MB per container
- PID limit: 64 processes max
- Read-only filesystem with
tmpfs /tmp - Per-org, per-language package volumes mounted automatically
Template Gallery
Browse and clone pre-built workflow templates to get started fast. Each template includes configurable fields so you can customize without touching the underlying workflow JSON.
How It Works
- Browse the gallery — filter by category (AI, Database, APIs, Logic, Data, Notifications) or difficulty
- Preview the template — see description, use cases, required connectors, and estimated time
- Clone & Configure — fill in the config fields (secrets, URLs, emails) and create your workflow
- Run — your new workflow is ready to execute immediately
📝 Ghostwriter
AI-powered content generation pipeline with drafting, review, and publishing steps.
Beginner📄 PDF Intelligence
Extract, parse, and analyze PDF documents with AI summarization.
Intermediate🐙 GitHub Issue Triage
Automatically classify, label, and route GitHub issues using AI.
Intermediate📊 Featured & Trending
Browse featured templates and see what's trending by clone count.
All LevelsTemplate Config Fields
Each template can define typed config fields that map to {{placeholder}} values in the workflow JSON:
Config Field Types:
text, url, email, secret, select,
multiselect, number, json, textarea
Example Config:
Field: "api_key" (type: secret, required: true)
Field: "target_url" (type: url, default: "https://api.example.com")
Field: "language" (type: select, options: ["en", "es", "fr"])
RBAC & Organizations
PyExecutor includes a full role-based access control (RBAC) system with multi-tenant organizations, hierarchical roles, fine-grained permissions, and immutable audit logging.
System Roles
| Role | Permissions |
|---|---|
| Admin | Full access — manage org, roles, members, and all resources |
| Editor | Create & edit workflows, scripts, connectors; cannot manage org settings |
| Executor | Run workflows and view jobs; cannot create or edit |
| Viewer | Read-only access to workflows, jobs, and logs |
| Report Analyst | Access analytics dashboards and export reports |
Custom roles are supported with parent-role inheritance. Permission categories span 12 resource types.
API Keys with Scopes
Generate API keys with specific scopes: read, execute, edit, analytics, full. Keys support expiry, rotation, and resource constraints.
Audit Logging
Every action (create, update, delete, execute, login) is recorded in an immutable audit log with IP address, user agent, old/new values, and status (success/failed/denied). Supports compliance export.
Git Version Control
Track changes to workflows and scripts with a built-in Git-like version control system. Create branches, commit changes, view diffs, open pull requests, and tag releases.
Version Control Models
| Entity | Description |
|---|---|
| Repository | Links to a Workflow or Script; tracks current branch |
| Branch | Types: main, staging, develop, feature, hotfix, release. Protected branch support. |
| Commit | SHA256-hashed content snapshot with parent linking, additions/deletions count |
| Diff | Unified diff format with structured JSON changes between two commits |
| Pull Request | Source → target branch with approval flow (required approvals count), merge/reject |
| Tag | Named reference to a specific commit for release versioning |
Analytics Dashboard
A built-in analytics dashboard with 10 API endpoints for monitoring platform health, execution trends, and resource utilization.
Available Analytics
📊 Dashboard Stats
Total jobs, success rate, CPU/memory/disk usage, AI token counts
📈 Daily Metrics
Daily job counts and trends over time
🔀 Trigger Breakdown
Jobs by trigger type: manual, webhook, scheduled, API
🏆 Top Resources
Most-used workflows and scripts
🔌 Connector Health
Status and latency of database, HTTP, and AI connectors
⏱ Duration Trends
Job execution duration patterns over time
📡 API Traffic
Request patterns across published API endpoints
🔒 Security Events
Auth failures, denied access, suspicious patterns
🤖 MCP Analytics
MCP tool invocation counts and performance
🌐 Endpoint Metrics
Per-endpoint latency, error rates, and throughput
Detailed Examples
📦 Example 1: E-Commerce Order Processing
Workflow triggered by webhook. Validates order, checks inventory, charges customer, sends confirmation.
Workflow: "Order Processing Pipeline"
Step 1: Validate Order (Script)
Input: Order JSON from webhook
Output: validated_order
import json
order = webhook_payload
if not order.get('customer_email'):
raise ValueError('Missing customer_email')
if order.get('total', 0) <= 0:
raise ValueError('Invalid order total')
print(json.dumps({
'order_id': order['order_id'],
'email': order['customer_email'],
'total': order['total'],
'valid': True
}))
Step 2: Check Inventory (API)
URL: https://inventory-api.example.com/check
Method: POST
Body: {{validated_order}}
Output: inventory_result
Step 3: Process Payment (API)
Connector: Stripe
URL: https://api.stripe.com/v1/charges
Method: POST
Output: payment_result
Step 4: Branch on Payment Status (Condition)
Expression: payment_result.get('status') == 'succeeded'
True Branch:
Step 1: Insert to Database
INSERT INTO orders (order_id, customer_email, status)
VALUES ({{validated_order.order_id}}, '{{validated_order.email}}', 'paid')
Step 2: Send Confirmation (Notification)
Email to: {{validated_order.email}}
Subject: Order #{{validated_order.order_id}} Confirmed
False Branch:
Step 1: Send Failure Notification
Email to: {{validated_order.email}}
Subject: Payment Failed - Order #{{validated_order.order_id}}
📊 Example 2: Daily Analytics Report
Scheduled workflow (daily at 8 AM) that generates analytics reports and emails them.
Workflow: "Daily Analytics Report"
Schedule: 0 8 * * * (daily at 8 AM)
Step 1: Fetch Yesterday's Data (Database)
Query:
SELECT
COUNT(*) as total_orders,
SUM(total) as revenue,
AVG(total) as avg_order_value,
COUNT(DISTINCT customer_id) as unique_customers
FROM orders
WHERE DATE(created_at) = CURRENT_DATE - 1
Output: yesterday_stats
Step 2: Compare to 7-Day Average (Database)
Query:
SELECT
AVG(total) as avg_value,
COUNT(*) as avg_count
FROM orders
WHERE DATE(created_at) BETWEEN
CURRENT_DATE - 7 AND CURRENT_DATE - 1
Output: week_avg
Step 3: Calculate Metrics (Transform)
growth_pct = (
(yesterday_stats['revenue'] - week_avg['avg_value'] * week_avg['avg_count'])
/ (week_avg['avg_value'] * week_avg['avg_count']) * 100
)
result = {
'date': date.today().isoformat(),
'total_orders': yesterday_stats['total_orders'],
'revenue': yesterday_stats['revenue'],
'growth_pct': round(growth_pct, 2),
'status': 'healthy' if growth_pct > 0 else 'warning'
}
Output: report_data
Step 4: Generate HTML Report (Script)
html = f"""
Daily Report - {{report_data['date']}}
Orders: {{report_data['total_orders']}}
Revenue: ${{report_data['revenue']}}
Growth: {{report_data['growth_pct']}}%
"""
Output: html_report
Step 5: Send Email (Notification)
Connector: Email (SMTP)
Recipient: {{TEAM_EMAIL}}
Subject: Daily Analytics Report - {{report_data['date']}}
Body: {{html_report}}
🔄 Example 3: Bulk User Import with Notifications
Webhook receives array of users, processes each one, sends individual confirmations.
Workflow: "Bulk Import Users"
Webhook Endpoint: POST /api/import-users
Step 1: Validate Input (Script)
users_list = webhook_payload.get('users', [])
if len(users_list) == 0:
raise ValueError('No users provided')
print(json.dumps({'users': users_list, 'count': len(users_list)}))
Output: import_batch
Step 2: Process Each User (Loop)
Items: {{import_batch.users}}
Item Variable: current_user
Sub-Step 1: Create User (API)
URL: https://api.example.com/users
Method: POST
Body: {{current_user}}
Output: created_user
Sub-Step 2: Send Welcome Email (Notification)
Email to: {{current_user.email}}
Subject: Welcome {{current_user.name}}
Message: Your account has been created
Sub-Step 3: Log Creation (Database)
INSERT INTO user_imports (email, name, status, imported_at)
VALUES ('{{current_user.email}}', '{{current_user.name}}', 'success', NOW())
Step 3: Send Summary (Notification)
Email to: {{webhook_payload.admin_email}}
Subject: Import Complete - {{import_batch.count}} users processed
Body: All users from the import batch have been successfully created
Enterprise Use Cases
Real-world scenarios where PyExecutor solves critical business problems.
🏢 Use Case 1: Real-Time Compliance Monitoring
Problem: Financial institution must track suspicious transactions in real-time and generate audit logs for regulatory compliance.
Solution with PyExecutor:
Webhook triggers on every transaction:
Step 1: Fetch Risk Score (API to Fraud Detection Service)
- Check transaction against ML model
- Get risk_score (0-100)
Step 2: Query Historical Data (Database)
- Check customer's transaction history
- Identify unusual patterns
Step 3: Evaluate Risk Rules (Condition)
if risk_score > 75 OR amount > daily_limit:
Branch A: BLOCK transaction immediately
- Notify customer
- Create regulatory event
- Alert compliance team
else:
Branch B: APPROVE transaction
- Log to audit database
- Update customer profile
Step 4: Generate Compliance Report (Transform)
- Combine all signals
- Format for regulatory submission
Step 5: Archive to Immutable Log (Database)
- INSERT into immutable audit table
- Ensures regulatory compliance
Results: Sub-second decision-making, complete audit trail, automatic regulatory reporting.
🏥 Use Case 2: Health Data Pipeline with AI Analysis
Problem: Healthcare provider ingests patient data from multiple sources and needs real-time AI-powered risk analysis.
Solution with PyExecutor:
Trigger: New patient vitals uploaded
Step 1: Ingest Data (Database)
- Normalize vitals from different sensors
- Store raw data for compliance
Step 2: Process with AI (AI Step)
- Use Gemini/GPT to analyze patient data
- Generate clinical insights
- Score risk level
Step 3: Branch on Risk Level (Condition)
if risk_score == 'critical':
- Immediately page on-call physician
- Update EHR with alert
- Send push notification to patient
elif risk_score == 'elevated':
- Schedule follow-up appointment
- Alert care team
- Send email to patient
Step 4: Generate Report (Transform)
- Combine medical history + AI insights
- Create physician-ready summary
Step 5: Log & Alert (Notification + Database)
- Store decision log for compliance
- Alert appropriate parties
Results: Faster diagnoses, reduced patient risk, automated compliance, full audit trail.
📱 Use Case 3: Multi-Channel Customer Engagement
Problem: E-commerce platform must notify customers through preferred channels (email, SMS, push) based on their activity and preferences.
Solution with PyExecutor:
Trigger: Customer abandoned cart (webhook from events service)
Step 1: Fetch Customer Profile (API)
- Get notification preferences
- Communication history
- Preferred channels
Step 2: Check Customer Segments (Database)
- VIP status
- Purchase history
- Engagement level
Step 3: Generate Personalized Message (AI)
- Use customer data to create custom offer
- Reference their previous purchases
- Adjust tone for customer segment
Step 4: Route by Channel Preferences (Condition)
if customer.prefers_email:
→ Send Email with personalized deal
if customer.prefers_sms:
→ Send SMS with offer code
if customer.prefers_push:
→ Send in-app push notification
Step 5: Track Engagement (Database)
- Log which channels were used
- Record if customer returned
- Update engagement metrics for ML models
Step 6: Feed Back to AI (API)
- Send engagement data to analytics platform
- Improve future personalization
Results: Higher conversion rates, improved customer satisfaction, data-driven personalization.
🔐 Use Case 4: Infrastructure Automation & Response
Problem: DevOps team needs to automatically respond to infrastructure alerts and failures with consistent, auditable actions.
Solution with PyExecutor:
Trigger: High memory usage alert from monitoring system
Step 1: Collect Diagnostics (API)
- Query Kubernetes cluster
- Get detailed pod metrics
- Check application logs
Step 2: Analyze Severity (Script)
- Calculate memory pressure
- Estimate time to OOM
- Determine response urgency
Step 3: Escalation Condition (Condition)
if memory_pressure > 90% AND trending_up:
Branch A: CRITICAL - Immediate action
- Trigger auto-scaling
- Page SRE team
- Start diagnostics collection
elif memory_pressure > 75%:
Branch B: WARNING - Monitor closely
- Increase monitoring frequency
- Send Slack alert to on-call
- Create incident ticket
else:
Branch C: INFO - Track for patterns
- Log to monitoring database
- Update trends
Step 4: Execute Remediation (Script + API)
- If auto-scaling enabled, trigger scale command
- Collect heap dumps for analysis
- Create diagnostic artifacts
Step 5: Notify & Document (Notification + Database)
- Alert team via Slack/PagerDuty
- Create incident record
- Store full context for post-mortem
Results: Faster incident response, reduced manual work, consistent remediation, audit trail for all actions.
Best Practices
✅ Do This
- ✓ Use descriptive output variable names (e.g., "user_data" not "x")
- ✓ Store secrets in Secrets Manager, never hardcode credentials
- ✓ Add logging to script steps for debugging
- ✓ Use conditions to handle different scenarios
- ✓ Test workflows manually before enabling schedules
- ✓ Set appropriate API key & rate limits on published endpoints
- ✓ Version your workflows before major changes
- ✓ Use loops for batch operations instead of multiple similar steps
❌ Avoid This
- ✗ Don't hardcode API keys or database passwords in steps
- ✗ Avoid workflows with >20 sequential steps (use sub-workflows if needed)
- ✗ Don't repeat similar step configurations (use loops instead)
- ✗ Avoid relying on external services without error handling
- ✗ Don't publish workflows without API security enabled
- ✗ Avoid very long script step code (extract to mounted scripts)
- ✗ Don't forget to test the "false branch" of condition steps
Performance Tips
- Database Queries: Add WHERE clauses to limit result sets
- API Calls: Use connection pooling and timeout configuration
- Large Data: Process in batches rather than loading all at once
- Caching: Store API responses in context to avoid redundant calls
- Parallel Workflows: Publish multiple endpoints instead of one mega-workflow
Troubleshooting
❓ "Output Variable Not Found" Error
Cause: Trying to use an output variable from a later step, or step hasn't been executed yet.
Solution: Verify the step that creates the variable has been executed before the step trying to use it.
Correct order:
Step 1: fetch_data (Output: "results")
Step 2: use {{results.field}} in condition
Step 3: use {{results.field}} in next step
Wrong order:
Step 1: use {{results.field}} ← Error! results doesn't exist yet
Step 2: fetch_data (Output: "results")
❓ "Context Variable Not Available" in Script Step
Cause: Trying to use context variable syntax {{var}} in script code (Python doesn't parse {{}} syntax).
Solution: Access context variables directly by name in Python, or pass via script args.
Wrong in Script Step code:
message = "User: {{user_email}}"
Correct way 1 (from args):
# Script Args: --email {{webhook_payload.email}}
import sys
email = sys.argv[sys.argv.index('--email') + 1]
Correct way 2 (via Environment):
import os
email = os.getenv('USER_EMAIL')
# Pass via step config
❓ API Step Returns Empty Response
Cause: API endpoint not accessible, authentication failed, or malformed request.
Solution: Check job logs for actual API error. Verify:
- URL is correct (no typos, proper interpolation)
- Authentication headers are included (API key, Bearer token)
- Request body is valid JSON
- External API is reachable from your network
❓ Scheduled Workflow Not Running
Cause: Workflow not enabled, cron expression wrong, or scheduler service not running.
Solution:
- Verify "is_active" is enabled for the workflow
- Test cron expression at crontab.guru
- Check scheduler logs:
docker logs pyexec-scheduler-1 - Click "Run Now" to verify workflow itself works
❓ Database Step Fails with Encoding Error
Cause: Non-UTF8 characters in context variable data being inserted into database.
Solution: Use Transform step to clean data before database insert:
Transform Step before Database:
# Clean non-UTF8 characters
cleaned = user_data.get('name', '').encode('utf-8', 'ignore').decode('utf-8')
result = {'clean_name': cleaned}
🔍 How to Debug
- Check Job Logs: Click on failed job to see detailed logs with full context
- Enable Debug Mode: Add verbose logging to script steps
- Test Step Isolation: Create a test workflow with just one step to verify it works
- Print Context: Use Transform steps to inspect variable contents before using them
- Verify Secrets: Ensure secrets are stored correctly (they don't show their values in logs)
Next Steps
Ready to build your first workflow?
← Back to Website
Return to PyExecutor homepage
🚀 Start Building
Open PyExecutor dashboard and create your first workflow
❓ Have Questions?
Join our community or reach out to support