对商业智能 BI, 大数据分析挖掘, 机器学习, python,R 等数据领域感兴趣的同学加微信: tstoutiao, 邀请你进入数据爱好者交流群, 数据爱好者们都在这儿.
作者: 量化小白 H
个人公众号: 量化小白上分记
本文是对报告《20100210 - 华泰证券 - 数量化策略: 大小盘轮动》,《20161218 - 中金公司 - 价量视角下的市值风格轮动策略》部分内容的复制, 文章为个人理解, 不保证正确性, 请理性看待, 欢迎指正.
大小盘轮动
A 股市场上存在着明显的大小盘轮动的现象, 一段时间大盘表现强势, 一段时间小盘表现强势, 所谓二八轮动. 这种现象提供了构建大小盘轮动策略的可能, 目前常见的两种构建大小盘轮动策略的方式分别为
宏观层面出发, 分析影响市场价格变动的政策, 利率, GDP, 供需数据等, 以此构建模型分析市场当前的大小盘风格;
技术角度出发, 通过量价数据构建指标, 分析市场当前的大小盘风格.
两种方式各有优劣, 本文复制的两篇报告均属于第二种方式.
左侧交易 & 右侧交易
以交易时点划分, 可以将市场上的交易行为划分为两类: 左侧交易与右侧交易.
左侧交易: 在价格即将达到某个支撑点时逆向进入市场, 做反转.
右侧交易: 在价格走出趋势之后进入市场, 做动量, 也就是常说的追涨杀跌.
本文选取的两种策略分别属于这两种策略, 比较说明在同一指标下, 这两种交易行为的优劣, 但不具有普遍性.
数据提取
大盘指数: HS300
小盘指数: ZZ500
回测区间: 2015 年 1 月 1 日 - 2018 年 12 月 2 日
- import pandas as pd
- import numpy as np
- from matplotlib import pyplot as plt
- from matplotlib.font_manager import FontProperties
- font = FontProperties(fname=r'c:\windows\fonts\simsun.ttc',size = 20)
- from WindPy import *
- w.start()
- dateStart = date(2005,1,1)
- dateEnd = date(2018,12,2)
- IndexClose = w.wsd("000300.SH,000905.SH", "close", "{}".format(dateStart), "{}".format(dateEnd), "")
- Indexprice = pd.DataFrame(np.array(IndexClose.Data).T,columns=['hs300','ZZ500'],index = IndexClose.Times)
- Indexprice['P'] = np.log(Indexprice.ZZ500) - np.log(Indexprice.hs300)
- X = np.arange(Indexprice.shape[0])
- xticklabel = Indexprice.index
- xticks = np.arange(0,Indexprice.shape[0]+1,np.int((Indexprice.shape[0]+1)/5))
- plt.figure(figsize = [20,4])
- SP = plt.axes()
- P1 = SP.plot(X,Indexprice.hs300,linewidth = 2,label = 'HS300',color = 'darkred')
- P2 = SP.plot(X,Indexprice.ZZ500,linewidth = 2,label = 'ZZ500',color = 'cornflowerblue')
- SP1 = SP.twinx()
- P3 = SP1.plot(X,Indexprice['P'],color = 'orange',linewidth = 2,label = u'相对强弱指数')
- SP.set_xticks(xticks)
- SP.set_xticklabels(xticklabel[xticks],size = 20)
- p = P1+P2+P3
- lns = [l.get_label() for l in p]
- plt.legend(p,lns,prop=font)
- plt.show()
- def updownbound(data,n,c):
- data['upbound'] = data['P'].rolling(Windows = n, center = False).mean() + c * data['P'].rolling(Windows = n, center = False).std()
- data['upbound'] = data['upbound'].shift(1)
- data['downbound'] = data['P'].rolling(Windows = n, center = False).mean() - c * data['P'].rolling(Windows = n, center = False).std()
- data['downbound'] = data['downbound'].shift(1)
- for i in range(2,n):
- data.loc[i,'upbound'] = data.loc[0:i - 1,'P'].mean() + c * data.loc[0:i - 1,'P'].std()
- data.loc[i,'downbound'] = data.loc[0:i - 1,'P'].mean() - c * data.loc[0:i - 1,'P'].std()
- data.loc[0,'upbound'] = 0
- data.loc[0,'downbound'] = 0
- data.loc[1,'upbound'] = data.loc[0,'P']
- data.loc[1,'downbound'] = data.loc[0,'P']
- return(data)
- def strategy(data):
- data['flag_hs'] = 0
- data['flag_zz'] = 0
- data['ret_strategy'] = 0
- data['ret_zz'] = data['ZZ500'].pct_change(1)
- data['ret_hs'] = data['hs300'].pct_change(1)
- data.loc[0,'ret_zz'] = 0
- data.loc[0,'ret_hs'] = 0
- for i in range(1,data.shape[0] - 1):
- # 上穿下轨, 做多小盘, 卖出大盘
- if data.ma10[i - 1] <data.downbound[i - 1] and data.ma10[i]> data.downbound[i]:
- data.loc[i + 1 ,'flag_zz'] = 1
- data.loc[i + 1 ,'flag_hs'] = 0
- # data.loc[i + 1,'net_zz'] = data.loc[i ,'net_zz']*(1 + data.loc[i + 1,'ret_zz'])
- # data.loc[i + 1,'net_hs'] = data.loc[i ,'net_hs']*(1 - data.loc[i + 1,'ret_hs'])
- # 下穿上轨, 做多大盘, 卖出小盘
- elif data.ma10[i - 1]> data.upbound[i - 1] and data.ma10[i] <data.upbound[i]:
- data.loc[i + 1 ,'flag_zz'] = 0
- data.loc[i + 1 ,'flag_hs'] = 1
- # data.loc[i + 1,'net_zz'] = data.loc[i ,'net_zz']*(1 - data.loc[i + 1,'ret_zz'])
- # data.loc[i + 1,'net_hs'] = data.loc[i ,'net_hs']*(1 + data.loc[i + 1,'ret_hs'])
- else:
- data.loc[i + 1 ,'flag_zz'] = data.loc[i ,'flag_zz']
- data.loc[i + 1 ,'flag_hs'] = data.loc[i ,'flag_hs']
- data.loc[data['flag_zz'] ==1,'ret_strategy'] = data.loc[data['flag_zz'] ==1,'ret_zz']
- data.loc[data['flag_hs'] ==1,'ret_strategy'] = data.loc[data['flag_hs'] ==1,'ret_hs']
- data['net_strategy_zz'] = (1 + data['flag_zz']*data['ret_zz']).cumprod()
- data['net_strategy_hs'] = (1 + data['flag_hs']*data['ret_hs']).cumprod()
- data['net_strategy'] = (1 + data['ret_strategy']).cumprod()
- return data
- def strategy1(data,N1,N2,fee_ratio = 0):
- data['flag_hs'] = 0
- data['flag_zz'] = 0
- data['ret_strategy'] = 0
- data['buysell'] = 0
- data['ret_zz'] = data['ZZ500'].pct_change(1).fillna(0)
- data['ret_hs'] = data['hs300'].pct_change(1).fillna(0)
- for i in range(max(N1,N2),data.shape[0] - 1):
- # 创新高, 做多小盘
- if data.P[i]>= np.max(data.P[i - N1:i]):
- data.loc[i + 1 ,'flag_zz'] = 1
- data.loc[i + 1 ,'flag_hs'] = 0
- # data.loc[i + 1,'net_zz'] = data.loc[i ,'net_zz']*(1 + data.loc[i + 1,'ret_zz'])
- # data.loc[i + 1,'net_hs'] = data.loc[i ,'net_hs']*(1 - data.loc[i + 1,'ret_hs'])
- # 创新低, 做多大盘
- elif data.P[i] <= np.min(data.P[i - N2:i]):
- data.loc[i + 1 ,'flag_zz'] = 0
- data.loc[i + 1 ,'flag_hs'] = 1
- # data.loc[i + 1,'net_zz'] = data.loc[i ,'net_zz']*(1 - data.loc[i + 1,'ret_zz'])
- # data.loc[i + 1,'net_hs'] = data.loc[i ,'net_hs']*(1 + data.loc[i + 1,'ret_hs'])
- else:
- data.loc[i + 1 ,'flag_zz'] = data.loc[i ,'flag_zz']
- data.loc[i + 1 ,'flag_hs'] = data.loc[i ,'flag_hs']
- data.loc[data['flag_zz'] ==1,'ret_strategy'] = data.loc[data['flag_zz'] ==1,'ret_zz']
- data.loc[data['flag_hs'] ==1,'ret_strategy'] = data.loc[data['flag_hs'] ==1,'ret_hs']
- data.loc[data['flag_hs'] != data.flag_hs.shift(1),'buysell'] = 1
- data.loc[0,'buysell'] = 0
- data['ret_strategy'] = data['ret_strategy'] - fee_ratio*data['buysell']
- #data['net_strategy_zz'] = (1 + data['flag_zz']*data['ret_zz']).cumprod()
- #data['net_strategy_hs'] = (1 + data['flag_hs']*data['ret_hs']).cumprod()
- data['net_strategy'] = (1 + data['ret_strategy']).cumprod()
- return data
来源: http://www.jianshu.com/p/4bc693394e60