본문 바로가기
Spring Batch

[Spring Batch] CursorItemReader 알아보기

by 대우니 2024. 4. 1.
728x90
반응형

서론

Spring Batch에서 사용되는 CursorItemReader의 코드를 확인해보면서 동작 방식과 특징에 대해 알아보고자 한다. 그리고, 대용량 데이터를 처리할 때 cursor 사용이 빠른 이유와 DB 커넥션이 길어질 때 유의 사항을 알아보고자 한다.

Cursor와 Paging

 Cursor Item Reader

대용량 데이터를 가져올 때 빠른 이유

cursor는 커넥션 연결 중 한번의 SQL 문을 요청 후, 결과값을 DB 메모리에 저장하고, fetch size만큼 서버메모리에 가져오는 방식이므로 한번 정렬 쿼리하므로 커넥션 time out이 충분히 길다면 성능 측면에서 빠르다. 그러나, 몇가지 유의 사항이 있다. 

대용량 데이터를 가져올 때 유의 사항

connection timeout이 길어야 한다. cursorItemReader는 모든 데이터를 처리할 때까지 커넥션 유지해야한다. 커넥션이 끊어지면 데이터에 접근하는 커서를 조회할 수 없다.

그러나, DB 커넥션이 길어진다면 몇가지 유의해야할 점이 있다.

 

리소스 소모 증가

  • DB 리소스 고갈
    커넥션이 길어지기 때문에 DB에서 사용하는 메모리라던지, CPU, 소켓 등 장기간 점유로 인해 다른 애플리케이션에 영향을 줄 수 있는지 체크해야한다.

네트워크 비용 증가

  • 지속적으로 유지되는 커넥션은 네트워크 비용을 증가시킬 수 있다. 특히, 클라우드 환경에서는 네트워크 트래픽 비용이 증가할 수 있으므로 유의해야 한다.

 

cursor의 특징

  • cursorItemReader는 모든 데이터를 처리할 때까지 커넥션을 유지해야 한다.
  •  fetchSize만큼 메모리에 로드하는 작업을 반복하다가 chunk 사이즈만큼 차게 되면, processor와 writer를 수행한다. 그리고 모든 결과를 다 읽을 때까지 반복한다.
  • 멀티스레드로 실행 시, 커서와 resultSet에 여러 스레드가 접근하여 같은 데이터의 동기화 문제가 발생한다. 따라서 동기화 처리를 하려면 SynchronizedItemStreamReader로 감싸면 된다.
    아래처럼 read해올 때 synchronized를 선언하여 멀티스레드에서 안전하게 cursor를 사용할 수 있다. 참고로 Synchronized 키워드는 여러개의 스레드가 한개의 자원을 사용하고자 할 때현재 데이터를 사용하고 있는 해당 스레드를 제외하고 나머지 스레드들은 데이터에 접근 할 수 없도록 막는다.

출처: https://ojt90902.tistory.com/805

 

  • 커서는 재처리가 제공되지 않는다. 따라서 재처리를 위해서는 직접 로직을 구현해야 한다.
    커서는 실행 상태가 실패했을 때 재처리를 위해 order by로 정렬하면, 과거에 executionContext에 기록된 커서 순서대로 실행시킬 수 있다. 커서의 방향은 default가 forward로 정해져있기 때문에 기록된 커서 순서대로 실행시킬 수 있으며, 재처리 사이에 데이터가 insert가 되지 않는 이상, 이전과 동일하게 실행 가능하다.
  • Fetch size와 chunk size는 동일하게 설정하는 것이 좋다.
    만일 fetch 사이즈를 10으로, 청크 사이즈를 50으로 할 경우, 한번의 트랜잭션 처리를 위해 DB로부터 10만큼가져와 메모리에 로드하는 작업을 5번 후 procesor와 writer를 실행하므로, 오버헤드가 늘어난다. 메모리 여유가 된다면 fetch와 청크 사이즈를 일치 시키는 것이 성능 상 좋다. 이는 페이징도 마찬가지로 page와 chunk size를 동일하게 설정하는 것이 좋다.

cursor의 동작 방식

위 그림은 cursor의 동작방식이라 할 수 있다. fetch 요청을 통해 데이터를 fetchSize만큼 가져온 후, currentItemCount를 하나씩 증가시키면서 가져온 데이터를 다 반환할 때까지 순회한다. 그리고 cursor를 증가시켜 위를 반복한다. 실제로 코드를 통해 살펴보자.

그리고 Hive 쿼리를 사용하고 있기에 ResultSet은 HiveQueryResultSet을, JpaCursorItemReader는 data를 한번에 가져오는 것으로 알고 있어서 대용량 데이터의 경우 메모리 감당이 되지 않는다면 OOM 발생하면서 서버가 중단될 수 있으므로, JdbcCursorItemReader 쪽을 살펴봤다.

 

대강 이러한 흐름으로 코드를 살펴보고자 한다.

  1. AbstractCursorItemReader.doOpen()
  2. AbstractCursorItemReader.initializeConnection()
  3. HiveConnection.HiveConnection(String uri, Properties info) & HiveConnection.executeInitSql()
  4. JdbcCursorItemReader.openCursor(Connection con)
  5. AbstractItemCountingItemStreamItemReader.read()
  6. AbstractCursorItemReader.doRead() & JdbcCursorItemReader.readCursor()
  7. HiveQueryResultSet.next()
  8. RowMapper<T>.maRow()

AbstractCursorItemReader.doOpen()

커넥션을 초기화하고, openCursor를 호출한다.

public abstract class AbstractCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T>
implements InitializingBean {
	...
	@Override
	protected void doOpen() throws Exception {

		Assert.state(!initialized, "Stream is already initialized.  Close before re-opening.");
		Assert.isNull(rs, "ResultSet still open!  Close before re-opening.");

		initializeConnection();
		openCursor(con);
		initialized = true;

	}
    ...
}

 

AbstractCursorItemReader.initializeConnection()

  • getConnection을 호출하여 con 객체에 HiveConnection 생성자를 호출해서 초기화한다.
public abstract class AbstractCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T>
implements InitializingBean {

	protected void initializeConnection() {
		....
			if (useSharedExtendedConnection) {
				if (!(getDataSource() instanceof ExtendedConnectionDataSourceProxy)) {
					throw new InvalidDataAccessApiUsageException(
							"You must use a ExtendedConnectionDataSourceProxy for the dataSource when " +
							"useSharedExtendedConnection is set to true.");
				}
				this.con = DataSourceUtils.getConnection(dataSource);
				((ExtendedConnectionDataSourceProxy)dataSource).startCloseSuppression(this.con);
			}
			else {
				this.con = dataSource.getConnection();
			}

			this.initialConnectionAutoCommit = this.con.getAutoCommit();

			if (this.connectionAutoCommit != null && this.con.getAutoCommit() != this.connectionAutoCommit) {
				this.con.setAutoCommit(this.connectionAutoCommit);
			}
		}
		...
	}
    ...
}

 

HiveConnection.HiveConnection(String uri, Properties info) & HiveConnection.executeInitSql()

  • session 연결해두고, 쿼리 실행 후, 해당 커넥션을 계속 유지한다. 쿼리가 실행 완료되면, hdfs에서 데이터가 로드 유지한 상태에서 resultSet을 통해 was로 fetchSize만큼 읽어와야하기 때문이다.
  public class HiveConnection implements java.sql.Connection {
  	public HiveConnection(String uri, Properties info) throws SQLException {
  		...
        openSession();
        executeInitSql();
  		...
  	}
    private void executeInitSql() throws SQLException {
    	List<String> sqlList = parseInitFile(initFile);
        Statement st = createStatement();
        for(String sql : sqlList) {
          boolean hasResult = st.execute(sql);
          if (hasResult) {
            ResultSet rs = st.getResultSet();
            while (rs.next()) {
              System.out.println(rs.getString(1));
            }
          }
        }
        ...
    }
}

 

JdbcCursorItemReader.openCursor(Connection con)

  • 설정했던 fetchSize, fetchDirection과 queryTimeOut, maxRows 등을 설정한다. fetchSize는 DB cursor가 가리키는 데이터를 가져와 메모리에 로드하는 개수이므로, 힙메모리 최대 사이즈를 고려하여 설정해야한다. 보통은 DB에서 가져올 데이터 샘플 * fetchSize로 고려한다.
  • 그리고 executeQuery를 통해 resultSet 객체를 생성한다. 참고로 resultSet은 DB 결과값을 로컬에 저장하는 장소이며, fetch size만큼의 DB 결과값은 rowsItr(Iterator)에 담겨진다. 하단에서 좀 더 자세하게 살펴볼 예정이다.
public class JdbcCursorItemReader<T> extends AbstractCursorItemReader<T> {
    private PreparedStatement preparedStatement;    
	...
    
	@Override
	protected void openCursor(Connection con) {
		try {
			if (isUseSharedExtendedConnection()) {
				preparedStatement = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
						ResultSet.HOLD_CURSORS_OVER_COMMIT);
			}
			else {
				preparedStatement = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
			}
			applyStatementSettings(preparedStatement);
			if (this.preparedStatementSetter != null) {
				preparedStatementSetter.setValues(preparedStatement);
			}
            // AbstractCursorItemReader 에 선언된 ResultSet rs;
			this.rs = preparedStatement.executeQuery();
			handleWarnings(preparedStatement);
		}
		catch (SQLException se) {
			close();
			throw translateSqlException("Executing query", getSql(), se);
		}

	}
}

 

AbstractItemCountingItemStreamItemReader.read()

  • 위에서 언급했던 resultSet에서 rowsItr(Iterator)에 담긴 객체는 array 형태로, 인덱스를 필요로 한다. 따라서 currentItemCount는 인덱스이다.
  • doRead()를 통해 resultSet에 rowsItr에서 currentItemCount인덱스에 위치한 데이터를 가져온다.
public abstract class AbstractItemCountingItemStreamItemReader<T> extends AbstractItemStreamItemReader<T> {
	...
    @Nullable
	@Override
	public T read() throws Exception, UnexpectedInputException, ParseException {
		if (currentItemCount >= maxItemCount) {
			return null;
		}
		currentItemCount++;
		T item = doRead();
		if(item instanceof ItemCountAware) {
			((ItemCountAware) item).setItemCount(currentItemCount);
		}
		return item;
	}
}

 

AbstractCursorItemReader.doRead() & JdbcCursorItemReader.readCursor()

  • readCursor는 resultSet에 담긴 currentItemCount 인덱스에 위치한 데이터를 mapRow에서 설정한 타입 별로 매핑하여 데이터를 가져와서 반환한다.
public abstract class AbstractCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T>
implements InitializingBean {
	...
    
    	@Nullable
	@Override
	protected T doRead() throws Exception {
		if (rs == null) {
			throw new ReaderNotOpenException("Reader must be open before it can be read.");
		}

		try {
			if (!rs.next()) {
				return null;
			}
			int currentRow = getCurrentItemCount();
			T item = readCursor(rs, currentRow);
			verifyCursorPosition(currentRow);
			return item;
		}
		catch (SQLException se) {
			throw translateSqlException("Attempt to process next row failed", getSql(), se);
		}
	}
}
public class JdbcCursorItemReader<T> extends AbstractCursorItemReader<T> {
	...
    @Nullable
	@Override
	protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
		return rowMapper.mapRow(rs, currentRow);
	}
    ..
}

HiveQueryResultSet.next()

  • 가장 처음 fetch하거나 fetchSize만큼 가져온 데이터가 다 읽어졌을 경우, hiveConnection 생성 시, HDFS로부터 로드된 데이터에서 설정된 fetchSize만큼  rpc로 가져와서 fetchedRows에 저장한다. 해당 값은 순회하면서 커서에 따라 값을 반환해야하므로 Iterator에 감싼다.
  • 조회가능할 경우, 조회하면서(Iterator.next()) row에 로드한다.
public class HiveQueryResultSet extends HiveBaseResultSet {
  
  public boolean next() throws SQLException {
    private Iterator<Object[]> fetchedRowsItr;
    ...
      if (fetchFirst) {
        // If we are asked to start from begining, clear the current fetched resultset
        orientation = TFetchOrientation.FETCH_FIRST;
        fetchedRows = null;
        fetchedRowsItr = null;
        fetchFirst = false;
      }
      if (fetchedRows == null || !fetchedRowsItr.hasNext()) {
        TFetchResultsReq fetchReq = new TFetchResultsReq(stmtHandle,
            orientation, fetchSize);
        TFetchResultsResp fetchResp;
        fetchResp = client.FetchResults(fetchReq);
        Utils.verifySuccessWithInfo(fetchResp.getStatus());

        TRowSet results = fetchResp.getResults();
        fetchedRows = RowSetFactory.create(results, protocol);
        fetchedRowsItr = fetchedRows.iterator();
      }

      if (fetchedRowsItr.hasNext()) {
        row = fetchedRowsItr.next();
      } else {
        return false;
      }

      rowsFetched++;
    ...
    return true;
  }

 

RowMapper<T>.maRow()

  • ResultSet의 rowNum에 위치한 데이터를 해당 데이터 타입으로 매핑하도록 정의한다.
    private static class PersonRowMapper implements RowMapper<Person> {
            @Override
        public Person mapRow(ResultSet rs, int rowNum) throws SQLException {
            return Person.builder()
            			 .personNumber(rs.getLong("person_number"))
                         ...
                         .build();
        }
    }

paging

 

이미지 출처: https://junuuu.tistory.com/611

  • 페이지 사이즈 만큼 데이터를 한 번에 처리
  • 페이지 사이즈 만큼 커넥션을 맺고 끊음
  • 멀티 스레드 환경에서 스레드 안정성을 보장하기에 별도 동기화 처리 불필요

정리

  •  paging은 page사이즈 만큼 매번 커넥션 연결하면서 order by 후 limit만큼 가져온다 , 결국 이 작업을 데이터 총 수 / 페이지 사이즈 만큼 해야한다. 따라서 훨씬 비효율적이다.
  • cursor는 커넥션 연결 중 한번의 SQL 문을 요청 후, 결과값을 DB 메모리에 저장하고, fetch size만큼 서버메모리에 가져오는 방식이므로 한번 정렬 쿼리하므로 커넥션 time out이 충분히 길다면 성능 측면에서 빠르다.

 

 

 

반응형