V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
Nirlan
V2EX  ›  Hadoop

关于 Spark 读取预分区 Hbase 问题

  •  
  •   Nirlan · 2018-09-28 00:26:39 +08:00 · 3188 次点击
    这是一个创建于 2009 天前的主题,其中的信息可能已经有所发展或是发生改变。

    RT.

    我有一张预分区的 Hbase 表, split key 是 000| 001| ... 199|这样,200 个分区.

    我的 rowkey 是这样设计的 001|20180928001122+ 业务 ID + 6 位随机数

    这样设计的话避免了 Spark 读取时数据倾斜啊,插入时数据热点问题.

    但是我想用 Spark 读取某一天的数据,还想用 scan 操作的话,貌似很难实现.

    比如我的 startrow=001|2018092800 + 0000 + 0000 + 000000 endrow=001|2018092899 + 0000 + 0000 +000000

    我想读取完这一天的数据,难道得循环 200 个 region 吗?

    单机多线程的话是可以这么做的,但是我想用 spark 分布式环境来操作.

    我查阅了 TableSnapshotScanner 类,对其 regions 属性不甚理解,望高手给个思路(给个 demo 最好了...

    第 1 条附言  ·  2018-10-09 14:35:06 +08:00
    --国庆净玩了,没有好好想这个问题.周一来了之后整理了一下思路,并参考(抄袭)了网上的一些实现,现在把完整的代码给贴一下. 附上参考的链接 http://www.zhyea.com/2017/06/21/visit-hbase-with-custom-spark-rdd.html

    --感谢 4 楼的兄弟提供思路

    提出这个问题主要是 Hbase 的 rowkey 设计为 B+ tree,Hbase 的 scan 操作性能极高.
    在 Hbase 建表的时候,预分区是必要的,但是 Hbase 的数据插入分区的时候,又是和 rowkey 的初始几位密切相关

    比如,我的 splitkeys 是 Array("0001|","002|","003|","004|","005|","006|","007|","008|")
    我在生成 rowkey 的时候,rowkey 的前缀从上面这个数据里随机取一个,如 006|,那么与这个 rowkey 相关的数据一定会插入 start 006| end 007| 这个分区里,给 scan 操作带来很大便利.

    但是随之产生的问题就是我主楼里提到的.以下代码解决了这个问题

    具体的实现过程主要是两个类,一个重写了 RDD 的实现,一个用于从 hbase 拉取数据

    --重写 RDD
    class QueryRDD(sc: SparkContext, tableName: String, startRow: String, endRow: String, splitKeys: Array[String]) extends RDD[Map[String,String]](sc, Nil)
    {

    #重写该方法用于计算每一个 partition
    override def compute(split: Partition, context: TaskContext): Iterator[Map[String,String]] =
    {
    val part = split.asInstanceOf[QueryPartition]
    val results = query(part)
    new InterruptibleIterator(context, results.iterator)
    }

    #重写该方法用于获取 partition
    override protected def getPartitions: Array[Partition] =
    {
    val partitions = ArrayBuffer[Partition]()
    for (splitKey <- splitKeys)
    {
    partitions += new QueryPartition(splitKey)
    }
    partitions.toArray
    }

    private def query(partition: QueryPartition) =
    {
    val splitKey = partition.split
    val filter = null #该参数可以不为 null,即可在 scan 的同时进行 filter
    val start = splitKey + startRow
    val end = splitKey + endRow
    HBaseClient.scan(tableName, filter, start, end)
    }
    }

    #实现自己的 partition
    class QueryPartition(splitKey: String) extends Partition
    {
    def split: String = splitKey

    override def index: Int = splitKey.substring(0, 3).toInt

    override def hashCode(): Int = index
    }

    以上是重写 RDD,hbase 的具体 scan 操作,在我上面的链接里可以找到,我照搬了过来.但是要注意他的 58 行,要把 startRow 改成 stopRow,不然的话其他代码写得再好都白费啦
    6 条回复    2018-10-09 17:34:49 +08:00
    liprais
        1
    liprais  
       2018-09-28 00:33:45 +08:00 via iPhone
    把你 hbase 的表对应成 spark 的 dataframe,然后让 spark 自己处理就好了
    不过确实是去所有 region 里面扫一遍
    sadhen
        2
    sadhen  
       2018-09-28 00:40:38 +08:00
    HBase 是实时集群,Spark 用在离线的,不要误用哦。

    具体要做什么,要想清楚自己的技术选型。
    Nirlan
        3
    Nirlan  
    OP
       2018-09-28 09:32:10 +08:00
    @sadhen #2 Hbase,是可以用于 OLAP 任务的吧...
    kex0916
        4
    kex0916  
       2018-09-28 09:53:32 +08:00   ❤️ 1
    自己实现个 RDD,partition 按照你预分区的划分 200 个,一个 partition 对应一个预分区,每个分区都是计算该分区对应的预分区的数据,对应的 startrow=该分区对应的预分区号+2018092800 + 0000 + 0000 + 000000,endrow=该分区对应的预分区号+2018092899 + 0000 + 0000 +000000,这样 200 个 region 在资源允许的情况可以并发的读取,也方便后面使用 spark 的算子。
    Nirlan
        5
    Nirlan  
    OP
       2018-10-09 14:38:58 +08:00
    @kex0916 #4 感谢提供思路
    kex0916
        6
    kex0916  
       2018-10-09 17:34:49 +08:00
    @Nirlan ^_^
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   2655 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 15:23 · PVG 23:23 · LAX 08:23 · JFK 11:23
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.