Skip to content
MACHHUB MACHHUB MACHHUB
Contribute to this page

Python processes

A Python process is wrapped by MACHHUB in an execute function — you write only the body. The inputs and trigger locals are made available for convenience.

def execute(context):
inputs = context['inputs']
trigger = context['trigger']
# YOUR CODE HERE

context is a dict:

context = {
'inputs': { ... }, # resolved inputs (tag reads, SQL results, HTTP body fields)
'trigger': {
'type': 'cron', # 'cron' | 'interval' | 'tag_change' | 'http' | 'manual'
'config': { ... }, # cron_expression / interval_* / tag / endpoint
'data': { ... }, # runtime-only: tag_change or http request details
},
'timestamp': '2026-06-03T...', # ISO 8601
'domain_id': 'domains:xxx',
'process_name': 'myProcess',
}

Access an input by the name you configured it with:

sensor_value = inputs['temperature'] # from a tag input named "temperature"
latest = inputs.get('latestReading') # from an sql input named "latestReading"

If you need to read records, add a SQL input with a query; if you need to write results, add a SQL output with {{output.field}} placeholders or a tag_write output. See The Process model.

A scheduled aggregation: a cron trigger runs the process hourly, a SQL input named hourlyReadings fetches the latest rows, the code computes summary statistics, and a SQL output persists them.

from datetime import datetime
def execute(context):
# inputs is a local var (same as context['inputs'])
readings = inputs.get('hourlyReadings', [])
if not readings:
return {'count': 0, 'average': None}
values = [r['value'] for r in readings]
average = sum(values) / len(values)
print(f"Aggregated {len(readings)} readings. avg={average:.2f}")
return {
'count': len(readings),
'average': round(average, 4),
'min': min(values),
'max': max(values),
'timestamp': datetime.utcnow().isoformat()
}

Configure it as:

  • Triggercron, expression 0 * * * * (every hour).
  • Inputsql, name hourlyReadings, query SELECT * FROM myapp.readings WHERE created_dt > time::now() - 1h;
  • Outputsql, query INSERT INTO myapp.hourly_summary { count: {{output.count}}, average: {{output.average}} };

The returned count and average fill the {{output.*}} placeholders. Standard library modules are always available; add third-party packages via per-domain package management.

When a process fires on tag_change, remember to add a tag input so the value lands in inputs (the trigger does not inject it):

def execute(context):
THRESHOLD = 80
new_value = inputs['temperature'] # from a tag input named "temperature"
status = 'alert' if new_value > THRESHOLD else 'normal'
print(f"Temperature: {new_value} status: {status}")
return {'status': status, 'value': new_value}

Pair it with a tag_change trigger on the sensor tag, a tag input for the same path, and a tag_write output with field: "status" to publish the result.