First, we need to write a Java function to get data from HBase:

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.DiamondAddressHelper;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Result;
public class HBaseReader {
  private HTableInterface table_;
  private HTablePool pool_;
  public ArrayList fetchFeatures(String columnFamily, String qualifier, ArrayList feature_ids) {
    List batch = new ArrayList();
    for (String id : feature_ids) {
      Get get = new Get(id.getBytes());
      get.addColumn(columnFamily.getBytes(), qualifier.getBytes());
      batch.add(get);
    }
    Object[] results = new Object[batch.size()];
    try {
      table_.batch(batch, results);
    } catch (Exception e) {
      System.err.println("Error: " + e);
    }
    ArrayList list = new ArrayList();
    for (Object obj : results) {
      if (obj instanceof Result) {
        Result res = (Result)obj;
        byte[] value = res.getValue(columnFamily.getBytes(), qualifier.getBytes());
        if (value == null) {
          list.add("".getBytes());
        } else {
          list.add(value);
        }
      }
    }
    return list;
  }
  public void init(String dataid, String groupid, String tableName) {
    Configuration conf = HBaseConfiguration.create();
    conf.setBoolean(DiamondAddressHelper.DIMAOND_HBASE_UNITIZED, true);
    conf.set(DiamondAddressHelper.DIAMOND_HBASE_KEY_NEW, dataid);
    conf.set(DiamondAddressHelper.DIAMOND_HBASE_GROUP, groupid);
    try {
      pool_ = new HTablePool(conf, 100);
      table_ = pool_.getTable(tableName);
    } catch (Exception e) {
      System.err.println("Error: " + e);
    }
  }
  public void shutdown() {
    try {
      table_.close();
      pool_.close();
    } catch (Exception e) {
      System.err.println("Error: " + e);
    }
  }
}

Then use maven to build it to one jar file with all dependent libraries:


  ....
  ....
  hbasereader
  jar
  1.0.0
  Reader for HBase
  http://maven.apache.org
  
    
      org.apache.hbase
      hbase
      ....
      
        
          jdk.tools
          jdk.tools
        
      
    
    
      org.apache.hadoop
      hadoop-core
      ....
    
    
      org.apache.hadoop.thirdparty.guava
      guava
      ....
    
    
....
  
    
      
        maven-assembly-plugin
        2.6
        
          
            jar-with-dependencies
          
          
            
              com.taobao.ad.HBaseReader
            
          
        
        
          
            make-assembly
            package
            
              single
            
          
        
      
    
  

Now, we could use python to call this Class from java by using JPype:

import os
import time
import jpype
import numpy
jpype.startJVM(jpype.getDefaultJVMPath(), '-ea', '-Djava.class.path=./target/hbasereader-1.0.0-jar-with-dependencies.jar')
Reader = jpype.JClass("com.taobao.ad.HBaseReader")
reader = Reader()
ArrayList = jpype.JClass("java.util.ArrayList")
list = ArrayList()
list.add('1')
list.add('2')
list.add('3')
list.add('4')
list.add('5')
reader.init('hbase.diamond.dataid.test.hbase', 'hbase-diamond-group-name-test', 'alimama_training_image_table')
begin = time.time()
for i in range(10000):
  res = reader.fetchFeatures('ct', 'image', list)
period = time.time() - begin
print(period)
print(10000/period)
data = numpy.asarray(res[4], dtype=numpy.uint8)
print(data)
reader.shutdown()
jpype.shutdownJVM()

This python example could run correctly. But if we use it in tf.py_func(), it will core dump in libjvm.so, which is difficult to debug. So at last we choose to write operation by c++ to access HBase through Thrift server, which is better for stability and grace of architecture.