Expert techniques for building production-grade, resilient, and efficient PyExecutor workflows.
Handle failures gracefully with retries, fallbacks, and error notifications.
Workflow: "Resilient API Integration"
Step 1: Fetch Data from External API
Config:
retries: 3
error_config: {
action: "continue" # Try false branch instead of failing
}
Script Step Code:
import requests
import time
max_retries = 3
for attempt in range(max_retries):
try:
response = requests.get(
'https://api.example.com/data',
timeout=10
)
response.raise_for_status()
return response.json()
except Exception as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 1s, 2s, 4s
time.sleep(wait_time)
else:
raise
Step 2: Check if Success (Condition)
Expression: api_response is not None
True Branch:
→ Process data
→ Log success
False Branch:
→ Use fallback data from cache
→ Send alert to team
→ Log incident
Process collections efficiently with batching, filtering, and aggregation.
Workflow: "Bulk Data Import with Batching"
Step 1: Prepare Data (Transform)
Input: Array of 10,000 users
# Split into batches of 100
def batch(iterable, n=100):
for i in range(0, len(iterable), n):
yield iterable[i:i + n]
result = {
'batches': list(batch(webhook_payload.users, 100)),
'total_batches': len(list(batch(webhook_payload.users, 100))),
'total_users': len(webhook_payload.users)
}
Step 2: Process Each Batch (Loop)
Items: {{prepared_data.batches}}
Item Variable: current_batch
Sub-Step 1: Validate Batch (Script)
# Check for required fields, format, etc.
Sub-Step 2: Insert to Database (Database)
INSERT INTO users (email, name, created_at)
VALUES {{batch_item}} [in loop]
Sub-Step 3: Track Progress (Transform)
result = {
'processed': batch_index + 1,
'total': prepared_data.total_batches,
'percentage': ((batch_index + 1) / prepared_data.total_batches) * 100
}
Step 3: Send Summary (Notification)
Subject: Import Complete
Body: {{prepared_data.total_users}} users imported in {{prepared_data.total_batches}} batches
Route notifications based on severity, time of day, and recipient availability.
Workflow: "Intelligent Alert Routing"
Step 1: Analyze Severity (Transform)
severity = 'critical' if metrics.error_rate > 50 else \
'warning' if metrics.error_rate > 20 else \
'info'
is_business_hours = 9 <= datetime.now().hour < 17
is_weekend = datetime.now().weekday() >= 5
result = {
'severity': severity,
'is_urgent': severity == 'critical' and is_business_hours,
'recipients': {
'critical': ['on_call@company.com', 'slack:#critical-alerts'],
'warning': ['team@company.com'],
'info': ['logs@company.com']
}[severity]
}
Step 2: Route Alerts (Condition)
if severity == 'critical':
Branch: CRITICAL
- Send SMS to on-call engineer
- Page via PagerDuty
- Post to #critical-alerts Slack
- Create incident ticket
elif severity == 'warning':
Branch: WARNING
- Send email to team
- Post to #warnings Slack channel
else:
Branch: INFO
- Log to database only
- Add to daily digest
Step 3: Log Alert (Database)
INSERT INTO alerts (severity, message, recipients, created_at)
VALUES ('{{alert_data.severity}}', '...', '...', NOW())
Build complex conditional and looped workflows with nested sub-steps.
Workflow: "Complex Order Processing"
Step 1: Validate Order (Script)
Output: order_validation
Step 2: Main Condition: Order Valid? (Condition)
Expression: order_validation.is_valid == True
True Branch:
Sub-Step 1: Check Stock (Database)
SELECT * FROM inventory WHERE sku IN (...)
Output: inventory_data
Sub-Step 2: Stock Available? (Condition)
Expression: inventory_data.available > 0
True Branch:
Sub-Step A: Process Payment (API)
Sub-Step B: Payment Success? (Condition)
If yes:
- Deduct from inventory
- Send confirmation email
- Create shipment
If no:
- Refund customer
- Log transaction
False Branch:
- Check backorder preference
- If yes: Add to queue
- If no: Cancel order
False Branch:
- Send validation error to customer
- Do not process payment
Aggregate and merge data from multiple sources (APIs, databases, scripts).
Workflow: "Customer 360 View"
Step 1: Fetch from CRM API
URL: crm.example.com/customers/{{webhook_payload.customer_id}}
Output: crm_data
Step 2: Fetch from Database
Query: SELECT * FROM purchases WHERE customer_id = {{webhook_payload.customer_id}}
Output: purchase_history
Step 3: Fetch from Analytics API
URL: analytics.example.com/customer/{{webhook_payload.customer_id}}/metrics
Output: behavior_metrics
Step 4: Merge Data (Transform)
customer = {
'profile': crm_data,
'purchases': purchase_history,
'metrics': behavior_metrics,
'lifetime_value': sum([p['total'] for p in purchase_history]),
'engagement_score': behavior_metrics['score'],
'risk_level': 'high' if behavior_metrics['churn_prob'] > 0.7 else 'low'
}
Output: customer_360
Step 5: Route by Risk (Condition)
if customer_360.risk_level == 'high':
- Alert retention team
- Offer discount coupon
- Schedule call
else:
- Update marketing profile
- Continue normal campaign
Automatically export and sync data on a schedule.
Workflow: "Daily Sales Export"
Schedule: 0 2 * * * (2 AM daily)
Step 1: Query Data (Database)
Query:
SELECT order_id, customer_id, total, status, created_at
FROM orders
WHERE DATE(created_at) = CURRENT_DATE - 1
Output: daily_orders
Step 2: Format as CSV (Transform)
import json
import csv
from io import StringIO
output = StringIO()
writer = csv.DictWriter(output, fieldnames=['order_id', 'customer_id', 'total', 'status'])
writer.writeheader()
for row in daily_orders:
writer.writerow(row)
csv_content = output.getvalue()
result = {
'csv': csv_content,
'filename': f"sales_{date.today().isoformat()}.csv",
'row_count': len(daily_orders)
}
Step 3: Upload to S3 (API)
URL: s3.amazonaws.com/my-bucket/exports/
Method: PUT
Headers:
Authorization: AWS4-HMAC-SHA256 ...
Body: {{export_data.csv}}
Step 4: Notify (Notification)
Email to: data-team@company.com
Subject: Daily Sales Export - {{export_data.row_count}} orders
Implement sophisticated retry strategies for unreliable operations.
Workflow: "Circuit Breaker for External API"
Step 1: Check Circuit State (Database)
Query:
SELECT * FROM circuit_breaker
WHERE service = 'payment_api'
ORDER BY updated_at DESC LIMIT 1
Output: circuit_state
Step 2: Is Circuit Open? (Condition)
Expression: circuit_state.status == 'open' AND circuit_state.since < datetime.now() - timedelta(minutes=5)
If Open & Timeout Passed:
→ Reset circuit to 'half_open'
If Closed:
→ Attempt API call
→ If failure: Open circuit
→ If success: Keep closed
If Open & Recently Opened:
→ Use cached/fallback data
→ Skip API call
Reduce API calls and database load with intelligent caching.
Workflow: "Product Data with Cache"
Step 1: Check Cache (Database)
Query:
SELECT * FROM cache
WHERE key = 'products_list'
AND expires_at > NOW()
Output: cached_data
Step 2: Use Cache or Fetch (Condition)
Expression: cached_data is not None
True Branch (Cache Hit):
→ Use {{cached_data.value}}
→ Log cache hit
False Branch (Cache Miss):
→ Fetch from API
→ Parse response
→ Store in cache with 1 hour TTL
→ Use fresh data
Step 3: Return Data
Output: final_products
Chain script steps in different languages within a single workflow. Use Python for data processing, Bash for system commands, JavaScript for API integrations, and PowerShell for Windows admin tasks.
Workflow: "Cross-Language Infrastructure Audit"
Step 1: Script (Bash)
→ Collect system metrics: disk, memory, CPU
→ Output: system_metrics
Step 2: Script (Python)
→ Analyze metrics with pandas & numpy
→ Classify health status
→ Output: health_analysis
Step 3: Script (JavaScript)
→ Format results as HTML report using Node.js
→ Output: html_report
Step 4: Script (PowerShell)
→ Push report to SharePoint / Exchange
→ Send Teams notification via Graph API
→ Output: delivery_status
Step 5: Condition
→ Expression: health_analysis.critical_count > 0
→ True: Send PagerDuty alert (Notification step)
→ False: Log success to database
Build small, focused workflows and compose them into larger pipelines using Subflow steps. Each sub-workflow runs in an isolated context with mapped inputs and returns results to the parent.
Workflow: "Customer Onboarding Pipeline"
Step 1: API Step — Fetch new customer from CRM
→ Output: customer_data
Step 2: Subflow — "data-enrichment"
→ Input Mapping: {"customer_id": "{{customer_data.id}}"}
→ Runs: 4 enrichment steps internally
→ Output merged back: subflow_enrich
Step 3: Subflow — "compliance-check"
→ Input Mapping: {"email": "{{customer_data.email}}"}
→ Runs: KYC checks, sanctions screening
→ Output merged back: subflow_compliance
Step 4: Condition
→ Expression: subflow_compliance.approved == true
→ True: Subflow "provision-account"
→ False: Notification — manual review needed
Step 5: Output Step
→ Map: {"account_id": "subflow_provision.id",
"status": "subflow_compliance.status"}
Use different AI providers for different tasks. PyExecutor supports Gemini, OpenAI, Azure OpenAI, LM Studio, and Ollama — each configurable per connector with JSON output mode and token tracking.
Workflow: "Smart Document Processing"
Step 1: AI Step (Gemini connector)
→ Classify document type: invoice, contract, report
→ JSON output mode for structured result
→ Output: classification
Step 2: Condition
→ Route by classification.type
Step 3a: AI Step (OpenAI GPT-4 connector)
→ Extract line items from invoice
→ Structured JSON output
→ Output: invoice_data
Step 3b: AI Step (Local LLM / Ollama)
→ Summarize contract — keep data on-premises
→ 120s timeout for large documents
→ Output: contract_summary
Step 4: AI token usage automatically tracked
per-job via _record_ai_usage