Hadoop集群間的hbase數據遷移

在日常的使用過程中,可能經常需要將一個集群中hbase的數據遷移到或者拷貝到另外一個集群中,這時候,可能會出很多問題 以下是我在處理的過程中的一些做法和處理方式。 前提,兩個hbase的版本一直,否則可能出現不可預知的問題,造成數據遷移失敗 當兩個集群

在日常的使用過程中,可能經常需要將一個集群中hbase的數據遷移到或者拷貝到另外一個集群中,這時候,可能會出很多問題

以下是我在處理的過程中的一些做法和處理方式。

前提,兩個hbase的版本一直,否則可能出現不可預知的問題,造成數據遷移失敗

當兩個集群不能通訊的時候,可以先將數據所在集群中hbase的數據文件拷貝到本地

具體做法如下:

在Hadoop目錄下執行如下命令,拷貝到本地文件。

bin/Hadoop fs -copyToLocal /hbase/tab_keywordflow /home/test/xiaochenbak

然后你懂得,將文件拷貝到你需要的你需要遷移到的那個集群中,目錄是你的表的目錄,

如果這個集群中也有對應的表文件,那么刪除掉,然后拷貝。

/bin/Hadoop fs -rmr /hbase/tab_keywordflow

/bin/Hadoop fs -copyFromLocal /home/other/xiaochenbak /hbase/tab_keywordflow

此時的/home/other/xiaochenbak為你要遷移到數據的集群。

重置該表在.META.表中的分區信息

bin/hbase org.jruby.Main /home/other/hbase/bin/add_table.rb /hbase/tab_keywordflow

/home/other/hbase/bin/add_table.rb為ruby腳本,可以執行,腳本內容如下:另存為add_table.rb即可

#?
# Copyright 2009 The Apache Software Foundation?
#?
# Licensed to the Apache Software Foundation (ASF) under one?
# or more contributor license agreements.? See the NOTICE file?
# distributed with this work for additional information?
# regarding copyright ownership.? The ASF licenses this file?
# to you under the Apache License, Version 2.0 (the?
# “License”); you may not use this file except in compliance?
# with the License.? You may obtain a copy of the License at?
#?
#???? http://www.apache.org/licenses/LICENSE-2.0??
#?
# Unless required by applicable law or agreed to in writing, software?
# distributed under the License is distributed on an “AS IS” BASIS,?
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.?
# See the License for the specific language governing permissions and?
# limitations under the License.?
#?
# Script adds a table back to a running hbase.?
# Currently only works on if table data is in place.?
#?
# To see usage for this script, run:?
#?
#? ${HBASE_HOME}/bin/hbase org.jruby.Main addtable.rb?
#?
include Java?
import org.apache.Hadoop.hbase.util.Bytes?
import org.apache.Hadoop.hbase.HConstants?
import org.apache.Hadoop.hbase.regionserver.HRegion?
import org.apache.Hadoop.hbase.HRegionInfo?
import org.apache.Hadoop.hbase.client.HTable?
import org.apache.Hadoop.hbase.client.Delete?
import org.apache.Hadoop.hbase.client.Put?
import org.apache.Hadoop.hbase.client.Scan?
import org.apache.Hadoop.hbase.HTableDescriptor?
import org.apache.Hadoop.hbase.HBaseConfiguration?
import org.apache.Hadoop.hbase.util.FSUtils?
import org.apache.Hadoop.hbase.util.Writables?
import org.apache.Hadoop.fs.Path?
import org.apache.Hadoop.fs.Filesystem?
import org.apache.commons.logging.LogFactory?
?
# Name of this script?
NAME = “add_table”?
?
# Print usage for this script?
def usage?
? puts ‘Usage: %s.rb TABLE_DIR [alternate_tablename]’ % NAME?
? exit!?
end?
?
# Get configuration to use.?
c = HBaseConfiguration.new()?
?
# Set Hadoop filesystem configuration using the hbase.rootdir.?
# Otherwise, we’ll always use localhost though the hbase.rootdir?
# might be pointing at hdfs location.?
c.set(“fs.default.name”, c.get(HConstants::HBASE_DIR))?
fs = FileSystem.get(c)?
?
# Get a logger and a metautils instance.?
LOG = LogFactory.getLog(NAME)?
?
# Check arguments?
if ARGV.size 2?
? usage?
end?
?
# Get cmdline args.?
srcdir = fs.makeQualified(Path.new(java.lang.String.new(ARGV[0])))?
?
if not fs.exists(srcdir)?
? raise IOError.new(“src dir ” + srcdir.toString() + ” doesn’t exist!”)?
end?
?
# Get table name?
tableName = nil?
if ARGV.size > 1?
? tableName = ARGV[1]?
? raise IOError.new(“Not supported yet”)?
elsif?
? # If none provided use dirname?
? tableName = srcdir.getName()?
end?
HTableDescriptor.isLegalTableName(tableName.to_java_bytes)?
?
# Figure locations under hbase.rootdir?
# Move directories into place; be careful not to overwrite.?
rootdir = FSUtils.getRootDir(c)?
tableDir = fs.makeQualified(Path.new(rootdir, tableName))?
?
# If a directory currently in place, move it aside.?
if srcdir.equals(tableDir)?
? LOG.info(“Source directory is in place under hbase.rootdir: ” + srcdir.toString());?
elsif fs.exists(tableDir)?
? movedTableName = tableName + “.” + java.lang.System.currentTimeMillis().to_s?
? movedTableDir = Path.new(rootdir, java.lang.String.new(movedTableName))?
? LOG.warn(“Moving ” + tableDir.toString() + ” aside as ” + movedTableDir.toString());?
? raise IOError.new(“Failed move of ” + tableDir.toString()) unless fs.rename(tableDir, movedTableDir)?
? LOG.info(“Moving ” + srcdir.toString() + ” to ” + tableDir.toString());?
? raise IOError.new(“Failed move of ” + srcdir.toString()) unless fs.rename(srcdir, tableDir)?
end?
?
# Clean mentions of table from .META.?
# Scan the .META. and remove all lines that begin with tablename?
LOG.info(“Deleting mention of ” + tableName + ” from .META.”)?
metaTable = HTable.new(c, HConstants::META_TABLE_NAME)?
tableNameMetaPrefix = tableName + HConstants::META_ROW_DELIMITER.chr?
scan = Scan.new((tableNameMetaPrefix + HConstants::META_ROW_DELIMITER.chr).to_java_bytes)?
scanner = metaTable.getScanner(scan)?
# Use java.lang.String doing compares.? Ruby String is a bit odd.?
tableNameStr = java.lang.String.new(tableName)?
while (result = scanner.next())?
? rowid = Bytes.toString(result.getRow())?
? rowidStr = java.lang.String.new(rowid)?
? if not rowidStr.startsWith(tableNameMetaPrefix)?
??? # Gone too far, break?
??? break?
? end?
? LOG.info(“Deleting row from catalog: ” + rowid);?
? d = Delete.new(result.getRow())?
? metaTable.delete(d)?
end?
scanner.close()?
?
# Now, walk the table and per region, add an entry?
LOG.info(“Walking ” + srcdir.toString() + ” adding regions to catalog table”)?
statuses = fs.listStatus(srcdir)?
for status in statuses?
? next unless status.isDir()?
? next if status.getPath().getName() == “compaction.dir”?
? regioninfofile =? Path.new(status.getPath(), HRegion::REGIONINFO_FILE)?
? unless fs.exists(regioninfofile)?
??? LOG.warn(“Missing .regioninfo: ” + regioninfofile.toString())?
??? next?
? end?
? is = fs.open(regioninfofile)?
? hri = HRegionInfo.new()?
? hri.readFields(is)?
? is.close()?
? # TODO: Need to redo table descriptor with passed table name and then recalculate the region encoded names.?
? p = Put.new(hri.getRegionName())?
? p.add(HConstants::CATALOG_FAMILY, HConstants::REGIONINFO_QUALIFIER, Writables.getBytes(hri))?
? metaTable.put(p)?
? LOG.info(“Added to catalog: ” + hri.toString())?
end?
好了,以上就是我的做法,如何集群鍵可以通信,那就更好辦了,相信你懂得,scp

? 版權聲明
THE END
喜歡就支持一下吧
點贊9 分享