netty中引入spring-boot

netty是Java世界中高并发的框架,号称单台物理机能够支撑100万并发连接,是Java世界中高性能并发的不二之选。不过,跟spring-boot相比,其开发有点偏于底层,写起来没有spring-boot那么爽快,开发的效率不高。
我的一个项目中,有高并发的需求,单靠spring-boot自带的tomcat无法满足性能上的要求。因此,我选择netty,作为底层框架。为了能够提高开发效率,我尝试将spring-boot引入我的开发中。仔细想想,其实整个spring都是建立在IOC和AOP之上的,所以只要我引入spring-boot这两个最基础的组件,那么势必整个spring-boot的组件都能为我所用。

不过spring-web不晓得该咋引入,其它的组件都不成问题。不过从我的角度看,netty本身就是网络框架,基本没必要在引入一个spring-web

我的项目中使用maven做整个工程管理,以下是pom.xml,我只保留了spring-boot和netty的部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.test</groupId>
<artifactId>netty.spring-boot</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>pom</packaging>

<name>cdn-router</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring.version>4.3.10.RELEASE</spring.version>
<build-tool.version>1.0.0</build-tool.version>
<cdn-opentsdb.version>1.0.0-SNAPSHOT</cdn-opentsdb.version>
</properties>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.13.Final</version>
</dependency>
<!-- log配置:Log4j2 + Slf4j -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<!--<version>2.8.2</version>-->
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<!--<version>2.8.2</version>-->
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<!--<version>1.7.25</version>-->
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<!--<version>2.8.2</version>-->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.31</version>
</dependency>

</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<!-- Import dependency management from Spring Boot -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>1.5.6.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<finalName>${finalName}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>com.test.CDNRouterServer</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>

</plugins>
</build>
</project>

接下来,我实现一个http消息的handler,并将其设置为IOC的bean,让spring-boot去管理它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* @Author Derek.
* @Date 2017/7/18 9:24.
*/
@Component
@Scope("prototype")
public class OpsHttpMessageHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
System.out.println("OK");
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("Channel Test".getBytes("utf-8")));
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

注意:其中的@Scope(“prototype”),因为netty会为每个eventloop重新生成一个handler的处理链,因此默认情况下,线程间不会共享handler。这样做的好处可以避免临界区访问的问题,从而避免了线程冲突和切换,提高并发率。而spring-boot中默认情况下,bean是单例模式的,也就是说这个bean只会有一个实例,而显然不适合与netty对handler的默认假设。因此,我们将bean改成原型模式,即@Scope(“prototype”)。在这个状态下,每次引用这个bean的时候,都会创建一个实例。当然,netty中也支持共享的handler,这时候需要在handler中注上@Sharable,此时就可以使用spring-boot默认的单例的bean了。具体见下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* @Author Derek.
* @Date 2017/5/18 10:58.
*/
@Component
@Sharable
public class Http2MessageHandler extends ChannelDuplexHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("OK");
super.channelRead(ctx, msg);
}
}

接下来,我将使用上面创建的handler创建http server,同样也设置为spring的bean。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Component(value = "opsHttpServer")
public class OpsHttpServer implements Runnable {
@Autowired
final private HttpProperty httpProperty = null;

@Autowired
private ServerBootstrapFactory factory;

@Autowired
final private ApplicationContext applicationContext = null;

final private EventExecutorGroup pool = new DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors() * 2);

private static final int MAX_CONTENT_LENGTH = 1024 * 100;


@Override
public void run() {
ServerBootstrap tcpBootStrap = factory.newServerBootstrap(0);
tcpBootStrap.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HttpServerCodec())
.addLast(new HttpContentDecompressor())
.addLast(new ChunkedWriteHandler())
.addLast(new HttpContentCompressor())
.addLast(new HttpObjectAggregator(MAX_CONTENT_LENGTH))
.addLast(pool, applicationContext.getBean("opsHttpMessageHandler", OpsHttpMessageHandler.class));
}
});
try {
ChannelFuture cf = tcpBootStrap.bind(httpProperty.getBackport()).sync();
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
} finally {
factory.shutdownGracefully(false);
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Component
@Scope("prototype")
public class ServerBootstrapFactory {
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;

/**
* New server bootstrap server bootstrap.
*
* @param ioThreadCount the io thread count
* @return the server bootstrap
*/
public ServerBootstrap newServerBootstrap(int ioThreadCount) {
if (Epoll.isAvailable()) {
return newEpollServerBootstrap(ioThreadCount);
}

return newNioServerBootstrap(ioThreadCount);
}

/**
* Shutdown gracefully.
*
* @param shouldWait the should wait
*/
public void shutdownGracefully(boolean shouldWait) {
Future<?> workerFuture = workerGroup.shutdownGracefully();
Future<?> bossFuture = bossGroup.shutdownGracefully();

if (shouldWait) {
workerFuture.awaitUninterruptibly();
bossFuture.awaitUninterruptibly();
}
}

private ServerBootstrap newNioServerBootstrap(int ioThreadCount) {
if (ioThreadCount > 0) {
bossGroup = new NioEventLoopGroup(ioThreadCount);
workerGroup = new NioEventLoopGroup(ioThreadCount);
} else {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
}

return new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class);
}

private ServerBootstrap newEpollServerBootstrap(int ioThreadCount) {
if (ioThreadCount > 0) {
bossGroup = new EpollEventLoopGroup(ioThreadCount);
workerGroup = new EpollEventLoopGroup(ioThreadCount);
} else {
bossGroup = new EpollEventLoopGroup();
workerGroup = new EpollEventLoopGroup();
}

return new ServerBootstrap().group(bossGroup, workerGroup)
.channel(EpollServerSocketChannel.class);
}
}

请注意这一句代码,addLast(pool, applicationContext.getBean(“opsHttpMessageHandler”, OpsHttpMessageHandler.class))。我使用applicationContext来获取opsHttpMessageHandler这个bean,因为此处位于ChannelInitializer中,而ChannelInitializer本身并不是spring管理的bean,所以只能通过applicationContext来获取对应的bean。

上述代码中,我将server设置为线程,这样是为了能在一个程序中,同时监听多个端口。最后是整个程序的入口,使用ApplicationContext来获取server的bean,并使用ExecutorService 来启动server对应的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
@SpringBootApplication
public class CDNRouterServer {

private static Logger logger = LoggerFactory.getLogger(CDNRouterServer.class);

public static void main(String[] args) throws Exception {
ApplicationContext ctx = SpringApplication.run(CDNRouterServer.class, args);
ExecutorService service = Executors.newCachedThreadPool();
OpsHttpServer server = ctx.getBean("opsHttpServer", OpsHttpServer.class);
service.execute(server);
service.shutdown();
}
}

引入Spring-boot后,我们就可以很方便的引入spring-data-jpa来做数据库的访问了,而不需要再手动得写JDBC的程序,极大简化了数据库的访问。

ps: 小编是从python路转java的,念念不忘python世界中的sqlalchemy和Django中的ORM,实在是无法忍受jdbc写sql语句。得亏还有个jpa,虽然我个人认为jpa绝对无法跟sqlalchemy相比,但是有胜于无嘛,况且写起来也还是挺方便的。虽然,总觉得它缺胳膊少腿的……