Netty+webSocket 获取火币交易所数据项目
先附上项目项目 GitHub 地址
项目简介
本项目使用 SpringBoot+Netty 来开发 WebSocket 服务器, 与火币交易所 Websocket 建立连接, 时时获取火币网交易所推送过来的交易对最新数据.
该项目可以直接运用于实际开发中, 做为获取各大交易所最新交易对相关数据的项目.
项目本身也是我在之前公司为了获取各大交易所数据所开发的项目, 现在只是重新整理了下代码, 现在它更像一个脚手架项目, 可以在此基础上很方便的添加其它交易所.
技术架构
SpringBoot2.1.5 +Netty4.1.25 + Maven3.5.4 + lombok(插件)
项目测试
直接启动 Springboot 启动类 Application.java, 就可以时时获取火币网推送过来交易对的数据了.
如图
一, 项目概述
1, 项目启动入口
在项目启动的时候就开始去连接火币交易所 Websocket 订阅数据.
- /**
- * 首次启动并订阅火币 websocket 数据
- */
- @PostConstruct
- public void firstSub() {
- try {
- huobiProMainService.start();
- } catch (Exception e) {
- log.error("huobi 首次启动订阅异常", e);
- }
- }
2, 获取交易对数据
我们是先要获取火币交易所所有的交易对数据, 然后告诉火币交易所我需要订阅哪些交易对数据.
是订阅所有交易对数据还是订阅部分交易对数据.
- @Override
- public synchronized List<String> getChannelCache() {
- // 假设这里是从远处拉取交易对数据
- List<String> list = Lists.newArrayList("btcusdt");
- return list;
- }
3, 连接火币交易所 Websocket, 并订阅指定的交易对.
先与火币网 WebSocket 建立连接, 连接成功后再告诉它我要订阅哪些交易对, 哪种主题, 成功后, 火币交易所就会根据我们所订阅的主题和交易对, 给我们时时推送消息.
- /**
- * 首次订阅交易对数据
- *
- * @param channelList 交易对列表
- * @param topicFormat 交易对订阅主题格式
- */
- private void firstSub(List<String> channelList, String topicFormat) {
- // 封装 huoBiProWebSocketService 对象
- klineClient = new HuoBiProWebSocketClient(huoBiProWebSocketService);
- // 启动连接火币网 websocket
- klineClient.start();
- for (String channel : channelList) {
- // 订阅具体交易对
- klineClient.addSub(formatChannel(topicFormat, channel));
- }
- }
启动连接火币网 websocket 核心代码
很明显我们我们是作为客户端去获取服务端的数据, 所以这里的 Bootstrap 来与服务端进行数据交互, 而不是用 ServerBootstrap.
还有一点就是作为客户端, 我们是要获取服务端所推送来的消息, 所以我们自定义的 handler 是入站 Handler, 所以这里选择的是 SimpleChannelInboundHandler.
- /**
- * 连接 WebSocket,
- *
- * @param uri url 构造出 URI
- * @param handler 处理消息
- */
- protected void connectWebSocket(final URI uri, SimpleChannelInboundHandler handler) {
- try {
- String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
- final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
- final int port;
- if (uri.getPort() == -1) {
- if ("http".equalsIgnoreCase(scheme) || "ws".equalsIgnoreCase(scheme)) {
- port = 80;
- } else if ("wss".equalsIgnoreCase(scheme)) {
- port = 443;
- } else {
- port = -1;
- }
- } else {
- port = uri.getPort();
- }
- if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
- System.out.println("Only WS(S) is supported");
- throw new UnsupportedAddressTypeException();
- }
- final boolean ssl = "wss".equalsIgnoreCase(scheme);
- final SslContext sslCtx;
- if (ssl) {
- sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
- } else {
- sslCtx = null;
- }
- group = new NioEventLoopGroup(2);
- // 构建客户端 Bootstrap
- Bootstrap Bootstrap = new Bootstrap();
- Bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- if (sslCtx != null) {
- pipeline.addLast(sslCtx.newHandler(ch.alloc(), host, port));
- }
- //pipeline 可以同时放入多个 handler, 最后一个为自定义 hanler
- pipeline.addLast(new HttpClientCodec(), new HttpObjectAggregator(8192), handler);
- }
- });
- channel = Bootstrap.connect(host, port).sync().channel();
- } catch (Exception e) {
- log.error("webSocketClient start error.", e);
- if (group != null) {
- group.shutdownGracefully();
- }
- }
- }
4, 自定义 handler
自定义 Handler 才是核心, 作为数据的入站这里选择继承 SimpleChannelInboundHandler, 继承它必须要实现一个方法就是 channelRead0, 通过该方法的 msg, 就可以获取火币交易所时时推送过来的消息了.
- /**
- * @Description: 火币网 WebSocket 消息处理类
- * 自定义入站的 handler 这个也是核心类
- */
- @Slf4j
- public class HuoBiProWebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
- private WebSocketClientHandshaker handshaker;
- private HuoBiProWebSocketClient client;
- /**
- * 该 handel 获取消息的方法
- */
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
- Channel channel = ctx.channel();
- WebSocketFrame frame = (WebSocketFrame) msg;
- if (frame instanceof BinaryWebSocketFrame) {
- // 火币网的数据是压缩过的, 所以需要我们进行解压
- BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
- // 获取数据, 保存数据
- client.onReceive(decodeByteBuf(binaryFrame.content()));
- } else if (frame instanceof TextWebSocketFrame) {
- TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) frame;
- client.onReceive(textWebSocketFrame.text());
- }
- }
- }
二, 项目注意点
1, 服务器问题
一般交易所的服务器都在国外, 所以我们本地是无法建立 Websocket 连接的, 除非本地 FQ.
同样项目也不能部署到阿里云等国内服务器, 你只能选择香港或者国外服务器部署项目.
这里是火币网专门为我们提供的国内测试地址, 所以本地可以获取数据.
2, 获取交易所最新交易对数据问题
我们在向交易所 Websocket 订阅交易对的时候, 首先就是要知道该交易所有哪些交易对, 这份数据是需要我们单独去获取的, 而且不是一次获取就好了.
因为该交易所可能新增或者删除交易对. 所以需要我们通过定时任务去获取更新最新的交易对数据.
我这边只是模拟了一个交易对 btcusdt, 并没有提供获取最新交易对数据的服务.
3, 数据存储问题
这也是最值得思考的一个问题, 数据我们是获取了, 但如果保存!
正常合理的开发应该获取数据是一个微服务, 处理获取的数据是一个微服务. 那么只需要获取数据后去调处理数据微服务就可以保存数据了.
但在这里, 如果只是这样是行不通的.
因为火币网向我们推送的消息的速度会比我们调其它服务保存的数据要快, 这就会存在数据丢失的情况发生.
这里仅仅是输出一个 btcusdt 交易对, 并且只是订阅一个 k 线主题, 而实际上交易所会有上百个交易对和几种订阅主题,
这样的消息推送速度是上面的几百倍. 所以你会发现如果你不做任何改动, 对于一些大的交易所而言, 你的数据是来不及存储的.
补充
这边之前也写过有关 Netty 和 Websocket 相关的博客文章, 可以做个参考
1,Netty 专题 (共 9 篇)
2,Websocket 专题 (共 5 篇)
只要自己变优秀了, 其他的事情才会跟着好起来 (上将 11)
posted on 2019-08-01 21:18 雨点的名字 阅读 (...) 评论 (...) 编辑 收藏
来源: https://www.cnblogs.com/qdhxhz/p/11280533.html