Runtime / Agent Integration

Use Lexega to check SQL queries before execution. Pass the SQL string, get a decision.

This guide covers runtime integration for agents and applications. For CI/CD integration (PR reviews, semantic diff), see Integration Options.

Performance: Single-statement analysis typically completes in < 20ms. Multiple semicolon-delimited statements are analyzed together in one call (~75ms for 10 simple statements).

Setup

1. Install the binary

# Download (Linux x86_64 example)
curl -L -o lexega-sql https://github.com/Lexega/releases/releases/latest/download/lexega-sql-linux-x86_64
chmod +x lexega-sql

# Move to PATH (or keep in project directory)
sudo mv lexega-sql /usr/local/bin/

2. Set your license key

export LEXEGA_LICENSE_KEY="your-license-key-here"

Or in your application's environment configuration.

3. Create a policy file

Save as policy.yaml:

schema_version: 1
policy_id: runtime-agent-policy
policy_version: "1.0.0"
policies: []

severity_actions:
  - critical: block
    high: warn

default_action: allow

This blocks any critical signal (like DELETE without WHERE) and warns on high. For more control, add explicit policies:

schema_version: 1
policy_id: runtime-agent-policy
policy_version: "1.0.0"

policies:
  - rule_id: C020              # DELETE without WHERE
    action: block
    
  - rule_id: GRT-ALL-PRIV              # GRANT ALL PRIVILEGES
    action: block

severity_actions:
  - critical: block
    high: warn

default_action: allow

Python Example

import subprocess
import json
import os

def check_sql(
    sql: str,
    policy_path: str = "policy.yaml",
    env: str = "prod",
  dialect: str = "snowflake",  # e.g. "snowflake", "postgresql", "bigquery", "databricks"
) -> dict:
    """
    Check SQL against policy. Returns decision dict.
    Raises if lexega-sql not found or license invalid.
    """
    result = subprocess.run(
        [
            "lexega-sql", "analyze", 
            "--stdin",
            "-q",
            "--dialect", dialect,
            "--policy", policy_path,
            "--env", env,
            "--mode", "runtime",
            "--decision-out", "-"
        ],
        input=sql,
        capture_output=True,
        text=True,
        env={**os.environ}  # Inherit LEXEGA_LICENSE_KEY
    )
    
    if result.returncode not in (0, 2):
        # Something went wrong (not found, license error, etc.)
        raise RuntimeError(f"lexega-sql failed: {result.stderr}")
    
    decision = json.loads(result.stdout)
    return {
        "allowed": result.returncode == 0,
        "outcome": decision.get("outcome"),
        "matched_rules": decision.get("analysis", {}).get("matched_rules", []),
    }


# Usage
sql = "DELETE FROM users"  # No WHERE clause
result = check_sql(sql)

if not result["allowed"]:
    print(f"Blocked: {result['matched_rules']}")
    # Don't execute
else:
    # Safe to execute
    execute(sql)

Node.js Example

const { spawn } = require('child_process');

function checkSql(sql, policyPath = 'policy.yaml', env = 'prod', dialect = 'snowflake') {
  return new Promise((resolve, reject) => {
    const proc = spawn('lexega-sql', [
      'analyze', '--stdin', '-q',
      '--dialect', dialect,
      '--policy', policyPath,
      '--env', env,
      '--mode', 'runtime',
      '--decision-out', '-'
    ], {
      env: process.env  // Inherit LEXEGA_LICENSE_KEY
    });

    let stdout = '';
    let stderr = '';

    proc.stdout.on('data', (data) => { stdout += data; });
    proc.stderr.on('data', (data) => { stderr += data; });

    proc.stdin.write(sql);
    proc.stdin.end();

    proc.on('close', (code) => {
      if (code !== 0 && code !== 2) {
        reject(new Error(`lexega-sql failed: ${stderr}`));
        return;
      }
      
      const decision = JSON.parse(stdout);
      resolve({
        allowed: code === 0,
        outcome: decision.outcome,
        matchedRules: decision.analysis?.matched_rules || [],
      });
    });
  });
}

// Usage
async function handleQuery(sql) {
  const result = await checkSql(sql);
  
  if (!result.allowed) {
    console.log('Blocked:', result.matchedRules);
    return;
  }
  
  await execute(sql);
}

Reference

Batching

Pass multiple semicolon-delimited statements in a single call—all are analyzed together:

sql = """
SELECT * FROM users WHERE id = 1;
INSERT INTO audit_log (action) VALUES ('query');
UPDATE users SET last_seen = NOW() WHERE id = 1;
"""
result = check_sql(sql)  # One call, all statements analyzed

No separate batch API needed. If any statement triggers a block, the entire batch is blocked.

Flags

FlagPurpose
--stdinRead SQL from stdin
-q / --quietSuppress all stderr messages (clean stdout-only output)
--mode runtimeOnly output decision JSON (no report)
--decision-out -Write decision to stdout
--policy <path>Policy file
--env <name>Environment (e.g., prod, dev)

Exit Codes

CodeMeaning
0Allowed
2Blocked by policy

Decision JSON

The decision JSON contains the full analysis. Key fields for agent integration:

{
  "decision_schema_version": 1,
  "decision_id": "e458603a656094e856aac0003fb04bbf...",
  "timestamp": "2026-02-04T19:34:40.353Z",
  "inputs": {
    "sql_sha256": "c3f483a8bb391787b4515f9afd8f3ede...",
    "policy_id": "baseline",
    "env_context": { "env": "prod" }
  },
  "analysis": {
    "risk_summary": {
      "critical_count": 1,
      "tables_written": 1
    },
    "matched_rules": [
      {
        "rule_id": "_severity_critical",
        "action": "block",
        "rule_description": "unbounded write operation detected..."
      }
    ]
  },
  "outcome": {
    "allowed": false,
    "blocked_reason": "Blocked by severity_actions.critical (signal severity = critical)"
  }
}

Most agents only need outcome.allowed - the exit code (0=allowed, 2=blocked) is the simplest check.

Handling Parse Failures

If the SQL contains syntax the parser doesn't recognize (e.g., dialect-specific features), check analysis.risk_summary.statements_skipped:

decision = json.loads(result.stdout)
skipped = decision.get("analysis", {}).get("risk_summary", {}).get("statements_skipped", 0)

if skipped > 0:
    # Parser couldn't fully analyze this SQL
    # Conservative option: block unknown syntax
    return {"allowed": False, "reason": "Unrecognized SQL syntax"}

This prevents fail-open behavior where unknown SQL slips through unanalyzed.

Read-Only Agent Example

Block all write operations (INSERT/UPDATE/DELETE/MERGE/TRUNCATE) in production - only allow SELECT:

1. Create agent_rules.yaml:

rules:
  - id: AGENT-READONLY
    name: Block all write operations
    risk_level: Critical
    message: "Write operations not allowed in production"
    triggers:
      - statement_type: DmlWriteStatement   # INSERT, UPDATE, DELETE, MERGE
      - statement_type: TruncateStatement

2. Create policy.yaml:

schema_version: 1
policy_id: agent-readonly
policy_version: "1.0.0"
policies: []
severity_actions:
  - critical: block
    scope:
      envs: [prod]
default_action: allow

3. Run with both files:

lexega-sql analyze --stdin -q \
  --dialect snowflake \
  --custom-rules agent_rules.yaml \
  --policy policy.yaml \
  --env prod \
  --mode runtime \
  --decision-out -

For PostgreSQL/BigQuery/Databricks agents, change --dialect snowflake to --dialect postgresql, --dialect bigquery, or --dialect databricks.

Result:

  • SELECT * FROM usersallowed
  • INSERT INTO users ...blocked
  • UPDATE users SET ...blocked
  • DELETE FROM users ...blocked
  • MERGE INTO users ...blocked
  • TRUNCATE TABLE usersblocked

More Policy Examples

# Strict policy: block critical, warn high
severity_actions:
  - critical: block
    high: warn

# Or be explicit about specific rules:
policies:
  - rule_id: C020              # DELETE without WHERE
    action: block
    
  - rule_id: C021              # UPDATE without WHERE  
    action: block
    
  - rule_id: GRT-ALL-PRIV              # GRANT ALL PRIVILEGES
    action: block
    
  - rule_id: MASK-DROP          # DROP MASKING POLICY
    action: block
    
  - rule_id: C074              # DROP ROW ACCESS POLICY
    action: block

See Policy Reference for full syntax.

Need Help?

Can't find what you're looking for? Check out our GitHub or reach out to support.