您现在的位置是:亿华云 > 热点
用DataFrame作为容器分析市场行情指标
亿华云2025-10-03 20:30:42【热点】8人已围观
简介----------------------------------------------4/24/2019---------------------------------------------
----------------------------------------------4/24/2019--------------------------------------------------------------------
1. 新增方法addResultBar,作为指标可以在输入参数中定义需要对比的容器的范围,主要当前close和以后close的分析上涨下跌,和差值。市场
2. 新增方法resultOutput,行情根据对比值,作为指标输出检查点之后的容器走势,支持导出csv分析。分析
3. 修复一些比较机械的市场写法
新的代码请下面Github
https://github.com/BillyZhangGuoping/MarketDataAnaylzerbyDataFrame
--------------------------------------------------------------------------------------------------------------------------------------------------
之前在看一篇《基于阻力支撑相对强度(RSRS)的市场择时》(链接 https://mp.weixin.qq.com/s/LehwGXe_6JFZEZ8ekC4dCg)
尝试做为指标用在期货量化交易,效果不怎么理想。行情不过作者用DataFrame作为容器来分析指标,作为指标根据行情数据,容器分析指标效果,分析并进行验证让我学到不少。市场之前做量化交易策略设计时候只是行情靠第三方平台或者交易软件来验证,很不专业。
就尝试着用DataFrame作为容器分析市场行情指标,比如MACD,KDJ,CCI等等行情指标效用。这些行情指标,都可以从金融交易常用talib库获得。调出这样指标,结合行情数据分析,用DataFrame做为容器,充分利用python的云服务器数据分析和图片功能。
借花献佛,东抄西抄,做了一个小tool和示例代码。
定义了一个类DataAnalyzer初始化的时候需要输入生成csv文件导出文件夹地址,和数据格式
方法db2df,输入vnpy使用Mongodb行情数据库信息和读取品种信息collection,程序会读取指定品种的开始时间到结束时间段的一分钟k线数据;按照初始化格式返回生成DataFrame供分析;如果expeort2csv为True的话,会生成一个csv文件到指定地址.
方法csv2df,输入指定路径的csv行情文件;程序读取csv文件;按照格式返回生成DataFrame供分析;如果expeort2csv为True的话,会生成一个csv文件到指定地址。程序把导入的字符串转换datetime格式,此时可能会有warning信息。
方法df2Barmin,输入DataFrame格式1分钟行情数据,和指定输出的多分钟k线,程序整合出对应多分钟k线数据。比如输出1分钟行情数据,要求输出5分钟k先数据;程序输出5分钟K线信息DataFrame供分析。这里有点地方要注意,如果数据中一天开始第一个bar是9点,那么crossmin为1; 如果第一个是9点1分,此处为0。香港云服务器如果expeort2csv为True的话,会生成一个csv文件到指定地址。
方法dfcci,其实是一个示例方法,输入DataFrame格式分钟行情数据,此处可以是1分钟也可以是多分钟的,和参数cciWindows;程序调用talib的cci方法,进行计算。返回带有新的一列cci数据的DataFrame。用来分析。如果expeort2csv为True的话,会生成一个csv文件到指定地址。打开就是这样一个东西。其他talib方法,其实调用原理差不多
通过上面的类,可以获得一个带有指标数据的DataFrame,后面就可以对这个进行分析,后面是一系列演示分析,其实DataFrame对应的分析工具很多多。
通过类的源码库方法,读取一个rb1905行情数据,按照聚合出5分钟K线,在按照cci周期为15条K线计算cci值
画出cci的柱状分布图,CCI(Commodity Channel lndex)顺势指标是测量股价是否已超出常态分布范围的一个指数,波动于正无限大和负无限小之间。如下图x轴是cci值,y轴是出现次数。从图中可以看出cci数据是两个正太分布叠加,波峰在-80和+80两个值,正负200之后,cci出现就变的很少,此时可以用DataFrame的数字分析功能找到更多数据。
计算每个时间点的当前价格,和之后第5根K线结束价格差,和cci的值做成散点图,看看cci值和价格波动是否有联系。如下图,图象上看并没有上面关联。
cci值在正负(100-200)区间,和(200-300)区间算是出现比较少,计算在这个区间出现时,之后第2,第4,和第6根K线结束价格增多还是减少概率。从下图数据来看,似乎在(100,200)区间,第2个k线上涨概率大点点,但是也说明不了什么。
源代码如下,也可以去我的github去下载带测试行情数据的:
https://github.com/BillyZhangGuoping/MarketDataAnaylzerbyDataFrame
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 # encoding: UTF-8 from pymongo import MongoClient, ASCENDING import pandas as pd import numpy as np from datetime import datetime import talib import matplotlib.pyplot as plt import scipy.stats as scs class DataAnalyzer(object): def __init__(self, exportpath="C:\Project\\", datformat=[datetime, high, low, open, close,volume]): self.mongohost = None self.mongoport = None self.db = None self.collection = None self.df = pd.DataFrame() self.exportpath = exportpath self.datformat = datformat def db2df(self, db, collection, start, end, mongohost="localhost", mongoport=27017, export2csv=True): """读取MongoDB数据库行情记录,输出到Dataframe中""" self.mongohost = mongohost self.mongoport = mongoport self.db = db self.collection = collection dbClient = MongoClient(self.mongohost, self.mongoport, connectTimeoutMS=500) db = dbClient[self.db] cursor = db[self.collection].find({ datetime: { $gte: start}, datetime: { $lt: end}}).sort("datetime", ASCENDING) self.df = pd.DataFrame(list(cursor)) self.df = self.df[self.datformat] self.df = self.df.reset_index(drop=True) path = self.exportpath + self.collection + ".csv" if export2csv == True: self.df.to_csv(path, index=True, header=True) return self.df def csv2df(self, csvpath, dataname="csv_data", export2csv=True): """读取csv行情数据,输入到Dataframe中""" csv_df = pd.read_csv(csvpath) self.df = csv_df[self.datformat] self.df["datetime"] = pd.to_datetime(self.df[datetime]) # self.df["high"] = self.df[high].astype(float) # self.df["low"] = self.df[low].astype(float) # self.df["open"] = self.df[open].astype(float) # self.df["close"] = self.df[close].astype(float) # self.df["volume"] = self.df[volume].astype(int) self.df = self.df.reset_index(drop=True) path = self.exportpath + dataname + ".csv" if export2csv == True: self.df.to_csv(path, index=True, header=True) return self.df def df2Barmin(self, inputdf, barmins, crossmin=1, export2csv=True): """输入分钟k线dataframe数据,合并多多种数据,例如三分钟/5分钟等,如果开始时间是9点1分,crossmin = 0;如果是9点1分,crossmin为1""" dfbarmin = pd.DataFrame() highBarMin = 0 lowBarMin = 0 openBarMin = 0 volumeBarmin = 0 datetime = 0 for i in range(0, len(inputdf) - 1): bar = inputdf.iloc[i, :].to_dict() if openBarMin == 0: openBarmin = bar["open"] if highBarMin == 0: highBarMin = bar["high"] else: highBarMin = max(bar["high"], highBarMin) if lowBarMin == 0: lowBarMin = bar["low"] else: lowBarMin = min(bar["low"], lowBarMin) closeBarMin = bar["close"] datetime = bar["datetime"] volumeBarmin += int(bar["volume"]) # X分钟已经走完 if not (bar["datetime"].minute + crossmin) % barmins: # 可以用X整除 # 生成上一X分钟K线的时间戳 barMin = { datetime: datetime, high: highBarMin, low: lowBarMin, open: openBarmin, close: closeBarMin, volume : volumeBarmin} dfbarmin = dfbarmin.append(barMin, ignore_index=True) highBarMin = 0 lowBarMin = 0 openBarMin = 0 volumeBarmin = 0 if export2csv == True: dfbarmin.to_csv(self.exportpath + "bar" + str(barmins) + ".csv", index=True, header=True) return dfbarmin def dfcci(self, inputdf, n, export2csv=True): """调用talib方法计算CCI指标,写入到df并输出""" dfcci = inputdf dfcci["cci"] = None for i in range(n, len(inputdf)): df_ne = inputdf.loc[i - n + 1:i, :] cci = talib.CCI(np.array(df_ne["high"]), np.array(df_ne["low"]), np.array(df_ne["close"]), n) dfcci.loc[i, "cci"] = cci[-1] dfcci = dfcci.fillna(0) dfcci = dfcci.replace(np.inf, 0) if export2csv == True: dfcci.to_csv(self.exportpath + "dfcci" + ".csv", index=True, header=True) return dfcci if __name__ == __main__: DA = DataAnalyzer() #数据库导入 # start = datetime(year=2019, month=3, day=1, hour=0, minute=0, second=0, microsecond=0) # end = datetime.today() # df = DA.db2df(db="VnTrader_1Min_Db", collection="rb1905", start = start, end = end) #csv导入 df = DA.csv2df("rb1905.csv") df10min = DA.df2Barmin(df,5) dfaftercci = DA.dfcci(df10min, 15) dfaftercci = dfaftercci.loc[15:,:] dfaftercci = dfaftercci.reset_index(drop=True) #######################################分析cci分布######################################## plt.figure(figsize=(15,5)) plt.hist(dfaftercci[cci],bins=100,histtype=bar,align=mid,orientation=vertical,color=r) plt.show() sta = scs.describe(dfaftercci.cci) stew = sta[4] kurtosis = sta[5] print(cci的偏度:%s % (stew)) print(cci的峰度:%s % (kurtosis)) #######cci在(100 - 200),(200 -300)后的第2根,第4根,第6根的价格走势###################### dfaftercci["next2BarClose"] = None dfaftercci["next4BarClose"] = None dfaftercci["next6BarClose"] = None dfaftercci["next5BarCloseMakrup"] = None for i in range(1, len(dfaftercci)-6): if dfaftercci.loc[i,"close"] > dfaftercci.loc[i+2,"close"]: dfaftercci.loc[i,"next2BarClose"] = -1 else: dfaftercci.loc[i, "next2BarClose"] =1 if dfaftercci.loc[i,"close"] > dfaftercci.loc[i+4,"close"]: dfaftercci.loc[i, "next4BarClose"] = -1 else: dfaftercci.loc[i, "next4BarClose"] = 1 if dfaftercci.loc[i,"close"] > dfaftercci.loc[i+6,"close"]: dfaftercci.loc[i, "next6BarClose"] = -1 else: dfaftercci.loc[i, "next6BarClose"] = 1 #######计算###################### dfaftercci.loc[i,"next5BarCloseMakrup"] = dfaftercci.loc[i+5,"close"] - dfaftercci.loc[i,"close"] dfaftercci = dfaftercci.fillna(0) plt.figure(figsize=(15, 3)) plt.scatter(dfaftercci["cci"], dfaftercci["next5BarCloseMakrup"]) plt.show() for cciValue in [100,200]: de_anaylsis = dfaftercci.loc[(dfaftercci["cci"]>= cciValue)& (dfaftercci["cci"]< cciValue + 100)] percebtage = de_anaylsis[de_anaylsis["next2BarClose"]>0]["next2BarClose"].count()*100.000/de_anaylsis[cci].count() print(在cci 区间(%s , %s) 时候,第二根K线结束价格上涨概率为 %s%% %(cciValue,cciValue + 100,percebtage)) percebtage = de_anaylsis[de_anaylsis["next2BarClose"]<0]["next2BarClose"].count()*100.000/de_anaylsis[cci].count() print(在cci 区间-(%s , %s) 时候,第二根K线结束价格下跌概率为 %s%% %(cciValue,cciValue + 100,percebtage)) percebtage = de_anaylsis[de_anaylsis["next4BarClose"] > 0]["next2BarClose"].count() * 100.000 / de_anaylsis[ cci].count() print(在cci 区间(%s , %s) 时候,第四根K线结束价格上涨概率为 %s%% % (cciValue, cciValue + 100, percebtage)) percebtage = de_anaylsis[de_anaylsis["next4BarClose"] < 0]["next2BarClose"].count() * 100.000 / de_anaylsis[ cci].count() print(在cci 区间-(%s , %s) 时候,第四根K线结束价格下跌概率为 %s%% % (cciValue, cciValue + 100, percebtage)) percebtage = de_anaylsis[de_anaylsis["next6BarClose"] > 0]["next2BarClose"].count() * 100.000 / de_anaylsis[ cci].count() print(在cci 区间(%s , %s) 时候,第六根K线结束价格上涨概率为 %s%% % (cciValue, cciValue + 100, percebtage)) percebtage = de_anaylsis[de_anaylsis["next6BarClose"] < 0]["next2BarClose"].count() * 100.000 / de_anaylsis[ cci].count() print(在cci 区间-(%s , %s) 时候,第六根K线结束价格下跌概率为 %s%% % (cciValue, cciValue + 100, percebtage))很赞哦!(4)