Advanced Workflow Patterns

Expert techniques for building production-grade, resilient, and efficient PyExecutor workflows.

🛡️ Pattern 1: Comprehensive Error Handling

Handle failures gracefully with retries, fallbacks, and error notifications.

Retry with Exponential Backoff

Workflow: "Resilient API Integration"

Step 1: Fetch Data from External API
  Config:
    retries: 3
    error_config: {
      action: "continue"  # Try false branch instead of failing
    }
  
  Script Step Code:
    import requests
    import time
    
    max_retries = 3
    for attempt in range(max_retries):
        try:
            response = requests.get(
              'https://api.example.com/data',
              timeout=10
            )
            response.raise_for_status()
            return response.json()
        except Exception as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # 1s, 2s, 4s
                time.sleep(wait_time)
            else:
                raise
  
Step 2: Check if Success (Condition)
  Expression: api_response is not None
  
  True Branch:
    → Process data
    → Log success
  
  False Branch:
    → Use fallback data from cache
    → Send alert to team
    → Log incident

Error Handling Best Practices

  • Set errorConfig to "continue" for non-critical steps
  • Add retry logic with exponential backoff in script steps
  • Use conditions to branch on success/failure
  • Log errors to database for monitoring
  • Alert team on critical failures
  • Implement circuit breaker for external dependencies

🔄 Pattern 2: Advanced Loop Operations

Process collections efficiently with batching, filtering, and aggregation.

Batch Processing with Chunking

Workflow: "Bulk Data Import with Batching"

Step 1: Prepare Data (Transform)
  Input: Array of 10,000 users
  
  # Split into batches of 100
  def batch(iterable, n=100):
      for i in range(0, len(iterable), n):
          yield iterable[i:i + n]
  
  result = {
    'batches': list(batch(webhook_payload.users, 100)),
    'total_batches': len(list(batch(webhook_payload.users, 100))),
    'total_users': len(webhook_payload.users)
  }

Step 2: Process Each Batch (Loop)
  Items: {{prepared_data.batches}}
  Item Variable: current_batch
  
  Sub-Step 1: Validate Batch (Script)
    # Check for required fields, format, etc.
    
  Sub-Step 2: Insert to Database (Database)
    INSERT INTO users (email, name, created_at)
    VALUES {{batch_item}} [in loop]
    
  Sub-Step 3: Track Progress (Transform)
    result = {
      'processed': batch_index + 1,
      'total': prepared_data.total_batches,
      'percentage': ((batch_index + 1) / prepared_data.total_batches) * 100
    }

Step 3: Send Summary (Notification)
  Subject: Import Complete
  Body: {{prepared_data.total_users}} users imported in {{prepared_data.total_batches}} batches

Loop Processing Strategies

  • Batch large datasets to avoid memory issues
  • Set maxIterations limit to prevent runaway loops
  • Use item_last variable to capture final iteration output
  • Track progress in database for long-running loops
  • Add break conditions to exit early if needed
  • Collect errors during loop and handle separately

📢 Pattern 3: Smart Conditional Alerts

Route notifications based on severity, time of day, and recipient availability.

Tiered Alert System

Workflow: "Intelligent Alert Routing"

Step 1: Analyze Severity (Transform)
  severity = 'critical' if metrics.error_rate > 50 else \
             'warning' if metrics.error_rate > 20 else \
             'info'
  
  is_business_hours = 9 <= datetime.now().hour < 17
  is_weekend = datetime.now().weekday() >= 5
  
  result = {
    'severity': severity,
    'is_urgent': severity == 'critical' and is_business_hours,
    'recipients': {
      'critical': ['on_call@company.com', 'slack:#critical-alerts'],
      'warning': ['team@company.com'],
      'info': ['logs@company.com']
    }[severity]
  }

Step 2: Route Alerts (Condition)
  if severity == 'critical':
    Branch: CRITICAL
      - Send SMS to on-call engineer
      - Page via PagerDuty
      - Post to #critical-alerts Slack
      - Create incident ticket
  
  elif severity == 'warning':
    Branch: WARNING
      - Send email to team
      - Post to #warnings Slack channel
  
  else:
    Branch: INFO
      - Log to database only
      - Add to daily digest

Step 3: Log Alert (Database)
  INSERT INTO alerts (severity, message, recipients, created_at)
  VALUES ('{{alert_data.severity}}', '...', '...', NOW())

🎯 Pattern 4: Multi-Level Sub-Steps

Build complex conditional and looped workflows with nested sub-steps.

Nested Condition & Loop Example

Workflow: "Complex Order Processing"

Step 1: Validate Order (Script)
  Output: order_validation

Step 2: Main Condition: Order Valid? (Condition)
  Expression: order_validation.is_valid == True
  
  True Branch:
    
    Sub-Step 1: Check Stock (Database)
      SELECT * FROM inventory WHERE sku IN (...)
      Output: inventory_data
    
    Sub-Step 2: Stock Available? (Condition)
      Expression: inventory_data.available > 0
      
      True Branch:
        
        Sub-Step A: Process Payment (API)
        
        Sub-Step B: Payment Success? (Condition)
          If yes:
            - Deduct from inventory
            - Send confirmation email
            - Create shipment
          If no:
            - Refund customer
            - Log transaction
      
      False Branch:
        - Check backorder preference
        - If yes: Add to queue
        - If no: Cancel order
  
  False Branch:
    - Send validation error to customer
    - Do not process payment

🔀 Pattern 5: Multi-Source Data Pipeline

Aggregate and merge data from multiple sources (APIs, databases, scripts).

Data Aggregation Workflow

Workflow: "Customer 360 View"

Step 1: Fetch from CRM API
  URL: crm.example.com/customers/{{webhook_payload.customer_id}}
  Output: crm_data

Step 2: Fetch from Database
  Query: SELECT * FROM purchases WHERE customer_id = {{webhook_payload.customer_id}}
  Output: purchase_history

Step 3: Fetch from Analytics API
  URL: analytics.example.com/customer/{{webhook_payload.customer_id}}/metrics
  Output: behavior_metrics

Step 4: Merge Data (Transform)
  customer = {
    'profile': crm_data,
    'purchases': purchase_history,
    'metrics': behavior_metrics,
    'lifetime_value': sum([p['total'] for p in purchase_history]),
    'engagement_score': behavior_metrics['score'],
    'risk_level': 'high' if behavior_metrics['churn_prob'] > 0.7 else 'low'
  }
  Output: customer_360

Step 5: Route by Risk (Condition)
  if customer_360.risk_level == 'high':
    - Alert retention team
    - Offer discount coupon
    - Schedule call
  else:
    - Update marketing profile
    - Continue normal campaign

📤 Pattern 6: Scheduled Data Export & Sync

Automatically export and sync data on a schedule.

Daily Export to S3 Example

Workflow: "Daily Sales Export"
Schedule: 0 2 * * * (2 AM daily)

Step 1: Query Data (Database)
  Query:
  SELECT order_id, customer_id, total, status, created_at
  FROM orders
  WHERE DATE(created_at) = CURRENT_DATE - 1
  Output: daily_orders

Step 2: Format as CSV (Transform)
  import json
  import csv
  from io import StringIO
  
  output = StringIO()
  writer = csv.DictWriter(output, fieldnames=['order_id', 'customer_id', 'total', 'status'])
  writer.writeheader()
  for row in daily_orders:
      writer.writerow(row)
  
  csv_content = output.getvalue()
  
  result = {
    'csv': csv_content,
    'filename': f"sales_{date.today().isoformat()}.csv",
    'row_count': len(daily_orders)
  }

Step 3: Upload to S3 (API)
  URL: s3.amazonaws.com/my-bucket/exports/
  Method: PUT
  Headers:
    Authorization: AWS4-HMAC-SHA256 ...
  Body: {{export_data.csv}}

Step 4: Notify (Notification)
  Email to: data-team@company.com
  Subject: Daily Sales Export - {{export_data.row_count}} orders

⚡ Pattern 7: Smart Retry Mechanism

Implement sophisticated retry strategies for unreliable operations.

Circuit Breaker Pattern

Workflow: "Circuit Breaker for External API"

Step 1: Check Circuit State (Database)
  Query:
  SELECT * FROM circuit_breaker
  WHERE service = 'payment_api'
  ORDER BY updated_at DESC LIMIT 1
  Output: circuit_state

Step 2: Is Circuit Open? (Condition)
  Expression: circuit_state.status == 'open' AND circuit_state.since < datetime.now() - timedelta(minutes=5)
  
  If Open & Timeout Passed:
    → Reset circuit to 'half_open'
  
  If Closed:
    → Attempt API call
    → If failure: Open circuit
    → If success: Keep closed
  
  If Open & Recently Opened:
    → Use cached/fallback data
    → Skip API call

💾 Pattern 8: Smart Caching Strategy

Reduce API calls and database load with intelligent caching.

Cache-First with Fallback

Workflow: "Product Data with Cache"

Step 1: Check Cache (Database)
  Query:
  SELECT * FROM cache
  WHERE key = 'products_list'
  AND expires_at > NOW()
  Output: cached_data

Step 2: Use Cache or Fetch (Condition)
  Expression: cached_data is not None
  
  True Branch (Cache Hit):
    → Use {{cached_data.value}}
    → Log cache hit
  
  False Branch (Cache Miss):
    → Fetch from API
    → Parse response
    → Store in cache with 1 hour TTL
    → Use fresh data

Step 3: Return Data
  Output: final_products

🌐 Pattern 9: Multi-Language Pipeline

Leverage the Best Language for Each Task

Chain script steps in different languages within a single workflow. Use Python for data processing, Bash for system commands, JavaScript for API integrations, and PowerShell for Windows admin tasks.

Workflow: "Cross-Language Infrastructure Audit"

Step 1: Script (Bash)
  → Collect system metrics: disk, memory, CPU
  → Output: system_metrics

Step 2: Script (Python)
  → Analyze metrics with pandas & numpy
  → Classify health status
  → Output: health_analysis

Step 3: Script (JavaScript)
  → Format results as HTML report using Node.js
  → Output: html_report

Step 4: Script (PowerShell)
  → Push report to SharePoint / Exchange
  → Send Teams notification via Graph API
  → Output: delivery_status

Step 5: Condition
  → Expression: health_analysis.critical_count > 0
  → True: Send PagerDuty alert (Notification step)
  → False: Log success to database

🔄 Pattern 10: Subflow Composition

Reusable Workflow Components

Build small, focused workflows and compose them into larger pipelines using Subflow steps. Each sub-workflow runs in an isolated context with mapped inputs and returns results to the parent.

Workflow: "Customer Onboarding Pipeline"

Step 1: API Step — Fetch new customer from CRM
  → Output: customer_data

Step 2: Subflow — "data-enrichment"
  → Input Mapping: {"customer_id": "{{customer_data.id}}"}
  → Runs: 4 enrichment steps internally
  → Output merged back: subflow_enrich

Step 3: Subflow — "compliance-check"
  → Input Mapping: {"email": "{{customer_data.email}}"}
  → Runs: KYC checks, sanctions screening
  → Output merged back: subflow_compliance

Step 4: Condition
  → Expression: subflow_compliance.approved == true
  → True: Subflow "provision-account"
  → False: Notification — manual review needed

Step 5: Output Step
  → Map: {"account_id": "subflow_provision.id",
           "status": "subflow_compliance.status"}

🤖 Pattern 11: Multi-Provider AI Routing

Route to the Best LLM for the Job

Use different AI providers for different tasks. PyExecutor supports Gemini, OpenAI, Azure OpenAI, LM Studio, and Ollama — each configurable per connector with JSON output mode and token tracking.

Workflow: "Smart Document Processing"

Step 1: AI Step (Gemini connector)
  → Classify document type: invoice, contract, report
  → JSON output mode for structured result
  → Output: classification

Step 2: Condition
  → Route by classification.type

Step 3a: AI Step (OpenAI GPT-4 connector)
  → Extract line items from invoice
  → Structured JSON output
  → Output: invoice_data

Step 3b: AI Step (Local LLM / Ollama)
  → Summarize contract — keep data on-premises
  → 120s timeout for large documents
  → Output: contract_summary

Step 4: AI token usage automatically tracked
  per-job via _record_ai_usage