이전 글에서 Vert.x 이벤트 핸들러를 처리를 위해서 제공하는 인터페이스에 사용 예를 간단히 알아보았다. 하지만 해당 인터페이스의 도움이 있다고 해도 순차적으로 처리되어야 하는 비동기 작업은 어쩔 수 없이 “Callback Hell”은 벗어 날 수 없다. 이번 글에서는 “Callback Hell”의 예를 살펴보고 Vert.x 프레임워크에서 제시하는 해결책을 알아보고자 한다.
Callback Hell이란 순차적으로 처리해야 하는 비동기처리를 위해서 콜백 함수를 연속해서 호출하는 경우를 말한다. 예를 들면 P1라는 비동기함수를 호출하여 비동기처리가 완료되었을 때 C1이라는 콜백 함수를 호출하게 되어 있다고 하자. C1 콜백 함수는 다시 P2라는 비동기함수를 다시 호출하고 P2가 완료되었을 때 C2 콜백을 호출한다고 하자. 이렇게 몇 단계만 거치면 코드가 복잡해지면서 이해하기 힘든 코드가 작성되게 된다. Welcome to The Hell이다.
1 2 3 4
Main -> P1 -> C1 -> P2 -> C2 Main <- P1 <-
이런 실 예는 현업 개발에서 금방 찾을 수 있다. 가장 쉽게 볼 수 있는 부분이 DB CRUD 작업에서 볼 수 있다. Vert.x 에서 제공하는 MySQL 비동기 라이브러리 사용 예를 살펴보자.
// All operations execute on the same connection conn.query("SELECT * FROM users WHERE user_id='walker' AND name='Walker. Lee.'", ar2 -> { if (ar2.succeeded()) { conn.query("INSERT INTO users (user_id, name) VALUES ('walker', 'Walker. Lee.') ", ar3 -> { if (ar3.succeeded()) { conn.query("INSERT INTO audit_log(user_id, log_event, log_date) VALUES ('walker', 'Add User.', '2019.12.23') ", ar4 -> { // Release the connection to the pool if (ar4.succeeded()) { System.out.println("ar4 success: "); } else { System.out.println("ar4 error : " + ar4.cause().getMessage()); } conn.close(); }); } else { System.out.println("ar3 error : " + ar3.cause().getMessage()); conn.close(); } }); } else { // Release the connection to the pool System.out.println("ar2 error : " + ar2.cause().getMessage()); conn.close(); } }); } else { System.out.println("Could not connect: " + ar1.cause().getMessage()); } });
위 코드는 Vert.x 프레임워크 공식 사이트 “Reactive MySQL Client” 문서의 코드를 간단히 수정한 것이다. DB 접속과 3번의 쿼리 요청을 위해서 콜백함수를 4번 호출하고 있다. 예제 코드에서는 로직이 간단해서 복잡도가 낮지만 현업에서는 그렇지 않은 경우가 더 많을 것이다. 코드가 더 복잡해지고 몇 단계 더 콜백 호출이 더해지면 복잡도가 올라가서 유지보수가 힘들어진다. 그래서 타 언어들에서 이런 비동기 처리 시 발생하는 문제를 해결하기 위해서 Reactive 프로그래밍을 사용하기도 한다. (* Vert.x 프레임워크 역시 Reactive Programming을 지원한다.) 그럼 Vert.x 프레임워크에서 어떻게 해결하는지 알아보자.
compose
이전 글에서 Future 인터페이스를 소개하였다. Future 인터페이스의 멤버 중에 아래와 같이 compose라고 하는 default 메쏘드가 선언되어있다.
/** * Compose this future with a {@code mapper} function.<p> * * When this future (the one on which {@code compose} is called) succeeds, the {@code mapper} will be called with * the completed value and this mapper returns another future object. This returned future completion will complete * the future returned by this method call.<p> * * If the {@code mapper} throws an exception, the returned future will be failed with this exception.<p> * * When this future fails, the failure will be propagated to the returned future and the {@code mapper} * will not be called. * * @param mapper the mapper function * @return the composed future */ default <U> Future<U> compose(Function<T, Future<U>> mapper){ return compose(mapper, Future::failedFuture); }
/** * Compose this future with a {@code successMapper} and {@code failureMapper} functions.<p> * * When this future (the one on which {@code compose} is called) succeeds, the {@code successMapper} will be called with * the completed value and this mapper returns another future object. This returned future completion will complete * the future returned by this method call.<p> * * When this future (the one on which {@code compose} is called) fails, the {@code failureMapper} will be called with * the failure and this mapper returns another future object. This returned future completion will complete * the future returned by this method call.<p> * * If any mapper function throws an exception, the returned future will be failed with this exception.<p> * * @param successMapper the function mapping the success * @param failureMapper the function mapping the failure * @return the composed future */ default <U> Future<U> compose(Function<T, Future<U>> successMapper, Function<Throwable, Future<U>> failureMapper){ if (successMapper == null) { thrownew NullPointerException(); } if (failureMapper == null) { thrownew NullPointerException(); } Promise<U> ret = Promise.promise(); setHandler(ar -> { if (ar.succeeded()) { Future<U> apply; try { apply = successMapper.apply(ar.result()); } catch (Throwable e) { ret.fail(e); return; } apply.setHandler(ret); } else { Future<U> apply; try { apply = failureMapper.apply(ar.cause()); } catch (Throwable e) { ret.fail(e); return; } apply.setHandler(ret); } }); return ret.future(); }
코드는 복잡해도 내용은 간단하다. 해당 Future 객체의 핸들러를 인자로 받은 successMapper와 failureMapper로 지정하는데 Future가 성공 시에는 successMapper를, 실패 시에는 failureMapper를 호출하는 것이다. 여기서 가중 중요한 부분이 하나 더 있는데 리턴 객체이다. 최초의 Future객체에 핸들러로 지정된 successMapper/failureMapper 함수형 인터페이스 객체의 실행 결과를 다시 Promise로 받아서 return ret.future()로 Future 객체로 만들어 반환하는 것을 볼 수 있다. 이는 내부에서 호출되는 핸들러의 결과를 가지고 다시 compose()를 호출할 수 있게 해준다. 이렇게 수십 개의 로직을 역어서 평면적인 비동기 로직을 호출할 수 있게 만들 수 있다. 체인 구성을 가능하게 해주는 것이다. 위 DB Query 콜백 코드를. compose()로 변경해보았다.
MySQLConnectOptions connectOptions = new MySQLConnectOptions() .setPort(10100) .setHost("127.0.0.1") .setDatabase("test") .setUser("root") .setPassword("iww2rm");
// Pool options PoolOptions poolOptions = new PoolOptions() .setMaxSize(5);
// Create the pooled client MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions);
// Get a connection from the pool Future<SqlConnection> fut1 = Future.future(promise -> client.getConnection(ar -> { if (ar.succeeded()) { System.out.println("Connected"); promise.complete(ar.result()); } else { promise.fail(ar.cause()); } }));
fut1.compose(connection->{ Promise<SqlConnection> promise = Promise.promise(); connection.query("SELECT * FROM users WHERE user_id='walker' AND name='Walker. Lee.'", ar -> { if (ar.succeeded()) { promise.complete(connection); } else { promise.fail(ar.cause()); connection.close(); } }); return promise.future(); }).compose(connection-> { Promise<SqlConnection> promise = Promise.promise(); connection.query("INSERT INTO users (user_id, name) VALUES ('walker', 'Walker. Lee.') ", ar -> { if (ar.succeeded()) { promise.complete(connection); } else { promise.fail(ar.cause()); connection.close(); } }); return promise.future(); }).compose(connection-> { Promise<SqlConnection> promise = Promise.promise(); connection.query("INSERT INTO audit_log(user_id, log_event, log_date) VALUES ('walker', 'Add User.', '2019.12.23') ", ar -> { if (ar.succeeded()) { promise.complete(connection); System.out.println("success query."); } else { promise.fail(ar.cause()); connection.close(); } }); return promise.future(); }).setHandler(ar -> { if (ar.succeeded()) { ar.result().close(); } else { System.out.println("error : " + ar.cause().getMessage()); } });
코드를 보면 DB 접속을 진행하면서 Future 객체를 하나 만들었다. 해당 Future 객체는 SqlConnection객체를 가지고 있다. 이 Future 객체에 compose(Function<T, Future<U>> successMapper) 메쏘드를 호출한다. 이 compose() 메쏘드는 Future가 성공했을 때만 전달된 람다 함수가 호출된다(compose 메쏘드 중에 인자가 하나인 메쏘드는 실패 메쏘드가 내부적으로 호출해줌.). 내부에서는 접속 객체로 쿼리가 호출하게 되어있다. 그리고 다시 성공 실패 여부를 Promise 객체를 생성해서 Future <SqlConnection>로 리턴한다. 이렇게 이후 두개의 compose()를 동일하게 정의해서 나머지 두 개의 쿼리를 실행한다. 마지막에는 최종 결과를 setHandler()를 호출하고 핸들러 람다 함수를 선언하여 마무리한다. 정의한 코드량은 조금 늘어난 것 같지만 코드 자체의 복잡도는 매우 낮아졌다. 앞에 compose()가 성공적이면 다음을 진행하고 마지막 결과를 핸들러로 처리하는 식이다. 코드 진행이 한눈에 들어온다.
마무리
이번 글은 compose()로 비동기 다중 콜백 호출을 1차원 체인으로 구성하는 방법을 설명하였다. 초기 코드를 이해하는 난이도가 조금 있지만 이해하고 나면 개발 속도면이나 유지보수면에서 훌륭한 Callback Hell 해결책이다. 이 방법 이외에도 Vert.x Reactive Library를 이용하는 방법도 있다. Reactive에 익숙하다면 한번 도전해보자.