Pipelines
Data transformation and ingestion workflows
Compute ClusterHealthy
All pipelines execute on the selected cluster8 vCPUs
32 GB RAM
Max 6 concurrent
Utilization
42%
Market Data ETLActive
pipelines/pip-001.py
| 1 | import pyarrow as pa |
| 2 | import pyarrow.parquet as pq |
| 3 | import numpy as np |
| 4 | import lightgbm as lgb |
| 5 | from kraken-sdk import DataSource, Pipeline, Output |
| 6 | from datetime import datetime, timedelta |
| 7 | |
| 8 | pipeline = Pipeline("market-data-etl") |
| 9 | |
| 10 | FEATURE_COLS = [ |
| 11 | "lag_1h", "lag_24h", "lag_7d", |
| 12 | "rolling_mean_24h", "rolling_std_24h", |
| 13 | "hour", "day_of_week", "is_weekend", |
| 14 | ] |
| 15 | |
| 16 | |
| 17 | @pipeline.transform |
| 18 | def extract_and_normalize(ctx): |
| 19 | """Fetch pricing data from all marketplace sources and normalize.""" |
| 20 | amazon = ctx.source("amazon-sp-api") |
| 21 | shopify = ctx.source("shopify") |
| 22 | |
| 23 | cutoff = datetime.utcnow() - timedelta(hours=1) |
| 24 | |
| 25 | amazon_prices = amazon.query( |
| 26 | "SELECT asin, price, currency, marketplace, updated_at " |
| 27 | "FROM pricing WHERE updated_at > :cutoff", |
| 28 | params={"cutoff": cutoff}, |
| 29 | ) |
| 30 | |
| 31 | shopify_prices = shopify.query( |
| 32 | "SELECT product_id, variant_id, price, compare_at_price " |
| 33 | "FROM product_variants WHERE updated_at > :cutoff", |
| 34 | params={"cutoff": cutoff}, |
| 35 | ) |
| 36 | |
| 37 | normalized = [] |
| 38 | for row in amazon_prices: |
| 39 | normalized.append({ |
| 40 | "source": "amazon", |
| 41 | "product_id": row["asin"], |
| 42 | "price_usd": convert_currency(row["price"], row["currency"]), |
| 43 | "marketplace": row["marketplace"], |
| 44 | "timestamp": row["updated_at"], |
| 45 | }) |
| 46 | |
| 47 | for row in shopify_prices: |
| 48 | normalized.append({ |
| 49 | "source": "shopify", |
| 50 | "product_id": row["product_id"], |
| 51 | "price_usd": float(row["price"]), |
| 52 | "marketplace": "shopify-direct", |
| 53 | "timestamp": cutoff.isoformat(), |
| 54 | }) |
| 55 | |
| 56 | return normalized |
| 57 | |
| 58 | |
| 59 | @pipeline.transform |
| 60 | def deduplicate(ctx, records): |
| 61 | """Remove duplicate entries, keeping the most recent per product.""" |
| 62 | seen = {} |
| 63 | for record in records: |
| 64 | key = (record["source"], record["product_id"]) |
| 65 | if key not in seen or record["timestamp"] > seen[key]["timestamp"]: |
| 66 | seen[key] = record |
| 67 | |
| 68 | ctx.log(f"Deduplicated {len(records)} -> {len(seen)} records") |
| 69 | return list(seen.values()) |
| 70 | |
| 71 | |
| 72 | @pipeline.transform |
| 73 | def engineer_features(ctx, records): |
| 74 | """Build lag, rolling, and seasonality features for forecasting.""" |
| 75 | history = ctx.source("postgres").query( |
| 76 | "SELECT product_id, price_usd, timestamp " |
| 77 | "FROM market_prices " |
| 78 | "ORDER BY timestamp DESC LIMIT 2000" |
| 79 | ) |
| 80 | |
| 81 | prices = {r["product_id"]: [] for r in history} |
| 82 | for r in history: |
| 83 | prices[r["product_id"]].append(r["price_usd"]) |
| 84 | |
| 85 | featured = [] |
| 86 | for record in records: |
| 87 | pid = record["product_id"] |
| 88 | ts = datetime.fromisoformat(record["timestamp"]) |
| 89 | series = prices.get(pid, []) |
| 90 | arr = np.array(series, dtype=float) if series else np.array([record["price_usd"]]) |
| 91 | |
| 92 | featured.append({ |
| 93 | **record, |
| 94 | "lag_1h": float(arr[0]) if len(arr) > 0 else record["price_usd"], |
| 95 | "lag_24h": float(arr[23]) if len(arr) > 23 else record["price_usd"], |
| 96 | "lag_7d": float(arr[167]) if len(arr) > 167 else record["price_usd"], |
| 97 | "rolling_mean_24h": float(np.mean(arr[:24])), |
| 98 | "rolling_std_24h": float(np.std(arr[:24])), |
| 99 | "hour": ts.hour, |
| 100 | "day_of_week": ts.weekday(), |
| 101 | "is_weekend": int(ts.weekday() >= 5), |
| 102 | }) |
| 103 | |
| 104 | ctx.log(f"Engineered {len(FEATURE_COLS)} features for {len(featured)} records") |
| 105 | return featured |
| 106 | |
| 107 | |
| 108 | @pipeline.transform |
| 109 | def forecast_prices(ctx, records): |
| 110 | """Generate 24h price forecasts using LightGBM.""" |
| 111 | model = ctx.model("price-forecast-lgbm") |
| 112 | |
| 113 | forecasts = [] |
| 114 | for record in records: |
| 115 | features = np.array([[record[f] for f in FEATURE_COLS]]) |
| 116 | pred = model.predict(features)[0] |
| 117 | |
| 118 | forecasts.append({ |
| 119 | "product_id": record["product_id"], |
| 120 | "source": record["source"], |
| 121 | "current_price": record["price_usd"], |
| 122 | "forecast_24h": round(float(pred), 2), |
| 123 | "delta_pct": round((pred - record["price_usd"]) / record["price_usd"] * 100, 2), |
| 124 | "model_version": model.version, |
| 125 | "timestamp": record["timestamp"], |
| 126 | }) |
| 127 | |
| 128 | ctx.log(f"Generated forecasts for {len(forecasts)} products") |
| 129 | return {"records": records, "forecasts": forecasts} |
| 130 | |
| 131 | |
| 132 | @pipeline.output |
| 133 | def write_parquet(ctx, result): |
| 134 | """Write normalized data and forecasts to separate partitions.""" |
| 135 | pricing = pa.Table.from_pylist(result["records"]) |
| 136 | pricing_path = ctx.storage.path("market-data", partition_by="date") |
| 137 | pq.write_table(pricing, pricing_path, compression="snappy") |
| 138 | |
| 139 | forecasts = pa.Table.from_pylist(result["forecasts"]) |
| 140 | forecast_path = ctx.storage.path("market-data/forecasts", partition_by="date") |
| 141 | pq.write_table(forecasts, forecast_path, compression="snappy") |
| 142 | |
| 143 | ctx.log(f"Wrote {len(result['records'])} prices, {len(result['forecasts'])} forecasts") |
| 144 | |
| 145 | return Output( |
| 146 | records_written=len(result["records"]), |
| 147 | forecasts_written=len(result["forecasts"]), |
| 148 | paths={"pricing": pricing_path, "forecasts": forecast_path}, |
| 149 | ) |
| 150 | |
| 151 | |
| 152 | def convert_currency(amount, currency, target="USD"): |
| 153 | rates = {"USD": 1.0, "EUR": 1.08, "GBP": 1.27, "CAD": 0.74} |
| 154 | return round(float(amount) * rates.get(currency, 1.0), 2) |
| 155 |
Python 3.12 · Kraken SDK v0.0.4
155 lines