Guide Build production-ready workflows in minutes

PyExecutor User Guide

Learn to build, deploy, and scale Python workflows with practical examples, best practices, and real-world enterprise use cases.

Getting Started

PyExecutor makes it easy to build, schedule, and publish multi-step Python workflows. Let's start with the basics.

1. Create Your First Workflow

  1. Go to Workflows tab in PyExecutor
  2. Click "New Workflow" and give it a name (e.g., "Service Health Check")
  3. Add your first step by clicking the "+" button
  4. Configure the step (we'll cover step types next)
  5. Save your workflow

2. Add Steps to Your Workflow

Steps execute in order, with each step's output available to the next step via Output Variable.

Step 1: Script
  └─ Output Variable: "health_data"
     
Step 2: Condition
  └─ Expression: health_data.get('status') == 'critical'
  └─ True Branch: Send Alert
  └─ False Branch: Log Success

3. Test & Deploy

  • Test Manually: Click "Run Now" to execute immediately
  • Schedule: Add cron expression (e.g., "0 */6 * * *" = every 6 hours)
  • Publish as API: Enable "Publish as REST Endpoint" to expose via webhook

Core Concepts

Context Variables

Data passed between workflow steps. Each step's output becomes available to subsequent steps.

Step 1 Output:
  user_data = {...}
  
Step 2 Access:
  {{{{ user_data.email }}}}

Output Variables

Name assigned to each step's result. Used in subsequent steps via {{{{variable_name}}}}

Script Step Config:
  Output Variable: "response"
  
Next Step:
  Use: {{{{ response.total }}}}

Webhooks

Trigger workflows via HTTP requests. All request data (payload, query, headers) is available.

POST /api/hooks/process
Body: {{"customer": "john"}}

Access in workflow:
  {{{{ webhook_payload.customer }}}}

Secrets

Encrypted credentials stored once, injected at runtime via environment variables.

Store Secret:
  Name: "DB_PASSWORD"
  Value: "secret123"
  
Access in Script:
  import os
  pwd = os.getenv('DB_PASSWORD')

Step Types

PyExecutor supports 12 step types, chained together to build complete workflows.

🐍

Script Step (Multi-Language)

Execute code in Python, JavaScript (Node.js), PowerShell, Bash, or Go inside isolated Docker containers. Each language has its own package management: PyPI, npm, or PowerShell Gallery.

Example: Process Health Check Data

import json

# Access context variables passed from previous steps
health_data = json.loads('''{{{{ raw_health_json }}}}''')

# Process the data
critical_services = [
    s for s in health_data.get('services', [])
    if s.get('status') == 'down'
]

# Return result as JSON string
result = {
    'total_services': len(health_data.get('services', [])),
    'critical_count': len(critical_services),
    'needs_alert': len(critical_services) > 0
}

print(json.dumps(result))
🌐

API Step

Call external HTTP endpoints with context variable interpolation.

Example: Fetch Customer Data

Connector: "External API"
Method: GET
URL: https://api.example.com/customers/{{{{ webhook_payload.customer_id }}}}
Headers:
  Authorization: Bearer {{{{ CUSTOMER_API_KEY }}}}
  
Body (for POST):
  {
    "email": "{{{{ webhook_payload.email }}}}",
    "action": "update_profile"
  }
🗄️

Database Step

Execute SQL queries against PostgreSQL or MySQL with interpolated context.

Example: Log Workflow Execution

Connector: "Main Database"
Query:
INSERT INTO audit_log (
  user_id, action, timestamp, details
) VALUES (
  {{{{ webhook_payload.user_id }}}},
  'workflow_executed',
  NOW(),
  '{{{{ json.dumps(audit_details) }}}}'
)
RETURNING id, created_at;
⚙️

Transform Step

Python expressions to shape and combine data from previous steps.

Example: Format Report

# Variables from previous steps are in scope
result = {
    'is_healthy': health_data.get('critical_count', 0) == 0,
    'total_errors': len(error_log),
    'success_rate': (
        (customer_count - error_count) / customer_count * 100
        if customer_count > 0 else 0
    ),
    'timestamp': datetime.now().isoformat(),
    'report': f"""Health Check Report
    Status: {'OK' if health_data.get('critical_count', 0) == 0 else 'CRITICAL'}
    Errors: {len(error_log)}
    Customers Processed: {customer_count}
    """
}
print(json.dumps(result))
📧

Notification Step

Send alerts via Email, Teams, Slack, Discord, Telegram, and more.

Example: Alert on Critical Issues

Connector: "Slack - #alerts"
Subject: Service Health Alert
Message: 
  Critical Services Down:
  {{{{ health_data.critical_services_list }}}}
  
  Timestamp: {{{{ now }}}}
  Action: Review dashboard at
  https://dashboard.example.com

Condition Step (If/Else)

Branch workflow execution based on boolean expressions.

Example: Route by Status

Expression:
  health_data.get('critical_count', 0) > 0

True Branch (Critical Issues Found):
  └─ Step 1: Send Urgent Slack Alert
  └─ Step 2: Create Incident (API call)
  └─ Step 3: Page On-Call Engineer

False Branch (Everything OK):
  └─ Step 1: Log Success to Database
  └─ Step 2: Send Digest Email

Each branch can contain multiple sub-steps, creating complex workflows.

Approval Step

Pause workflow execution and wait for human approval before continuing. Supports multiple designated approvers with email notifications and full audit trail.

Key Features

  • Multi-Approver — Designate multiple approvers per step (org members + free-text emails)
  • Email Notifications — Sends approval request emails via configured connectors
  • Comment & Audit — Approvers can leave comments; every action is logged
  • Timeout — Auto-expire after configurable hours/days
  • MCP / CLI — Approve or reject from LLM chat or the command line

Example: Deployment Gate

Step: "Request Deployment Approval"
Type: approval

Config:
  Approver(s): ops-lead@company.com, cto@company.com
  Required Approvals: 1
  Timeout: 24 hours
  Require Comment: true
  Subject: "Approval required for {{workflow_name}}"
  Message: "Production deployment ready for review."

Behaviour:
  → Sends email to all designated approvers
  → Workflow pauses and polls every 10 seconds
  → First approver to approve/reject decides outcome
  → On approval: workflow continues to next step
  → On rejection: workflow fails with reason
  → On timeout: workflow fails as expired

Approvers are snapshotted per request — changing the config later won't affect in-flight approvals.

Subflow Step

Invoke another workflow as a child step within the parent. Map inputs from current context and receive merged results.

Example: Call Data Enrichment Sub-Workflow

Step: "Enrich Customer Data"
Type: subflow

Config:
  Sub-Workflow: "data-enrichment-pipeline"
  Input Mapping:
    {
      "customer_id": "{{webhook_payload.id}}",
      "source": "crm"
    }

Behaviour:
  → Runs the child workflow with mapped inputs
  → Child steps execute with label prefix [SubFlow: ...]
  → Result merged back as subflow_{step_label} keys
  → Parent continues with enriched context

Delay Step

Pause workflow execution for a specified number of seconds (max 300). Useful for rate limiting or waiting for external processes.

Example: Wait Before Retry

Step: "Wait for External System"
Type: delay

Config:
  Seconds: 30
  Message: "Waiting for payment processor to settle..."

Output:
  delay_duration: 30
  delay_message: "Waiting for payment processor to settle..."
📤

Output Step

Map context variables to a structured workflow output. The mappings are persisted to the Job record as workflow_output and returned via the API.

Example: Return Report Summary

Step: "Return Results"
Type: output

Config:
  Mappings:
    "total_processed": "batch_result.count"
    "errors": "validation_step.error_list"
    "report_url": "upload_step.public_url"

→ Supports dot-notation for nested data
→ Result saved to Job.workflow_output
🔁

Loop Step

Iterate over an array from a previous step and execute sub-steps for each element. Each iteration has access to the current item.

Example: Process Each Order

Step: "Process Orders"
Type: loop

Config:
  Array Source: "fetch_orders.data"
  Item Variable: "current_order"
  Sub-Steps:
    └─ Step 1: Validate order (Script)
    └─ Step 2: Update inventory (Database)
    └─ Step 3: Send confirmation (Notification)

Webhooks & Context Variables

When a workflow is triggered via webhook, all request data is automatically available as context variables.

Available Variables

Variable Description
{{webhook_payload}} Full request body (parsed JSON)
{{webhook_payload.key}} Nested access with dot notation
{{webhook_query.param}} URL query string parameters
{{webhook_headers.name}} HTTP headers (lowercase)
{{webhook_method}} HTTP method (GET, POST, PUT, etc.)
{{webhook_path}} Request path after domain

Example: Process Order Webhook

Incoming Webhook:
POST /api/hooks/process-order \
  -H "Content-Type: application/json" \
  -d '{
    "order_id": 12345,
    "customer": {
      "email": "john@example.com",
      "name": "John Doe"
    },
    "total": 199.99,
    "items": 3
  }'

Use in Script Step argument:
  --customer-email {{webhook_payload.customer.email}}
  --order-id {{webhook_payload.order_id}}
  
Use in Database Step:
  INSERT INTO orders (order_id, customer_email, total, created_at)
  VALUES ({{webhook_payload.order_id}}, '{{webhook_payload.customer.email}}', {{webhook_payload.total}}, NOW());
  
Use in API Step body:
  {
    "email": "{{webhook_payload.customer.email}}",
    "name": "{{webhook_payload.customer.name}}",
    "order_number": "{{webhook_payload.order_id}}"
  }

Security

  • API Key Authentication: Require X-API-Key header
  • IP Allowlist: Restrict callers to trusted IPs
  • Rate Limiting: Prevent abuse with request quotas
  • TLS/HTTPS: All webhooks use encrypted connections

Multi-Language Script Execution

PyExecutor supports 5 programming languages. Each script runs in an isolated Docker container with language-specific runtimes pre-installed.

Supported Languages

Language Runtime Package Manager Command
Python Python 3.12 PyPI (pip) python -u -c
JavaScript Node.js 20.x npm node -e
PowerShell PowerShell 7.4 PowerShell Gallery pwsh -Command
Bash System bash bash -c
Go Go (apt) go run (temp file)

JavaScript Example

const data = JSON.parse(process.env.INPUT_DATA || '{}');
const result = {
  processed: true,
  items: data.items?.map(i => ({
    ...i, score: i.value * 1.15
  }))
};
console.log(JSON.stringify(result));

PowerShell Example

$services = Get-Service | Where-Object {
  $_.Status -eq 'Running'
} | Select-Object Name, Status
$result = @{
  count = $services.Count
  services = $services
} | ConvertTo-Json
Write-Output $result

Bash Example

#!/bin/bash
DISK_USAGE=$(df -h / | awk 'NR==2 {print $5}')
MEM_FREE=$(free -m | awk 'NR==2 {print $4}')
echo "{\"disk_usage\": \"$DISK_USAGE\", \"mem_free_mb\": $MEM_FREE}"

Go Example

package main
import (
  "encoding/json"
  "fmt"
)
func main() {
  result := map[string]interface{}{
    "status": "ok",
    "fibonacci": fib(10),
  }
  out, _ := json.Marshal(result)
  fmt.Println(string(out))
}

Docker Sandbox Security

  • Network isolation: --network none — no internet access from scripts
  • Memory limit: 256 MB per container
  • PID limit: 64 processes max
  • Read-only filesystem with tmpfs /tmp
  • Per-org, per-language package volumes mounted automatically

RBAC & Organizations

PyExecutor includes a full role-based access control (RBAC) system with multi-tenant organizations, hierarchical roles, fine-grained permissions, and immutable audit logging.

System Roles

Role Permissions
Admin Full access — manage org, roles, members, and all resources
Editor Create & edit workflows, scripts, connectors; cannot manage org settings
Executor Run workflows and view jobs; cannot create or edit
Viewer Read-only access to workflows, jobs, and logs
Report Analyst Access analytics dashboards and export reports

Custom roles are supported with parent-role inheritance. Permission categories span 12 resource types.

API Keys with Scopes

Generate API keys with specific scopes: read, execute, edit, analytics, full. Keys support expiry, rotation, and resource constraints.

Audit Logging

Every action (create, update, delete, execute, login) is recorded in an immutable audit log with IP address, user agent, old/new values, and status (success/failed/denied). Supports compliance export.

Git Version Control

Track changes to workflows and scripts with a built-in Git-like version control system. Create branches, commit changes, view diffs, open pull requests, and tag releases.

Version Control Models

Entity Description
Repository Links to a Workflow or Script; tracks current branch
Branch Types: main, staging, develop, feature, hotfix, release. Protected branch support.
Commit SHA256-hashed content snapshot with parent linking, additions/deletions count
Diff Unified diff format with structured JSON changes between two commits
Pull Request Source → target branch with approval flow (required approvals count), merge/reject
Tag Named reference to a specific commit for release versioning

Analytics Dashboard

A built-in analytics dashboard with 10 API endpoints for monitoring platform health, execution trends, and resource utilization.

Available Analytics

📊 Dashboard Stats

Total jobs, success rate, CPU/memory/disk usage, AI token counts

📈 Daily Metrics

Daily job counts and trends over time

🔀 Trigger Breakdown

Jobs by trigger type: manual, webhook, scheduled, API

🏆 Top Resources

Most-used workflows and scripts

🔌 Connector Health

Status and latency of database, HTTP, and AI connectors

⏱ Duration Trends

Job execution duration patterns over time

📡 API Traffic

Request patterns across published API endpoints

🔒 Security Events

Auth failures, denied access, suspicious patterns

🤖 MCP Analytics

MCP tool invocation counts and performance

🌐 Endpoint Metrics

Per-endpoint latency, error rates, and throughput

Detailed Examples

📦 Example 1: E-Commerce Order Processing

Workflow triggered by webhook. Validates order, checks inventory, charges customer, sends confirmation.

Workflow: "Order Processing Pipeline"

Step 1: Validate Order (Script)
  Input: Order JSON from webhook
  Output: validated_order
  
  import json
  order = webhook_payload
  
  if not order.get('customer_email'):
      raise ValueError('Missing customer_email')
  if order.get('total', 0) <= 0:
      raise ValueError('Invalid order total')
  
  print(json.dumps({
      'order_id': order['order_id'],
      'email': order['customer_email'],
      'total': order['total'],
      'valid': True
  }))

Step 2: Check Inventory (API)
  URL: https://inventory-api.example.com/check
  Method: POST
  Body: {{validated_order}}
  Output: inventory_result

Step 3: Process Payment (API)
  Connector: Stripe
  URL: https://api.stripe.com/v1/charges
  Method: POST
  Output: payment_result

Step 4: Branch on Payment Status (Condition)
  Expression: payment_result.get('status') == 'succeeded'
  
  True Branch:
    Step 1: Insert to Database
      INSERT INTO orders (order_id, customer_email, status)
      VALUES ({{validated_order.order_id}}, '{{validated_order.email}}', 'paid')
    
    Step 2: Send Confirmation (Notification)
      Email to: {{validated_order.email}}
      Subject: Order #{{validated_order.order_id}} Confirmed
      
  False Branch:
    Step 1: Send Failure Notification
      Email to: {{validated_order.email}}
      Subject: Payment Failed - Order #{{validated_order.order_id}}

📊 Example 2: Daily Analytics Report

Scheduled workflow (daily at 8 AM) that generates analytics reports and emails them.

Workflow: "Daily Analytics Report"
Schedule: 0 8 * * * (daily at 8 AM)

Step 1: Fetch Yesterday's Data (Database)
  Query:
  SELECT 
    COUNT(*) as total_orders,
    SUM(total) as revenue,
    AVG(total) as avg_order_value,
    COUNT(DISTINCT customer_id) as unique_customers
  FROM orders
  WHERE DATE(created_at) = CURRENT_DATE - 1
  Output: yesterday_stats

Step 2: Compare to 7-Day Average (Database)
  Query:
  SELECT 
    AVG(total) as avg_value,
    COUNT(*) as avg_count
  FROM orders
  WHERE DATE(created_at) BETWEEN 
    CURRENT_DATE - 7 AND CURRENT_DATE - 1
  Output: week_avg

Step 3: Calculate Metrics (Transform)
  growth_pct = (
    (yesterday_stats['revenue'] - week_avg['avg_value'] * week_avg['avg_count']) 
    / (week_avg['avg_value'] * week_avg['avg_count']) * 100
  )
  
  result = {
    'date': date.today().isoformat(),
    'total_orders': yesterday_stats['total_orders'],
    'revenue': yesterday_stats['revenue'],
    'growth_pct': round(growth_pct, 2),
    'status': 'healthy' if growth_pct > 0 else 'warning'
  }
  Output: report_data

Step 4: Generate HTML Report (Script)
  html = f"""
  

Daily Report - {{report_data['date']}}

Orders: {{report_data['total_orders']}}

Revenue: ${{report_data['revenue']}}

Growth: {{report_data['growth_pct']}}%

""" Output: html_report Step 5: Send Email (Notification) Connector: Email (SMTP) Recipient: {{TEAM_EMAIL}} Subject: Daily Analytics Report - {{report_data['date']}} Body: {{html_report}}

🔄 Example 3: Bulk User Import with Notifications

Webhook receives array of users, processes each one, sends individual confirmations.

Workflow: "Bulk Import Users"
Webhook Endpoint: POST /api/import-users

Step 1: Validate Input (Script)
  users_list = webhook_payload.get('users', [])
  if len(users_list) == 0:
      raise ValueError('No users provided')
  print(json.dumps({'users': users_list, 'count': len(users_list)}))
  Output: import_batch

Step 2: Process Each User (Loop)
  Items: {{import_batch.users}}
  Item Variable: current_user
  
  Sub-Step 1: Create User (API)
    URL: https://api.example.com/users
    Method: POST
    Body: {{current_user}}
    Output: created_user
  
  Sub-Step 2: Send Welcome Email (Notification)
    Email to: {{current_user.email}}
    Subject: Welcome {{current_user.name}}
    Message: Your account has been created
  
  Sub-Step 3: Log Creation (Database)
    INSERT INTO user_imports (email, name, status, imported_at)
    VALUES ('{{current_user.email}}', '{{current_user.name}}', 'success', NOW())

Step 3: Send Summary (Notification)
  Email to: {{webhook_payload.admin_email}}
  Subject: Import Complete - {{import_batch.count}} users processed
  Body: All users from the import batch have been successfully created

Enterprise Use Cases

Real-world scenarios where PyExecutor solves critical business problems.

🏢 Use Case 1: Real-Time Compliance Monitoring

Problem: Financial institution must track suspicious transactions in real-time and generate audit logs for regulatory compliance.

Solution with PyExecutor:

Webhook triggers on every transaction:

Step 1: Fetch Risk Score (API to Fraud Detection Service)
  - Check transaction against ML model
  - Get risk_score (0-100)

Step 2: Query Historical Data (Database)
  - Check customer's transaction history
  - Identify unusual patterns

Step 3: Evaluate Risk Rules (Condition)
  if risk_score > 75 OR amount > daily_limit:
    Branch A: BLOCK transaction immediately
    - Notify customer
    - Create regulatory event
    - Alert compliance team
  else:
    Branch B: APPROVE transaction
    - Log to audit database
    - Update customer profile

Step 4: Generate Compliance Report (Transform)
  - Combine all signals
  - Format for regulatory submission

Step 5: Archive to Immutable Log (Database)
  - INSERT into immutable audit table
  - Ensures regulatory compliance

Results: Sub-second decision-making, complete audit trail, automatic regulatory reporting.

🏥 Use Case 2: Health Data Pipeline with AI Analysis

Problem: Healthcare provider ingests patient data from multiple sources and needs real-time AI-powered risk analysis.

Solution with PyExecutor:

Trigger: New patient vitals uploaded

Step 1: Ingest Data (Database)
  - Normalize vitals from different sensors
  - Store raw data for compliance

Step 2: Process with AI (AI Step)
  - Use Gemini/GPT to analyze patient data
  - Generate clinical insights
  - Score risk level

Step 3: Branch on Risk Level (Condition)
  if risk_score == 'critical':
    - Immediately page on-call physician
    - Update EHR with alert
    - Send push notification to patient
  elif risk_score == 'elevated':
    - Schedule follow-up appointment
    - Alert care team
    - Send email to patient

Step 4: Generate Report (Transform)
  - Combine medical history + AI insights
  - Create physician-ready summary

Step 5: Log & Alert (Notification + Database)
  - Store decision log for compliance
  - Alert appropriate parties

Results: Faster diagnoses, reduced patient risk, automated compliance, full audit trail.

📱 Use Case 3: Multi-Channel Customer Engagement

Problem: E-commerce platform must notify customers through preferred channels (email, SMS, push) based on their activity and preferences.

Solution with PyExecutor:

Trigger: Customer abandoned cart (webhook from events service)

Step 1: Fetch Customer Profile (API)
  - Get notification preferences
  - Communication history
  - Preferred channels

Step 2: Check Customer Segments (Database)
  - VIP status
  - Purchase history
  - Engagement level

Step 3: Generate Personalized Message (AI)
  - Use customer data to create custom offer
  - Reference their previous purchases
  - Adjust tone for customer segment

Step 4: Route by Channel Preferences (Condition)
  if customer.prefers_email:
    → Send Email with personalized deal
  if customer.prefers_sms:
    → Send SMS with offer code
  if customer.prefers_push:
    → Send in-app push notification

Step 5: Track Engagement (Database)
  - Log which channels were used
  - Record if customer returned
  - Update engagement metrics for ML models

Step 6: Feed Back to AI (API)
  - Send engagement data to analytics platform
  - Improve future personalization

Results: Higher conversion rates, improved customer satisfaction, data-driven personalization.

🔐 Use Case 4: Infrastructure Automation & Response

Problem: DevOps team needs to automatically respond to infrastructure alerts and failures with consistent, auditable actions.

Solution with PyExecutor:

Trigger: High memory usage alert from monitoring system

Step 1: Collect Diagnostics (API)
  - Query Kubernetes cluster
  - Get detailed pod metrics
  - Check application logs

Step 2: Analyze Severity (Script)
  - Calculate memory pressure
  - Estimate time to OOM
  - Determine response urgency

Step 3: Escalation Condition (Condition)
  if memory_pressure > 90% AND trending_up:
    Branch A: CRITICAL - Immediate action
    - Trigger auto-scaling
    - Page SRE team
    - Start diagnostics collection
    
  elif memory_pressure > 75%:
    Branch B: WARNING - Monitor closely
    - Increase monitoring frequency
    - Send Slack alert to on-call
    - Create incident ticket
    
  else:
    Branch C: INFO - Track for patterns
    - Log to monitoring database
    - Update trends

Step 4: Execute Remediation (Script + API)
  - If auto-scaling enabled, trigger scale command
  - Collect heap dumps for analysis
  - Create diagnostic artifacts

Step 5: Notify & Document (Notification + Database)
  - Alert team via Slack/PagerDuty
  - Create incident record
  - Store full context for post-mortem

Results: Faster incident response, reduced manual work, consistent remediation, audit trail for all actions.

Best Practices

✅ Do This

  • ✓ Use descriptive output variable names (e.g., "user_data" not "x")
  • ✓ Store secrets in Secrets Manager, never hardcode credentials
  • ✓ Add logging to script steps for debugging
  • ✓ Use conditions to handle different scenarios
  • ✓ Test workflows manually before enabling schedules
  • ✓ Set appropriate API key & rate limits on published endpoints
  • ✓ Version your workflows before major changes
  • ✓ Use loops for batch operations instead of multiple similar steps

❌ Avoid This

  • ✗ Don't hardcode API keys or database passwords in steps
  • ✗ Avoid workflows with >20 sequential steps (use sub-workflows if needed)
  • ✗ Don't repeat similar step configurations (use loops instead)
  • ✗ Avoid relying on external services without error handling
  • ✗ Don't publish workflows without API security enabled
  • ✗ Avoid very long script step code (extract to mounted scripts)
  • ✗ Don't forget to test the "false branch" of condition steps

Performance Tips

  • Database Queries: Add WHERE clauses to limit result sets
  • API Calls: Use connection pooling and timeout configuration
  • Large Data: Process in batches rather than loading all at once
  • Caching: Store API responses in context to avoid redundant calls
  • Parallel Workflows: Publish multiple endpoints instead of one mega-workflow

Troubleshooting

❓ "Output Variable Not Found" Error

Cause: Trying to use an output variable from a later step, or step hasn't been executed yet.

Solution: Verify the step that creates the variable has been executed before the step trying to use it.

Correct order:
Step 1: fetch_data (Output: "results")
Step 2: use {{results.field}} in condition
Step 3: use {{results.field}} in next step

Wrong order:
Step 1: use {{results.field}} ← Error! results doesn't exist yet
Step 2: fetch_data (Output: "results")

❓ "Context Variable Not Available" in Script Step

Cause: Trying to use context variable syntax {{var}} in script code (Python doesn't parse {{}} syntax).

Solution: Access context variables directly by name in Python, or pass via script args.

Wrong in Script Step code:
message = "User: {{user_email}}"

Correct way 1 (from args):
# Script Args: --email {{webhook_payload.email}}
import sys
email = sys.argv[sys.argv.index('--email') + 1]

Correct way 2 (via Environment):
import os
email = os.getenv('USER_EMAIL')
# Pass via step config

❓ API Step Returns Empty Response

Cause: API endpoint not accessible, authentication failed, or malformed request.

Solution: Check job logs for actual API error. Verify:

  • URL is correct (no typos, proper interpolation)
  • Authentication headers are included (API key, Bearer token)
  • Request body is valid JSON
  • External API is reachable from your network

❓ Scheduled Workflow Not Running

Cause: Workflow not enabled, cron expression wrong, or scheduler service not running.

Solution:

  • Verify "is_active" is enabled for the workflow
  • Test cron expression at crontab.guru
  • Check scheduler logs: docker logs pyexec-scheduler-1
  • Click "Run Now" to verify workflow itself works

❓ Database Step Fails with Encoding Error

Cause: Non-UTF8 characters in context variable data being inserted into database.

Solution: Use Transform step to clean data before database insert:

Transform Step before Database:
# Clean non-UTF8 characters
cleaned = user_data.get('name', '').encode('utf-8', 'ignore').decode('utf-8')
result = {'clean_name': cleaned}

🔍 How to Debug

  1. Check Job Logs: Click on failed job to see detailed logs with full context
  2. Enable Debug Mode: Add verbose logging to script steps
  3. Test Step Isolation: Create a test workflow with just one step to verify it works
  4. Print Context: Use Transform steps to inspect variable contents before using them
  5. Verify Secrets: Ensure secrets are stored correctly (they don't show their values in logs)

Next Steps

Ready to build your first workflow?

← Back to Website

Return to PyExecutor homepage

🚀 Start Building

Open PyExecutor dashboard and create your first workflow

❓ Have Questions?

Join our community or reach out to support