如何使用 Shopify Bulk Operations 批量获取全部商品数据

本文介绍如何通过 shopify graphql bulk operations 高效获取店铺中全部商品(如 1300+ 条),替代易出错的 rest 分页轮询,避免手动解析 link 头、死循环或重复请求等问题。

在 Shopify 开发中,当需要拉取大量商品(例如 1300+ 条)时,直接依赖 REST API 的分页机制(如 page_info 或 since_id)极易引发逻辑错误:链接解析失败、状态未更新、递归失控、重复请求同一页面等——正如你在代码中遇到的 nextLink 恒定不变的问题。根本原因在于:你混合使用了两种不兼容的分页策略——既调用 shopify.rest.Product.all()(该方法内部已封装分页逻辑),又试图手动提取并拼接 Link 响应头,导致上下文错乱;同时 since_id 未随每次响应动态更新,使后续请求始终从初始 ID 开始,造成数据重叠与停滞。

✅ 正确解法:采用 Shopify 官方推荐的 GraphQL Bulk Operations(批量查询)。它专为海量数据导出设计,具备以下核心优势:

  • 单次声明式查询:无需手动管理翻页、链接解析或递归调用;
  • 异步执行 + 状态轮询:提交任务后由 Shopify 后台处理,支持百万级数据;
  • 结构化结果输出:返回标准化 JSONL(每行一个 JSON 对象),便于流式解析;
  • 字段精准控制:可指定仅获取 id, title, images, variants 等必要字段,减少传输开销。

? 快速实现步骤(Node.js 示例)

import { shopify } from './testingShopifyApp.js';
import { session } from './testingShopifyApp.js';

const client = new shopify.clients.Graphql({ session });

// 1. 构建 Bulk Query(支持分页自动处理)
const bulkQuery = `
  query {
    products(first: 250) {
      edges {
        node {
          id
          title
          images(first: 3) {
            edges {
              node {
                url
                altText
              }
            }
          }
        }
      }
      pageInfo {
        hasNextPage
        endCursor
      }
    }
  }
`;

// 2. 创建 Bulk Operation(异步任务)
async function createBulkOperation() {
  try {
    const response = await client.query({
      data: {
        mutation: `
          mutation bulkOperationRunQuery($query: String!) {
            bulkOperationRunQuery(query: $query) {
              bulkOperation {
                id
                status
              }
              userErrors {
                field
                message
              }
            }
          }
        `,
        variables: { query: bulkQuery.replace(/\s+/g, ' ').trim() },
      },
    });

    const bulkId = response.body.data.bulkOperationRunQuery.bulkOperation.id;
    console.log('✅ Bulk operation created:', bulkId);
    return bulkId;
  } catch (err) {
    console.error('❌ Failed to create bulk operation:', err);
    throw err;
  }
}

// 3. 轮询任务状态并下载结果
async function pollAndDownload(bulkId) {
  let status = 'CREATED';
  let url = null;

  while (['CREATED', 'RUNNING', 'CANCELING'].includes(status)) {
    const res = await client.query({
      data: {
        query: `
          query getBulkOperation($id: ID!) {
            node(id: $id) {
              ... on BulkOperation {
                id
                status
                errorCode
                createdAt
                objectCount
                fileSize
                url
                partialDataUrl
              }
            }
          }
        `,
        variables: { id: bulkId },
      },
    });

    const op = res.body.data.node;
    status = op.status;
    url = op.url || url;

    console.log(`⏳ Status: ${status} | Objects: ${op.objectCount || 0}`);

    if (status === 'COMPLETED' && url) {
      // 4. 下载 JSONL 结果并转为数组
      const resultRes = await fetch(url);
      const jsonlText = await resultRes.text();
      const allProducts = jsonlText
        .split('\n')
        .filter(line => line.trim())
        .map(line => JSON.parse(line).data.products.edges.map(e => e.node))
        .flat();

      console.log(`✅ Successfully fetched ${allProducts.length} products`);
      return allProducts;
    }

    await new Promise(r => setTimeout(r, 2000)); // 2s 间隔轮询
  }

  throw new Error(`Bulk operation failed with status: ${status}`);
}

// ? 执行全流程
async function getAllProducts() {
  try {
    const bulkId = await createBulkOperation();
    const products = await pollAndDownload(bulkId);
    return products;
  } catch (err) {
    console.error('? Bulk fetch failed:', err);
    throw err;
  }
}

// 使用示例
getAllProducts().then(products => {
  console.log('? Final product array length:', products.length);
  // → [ { id: "gid://shopify/Product/81226235887", title: "title", ... }, ... ]
});

⚠ 注意事项

  • ? 权限要求:确保 App 具备 read_products(及 read_product_images 等所需字段)访问权限;
  • ? API 版本:Bulk Operations 仅支持 2025-07 及更高版本的 Admin API;
  • ? 结果格式:返回的是 JSONL(换行分隔的 JSON),不是普通 JSON 数组,需按行解析;
  • 时效性:大任务可能需数秒至数分钟完成,切勿同步阻塞主流程;
  • ? 进阶建议:生产环境应添加重试机制、超时控制、错误日志及 S3 临时存储。

✅ 总结

放弃手工维护 REST 分页逻辑——它脆弱、难调试、不可扩展。Shopify Bulk Operations 是处理千级乃至百万级商品数据的事实标准。它将“获取全部商品”这一复杂操作简化为三步:声明查询 → 提交任务 → 下载结果。不仅代码更简洁健壮,还能显著提升稳定性与可维护性。立即迁移,告别 Link 正则匹配和无限递归陷阱。