Cloudera Data Platform — Custom Data Migration Scenario (with distcp and HiveQL commands)

Sometimes specific situations bring about custom solutions, and custom solutions bring about better custom solutions. There is a similar situation in the example we will explain.

Problem Statement: One of our customers stated that they wanted to move the Hive tables in their CDH (Cloudera Distributed Hadoop, former name of Cloudera Data Platform) 5.15 environment to the newly established CDP 7.1.8 cluster.

Normally, this situation could be handled with Cloudera’s built-in tool Replication Manager, but the source and destination versions did not support each other and our customer did not want to waste time with an in-place upgrade. Support matrix for Replication Manager on CDP Private Cloud Base

So what could be the custom solution? The first thing that came to our mind (and what we implemented) was to somehow copy the HDFS data of these tables to the destination cluster and recreate the hive tables in the destination cluster.

We have determined our high level solution. So what kind of challenges were waiting for us here?

Challenges:

  • Preserve HDFS data permissions and and copy them efficiently for ~25 TB data
  • Export & Import DDLs for almost 20000 managed Hive tables
  • Since it’s a production environment, perform these operations with minimum downtime (between 24h)

We were able to move HDFS data effectively with distcp.

distcp, short for Distributed Copy, is a tool used in Hadoop ecosystem for efficiently copying large amounts of data between Hadoop clusters. It works by dividing the data into chunks and parallelizing the copy process across multiple nodes (MapReduce), which enhances performance. distcp preserves file attributes such as permissions, ownership, and timestamps during the copy operation. It also supports copying data between clusters with different configurations and versions, ensuring compatibility.

Apache Hadoop Distributed Copy — DistCp Guide

After some research, our sample syntax was as follows:


hadoop distcp -D ipc.client.fallback-to-simple-auth-allowed=true -pbugp \
  hdfs://:8020/user/hive/warehouse// \
  hdfs://:8020/warehouse/tablespace/managed/hive/

Note that we use /warehouse/tablespace/managed/hive instead of /user/hive/warehouse for managed tables destination, we will see it again. (In new versions of CDP, the directory structure previously used as /user/hive/warehouse changes to /warehouse/tablespace/managed/hive (for managed tables) and /warehouse/tablespace/external/hive (for external tables)

-pbugp flag stands for: preserve → block size + user + group + permissions

hadoop copy preserving the ownership/permissions

Since the network between clusters is quite fast (in our benchmark, approximately 1 TB of data was copied in 20 minutes) looks like we solved the distcp part.

Let’s move to the second challenge, how we can export and import 20000 managed Hive tables?

First, of course, we looked at our old friend Stackoverflow. On Hive, we could export the DDL of a table with a command like SHOW CREATE TABLE mydatabase.myTable. We wrote and tested a bash script similar to the one in the link below:

How to get/generate the create statement for an existing hive table?

Our script was successfully pulling the DDLs of the tables, but there was a problem. It was writing the DDL of a table to a txt file in about 6–7 seconds. This was a terrible value for 20000 tables (about 38 hours in the scenario where everything goes well)

After we saw that it took approximately 20–30 seconds to create one table on the destination side, we agreed that additional development was required!

Hive — Create Table

For the very reason, we decided to employ a multiprocessing strategy. We created a Python script, which make use of Python’s built-in subprocess module. Each SHOW CREATE TABLE command is ran through Hive CLI with the proper arguments on separate child processes. Since Queue and Logger module interfaces are being thread-safe, we utilized them to capture stdout & stderr data from multiple processes, and to write into dedicated files. We also put BoundedSemaphore object in action to maintain the maximum number of subprocesses that is allowed to run simultaneously — a value of 16 or 32 would do the trick, depending on hardware specifications and capacity & configuration of your CDH/CDP cluster.

An example Hive command is built and passed as the following with the arguments (note that Hive’s Beeline shell would require slightly different CLI arguments for the same purpose):


result = subprocess.check_output([
  "hive",
  "--hiveconf",
  "hive.cli.print.header=false",
  "--hiveconf",
  "hive.cli.print.current.db=false",
  "--hiveconf",
  "hive.cli.errors.ignore=true",
  "--silent",
  "-e",
  "\"show create table {}.{}\"".format(database_name, table_name)
])

Before the import stage, it was required to modify LOCATION path of each table’s DDL export because of the aforementioned reason, i.e., the new CDP versions seek separate HDFS paths for managed and external tables. A simple Bash script with sed commands and proper logging was more than enough to get the job done.

Thereafter, we moved to the actual DDL import stage and we created all databases beforehand. Then, a similar multiprocessing strategy was followed to create tables in batches, on the new cluster environment. All the DDL exports from the former cluster have been already split into chunks as separate HQL files to be able to process in batches with ease

Final Migration: As a result of all these improvements:

  • We copied 25 TB of data to the destination cluster with distcp in approximately 6 hours.
  • We exported the DDL of nearly 20000 tables in approximately 3.5 hours.
  • Thanks to the DDLs we exported and edited, we created nearly 20000 tables in approximately 6 hours.

And we completed the entire migration within one day, as requested by our customer.

Authors: Mert Yiğit Aladağ, Platform Engineering Team Lead, Oredata & Baturalp Kızıltan, Platform Engineer, Oredata

Contact us