天天看點

spring hadoop系列(五)---spring hadoop hbase之HbaseSynchronizationManager

一、源碼如下

public abstract class HbaseSynchronizationManager {

private static final Log logger = LogFactory.getLog(HbaseSynchronizationManager.class);

// 建立ThreadLocal 用來存放tableName與Hbase table之間的映射内容

private static final ThreadLocal<Map<String, HTableInterface>> resources = new NamedThreadLocal<Map<String, HTableInterface>>("Bound resources");

public static boolean hasResource(Object key) {

Object value = doGetResource(key);

return (value != null);

}

public static HTableInterface getResource(Object key) {

return doGetResource(key);

}

private static HTableInterface doGetResource(Object actualKey) {

Map<String, HTableInterface> tables = resources.get();

if (tables == null) {

return null;

}

return tables.get(actualKey);

}

public static void bindResource(String key, HTableInterface value) throws IllegalStateException {

Assert.notNull(value, "Value must not be null");

// 判斷目前線程中是否包含映射内容

// 若是存在 直接添加新内容

// 若不存在 則先建立map 再将對應的東西 再将map綁定到目前的線程 添加具體内容

Map<String, HTableInterface> map = resources.get();

// set ThreadLocal Map if none found

if (map == null) {

map = new LinkedHashMap<String, HTableInterface>();

resources.set(map);

}

HTableInterface oldValue = map.put(key, value);

if (oldValue != null) {

throw new IllegalStateException("Already value [" + oldValue + "] for key [" + key

+ "] bound to thread [" + Thread.currentThread().getName() + "]");

}

if (logger.isTraceEnabled()) {

logger.trace("Bound value [" + value + "] for key [" + key + "] to thread ["

+ Thread.currentThread().getName() + "]");

}

}

public static HTableInterface unbindResource(String key) throws IllegalStateException {

HTableInterface value = doUnbindResource(key);

if (value == null) {

throw new IllegalStateException("No value for key [" + key + "] bound to thread ["

+ Thread.currentThread().getName() + "]");

}

return value;

}

public static Object unbindResourceIfPossible(Object key) {

return doUnbindResource(key);

}

private static HTableInterface doUnbindResource(Object actualKey) {

Map<String, HTableInterface> map = resources.get();

if (map == null) {

return null;

}

HTableInterface value = map.remove(actualKey);

// 在解除指定的table name所關聯的資源後,防止對應的map不能為空 進行map判斷

// Remove entire ThreadLocal if empty...

if (map.isEmpty()) {

resources.remove();

}

if (value != null && logger.isTraceEnabled()) {

logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread ["

+ Thread.currentThread().getName() + "]");

}

return value;

}

public static Set<String> getTableNames() {

Map<String, HTableInterface> map = resources.get();

if (map != null && !map.isEmpty()) {

return Collections.unmodifiableSet(map.keySet());

}

return Collections.emptySet();

}

}