2013년 12월 29일 일요일

Region

Region (=Tablet in Bigtable)

l  확장성 및 로드밸런싱의 기본 단위.

l  기본적으로 함께 저장된 인접한 범위의 로우들.

 regionkey range를 가지고 있고, 해당 key rangerow들은 해당 region에 저장되게 된다.

Mapreduce 환경에서 mapper의 개수는 region의 개수와 동일하다.(Column Family의 개수와는 상관없음.)

Region의 개수가 적을 경우, hot region이 생기게 됨. 일정 region에만 많은 access가 오게 되고 로드 밸런싱이 잘 이루어지지 못한다.

Region의 개수가 많을 경우, region은 확장성 및 로드밸런싱의 기본 단위이기 때문에 region의 개수가 너무 많으면 region을 관리하는데 많은 cost가 들어가게 됨. 예를 들어 master가 주로 죽은 region을 확인하고 이동시키는 역할을 하지만 이런 region이 많으면 부하가 크게 됨. 이외에도 region이 많아지면 여러 이유로 performance문제를 일으킨다고 함.

현재 Apache에서 제시하는 권장설정은 10 – 1000개 사이이다.

Regionsplit
하지만 regionauto split을 통해서 region을 나눠지게 놔둘 경우 해당 regionsplit시간동안 접근을 못하기 때문에 성능에 큰 영향을 주게 됨.
따라서 manually split해야함.
Manually split : Splitauto로 일어나지 않게 하고 사용자가 직접 큰 regionsplit하는 방법.

split문제의 해결 방법으로 pre-split도 있음. 미리 region의 개수를 여러 개로 설정해 놓은 후 돌아가게 됨. 본래는 1개의 region이 켜지게 되고 그 region이 커질 때 마다 split을 실행함.



현재 HBasecolumnwrite performance 문제가 Hot spot region 문제인지 확인해 볼 것. 하지만 skew도 있지만 전체적으로 느려지는 것을 보면 hot spot region문제만 있는 것은 아닌 것으로 예상.

2013년 9월 2일 월요일

reverse lookup & authority problem in HBase.

dnsmasq 설치문제
http://www.joinc.co.kr/modules/moniwiki/wiki.php/Site/System_management/Dnsmasq
설치된 dns 서버로 각 클러스마다 네임노드 설정을 해줘야함.


권한문제
http://develop.sunshiny.co.kr/922

scanner timeout문제
configure에서 timeout period를 늘림.

2013년 7월 13일 토요일

HBase에서 Column Family 개수

HBase안에 Column Family를 2개나 3개 이상만들경우 비효율적인 Compaction으로 인해 좋지않음.

정확한 이유는 나중에 분석해봐야 할듯하다.

왜 pagerank에서 map skew가 생기는가 ?


왜 pagerank에서 map skew가 생기는가 ?
Map에서 data를 읽을때 일정크기의 block으로 잘라서 나눠주게된다.
그러면 line의 크기가 달라도 연산이 비슷할 경우 skew가 안 일어날 것 같다.
왜냐면 크기가 큰곳이 배정된 곳은 적은 수의 line이 할당 될 것이기때문이다.
하지만 skew가 일어난다.
아마 output하는 과정에서 동일한 id를 가지게하면서 출력을 해야한다.
따라서 file size가 비례해보이지만 write할때
file size차이만큼 key를 더 출력해야하는 것 그래서 map skew가 일어나는 것 같다.

pagerank 1 2 3 4 5 6     8+12  20

pagerank 1             8+2     10

pagerank 4             8+2     10

split은 같은 size로 이루어지기에
2+3 = 1 과같게본다
하지만 실제적으로는
2 < 6이다. 3배나 크다.

즉 이게 엄청나게 커지면 별로 차이가 없지만
Real Graph에서는 대다수의 Graph가 작고 몇몇의 Graph만 엄청크기때문에
Skew가 일어난 것으로 예상한다.



http://lintool.github.io/Cloud9/docs/content/Lin_Schatz_MLG2010.pdf

2013년 7월 12일 금요일

When is the earliest point at which the reduce method

When is the earliest point at which the reduce method…

When is the earliest point at which the reduce method of a given Reducer can be called?
  • As soon as at least one mapper has finished processing its input split.
  • As soon as a mapper has emitted at least one record.
  • Not until all mappers have finished processing all records.
  • It depends on the InputFormat used for the job.

Answer

  • Not until all mappers have finished processing all records.

Explanation

In a MapReduce job reducers do not start executing the reduce method until the all Mapjobs have completed. Reducers start copying intermediate key-value pairs from themappers as soon as they are available. The programmer defined reduce method is calledonly after all the mappers have finished.Note:The reduce phase has 3 steps: shuffle, sort, reduce. Shuffle is where the data iscollected by the reducer from each mapper. This can happen while mappers aregenerating data since it is only a data transfer. On the other hand, sort and reduce canonly start once all the mappers are done.Why is starting the reducers early a good thing? Because it spreads out the data transferfrom the mappers to the reducers over time, which is a good thing if your network is thebottleneck.Why is starting the reducers early a bad thing? Because they "hog up" reduce slots whileonly copying data. Another job that starts later that will actually use the reduce slotsnow can't use them.You can customize when the reducers startup by changing the default value ofmapred.reduce.slowstart.completed.maps in mapred-site.xml. A value of 1.00 will waitfor all the mappers to finish before starting the reducers. A value of 0.0 will start thereducers right away. A value of 0.5 will start the reducers when half of the mappers arecomplete. You can also change mapred.reduce.slowstart.completed.maps on a job-by-job basis.Typically, keep mapred.reduce.slowstart.completed.maps above 0.9 if the system everhas multiple jobs running at once. This way the job doesn't hog up reducers when theyaren't doing anything but copying data. If you only ever have one job running at a time,doing 0.1 would probably be appropriate.Reference:24 Interview Questions & Answers for Hadoop MapReduce developers,When is thereducers are started in a MapReduce job?


http://certificationpath.com/view/ccd-410--cloudera-certified-developer-for-apache-hadoopccdh/questions/when-is-the-earliest-point-at-which-the-reduce-method-of-a-q69077

2013년 6월 4일 화요일

Secondary Sort] 오늘의 삽질

나는 key value에 key와 value값을 섞어서 secondary sort값을 유도했으나 나의 실수인지 아니면 secondary sort의 과정에서 key값을 결국에 같게 맞추는건지까지는 정확히 파악 못했으나 Hadoop책의 예제로 응용해봤던 코드는 이 결과로 삽질했다.

Sorting이 되긴하나 key값은 바뀐다.

2013년 5월 11일 토요일

hbase read cache | hbase pool


block cache
I/O가 일어날때 읽은 Block을 cache에 올려서 다음에 같은 Block 접근하는 중복된 I/O 접근을 막음. 중복되게 row를 자주 사용하지 않는 경우 예로들어 순차적으로 row를 접근하는 경우 효율적이지 못하다.
LRU사용 : 최근 가장 오래동안 참조되지 않은 페이지를 교체

in-memory
block cache를 in-memory가 지정된 columnFamily에 한하여 우선순위를 더 높게함.
메모리에 로드된 column family의 모든 block을 계속 유지하도록 보장.
memory안에 table이 다 올라간다는 보장은 없음.

scan cache
한번에 여러개를 읽어들이고 한번에 여러개를 씀.

batch mode ( scan cache와 연결되어있음)
Result 하나에 들어가 있는 column(identifier)의 갯수를 조절함.
node:t = 1 node:k = 2 면 batch가 1이면 node:t만 읽고 그다음 node:k를 읽음.

bloom filter(엄연히 따지면 cache라고 하기는 좀 그렇다.)
block을 찾아서 scan을 하면서 원하는 것을 찾아야하는데 block에 data가 있는지 없는지를 판단하여 필요없는 block에 접근하는 것을 방지하는 것.





------------
read와는 상관없지만 performance향상에 영향을 줄만한 것 중 하나

hbase pool 
htable에 지속적으로 연결해줘야할 경우 pool을 만들어서
지속적인 htable 객체의 생성을 막는다.

2013년 5월 2일 목요일


Consistent Hashing

NoSQL 관련 기술 중에 하나인 Consistent Hashing의 개념은 1997년에 MIT의 karger가 웹서버의 숫자가 수시로 변경되는 중에 분산 요청을 처리하기 위해 처음 고안했다고 하는데 그 내용을 살펴보고자 한다.
아래의 내용은 'Consistent Hashing' 이라는 아티클을 많이 참조해서 작성했다.

왜 필요한가?

N개의 캐시 시스템(노드)이 있다고 하고 이때 부하 분산에 사용하는 일반적인 방법은 Object o를 hash(o) mod n 번째 캐시 시스템에 저장하는 방식이다. 이런 방식은 캐시 시스템이 추가되거나 제거되기 전까지는 잘 운영된다.
그러나 캐시 시스템이 추가되거나 장애로 제거 되었을 경우, n이 빠뀌게 되면 모든 Object는 새로운 위치에 모두 재할당을 해야하는데 그러기엔 부하 부담이 크다.
이 때 Consistent Hashing 방법을 사용하게 되면 캐시 시스템이 추가되거나 작동이 중단되어도 모든 Object를 할당을 하는 것이 아니라 추가되면 인접한 다른 캐시 시스템에서 적정한 양의 Object 를 받게되고 마찬가지로 제거된다면 남은 캐시 시스템들이 나누는 형태가 되어 일관되게 일부 Object에 대해서 재 할당을 수행하게 된다. 그러므로 기존에 저장된 대부분의 캐시를 사용할 수 있으며 시스템 변동이 히트율에 영향을 미치지 않게 된다.
기본 원리는 Consistent hashing 알고리즘은 Object와 캐시 시스템 둘다 동일한 Hash 함수를 사용해서 해싱하는 것이다. 캐시 시스템은 구간을 정하고 그 구간에는 많은 Object의 해시값을 가지고 있는다. 캐시 시스템이 제거도면 인접한 구간의 캐시 시스템이 제거된 구간을 맞게되고 다른 캐시 시스템은 영향을 받지 않는다.
Performance는 O(log n).

작동 방식은?

Hash 함수는 Object와 캐시 시스템이 일정 구간을 정하게 한다. Java 언어의 예를 보면 Object의 hashCode() 함수는 리턴을 int로 하고 리턴값 int의 구간은 -2^31에서 2^31-1을 가진다.
다음 그림은 네 개의 Object(1,2,3,4)와 3개의 캐시 시스템(A, B, C)가 링에 배치되어 있다.


위의 그림에서 1,4는 캐시 시스템 A에 들어가고 2는 B, 3은 C에 들어 간다. 이때 C가 제거되었다고 가정하면 Object 3은 A에 들어간다. 기타 지원은 변하지 않는다. 다음 그림처럼 캐시 D가 추가되면, 3, 4는 D로 옮긴다. 그리고 1 만 A에 남아있다.


이렇게 함으로써 각 캐시 시스템에 할당된 구간의 사이즈에 예외가 발생해도 잘 동작하게 된다. 한가지 고민해야할 점은 무작위로 분포되기 때문에 캐시 시스템 사이의 Object의 분포는 균일하지 않을 수 있다. 이를 위한 해결책으로 '가상 노드'를 사용하는 방법이 있다. 가상 노드는 캐시 시스템의 링안에서 복제하는데 이는 캐시 시스템 하나 추가할때마다 링에 여러개 배치되게 되는 방식이다.
가상 노드의 효과는 다음 그래프와 같은데 10,000개의 Object를 10개의 캐시 시스템으로 시뮬레이션한 결과이다. X축이 캐시 시스템의 복제수가 되고 Y축은 표준 편차.
복제 수가 작은 Object들의 분산도가 언발란스한데 이는 캐시 시스템의 Object수의 평균의 표준 편차가 크기 때문일 것이다. 이 실험에서 복제를 100이나 200으로 했을 경우 합리적인 균형을 실현할 수 있다고 할 수 있었다고 한다. (표준 편차가 평균 5-10% 정도가 적당함)


시스템마다 다수의 Virtual Nodes(가상 노드)를 만들어서 로드발란싱을 좋게 한 예는 libketema이다.

구현 방법은?

Consistent Hashing 방법이 효력이 발휘하기 위해서는 해싱 함수가 잘 동작해야하는데 Object의 hashCode로는 부족하고 MD5를 추천한다.
public class ConsistentHash {

  private final HashFunction hashFunction;
  private final int numberOfReplicas;
  private final SortedMap circle =
    new TreeMap();

  public ConsistentHash(HashFunction hashFunction,
    int numberOfReplicas, Collection nodes) {

    this.hashFunction = hashFunction;
    this.numberOfReplicas = numberOfReplicas;

    for (T node : nodes) {
      add(node);
    }
  }

  public void add(T node) {
    for (int i = 0; i < numberOfReplicas; i++) {
      circle.put(hashFunction.hash(node.toString() + i),
        node);
    }
  }

  public void remove(T node) {
    for (int i = 0; i < numberOfReplicas; i++) {
      circle.remove(hashFunction.hash(node.toString() + i));
    }
  }

  public T get(Object key) {
    if (circle.isEmpty()) {
      return null;
    }
    int hash = hashFunction.hash(key);
    if (!circle.containsKey(hash)) {
      SortedMap tailMap =
        circle.tailMap(hash);
      hash = tailMap.isEmpty() ?
             circle.firstKey() : tailMap.firstKey();
    }
    return circle.get(hash);
  } 

}

사용 사례는?

- NoSQL
- 오픈 소스

[참조 사이트]

2013년 4월 29일 월요일

hbase table size 측정, mapreduce hadoop 에서 output 없애기


hbase를 종료하고

hadoop fs -dus /hbase/TableName 을 한다.


mapreduce에서 hadoop output 없애기

conf.setOutputFormat(NullOutputFormat.class);

2013년 4월 19일 금요일


Passing parameters to Mappers and Reducers

There might be a requirement to pass additional parameters to the mapper and reducers, besides the the inputs which they process. Lets say we are interested in Matrix multiplication and there are multiple ways/algorithms of doing it. We could send an input parameter to the mapper and reducers, based on which the appropriate way/algorithm is picked. There are multiple ways of doing this

Setting the parameter:

1. Use the -D command line option to set the parameter while running the job.

2. Before launching the job using the old MR API

?
1
2
JobConf job = (JobConf) getConf();
job.set("test", "123");

3. Before launching the job using the new MR API

?
1
2
3
Configuration conf = new Configuration();
conf.set("test", "123");
Job job = new Job(conf);

Getting the parameter:

1. Using the old API in the Mapper and Reducer. The JobConfigurable#configure has to be implemented in the Mapper and Reducer class.

?
1
2
3
4
private static Long N;
public void configure(JobConf job) {
    N = Long.parseLong(job.get("test"));
}

The variable N can then be used with the map and reduce functions.

2. Using the new API in the Mapper and Reducer. The context is passed to the setup, map, reduce and cleanup functions.

?
1
2
Configuration conf = context.getConfiguration();
String param = conf.get("test");