Development/DevLife

[인프콘 2022후기] 10만 connection 그까이꺼, Armeria 서버 한 대면 끝!

📝 작성 : 2022.08.27  ⏱ 수정 : 
반응형

전체적인 INFCON2022에 대한 후기는 INFCON 2022 후기 글을 참고 해 주세요.

IMG_8174 Medium

개인 코멘트

얼마전에 Armeria 튜토리얼 따라해보기 를 해보면서 까지 준비 할 만큼 기대를 많이 했습니다.

심지어 40명 선착순 신청으로만 참여 할 수 있는 핸드온 세션 이기 때문에 목요일 오후 4시 30분 땡 되자 마자 바로 신청을 했고 그 덕에 참여 할 수 있었습니다. 매번 동기 서버만 작성 해 보았기에 비동기 서버가 어떤 원리로 동작하는지 그리고 어떻게 구현하는지 궁금 했는데 오랜만에 화면을 보며 코드를 따라 치는 경험도 할 수 있었고, 영상으로만 보던 Armeria 팀원분들을 뵐 수 있어 좋았습니다. 정말 재밌었고 참여하길 잘했단 생각이 듭니다.

발표는 민우님이 해 주셨고, 익훈님과 한남님도 함께 자리 해 주셨습니다.

IMG_8175 Medium

개인별로 넓직한 자리에서 랩탑을 놓고 쓸 수 있었고, 인터넷 접속을 위한 아이디,비밀번호가 적힌 종이를 지급 해 주었습니다.

준비사항

참여 전 아래의 준비를 해야 합니다.

$ git clone https://github.com/minwoox/infcon-armeria.git
$ ./gradlew build
  • Armeria는 비동기 마이크로소프트 서비스를 쉽게 at your face로 구현할 수 있게 해줌

  • 비동기 서버 개발이 처음이신분? 나 포함 겨우 5명,,

  • Tomcat의 기본 쓰레드는 200개. http response는 한번에 오지 않는다.

  • 비동기 서버는 이 개념을 이해하기가 처음에 어려울 뿐 이것만 이해하면 그 다음부터는 쉽다.

핸드온을 통해 구현할 최종 목표

image-20220827181440718

https://github.com/minwoox/infcon-armeria

Hello Armeria

image-20220827181517622

Main.java

public final class Main {

    public static void main(String[] args) {
        ServerBuilder serverBuilder = Server.builder();
        Server server = serverBuilder.http(8080)
                .service("/infcon", new MyService())
                .build();

        CompletableFuture<Void> future = server.start();
        future.join();
    }
}

MyService.java

public final class MyService implements HttpService {

    @Override
    public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exception {
        return HttpResponse.of("Hello, Armeria!");
    }
}

천천히 응답하는 백엔드

image-20220827181622171

1. 동기 backend

Backend.java

public final class Backend {

    private final Server server;

    private Backend(String name, int port) {
        server = Server.builder()
                .http(port)
                .service("/foo", ((ctx, req) -> {
                    HttpResponse response = HttpResponse.of("response from/: " + name);
                    return HttpResponse.delayed(response, Duration.ofSeconds(3));
                })).build();
    }

    public static Backend of(String name, int port) {
        return new Backend(name, port);
    }

    public void start() {
        server.start().join();
    }

}

BackendTest.java

class BackendTest {

    @Test
    void backend() {
        final Backend foo = Backend.of("foo", 9000);
        foo.start();

        final WebClient webClient = WebClient.of("http://127.0.0.1:9000");

        // 이 시점에 httpResponse는 응답을 가지고 있지 않음. 3초 후에 응답을 return 하게끔 했기 때문에
        // 응답을 가지고 있지 않은 껍데기.
        HttpResponse httpResponse = webClient.get("/foo");
        System.err.println("Thread name: " + Thread.currentThread().getName());
        // 동기서버는 3초를 기다리지만, 비동기서버에서는 기다리지 않음.

        // response는 한번에 오지 않는다.
        // aggregate 를 이용 하면 header 와 body가 따로따로 오는걸 잘 모아서 하나의 aggregated된 Response로 만들어준다.
        CompletableFuture<AggregatedHttpResponse> future = httpResponse.aggregate();
        // 지금 이 future는 body를 가지고 있지 않음. 또 다른 껍데기. 껍데기에서 껍데기를 만든 상태.

        // 비동기서버에서는 join을 절대로 사용 하면 안됨.
        AggregatedHttpResponse aggregatedHttpResponse = future.join();
        String content = aggregatedHttpResponse.contentUtf8();
        System.err.println(content);


    }
}

httpResponse는 단지 wrapper임. 생성 만으로는 무엇을 가지고 있지 않은 껍데기에 불과.

그렇지 않다면 쓰레드는 response가 도착하는 3초 동안 기다려야 하는데 그 경우에는 비동기가 아님. 쓰레드는 200개만 있는데 동시에 200개가 넘는 요청이 온다면 그걸 어떻게 처리?

2. 비동기 Backend (call back)

HTTP 응답은 Header frame과 Data frame을 가지고 있는데 한번에 오지 않는다. 프레임을 하나씩 다루거나 aggregate 해야함. 이벤트 루프를 막지 않기 위해 콜백을 활용

BackendTest.java

class BackendTest {

    @Test
    void backend() throws InterruptedException {
        final Backend foo = Backend.of("foo", 9000);
        foo.start();

        final WebClient webClient = WebClient.of("http://127.0.0.1:9000");

        // 이 시점에 httpResponse는 응답을 가지고 있지 않음. 3초 후에 응답을 return 하게끔 했기 때문에
        // 응답을 가지고 있지 않은 껍데기.
        HttpResponse httpResponse = webClient.get("/foo");
        System.err.println("Thread name: " + Thread.currentThread().getName());
        // 동기서버는 3초를 기다리지만, 비동기서버에서는 기다리지 않음.

        // response는 한번에 오지 않는다.
        // aggregate 를 이용 하면 header 와 body가 따로따로 오는걸 잘 모아서 하나의 aggregated된 Response로 만들어준다.
        CompletableFuture<AggregatedHttpResponse> future = httpResponse.aggregate();
        // 지금 이 future는 body를 가지고 있지 않음. 또 다른 껍데기. 껍데기에서 껍데기를 만든 상태.

        // join 이 아닌 callback 을 사용 해야 한다.
        // future에 callback을 등록 해서 3초 후에 응답이 발생하면 알맹이가 채워지고, 오리지널 클라이언트에게 요청
        future.thenAccept(aggregatedHttpResponse -> {
            // 이벤트 루프. 요청을 받아서 futre 껍데기 에다가 알맹이를 넣어주는 쓰레드
            System.err.println("In callback. Thread name: " + Thread.currentThread().getName());
            sendBackToTheOriginalClient(aggregatedHttpResponse);
        });

        Thread.sleep(Long.MAX_VALUE);
    }

    private void sendBackToTheOriginalClient(AggregatedHttpResponse aggregatedHttpResponse) {
    }
}

두개의 백엔드

image-20220827183244340

두개의 백엔드를 위해 Backend.java 변경

Backend.java

public final class Backend {

    private final Server server;

    private Backend(String name, int port) {
        server = Server.builder()
                .http(port)
                .service("/" + name, ((ctx, req) -> {
                    HttpResponse response = HttpResponse.of("response from: " + name);
                    return HttpResponse.delayed(response, Duration.ofSeconds(3));
                })).build();
    }

    public static Backend of(String name, int port) {
        return new Backend(name, port);
    }

    public void start() {
        server.start().join();
    }

}

메인도 변경

Main.java

public final class Main {

    public static void main(String[] args) {
        final Backend foo = Backend.of("foo", 9000);
        foo.start();
        final WebClient fooClient = WebClient.of("http://127.0.0.1:9000");

        final Backend bar = Backend.of("bar", 9001);
        bar.start();
        final WebClient barClient = WebClient.of("http://127.0.0.1:9001");

        ServerBuilder serverBuilder = Server.builder();
        Server server = serverBuilder.http(8080)
                .service("/infcon", new MyService(fooClient, barClient))
                .build();

        CompletableFuture<Void> future = server.start();
        future.join();
    }
}

서비스 변경. 두개의 클라이언트를 콜백 호출

Service.java

public final class MyService implements HttpService {

    private final WebClient fooClient;
    private final WebClient barClient;

    public MyService(WebClient fooClient, WebClient barClient) {
        this.fooClient = fooClient;
        this.barClient = barClient;
    }

    @Override
    public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exception {
        CompletableFuture<HttpResponse> future = new CompletableFuture<>();

        fooClient.get("/foo").aggregate().thenAccept(fooResponse -> {
            barClient.get("/bar").aggregate().thenAccept(barResponse -> {
                HttpResponse response = HttpResponse.of(fooResponse.contentUtf8() + '\n' + barResponse.contentUtf8());
                future.complete(response);
            });
        });

        return HttpResponse.from(future);
    }
}

데코레이터

시간 관계상 데코레이터 시작하는 부분에서 발표가 끝났고 이후 부터는 각자 알아서 주말동안 해보기로 하였습니다.

image-20220827183741266

데코레이터를 통해 MyService를 감싸는 interceptor에서 로깅을 남긴다거나 인증이 되었는지를 확인 할 수 있다.

AuthDecorator.java

public final class AuthDecorator implements DecoratingHttpServiceFunction {

    private final WebClient authClient;

    public AuthDecorator(WebClient authClient) {
        this.authClient = authClient;
    }

    @Override
    public HttpResponse serve(HttpService delegate, ServiceRequestContext ctx, HttpRequest req) {
        CompletableFuture<HttpResponse> future = new CompletableFuture<>();
        authClient.get("/auth").aggregate().thenAccept(aggregatedHttpResponse -> {
            if ("authorized".equals(aggregatedHttpResponse.contentUtf8())) {
                try {
                    future.complete(delegate.serve(ctx, req));
                } catch (Exception e) {
                    future.completeExceptionally(e);
                }
            } else {
                future.complete(HttpResponse.of(401));
            }
        });
        return HttpResponse.from(future);
    }

}

AuthServer.java

public class AuthServer {

    private final Server server;

    public static AuthServer of(int port) {
        return new AuthServer(port);
    }

    private AuthServer(int port) {
        server = Server.builder()
                .http(port)
                .service(
                        "/auth",
                        (ctx, req) -> {
                            System.err.println(this.getClass() + ": 요청 처리");
                            return HttpResponse.of("authorized");
                        })
                .build();
    }

    public void start() {
        server.start().join();
    }

}

Main.java

public final class Main {

    public static void main(String[] args) {

        final AuthServer authServer = AuthServer.of(8999);
        authServer.start();
        WebClient authClient = WebClient.of("http://127.0.0.1:8999");

        final Backend foo = Backend.of("foo", 9000);
        foo.start();
        final WebClient fooClient = WebClient.of("http://127.0.0.1:9000");

        final Backend bar = Backend.of("bar", 9001);
        bar.start();
        final WebClient barClient = WebClient.of("http://127.0.0.1:9001");

        ServerBuilder serverBuilder = Server.builder();
        Server server = serverBuilder.http(8080)
                .decorator(new AuthDecorator(authClient))
                .service("/infcon", new MyService(fooClient, barClient))
                .build();

        CompletableFuture<Void> future = server.start();
        future.join();
    }
}

테스트

multipleRequests()

    // Run main() before run this test.
    @Test
    void multipleRequests() throws InterruptedException {
        final int TARGET = 100_000;

        final long start = System.nanoTime();
        final CountDownLatch latch = new CountDownLatch(10000);
        for (int i = 0; i < TARGET; i++) {
            final WebClient webClient = WebClient.of("http://127.0.0.1:8080");
            webClient.get("/infcon").aggregate().handle((aggregatedHttpResponse, throwable) -> {
                latch.countDown();
                return null;
            });
        }
        latch.await();
        System.err.println("Elapsed time: " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start) + " ms");
    }

  • 10만커넥션 그까이꺼~ 답게 10만 요청을 금방 처리 해 냈습니다.

  • 이 테스트코드에서 10만보다 숫자를 더 높이며 테스트 했을 때에는 서버는 요청은 오는대로 처리를 하려고는 하는데.. 힙메모리 부족으로 서버가 아닌 테스트 코드가 중간에 죽어버렸습니다.

반응형