ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Elasticssearch BulkProcessor를 사용하자
    elasticsearch 2021. 3. 13. 00:24
    반응형

    cdc(change data capture) api를 구축해서 사용하고 있었다 webflux를 사용했고

    클라이언트에서도 데이터 셋은 최대한 중복처리 해서 보내주지만 요청 당 es로 bulk 하는 방식이었다.

    즉 client에서 1,2,3,4~n의 변경된 테이블 기본키를 몇 초 정도 수집하고 데이터를 전송하는데

    많을 때는 백건 이상도 있지만 적을 땐 몇 건 밖에 없을 때도 많았다 그래서 es로 전송하는 요청 수도 상당히 많아지고 요청 오는 즉시 색인(대부분 수정)을 하니 잦은 요청으로 인한 세그먼트 삭제와 추가가 발생하였고 그로 인해 잦은 머지로 퍼포먼스 문제가 있어서 bulk processor를 도입하기로 하였다.

     

    기본적으로 es에서는 bulk요청 시 바로 요청을 해주는 것도 있지만

    restHighLevelClient.bulkAsync

     

    bulkprocessor도 내부적으로는 위와 같이 사용하지만 요청 수, 용량, 시간에 따라서 대량 작업을 flush(refresh, flush 아님)하는 api를 제공한다

     

    BulkProcessor.Listener listener = new BulkProcessor.Listener() {
        @Override
        public void beforeBulk(long executionId, BulkRequest request) {
    
        }
    
        @Override
        public void afterBulk(long executionId, BulkRequest request,
                              BulkResponse response) {
    
        }
    
        @Override
        public void afterBulk(long executionId, BulkRequest request,
                              Throwable failure) {
    
        }
    };
    
    BulkProcessor.builder(
      (request, bulkListener) -> {
          highLevelSearch.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
      },
      listener
    )
      .setBulkActions(5000)//5000건이 요청되면 플러쉬
      .setBulkSize(new ByteSizeValue(7L, ByteSizeUnit.MB))//용량이 7메가가 되면 플러쉬
      .setFlushInterval(TimeValue.timeValueSeconds(10L))//10초가 되면 플러쉬
      .setBackoffPolicy(
          BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(5L), 1)
      )//retry 정책 요청 후 5초이상 응답이 없을 경우 1회 재시도
      .build()
    
    bulkProcessor.add(
        new IndexRequest("index")
            .id(key)
            .source(bytes, XContentType.JSON)
    );
    

    위의 방법을 도입하였고 인덱스 리프레쉬 주기는 그대로 가져가되 클라이언트에서 주는 데이터 주기는 좀 더 길게 가져갔다 물론 엄청난 실시간으로 제공해줘야 하는 곳에서는 문제가 있을 수도 있지만 보통 10~20초 내에 색인이 되도록 해도 문제가 없다면 하는 게 좋다 그리고 10~20초 아니어도 5초 정도여도 충분히 이점이 있을 것이다 

    사용 방법은 워낙 간단해 이 정도로 하고 그리고 난 또 한 가지가 궁금했다 기존에는 bulkAsync를 상용해서 색인했는데 멀티스레드에 안전할까였다 검색해 봤더니 https://discuss.elastic.co/t/bulkprocessor-usage-is-safe/12625 해당 링크에서 안전하다고 하였고 궁금해서 소스를 좀 찾아보았다

     

    bulkProcessor.add()
    
    private void internalAdd(DocWriteRequest<?> request) {
        //bulkRequest and instance swapping is not threadsafe, so execute the mutations under a lock.
        //once the bulk request is ready to be shipped swap the instance reference unlock and send the local reference to the handler.
        Tuple<BulkRequest, Long> bulkRequestToExecute = null;
        lock.lock();
        try {
            ensureOpen();
            bulkRequest.add(request);
            bulkRequestToExecute = newBulkRequestIfNeeded();
        } finally {
            lock.unlock();
        }
        //execute sending the local reference outside the lock to allow handler to control the concurrency via it's configuration.
    		//newBulkRequestIfNeeded 결과에 따라 실행 결정
        if (bulkRequestToExecute != null) {
            execute(bulkRequestToExecute.v1(), bulkRequestToExecute.v2());
        }
    }

    그럼 왜 이 코드에 저런 lock을 걸었을까?? 물론 멀티 스레드에서 안전하다는 글이 있지만 여러 가지 이유가 있을 수 있겠지만 bulkactions 때문이다

    bulkRequest.add 되는 부분에서 멀티스레드 환경은 동시에 add를 할 수도 있다

    5000건이 되었을 때 요청(es로)을 함으로 제한을 해야 하는데 동시성에 대한 처리가 없다면 5001건이 될 수도 있고 스레드에 따라 더해질 수도 있다 if bulkRequstSize < 5000 이런 조건으로 제한을 해서 처리한다 해도

    bulkRequestSize가 4999일 때 동시에 스레드가 접근해서 조건을 통과해 add를 한다면 5001건이 되어 버린다 그래서 동시성에 대한 처리가 필요하다 설명을 잘 못하는 감이 있는데 자바 멀티 쓰레드 배울 때 계좌 예제를 생각하면 쉬울 것이다

     

    bulkRequestToExecute = newBulkRequestIfNeeded(); 함수를 살펴보면
    
    private Tuple<BulkRequest,Long> newBulkRequestIfNeeded(){
        ensureOpen();
    		
    		사이즈와 용량이 설정보다 크다면 true로 반환되어 부정하니 false가 됨
        if (!isOverTheLimit()) {
            return null;
        }
    		
    		현재 멤버변수인 bulkRequest를 지역 변수에 할당하고 bulkRequestSupplier.get()으로 새로운 
    		bulkRequest를 할당한다
        final BulkRequest bulkRequest = this.bulkRequest;
        this.bulkRequest = bulkRequestSupplier.get();
        return new Tuple<>(bulkRequest,executionIdGen.incrementAndGet()) ;
    }
    
    private boolean isOverTheLimit() {
    		사이즈 체크
        if (bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) {
            return true;
        }
    		용량 체크
        if (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize) {
            return true;
        }
        return false;
    }

    그리고 마지막으로 BulkProcessor를 close 할 때 코드를 보자 여기도 마찬가지로 동시성이 처리되어있다 여러 번 종료될 수 있기 때문이다

    종료 될 때 처리할 요청이 있다면 요청 처리하고 우아하게 종료된다

    lock.lock();
    try {
        if (closed) {
            return true;
        }
        closed = true;
    
        this.cancellableFlushTask.cancel();
    
        if (bulkRequest.numberOfActions() > 0) {
            execute();
        }
        try {
            return this.bulkRequestHandler.awaitClose(timeout, unit);
        } finally {
            onClose.run();
        }
    } finally {
        lock.unlock();
    }

     

    반응형

    댓글

Designed by Tistory.