Pipelines

Data transformation and ingestion workflows

Compute ClusterHealthy
All pipelines execute on the selected cluster
8 vCPUs
32 GB RAM
Max 6 concurrent
Utilization
42%
Market Data ETLActive
pipelines/pip-001.py
1import pyarrow as pa
2import pyarrow.parquet as pq
3import numpy as np
4import lightgbm as lgb
5from kraken-sdk import DataSource, Pipeline, Output
6from datetime import datetime, timedelta
7
8pipeline = Pipeline("market-data-etl")
9
10FEATURE_COLS = [
11 "lag_1h", "lag_24h", "lag_7d",
12 "rolling_mean_24h", "rolling_std_24h",
13 "hour", "day_of_week", "is_weekend",
14]
15
16
17@pipeline.transform
18def extract_and_normalize(ctx):
19 """Fetch pricing data from all marketplace sources and normalize."""
20 amazon = ctx.source("amazon-sp-api")
21 shopify = ctx.source("shopify")
22
23 cutoff = datetime.utcnow() - timedelta(hours=1)
24
25 amazon_prices = amazon.query(
26 "SELECT asin, price, currency, marketplace, updated_at "
27 "FROM pricing WHERE updated_at > :cutoff",
28 params={"cutoff": cutoff},
29 )
30
31 shopify_prices = shopify.query(
32 "SELECT product_id, variant_id, price, compare_at_price "
33 "FROM product_variants WHERE updated_at > :cutoff",
34 params={"cutoff": cutoff},
35 )
36
37 normalized = []
38 for row in amazon_prices:
39 normalized.append({
40 "source": "amazon",
41 "product_id": row["asin"],
42 "price_usd": convert_currency(row["price"], row["currency"]),
43 "marketplace": row["marketplace"],
44 "timestamp": row["updated_at"],
45 })
46
47 for row in shopify_prices:
48 normalized.append({
49 "source": "shopify",
50 "product_id": row["product_id"],
51 "price_usd": float(row["price"]),
52 "marketplace": "shopify-direct",
53 "timestamp": cutoff.isoformat(),
54 })
55
56 return normalized
57
58
59@pipeline.transform
60def deduplicate(ctx, records):
61 """Remove duplicate entries, keeping the most recent per product."""
62 seen = {}
63 for record in records:
64 key = (record["source"], record["product_id"])
65 if key not in seen or record["timestamp"] > seen[key]["timestamp"]:
66 seen[key] = record
67
68 ctx.log(f"Deduplicated {len(records)} -> {len(seen)} records")
69 return list(seen.values())
70
71
72@pipeline.transform
73def engineer_features(ctx, records):
74 """Build lag, rolling, and seasonality features for forecasting."""
75 history = ctx.source("postgres").query(
76 "SELECT product_id, price_usd, timestamp "
77 "FROM market_prices "
78 "ORDER BY timestamp DESC LIMIT 2000"
79 )
80
81 prices = {r["product_id"]: [] for r in history}
82 for r in history:
83 prices[r["product_id"]].append(r["price_usd"])
84
85 featured = []
86 for record in records:
87 pid = record["product_id"]
88 ts = datetime.fromisoformat(record["timestamp"])
89 series = prices.get(pid, [])
90 arr = np.array(series, dtype=float) if series else np.array([record["price_usd"]])
91
92 featured.append({
93 **record,
94 "lag_1h": float(arr[0]) if len(arr) > 0 else record["price_usd"],
95 "lag_24h": float(arr[23]) if len(arr) > 23 else record["price_usd"],
96 "lag_7d": float(arr[167]) if len(arr) > 167 else record["price_usd"],
97 "rolling_mean_24h": float(np.mean(arr[:24])),
98 "rolling_std_24h": float(np.std(arr[:24])),
99 "hour": ts.hour,
100 "day_of_week": ts.weekday(),
101 "is_weekend": int(ts.weekday() >= 5),
102 })
103
104 ctx.log(f"Engineered {len(FEATURE_COLS)} features for {len(featured)} records")
105 return featured
106
107
108@pipeline.transform
109def forecast_prices(ctx, records):
110 """Generate 24h price forecasts using LightGBM."""
111 model = ctx.model("price-forecast-lgbm")
112
113 forecasts = []
114 for record in records:
115 features = np.array([[record[f] for f in FEATURE_COLS]])
116 pred = model.predict(features)[0]
117
118 forecasts.append({
119 "product_id": record["product_id"],
120 "source": record["source"],
121 "current_price": record["price_usd"],
122 "forecast_24h": round(float(pred), 2),
123 "delta_pct": round((pred - record["price_usd"]) / record["price_usd"] * 100, 2),
124 "model_version": model.version,
125 "timestamp": record["timestamp"],
126 })
127
128 ctx.log(f"Generated forecasts for {len(forecasts)} products")
129 return {"records": records, "forecasts": forecasts}
130
131
132@pipeline.output
133def write_parquet(ctx, result):
134 """Write normalized data and forecasts to separate partitions."""
135 pricing = pa.Table.from_pylist(result["records"])
136 pricing_path = ctx.storage.path("market-data", partition_by="date")
137 pq.write_table(pricing, pricing_path, compression="snappy")
138
139 forecasts = pa.Table.from_pylist(result["forecasts"])
140 forecast_path = ctx.storage.path("market-data/forecasts", partition_by="date")
141 pq.write_table(forecasts, forecast_path, compression="snappy")
142
143 ctx.log(f"Wrote {len(result['records'])} prices, {len(result['forecasts'])} forecasts")
144
145 return Output(
146 records_written=len(result["records"]),
147 forecasts_written=len(result["forecasts"]),
148 paths={"pricing": pricing_path, "forecasts": forecast_path},
149 )
150
151
152def convert_currency(amount, currency, target="USD"):
153 rates = {"USD": 1.0, "EUR": 1.08, "GBP": 1.27, "CAD": 0.74}
154 return round(float(amount) * rates.get(currency, 1.0), 2)
155
Python 3.12 · Kraken SDK v0.0.4
155 lines