Migrating Local Tables to Distributed Cluster Setup

Moving an existing local (single-node) table into a sharded, replicated ClickHouse cluster is one of the trickiest production migrations, because ClickHouse has no automatic resharding — every approach requires deliberate, manual orchestration. The goal is to get historical data spread across the new shards and replicated for fault tolerance, while new inserts keep flowing, ideally without a maintenance window.

This guide covers the migration workflow: choosing a strategy, synchronizing DDL across shards, moving existing partitions efficiently, and cutting over reads and writes. For the underlying cluster configuration (remote_servers, Keeper, macros), see ClickHouse production cluster configuration. If your source is a plain MergeTree that first needs to become replicated, start with converting MergeTree to ReplicatedMergeTree.

The Target Architecture

A production ClickHouse cluster splits a table into two layers:

  • A local replicated table (ReplicatedMergeTree) on every node, which physically holds one shard's worth of data and replicates it to the other replicas of that shard.
  • A Distributed table sitting on top, which fans reads out across all shards and routes inserts to the correct shard using a sharding_key.

The migration problem is therefore: take data that currently lives in a single local table and (1) create the replicated local tables on every shard, (2) create the Distributed table, (3) move the existing data so it lands on the right shards, and (4) point the application at the Distributed table.

-- Local table, created on every node of the cluster
CREATE TABLE db.events_local ON CLUSTER '{cluster}'
(
    event_date Date,
    user_id    UInt64,
    event_type String,
    amount     Decimal(10, 2)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/events_local', '{replica}')
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, user_id);

-- Distributed table on top, routing by user_id
CREATE TABLE db.events ON CLUSTER '{cluster}'
(
    event_date Date,
    user_id    UInt64,
    event_type String,
    amount     Decimal(10, 2)
)
ENGINE = Distributed('{cluster}', db, events_local, intHash64(user_id));

ON CLUSTER '{cluster}' is what synchronizes the DDL: the statement is queued as a distributed DDL task and executed on every node, so all shards end up with identical schemas. {shard} and {replica} are macros from each node's config — see production cluster configuration for how they are defined. The sharding_key (intHash64(user_id) here) must be an expression returning an integer; ClickHouse takes its remainder against the total shard weight to pick a shard.

Choosing a Migration Strategy

There is no single best method — the right one depends on table size, whether the source is already replicated, and your downtime tolerance.

Strategy How data moves Reshards across shards? Downtime Best for
INSERT … SELECT through Distributed Reads source, re-routes every row by sharding key Yes (automatic) None (dual-write) Small/medium tables; clean resharding
ATTACH PARTITION FROM (hardlink) Hardlinks parts into the local table on the same node No (data stays on its node) Near-zero Single node → single shard, or same-node reorg
FETCH + ATTACH PARTITION Pulls parts from a remote replica into detached/ Manual placement Near-zero Moving specific partitions between hosts
MOVE PARTITION TO TABLE Moves parts, deleting from source No Near-zero Promoting a shadow table on the same node

A common production pattern combines them: use hardlink-based ATTACH PARTITION to get the local data into the new replicated table cheaply, then let only new writes go through the Distributed table for resharding. INSERT … SELECT is the most flexible (it actually reshards) but the most expensive — the official scaling guide explicitly calls it a "last resort" for large datasets because it rewrites every part and consumes heavy IO and network.

Strategy 1: Shadow Table + ATTACH PARTITION (no reshard)

When you are migrating a single node into a cluster and that node will become one shard, you do not need to reshard the existing data — it can stay where it is and simply be adopted by the new replicated table. This is the cheapest path because ATTACH PARTITION FROM uses filesystem hardlinks: no part data is copied, and the operation is effectively instant regardless of table size.

The source and destination tables must have identical structure, partition key, ORDER BY / primary key, and storage policy — otherwise the attach is rejected.

-- 1. Create the replicated "shadow" table next to the old local table
CREATE TABLE db.events_local ON CLUSTER '{cluster}'
(
    event_date Date,
    user_id    UInt64,
    event_type String,
    amount     Decimal(10, 2)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/events_local', '{replica}')
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, user_id);

-- 2. Move each existing partition into it (hardlink, no extra disk)
ALTER TABLE db.events_local
    ATTACH PARTITION '202601' FROM db.events_old;

Generate the per-partition statements directly from system.parts so you do not miss any:

SELECT DISTINCT
    'ALTER TABLE db.events_local ATTACH PARTITION ID ''' ||
    partition_id || ''' FROM db.events_old;'
FROM system.parts
WHERE database = 'db' AND table = 'events_old' AND active;

Once attached, the data is automatically registered in Keeper and replicated to the other replicas of that shard. New inserts then go to the Distributed table, which will spread them across all shards — over time the data rebalances as older partitions age out.

Strategy 2: INSERT … SELECT for True Resharding

When you need existing data physically redistributed across multiple shards (for example, a single node splitting into four shards), insert through the Distributed table. ClickHouse evaluates the sharding_key per row and routes each to the correct shard:

-- Runs on a node that can see both source and the cluster.
-- Distributed table 'events' reshards every row by its sharding_key.
INSERT INTO db.events
SELECT * FROM db.events_old;

If the source lives on a different host or cluster, read it with the remote() (or remoteSecure()) table function instead of a pre-configured cluster entry:

INSERT INTO db.events
SELECT *
FROM remote('old-host:9000', 'db', 'events_old', 'user', 'password');

To avoid a single huge transaction, migrate partition by partition and guard against oversized insert blocks:

INSERT INTO db.events
SELECT * FROM db.events_old
WHERE toYYYYMM(event_date) = 202601
SETTINGS max_partitions_per_insert_block = 1;

Distributed inserts are asynchronous by default: data is buffered locally on the initiator and forwarded to shards in the background. For migrations where you want to confirm each batch actually landed before moving on, force synchronous behavior with distributed_foreground_insert = 1 (also available as its alias insert_distributed_sync). Background sending is tuned by distributed_background_insert_batch and distributed_background_insert_max_sleep_time_ms.

Strategy 3: FETCH + ATTACH Between Hosts

When you need to relocate a specific partition from a replica on one host to a table on another host (for example, manually balancing partitions after adding a shard), use FETCH PARTITION. It pulls the part from a healthy replica into the local detached/ directory; you then ATTACH it:

-- On the destination node: pull a partition from the source shard's Keeper path
ALTER TABLE db.events_local
    FETCH PARTITION '202601'
    FROM '/clickhouse/tables/1/events_local';

ALTER TABLE db.events_local ATTACH PARTITION '202601';

FETCH PARTITION is not replicated — it only places parts in detached/ on the local node, so you run it on the specific node that should own the data. After attaching, verify nothing was left behind:

SELECT database, table, partition_id, count() AS detached_parts
FROM system.detached_parts
GROUP BY database, table, partition_id;

Zero-Downtime Cutover with Dual Writes

To migrate a live cluster without a window, shift writes gradually using a Distributed table plus a materialized view, while backfilling history in parallel. The pattern below moves all new inserts from a source cluster to a destination cluster:

-- On the source: a Distributed table pointing at the destination's local table
CREATE TABLE db.events_to_dest ON CLUSTER 'source'
(
    event_date Date, user_id UInt64, event_type String, amount Decimal(10, 2)
)
ENGINE = Distributed('destination', db, events_local, intHash64(user_id));

-- A materialized view that forwards every new row to the destination
CREATE MATERIALIZED VIEW db.shift_inserts ON CLUSTER 'source'
TO db.events_to_dest AS
SELECT * FROM db.events_source;

New rows now land in both the old and new clusters; meanwhile you backfill historical partitions with INSERT … SELECT or ATTACH PARTITION. When the backfill is verified, repoint the application's reads to the destination Distributed table and drop the view. This keeps the application code unchanged throughout.

For deeper background on how replicas stay in sync once data lands, see ClickHouse replication and the replication queue guide.

Synchronizing DDL Across Shards

The most common migration failure is schema drift between shards. Always apply schema changes with ON CLUSTER so the distributed DDL queue runs them everywhere:

ALTER TABLE db.events_local ON CLUSTER '{cluster}'
    ADD COLUMN session_id UInt64;

If a distributed DDL hangs on "N unfinished hosts", a node is unreachable or its DDL worker is stuck — see DDL unfinished hosts and inconsistent cluster definition. Verify the cluster topology and per-host schema agreement before and after migration:

SELECT cluster, shard_num, replica_num, host_name
FROM system.clusters
WHERE cluster = '{cluster}'
ORDER BY shard_num, replica_num;

Best Practices

  1. Migrate partition by partition. It bounds memory, makes progress resumable, and lets you verify counts incrementally rather than betting on one giant transaction.
  2. Prefer hardlink ATTACH PARTITION over INSERT … SELECT whenever data can stay on its node — it avoids rewriting parts and doubling disk usage.
  3. Always use ON CLUSTER for DDL so every shard and replica gets the identical schema; manual per-node DDL invites drift.
  4. Match structure exactly. ATTACH/MOVE PARTITION require identical schema, partition key, sort/primary key, and storage policy — confirm before you start.
  5. Verify row counts per shard after each batch with clusterAllReplicas (below), not just the aggregate on the Distributed table.
  6. Keep the old table until verified. ATTACH PARTITION FROM does not delete the source, so you retain an instant rollback path.

Verifying the Migration

Compare source and destination counts, and inspect how data actually landed across shards using the _shard_num virtual column on the Distributed table:

-- Per-shard row distribution after resharding
SELECT _shard_num AS shard, count() AS rows
FROM db.events
GROUP BY shard
ORDER BY shard;

-- Per-replica counts to confirm replication caught up
SELECT hostName() AS host, count()
FROM clusterAllReplicas('{cluster}', db.events_local)
GROUP BY host;

Then confirm replication is healthy and no parts are stuck:

SELECT database, table, is_leader, total_replicas, active_replicas,
       queue_size, absolute_delay
FROM system.replicas
WHERE table = 'events_local';

A nonzero, growing queue_size or absolute_delay signals replication is lagging — diagnose with the replication queue guide before cutting over reads.

Common Issues

  • ATTACH PARTITION rejected — structure, partition key, sort key, or storage policy differs between source and destination. They must match exactly.
  • Data skewed onto one shard — a low-cardinality or constant sharding_key sends most rows to a single shard. Use a high-cardinality expression like intHash64(user_id) or rand().
  • Inserts "succeed" but data is missing on shards — asynchronous distributed inserts are still in flight or stuck in the background queue. Use distributed_foreground_insert = 1 during migration, and watch system.distribution_queue.
  • Distributed DDL stuck on unfinished hosts — a node is down or its DDL worker is blocked; see N unfinished hosts.
  • Detached parts left behind after FETCH — check system.detached_parts and attach or drop them explicitly.

If you are consolidating shards rather than expanding, the reverse workflow is covered in merging shards / desharding.

How Pulse Helps

Distributed-to-cluster migrations are where small misconfigurations turn into silent data loss — a skewed sharding key, a schema that drifted on one shard, or a background insert queue that never drained. Pulse continuously monitors ClickHouse clusters for exactly these conditions: it flags schema inconsistencies across shards, replication lag and stuck queues, distribution-queue backlogs, and uneven shard distribution before they become incidents. During a migration, that means you can verify each cutover step against real cluster state instead of hoping the async inserts landed. For teams running self-managed ClickHouse, Pulse provides the expert review and ongoing health checks that make these one-way migrations safe.

Frequently Asked Questions

Q: Does ClickHouse rebalance data automatically when I add a shard?

No. Self-managed ClickHouse has no automatic resharding — adding a shard only affects where new inserts go (subject to shard weights). Existing data must be moved manually via ATTACH/FETCH PARTITION or INSERT … SELECT. ClickHouse Cloud's SharedMergeTree is the exception, since storage is shared and nodes don't own physical shards.

Q: Can I migrate a plain MergeTree table directly into a cluster?

You first convert it to ReplicatedMergeTree, then place it under a Distributed table. The hardlink-based conversion is covered in convert MergeTree to ReplicatedMergeTree; from there the strategies on this page apply.

Q: Why is ATTACH PARTITION FROM so much faster than INSERT … SELECT?

ATTACH PARTITION FROM creates filesystem hardlinks to the existing immutable parts instead of reading and rewriting every row. No data is copied and no extra disk is consumed, so it completes in seconds regardless of table size. The trade-off is that it does not reshard — data stays on the node it was already on.

Q: How do I keep the application running during the migration?

Use dual writes: route new inserts through a Distributed table (often via a materialized view) so they land in the new cluster while you backfill history in parallel. Once row counts are verified per shard, repoint reads to the new Distributed table and remove the view.

Q: My distributed insert succeeded but the data isn't on the shards. Why?

Distributed inserts are asynchronous by default — rows are buffered on the initiator and forwarded in the background. Check system.distribution_queue for pending or errored batches, and set distributed_foreground_insert = 1 during migration so each insert blocks until the shards acknowledge it.

Q: How do I confirm data landed evenly across shards?

Query the Distributed table grouping by the _shard_num virtual column, and cross-check per-replica counts with clusterAllReplicas('{cluster}', db.table_local). A heavily skewed distribution usually points to a poor sharding_key choice.

Subscribe to the Pulse Newsletter

Get early access to new Pulse features, insightful blogs & exclusive events , webinars, and workshops.

We use cookies to provide an optimized user experience and understand our traffic. To learn more, read our use of cookies; otherwise, please choose 'Accept Cookies' to continue using our website.