// src/server/workflows/syncEnrichedOrders.ts
import type { JobDefinition } from '@synthetiq/workflows';
export const syncEnrichedOrders: JobDefinition = {
name: "Sync Enriched Orders",
description: "Fetch Shopify orders, enrich with CRM customer data, and persist",
variables: {
orgId: { type: "param" },
startDate: { type: "dateOffset", days: -30 },
},
steps: [
{
service: "shopify",
operation: "listOrders",
params: { status: "any", created_at_min: "{{var.startDate}}" },
outputTable: "raw_orders",
outputSchema: {
id: "VARCHAR",
customer_id: "VARCHAR",
total_price: "VARCHAR",
created_at: "VARCHAR",
},
paginate: {
type: "cursor",
cursorField: "nextPageCursor",
cursorParam: "page_info",
},
},
{
service: "crm",
operation: "listCustomers",
params: { orgId: "{{var.orgId}}" },
outputTable: "customers",
outputSchema: {
id: "VARCHAR",
name: "VARCHAR",
email: "VARCHAR",
segment: "VARCHAR",
},
},
{
sql: `
SELECT
o.id AS order_id,
o.total_price,
o.created_at,
c.name AS customer_name,
c.email AS customer_email,
c.segment AS customer_segment
FROM raw_orders o
JOIN customers c ON o.customer_id = c.id
`,
outputTable: "enriched_orders",
outputSchema: {
order_id: "VARCHAR",
total_price: "VARCHAR",
created_at: "VARCHAR",
customer_name: "VARCHAR",
customer_email: "VARCHAR",
customer_segment: "VARCHAR",
},
},
{
persistToDatabase: {
appId: "{{var.appId}}",
tableName: "EnrichedOrder",
sourceTable: "enriched_orders",
mode: "upsert",
upsertKey: ["order_id"],
},
},
],
};