We investigate the results in [1] where-by holding-period returns are predictable using fundamental data. The fundamental data and the price data are from Sharadar data sets available from Quandl. We utilize Apache Spark for data ingestion and regression model fitting, followed by Quantopian's Zipline for strategy back-testing.
[1] Lyle, M. R. and C. C. Wang (2015). The cross section of expected holding period returns and their dynamics: A present value approach. Journal of Financial Economics 16 (3), 505–525.
from datetime import date, datetime, timedelta
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import abs, lead, lag, log, min, max, row_number, when, ntile
from pyspark.sql.types import *
from pyspark.ml.linalg import DenseVector
from pyspark.ml.regression import LinearRegression
import pandas as pd
import numpy as np
import pytz
from zipline.api import symbol, order, record, set_benchmark, get_datetime, order_target_percent
from zipline.utils.run_algo import run_algorithm
from zipline.finance.trading import TradingEnvironment
from itertools import izip, tee
from functools import partial
import matplotlib.pyplot as plt
%matplotlib inline
DATE_FORMAT = '%Y-%m-%d'
SHARADAR_PRICE_SCHEMA = StructType([
StructField('ticker', StringType()),
StructField('date', DateType()),
StructField('open', FloatType()),
StructField('high', FloatType()),
StructField('low', FloatType()),
StructField('close', FloatType()),
StructField('volume', FloatType()),
StructField('dividends', FloatType()),
StructField('lastupdated', DateType()),
# price field is actually mid at close
])
The size of the datasets we are working with is as follows.
%ls -oah '/Users/unclechu/Dropbox/Datasets/Sharadar/SHARADAR_Equity_Prices.csv'
%ls -oah '/Users/unclechu/Dropbox/Datasets/2F_Fundamental_Model_Data/SHARADAR_SF1_ARQ.csv'
%ls -oah '/Users/unclechu/Dropbox/Datasets/2F_Fundamental_Model_Data/FF_48_industry_tickers.csv'
holdingDays = 21
required_cols = ['ticker', 'datekey', 'eps', 'bvps', 'pb', 'marketcap' ]
showCols = ['ticker', 'datekey', 'eps', 'bvps', 'prev_qtr_bvps',
'roe', 'log_roe', 'pb', 'bm', 'log_bm']
min_date = '2007-01-03'
max_date = '2013-12-31'
priceCSVPath = '/Users/unclechu/Dropbox/Datasets/Sharadar/SHARADAR_Equity_Prices.csv'
fundaCSVPath = '/Users/unclechu/Dropbox/Datasets/2F_Fundamental_Model_Data/SHARADAR_SF1_ARQ.csv'
sicDataPath = '/Users/unclechu/Dropbox/Datasets/2F_Fundamental_Model_Data/FF_48_industry_tickers.csv'
First we load the industry data for Fama/French 48 industry classification.
sic_df = spark.read.csv(sicDataPath, header=True)
sic_df.show( truncate=50 )
Next we load the price data.
price_df = spark.read.csv(priceCSVPath, schema=SHARADAR_PRICE_SCHEMA, header=True, dateFormat='yyyy-MM-dd')
price_df = price_df.select( 'date', 'ticker', 'close').withColumnRenamed('close', 'mid')
windowSpec = Window.partitionBy('ticker').orderBy('date')
price_df = price_df.withColumn('prc_next', lead(price_df['mid'], 1).over(windowSpec))
price_df = price_df.withColumn('prc_fwd', lead(price_df['mid'], holdingDays + 1).over(windowSpec))
price_df = price_df.withColumn('ret', ( price_df['prc_fwd'] - price_df['prc_next'] ) / price_df['prc_next'] )
price_df = price_df.withColumn('log_ret', log(price_df['prc_fwd']) - log(price_df['prc_next'] ) )
We batch compute, for each date $d$, the return (ret column) from buying at mid-quote at close on $d+1$ (prc_next column), and closing out the position at $d+21$ (the prc_fwd column).
For example, as we can see below, for the date 2007-01-03, the return from buying on 2007-01-04 (at price of 9.041552) and closing out on 2007-02-05 (at price of 9.555308), is 0.056821.
With this data ready, we can merge it to the fundamentals data prepared in the next step.
price_df.filter(price_df.ticker=='BREW').show(10)
We compute the ROE, for each stock as,
$$ ROE_t = 1 + \frac{EPS_t}{ BVPS_{t-1}} $$meaning the ROE for this quarter is 1 plus the EPS (earnings-per-share) of this quarter divided by the BVPS (book-value-per-share) of the previous quarter.
For book-to-market, we infer it from the price-to-book data from the fundamentals data set.
We also filter out rows with negative values for ROE and BM.
df = spark.read.csv(fundaCSVPath, header=True).select(required_cols)
df = df.withColumn( 'datekey', df.datekey.cast('date'))
df = df.withColumnRenamed('datekey', 'date')
df = df.filter( df.date.between(min_date, max_date) )
df = df.withColumn('prev_qtr_bvps', lag(df['bvps']).over(windowSpec) )
df = df.withColumn('roe', 1. + df['eps']/df['prev_qtr_bvps'])
df = df.withColumn('bm', 1./df.pb)
# filter negative values in the roe and bm columns
df = df.filter((df.roe > 0.0) & (df.bm > 0.0) )
df = df.withColumn('log_bm', log(df['bm']))
df = df.withColumn('log_roe', log(df['roe']))
We show a sample of the ROE, BM, etc. fundamentals data.
df.filter(df.ticker=='BREW').show(10, 10)
We next join the ROE and BM data with the 21-day holding period return data so that for each earnings date, we have the $\log{BM}$ and $\log{ROE}$ values and the corresponding return. Our thesis is that the two factors $\log{BM}$ and $\log{ROE}$ predicts the holding period return. We will use these two factors in a linear regression model, train the model, and use it as a signal for our trading strategy.
input_df = df.join(price_df, on=['date', 'ticker'], how='left')
input_df = input_df.dropna()
input_df = input_df.withColumn( 'rn', ntile(2).over(windowSpec) )
input_df.filter(input_df.ticker=='BREW').dropna()\
.select('date', 'ticker', 'log_bm', 'log_roe', 'ret', 'rn').show(10, 10)
In the above, we have computed row numbers for each ticker ordered by date, and with this we can split the data into half, using each half as training set and test set respectively.
We next train the model using the first half of the data. There is a separate regression model for each of the industry group in Fama and French's 48 industry classification.
def fit_1f_model(train_df):
def f(row):
x1, x2, y = row
return ( DenseVector([x2]), y )
regression_df = spark.createDataFrame( train_df.rdd.map( f ),
['features', 'label'] )
lr = LinearRegression(labelCol='label')
model = lr.fit(regression_df)
return model
def winsorize_data( df ):
# winsorize top and bottom 0.5% or regressors
q_bm = df.approxQuantile('log_bm', [0.005, 0.995], 0.0)
q_roe = df.approxQuantile('log_roe', [0.005, 0.995], 0.0)
#print q_bm, q_roe
df = df.withColumn('log_bm', when(df['log_bm'] < q_bm[0], q_bm[0]) \
.otherwise(df['log_bm']) )
df = df.withColumn('log_bm', when(df['log_bm'] > q_bm[1], q_bm[1]) \
.otherwise(df['log_bm']) )
df = df.withColumn('log_roe', when(df['log_roe'] < q_roe[0], q_roe[0]) \
.otherwise(df['log_roe']) )
df = df.withColumn('log_roe', when(df['log_roe'] > q_roe[1], q_roe[1]) \
.otherwise(df['log_roe']) )
df = df.withColumn('log_ret', when(df['log_ret'] > 0.3, 0.3) \
.otherwise(df['log_ret']) )
df = df.withColumn('log_ret', when(df['log_ret'] < -0.3, -0.3) \
.otherwise(df['log_ret']) )
return df
def fit_2f_model(train_df):
train_df = winsorize_data( train_df )
def f(row):
x1, x2, x3 = row
return ( DenseVector([x1, x2]), x3 )
regression_df = spark.createDataFrame( train_df.rdd.map( f ),
['features', 'label'] )
lr = LinearRegression(labelCol='label')
model = lr.fit(regression_df)
return model
modelsByIndustry = {}
for row in sic_df.collect():
industry, _, tickers = row
tickers = tickers.split('|')
ind_train_df = input_df.filter( ( input_df.ticker.isin(tickers) ) & (input_df.rn==1 ) ) \
.select( 'log_bm', 'log_roe', 'log_ret')
modelsByIndustry[industry] = fit_2f_model( ind_train_df )
def summarize_models( modelsByIndustry ):
data = []
countByInd = {}
for row in sic_df.collect():
industry, _, tickers = row
countByInd[industry] = len(tickers.split('|'))
for ind, model in modelsByIndustry.items():
data.append( (ind, countByInd[ind], model.coefficients, model.intercept,
model.summary.rootMeanSquaredError, model.summary.r2 ) )
return pd.DataFrame( data, columns=['industry', 'size', 'coeffs', 'intercept', 'RMSE', '$R^2$'] )
summarize_models(modelsByIndustry).sort('$R^2$', ascending=False)
Next we prepare the price data for strategy back testing. For each ticker, we create a pandas dataframe for passing into the zipline backtester.
def showData(ind):
tickers = sic_df.filter( sic_df.industry==ind ).select( 'tickers' ).head().tickers
tickers = tickers.split('|')
data = input_df.filter( ( input_df.ticker.isin(tickers) ) & (input_df.rn == 1 ) )\
.select( 'log_bm', 'log_roe', 'log_ret')
data = winsorize_data(data)
x1, x2, y = [], [], []
for lbm, lroe, lret in data.collect():
x1.append(lbm)
x2.append(lroe)
y.append(lret)
plt.subplot(211)
plt.scatter(x1, y)
plt.subplot(212)
plt.scatter(x2, y)
showData('Recreation')
Next we choose the at random the Beer industry and perform a back test of the model using the train dataset (i.e. the rows with industry=='Beer' and rn=2).
INDUSTRY = 'Beer'
TICKERS = sic_df.filter( sic_df.industry==INDUSTRY ).select( 'tickers' ).head().tickers.split('|')
print 'List of tickers =', TICKERS
Now prepare the data we need for back testing. For every ticker in Beer industry we extract the daily close price into pandas dataframes.
priceBySym = price_df.filter( price_df.ticker.isin(TICKERS) ) \
.rdd.map(lambda r:(r.ticker, [(r.date, r.mid)]))\
.reduceByKey( lambda x, y : x + y )\
.collectAsMap()
data = {}
for k, v in priceBySym.items():
idx, vals = zip(*v)
close_prices = pd.DataFrame( { 'close' : vals }, pd.DatetimeIndex(idx) )
data[k]=close_prices
We also group all the tickers by their report date, and for each report date we collect the log_bm and log_roe info so that we can apply the model to make predictions on whether we should go long/short the next trading day after earnings announcement.
# prepare the relevant tickers for each report date
g = lambda r : ( r.date, [(r.ticker, r.log_bm, r.log_roe)] )
# create factors by date by ticker
res = input_df.filter( input_df.ticker.isin(TICKERS) ) \
.rdd.map( g ) \
.reduceByKey( lambda x, y : x + y ) \
.collectAsMap()
factorsByDateBySym = {}
for k, v in res.items():
d = factorsByDateBySym.setdefault(k, {})
for ticker, log_bm, log_roe in v:
d[ticker] = (log_bm, log_roe)
Now we implement the buyNextDay and closeOut logic to be invoked by the zipline back-test engine.
def buyNextDay( context, data ):
today = get_datetime().date()
model = context.model
# if there are companies reporting for today, get the symbols list
if today in context.factorsByDateBySym.keys():
factorsBySym = context.factorsByDateBySym[today]
if context.ticker in factorsBySym:
factors = factorsBySym[context.ticker]
signal = np.array(factors+(1,)).dot(np.append(model.coefficients.array, model.intercept))
price = data[symbol(context.ticker)].price
# note, we don't have to use symsToBuyNextDay next day logic below because
# for daily simulation in zipline, any order issued at t will execute with t+1 price
# see https://www.quantopian.com/posts/at-jessica-stauth-question-about-trade-at-specified-time
if signal > 0.0:
order(symbol(context.ticker), context.portfolio.cash/price)
#context.symsToBuyNextDay.append((sym, 1.))
elif signal < 0.0:
order(symbol(context.ticker), -context.portfolio.cash/price)
#context.symsToBuyNextDay.append((sym, -1.))
# track age of position, so that we can liquidate after 21 days
context.posAgeBySym[context.ticker] = 0
def closeOut(context):
# close out positions for those that reached 21 days
# update position age
for sym in context.posAgeBySym:
context.posAgeBySym[context.ticker] += 1
today = get_datetime().date()
for asset, pos in context.portfolio.positions.items():
# use holdingDays - 1 below to work around the next day order issue again
if context.posAgeBySym[asset.symbol] == holdingDays-1:
order(asset, -pos.amount)
Below we have the logic for the back test helper class.
class BackTest(object):
def __init__(self, industry, closeBySym, modelsByIndustry, sic_df, input_df):
self.industry = industry
tickers = sic_df.filter( sic_df.industry==industry ).select( 'tickers' ).head().tickers
self.tickers = tickers.split('|')
self.model = modelsByIndustry[industry]
self.closeBySym = dict( ( k, v ) for k, v in closeBySym.items() if k in self.tickers )
g = lambda r : ( r.date, [(r.ticker, r.log_bm, r.log_roe)] )
temp_df = input_df.filter( input_df.ticker.isin(self.tickers) & (input_df.rn==2) )
res = temp_df.rdd.map(g).reduceByKey( lambda x, y : x + y ).collectAsMap()
self.minMaxDatesForSym = {}
self.factorsByDateBySym = {}
for idate, v in res.items():
for t, bm, roe in v:
d = self.factorsByDateBySym.setdefault(idate, {})
d[t] = (bm, roe)
dates = self.minMaxDatesForSym.setdefault(t, [date.max, date.min])
if idate < dates[0]:
dates[0] = idate
if idate > dates[1]:
dates[1] = idate
for sym, (l, h) in self.minMaxDatesForSym.items():
print sym, l, h
def initialize(self, context, ticker):
context.model = self.model
context.factorsByDateBySym = self.factorsByDateBySym
context.posAgeBySym={}
context.ticker = ticker
context.sym = symbol(ticker)
set_benchmark(context.sym) # overcome SPY issue
def handle_data(self, context, data):
# this is the main strategy logic, nice and simple :]
closeOut(context)
buyNextDay(context, data)
def run( self ):
self.resultsBySym = {}
self.noPriceList = []
self.exceptionList = []
self.noTradesList = []
for ticker in self.tickers:
if (not ticker in data) or ( data[ticker].dropna().size == 0 ):
self.noPriceList.append(ticker)
continue
else:
sim_panel = pd.Panel( { ticker:self.closeBySym[ticker] } )
try:
start, end = self.minMaxDatesForSym[ticker]
start = pd.Timestamp(start, tz=pytz.UTC)
end = pd.Timestamp(end, tz=pytz.UTC)
sim_results = run_algorithm(start=start, end=end,
initialize=partial( self.initialize, ticker=ticker ),
handle_data=self.handle_data,
capital_base=100000.0,
data=sim_panel )
trades = [ t for t in sim_results.transactions if t ]
if len(trades) == 0:
self.noTradesList.append(ticker)
self.resultsBySym[ticker] = sim_results
except Exception as e:
self.exceptionList.append(ticker)
def plot_results( self ):
valid_res = dict( (k, v) for k, v in self.resultsBySym.items() if k not in self.noTradesList )
numCharts = len(valid_res)
if numCharts % 2 == 1:
numRows = ( numCharts + 1 ) / 2
else:
numRows = numCharts /2
f, axarr = plt.subplots(numRows, 2, figsize=(12, 20))
f.tight_layout()
for i, (sym, res) in enumerate( valid_res.items()):
r = i / 2
c = i % 2
res.portfolio_value.plot(ax=axarr[r,c], title='ticker = %s' % sym)
plt.tight_layout()
Run individual back test for each ticker. This is similar to equal-weight portfolio on each of the tickers.
bt = BackTest( 'Beer', data, modelsByIndustry, sic_df, input_df )
bt.run()
bt.plot_results()
def cagr(bt):
avg_return = np.mean([res.algorithm_period_return[-1] for res in bt.resultsBySym.values()])
# find the overall min, max dates for all our above back-test periods for various tickers
min_date, max_date = date.max, date.min
days = 0
for sym, (l, h) in bt.minMaxDatesForSym.items():
if sym in bt.resultsBySym and sym != 'BUD':
# exclude BUD since we can see from the charts
# there is only few months of data in 2008, which will
# skew the cagr computation by too much
if l < min_date:
min_date = l
if h > max_date:
max_date = h
cagr = (1+avg_return) ** (1/((max_date-min_date).days/365.)) - 1
print "Start date = %s, End date = %s" % (min_date, max_date)
print "Average return for whole period = %s" % avg_return
print "CAGR = %s" % cagr
cagr(bt)