Two-factor Model for Holding-period Return Prediction

We investigate the results in [1] where-by holding-period returns are predictable using fundamental data. The fundamental data and the price data are from Sharadar data sets available from Quandl. We utilize Apache Spark for data ingestion and regression model fitting, followed by Quantopian's Zipline for strategy back-testing.

References

[1] Lyle, M. R. and C. C. Wang (2015). The cross section of expected holding period returns and their dynamics: A present value approach. Journal of Financial Economics 16 (3), 505–525.

In [1]:
from datetime import date, datetime, timedelta
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import abs, lead, lag, log, min, max, row_number, when, ntile
from pyspark.sql.types import *
from pyspark.ml.linalg import DenseVector
from pyspark.ml.regression import LinearRegression
import pandas as pd
import numpy as np
import pytz
from zipline.api import symbol, order, record, set_benchmark, get_datetime, order_target_percent
from zipline.utils.run_algo import run_algorithm
from zipline.finance.trading import TradingEnvironment
from itertools import izip, tee
from functools import partial
import matplotlib.pyplot as plt
%matplotlib inline

DATE_FORMAT = '%Y-%m-%d'

SHARADAR_PRICE_SCHEMA = StructType([
                           StructField('ticker', StringType()),
                           StructField('date', DateType()),
                           StructField('open', FloatType()),
                           StructField('high', FloatType()),
                           StructField('low', FloatType()),
                           StructField('close', FloatType()),
                           StructField('volume', FloatType()),
                           StructField('dividends', FloatType()),
                           StructField('lastupdated', DateType()),
                           # price field is actually mid at close
                          ])

The size of the datasets we are working with is as follows.

In [127]:
%ls -oah '/Users/unclechu/Dropbox/Datasets/Sharadar/SHARADAR_Equity_Prices.csv'
%ls -oah '/Users/unclechu/Dropbox/Datasets/2F_Fundamental_Model_Data/SHARADAR_SF1_ARQ.csv'
%ls -oah '/Users/unclechu/Dropbox/Datasets/2F_Fundamental_Model_Data/FF_48_industry_tickers.csv'
-rw-r--r--@ 1 unclechu   1.6G Feb 17 04:55 /Users/unclechu/Dropbox/Datasets/Sharadar/SHARADAR_Equity_Prices.csv
-rw-r--r--@ 1 unclechu   242M Jan  9 23:36 /Users/unclechu/Dropbox/Datasets/2F_Fundamental_Model_Data/SHARADAR_SF1_ARQ.csv
-rw-r--r--@ 1 unclechu    38K Feb 16 13:40 /Users/unclechu/Dropbox/Datasets/2F_Fundamental_Model_Data/FF_48_industry_tickers.csv
In [2]:
holdingDays = 21
required_cols = ['ticker', 'datekey', 'eps', 'bvps', 'pb', 'marketcap' ]
showCols      = ['ticker', 'datekey', 'eps', 'bvps', 'prev_qtr_bvps',
                 'roe', 'log_roe', 'pb', 'bm', 'log_bm']

min_date = '2007-01-03'
max_date = '2013-12-31'

priceCSVPath  = '/Users/unclechu/Dropbox/Datasets/Sharadar/SHARADAR_Equity_Prices.csv'
fundaCSVPath  = '/Users/unclechu/Dropbox/Datasets/2F_Fundamental_Model_Data/SHARADAR_SF1_ARQ.csv'
sicDataPath   = '/Users/unclechu/Dropbox/Datasets/2F_Fundamental_Model_Data/FF_48_industry_tickers.csv'

First we load the industry data for Fama/French 48 industry classification.

In [3]:
sic_df = spark.read.csv(sicDataPath, header=True)
sic_df.show( truncate=50 )
+-------------------------------+--------------+--------------------------------------------------+
|                       industry|num_of_tickers|                                           tickers|
+-------------------------------+--------------+--------------------------------------------------+
|              Personal Services|           440|ABLYF|ACBFF|ACGBY|ADRZY|AEXAY|AFAM|AGESY|AGODY|...|
|           Electrical Equipment|            85|ABB|AERN|AETI|AME|AMSC|ARGKF|ARTX|ATKR|AXPW|AYI...|
|                  Entertainment|            81|AMC|AMMA|ARHN|AVPI|BLIAQ|BLIBQ|BTOP|BUKS|BWL.A|...|
|         Automobiles and Trucks|           105|ADFS|ADNT|ADOM|ALSN|ALV|AMIN|AMTY|APTV|ATCV|AXL...|
|              Business Supplies|            43|ALGI|AVY|BMS|BOTX|BPMI|BZLFY|CLW|CVO|CYYNF|EBF|...|
|                      Utilities|           181|AEE|AEP|AGR|ALE|AM|AMGP|AMID|APLP|APSI|APTL|AQN...|
|                           Beer|            35|ABEV|AHNR|BF.B|BORN|BRBMF|BREW|BUD|CABGY|CCLAY|...|
|                 Almost Nothing|            32|ADSW|AES|ALAN|AWX|BEVFF|CIWT|CLH|CVA|CWST|CXIA|...|
|        Printing and Publishing|            40|ABCD|ACCO|ACNI|AHC|AXR|BEEN|BSTK|COMP|CSS|DJCO|...|
|                Steel Works Etc|            82|AA|ACH|AKS|AMGY|AMSIY|ANGGY|APEMY|APNI|APWC|AQM...|
|              Medical Equipment|           242|ABMD|ABMT|ADLI|ADMT|AHPI|AIPT|AITB|AKSY|ALGN|AL...|
|                  Food Products|            82|ADM|ALN|AMNF|ANFI|ATLT|BABB|BG|BGS|BRFH|BRFS|BR...|
|Measuring and Control Equipment|           108|A|ABAX|AEHR|AEMD|ALOG|AMOT|ARYC|AXDX|BFNH|BIO|B...|
|        Pharmaceutical Products|           706|AAAP|ABBV|ABEO|ABIO|ABMC|ABT|ABUS|ABVC|ACAD|ACE...|
|                            Oil|           344|AAV|AEGG|AETUF|ALOD|ALTX|ANDV|ANFC|AOGN|AOIFF|A...|
|                 Transportation|           203|AAIR|AAL|AAWW|AFLYY|AIRT|ALGT|ALK|ANDX|ARCB|ASC...|
|                      Chemicals|           145|ABCZY|ADES|AEPT|AHKSY|AIQUY|AKZOY|ALB|AMRS|AMTX...|
|                       Aircraft|            28|AAIIQ|AIR|AIRI|ATRO|AVAV|BA|BBAVY|BRSI|COL|CVU|...|
|                   Construction|            82|ABCE|AEGN|AGX|AMRC|AVHI|BBU|BLD|BZH|CAA|CADC|CB...|
|    Rubber and Plastic Products|            44|ABLT|ADSV|AFI|ATPL|ATR|AWI|BERY|BSRC|CMT|CSL|CT...|
+-------------------------------+--------------+--------------------------------------------------+
only showing top 20 rows

Next we load the price data.

In [4]:
price_df = spark.read.csv(priceCSVPath, schema=SHARADAR_PRICE_SCHEMA, header=True, dateFormat='yyyy-MM-dd')
price_df = price_df.select( 'date', 'ticker', 'close').withColumnRenamed('close', 'mid')

windowSpec = Window.partitionBy('ticker').orderBy('date')
In [5]:
price_df = price_df.withColumn('prc_next', lead(price_df['mid'], 1).over(windowSpec))
price_df = price_df.withColumn('prc_fwd', lead(price_df['mid'], holdingDays + 1).over(windowSpec))
price_df = price_df.withColumn('ret', ( price_df['prc_fwd'] - price_df['prc_next'] ) / price_df['prc_next'] )
price_df = price_df.withColumn('log_ret', log(price_df['prc_fwd']) - log(price_df['prc_next'] ) )

Computing Returns After Earnings Date

We batch compute, for each date $d$, the return (ret column) from buying at mid-quote at close on $d+1$ (prc_next column), and closing out the position at $d+21$ (the prc_fwd column).

For example, as we can see below, for the date 2007-01-03, the return from buying on 2007-01-04 (at price of 9.041552) and closing out on 2007-02-05 (at price of 9.555308), is 0.056821.

With this data ready, we can merge it to the fundamentals data prepared in the next step.

In [6]:
price_df.filter(price_df.ticker=='BREW').show(10)
+----------+------+-----+--------+-------+--------------------+--------------------+
|      date|ticker|  mid|prc_next|prc_fwd|                 ret|             log_ret|
+----------+------+-----+--------+-------+--------------------+--------------------+
|1998-12-31|  BREW|  4.5|     4.5|    5.5|  0.2222222222222222|  0.2006706954621511|
|1999-01-04|  BREW|  4.5|     4.5|  5.406| 0.20133336385091147| 0.18343207648107085|
|1999-01-05|  BREW|  4.5|     4.5|    5.0|  0.1111111111111111| 0.10536051565782611|
|1999-01-06|  BREW|  4.5|     4.5|    5.5|  0.2222222222222222|  0.2006706954621511|
|1999-01-07|  BREW|  4.5|   4.563|  5.563| 0.21915405559755907| 0.19815722118306267|
|1999-01-08|  BREW|4.563|   4.563|  5.438|  0.1917597986478642| 0.17543103713559183|
|1999-01-11|  BREW|4.563|   4.813|  5.438| 0.12985663281648072| 0.12209075106137757|
|1999-01-12|  BREW|4.813|     5.0|  5.375|               0.075| 0.07232066157962613|
|1999-01-13|  BREW|  5.0|   5.188|   5.25|0.011950615922297972|0.011879771178835652|
|1999-01-14|  BREW|5.188|    5.25|  5.188|-0.01180948529924...|-0.01187977117883...|
+----------+------+-----+--------+-------+--------------------+--------------------+
only showing top 10 rows

Preparing the Return-On-Equity and Book-to-Market Data

We compute the ROE, for each stock as,

$$ ROE_t = 1 + \frac{EPS_t}{ BVPS_{t-1}} $$

meaning the ROE for this quarter is 1 plus the EPS (earnings-per-share) of this quarter divided by the BVPS (book-value-per-share) of the previous quarter.

For book-to-market, we infer it from the price-to-book data from the fundamentals data set.

We also filter out rows with negative values for ROE and BM.

In [35]:
df = spark.read.csv(fundaCSVPath, header=True).select(required_cols)

df = df.withColumn( 'datekey', df.datekey.cast('date'))
df = df.withColumnRenamed('datekey', 'date')
df = df.filter( df.date.between(min_date, max_date) )

df = df.withColumn('prev_qtr_bvps', lag(df['bvps']).over(windowSpec) )
df = df.withColumn('roe', 1. + df['eps']/df['prev_qtr_bvps'])
df = df.withColumn('bm', 1./df.pb)

# filter negative values in the roe and bm columns
df = df.filter((df.roe > 0.0) & (df.bm > 0.0) )
df = df.withColumn('log_bm', log(df['bm']))
df = df.withColumn('log_roe', log(df['roe']))

We show a sample of the ROE, BM, etc. fundamentals data.

In [9]:
df.filter(df.ticker=='BREW').show(10, 10)
+------+----------+-----+-----+-----+---------+-------------+----------+----------+----------+----------+
|ticker|      date|  eps| bvps|   pb|marketcap|prev_qtr_bvps|       roe|        bm|    log_bm|   log_roe|
+------+----------+-----+-----+-----+---------+-------------+----------+----------+----------+----------+
|  BREW|2007-05-09|  0.0|7.308| 0.92| 55807174|        7.308|       1.0|1.08695...|0.08338...|       0.0|
|  BREW|2007-05-11|-0.04|7.269|0.933| 56377659|        7.308|0.99452...|1.07181...|0.06935...|-0.0054...|
|  BREW|2007-08-10| 0.06|7.328|0.858| 52520587|        7.269|1.00825...|1.16550...|0.15315...|0.00822...|
|  BREW|2007-11-13|-0.03|7.294|0.836| 50960857|        7.328|0.99590...|1.19617...|0.17912...|-0.0041...|
|  BREW|2008-03-26| -0.1|7.192| 0.62| 37259905|        7.294|0.98629...|1.61290...|0.47803...|-0.0138...|
|  BREW|2008-05-02| -0.1|7.192|0.556| 33416956|        7.192|0.98609...|1.79856...|0.58698...|-0.0140...|
|  BREW|2008-05-12| -0.1|7.192|0.553| 33249871|        7.192|0.98609...|1.80831...|0.59239...|-0.0140...|
|  BREW|2008-05-15|-0.07|7.102|0.562| 33461093|        7.192|0.99026...|1.77935...|0.57625...|-0.0097...|
|  BREW|2008-08-14|-0.16| 3.47|1.156| 67440833|        7.102|0.97747...|0.86505...|-0.1449...|-0.0227...|
|  BREW|2008-11-14|-0.07|6.476|0.402| 44090963|         3.47|0.97982...|2.48756...|0.91130...|-0.0203...|
+------+----------+-----+-----+-----+---------+-------------+----------+----------+----------+----------+
only showing top 10 rows

We next join the ROE and BM data with the 21-day holding period return data so that for each earnings date, we have the $\log{BM}$ and $\log{ROE}$ values and the corresponding return. Our thesis is that the two factors $\log{BM}$ and $\log{ROE}$ predicts the holding period return. We will use these two factors in a linear regression model, train the model, and use it as a signal for our trading strategy.

In [36]:
input_df = df.join(price_df, on=['date', 'ticker'], how='left')
input_df = input_df.dropna()
input_df = input_df.withColumn( 'rn', ntile(2).over(windowSpec) )
In [37]:
input_df.filter(input_df.ticker=='BREW').dropna()\
    .select('date', 'ticker', 'log_bm', 'log_roe', 'ret', 'rn').show(10, 10)
+----------+------+----------+----------+----------+---+
|      date|ticker|    log_bm|   log_roe|       ret| rn|
+----------+------+----------+----------+----------+---+
|2007-05-09|  BREW|0.08338...|       0.0|0.03834...|  1|
|2007-05-11|  BREW|0.06935...|-0.0054...|0.02623...|  1|
|2007-08-10|  BREW|0.15315...|0.00822...|0.05182...|  1|
|2007-11-13|  BREW|0.17912...|-0.0041...|-0.0362...|  1|
|2008-03-26|  BREW|0.47803...|-0.0138...|-0.1189...|  1|
|2008-05-02|  BREW|0.58698...|-0.0140...|0.11970...|  1|
|2008-05-12|  BREW|0.59239...|-0.0140...|0.11083...|  1|
|2008-05-15|  BREW|0.57625...|-0.0097...|0.15724...|  1|
|2008-08-14|  BREW|-0.1449...|-0.0227...|-0.2014...|  1|
|2008-11-14|  BREW|0.91130...|-0.0203...|-0.4174...|  1|
+----------+------+----------+----------+----------+---+
only showing top 10 rows

In the above, we have computed row numbers for each ticker ordered by date, and with this we can split the data into half, using each half as training set and test set respectively.

We next train the model using the first half of the data. There is a separate regression model for each of the industry group in Fama and French's 48 industry classification.

In [12]:
def fit_1f_model(train_df):
    
    def f(row):
        x1, x2, y = row
        return ( DenseVector([x2]), y )
    
    regression_df = spark.createDataFrame( train_df.rdd.map( f ),
                                           ['features', 'label'] )

    lr = LinearRegression(labelCol='label')
    model = lr.fit(regression_df)
    return model
In [13]:
def winsorize_data( df ):
    
    # winsorize top and bottom 0.5% or regressors
    q_bm  = df.approxQuantile('log_bm', [0.005, 0.995], 0.0)
    q_roe = df.approxQuantile('log_roe', [0.005, 0.995], 0.0)
    
    #print q_bm, q_roe
    
    df = df.withColumn('log_bm', when(df['log_bm'] < q_bm[0], q_bm[0]) \
                                 .otherwise(df['log_bm']) )
    df = df.withColumn('log_bm', when(df['log_bm'] > q_bm[1], q_bm[1]) \
                                 .otherwise(df['log_bm']) )    
    df = df.withColumn('log_roe', when(df['log_roe'] < q_roe[0], q_roe[0]) \
                                  .otherwise(df['log_roe']) )
    df = df.withColumn('log_roe', when(df['log_roe'] > q_roe[1], q_roe[1]) \
                                  .otherwise(df['log_roe']) )    
    df = df.withColumn('log_ret', when(df['log_ret'] > 0.3, 0.3) \
                                  .otherwise(df['log_ret']) )    
    df = df.withColumn('log_ret', when(df['log_ret'] < -0.3, -0.3) \
                                  .otherwise(df['log_ret']) )    
    
    return df

def fit_2f_model(train_df):
    
    train_df = winsorize_data( train_df )
    
    def f(row):
        x1, x2, x3 = row
        return ( DenseVector([x1, x2]), x3 )

    regression_df = spark.createDataFrame( train_df.rdd.map( f ),
                                           ['features', 'label'] )
    lr = LinearRegression(labelCol='label')
    model = lr.fit(regression_df)
    return model
In [ ]:
modelsByIndustry = {}

for row in sic_df.collect():
    industry, _, tickers = row 
    tickers = tickers.split('|')
    ind_train_df = input_df.filter( ( input_df.ticker.isin(tickers) ) & (input_df.rn==1 ) ) \
                           .select( 'log_bm', 'log_roe', 'log_ret')
    modelsByIndustry[industry] = fit_2f_model( ind_train_df )
    

Model Calibration by Industry Group

In [22]:
def summarize_models( modelsByIndustry ):
    data = []
    countByInd = {}
    for row in sic_df.collect():
        industry, _, tickers = row
        countByInd[industry] = len(tickers.split('|'))
    
    for ind, model in modelsByIndustry.items():
        data.append( (ind, countByInd[ind], model.coefficients, model.intercept, 
                      model.summary.rootMeanSquaredError, model.summary.r2 ) ) 
    
    return pd.DataFrame( data, columns=['industry', 'size', 'coeffs', 'intercept', 'RMSE', '$R^2$'] )

summarize_models(modelsByIndustry).sort('$R^2$', ascending=False)
/Users/unclechu/anaconda/lib/python2.7/site-packages/ipykernel/__main__.py:14: FutureWarning: sort(columns=....) is deprecated, use sort_values(by=.....)
Out[22]:
industry size coeffs intercept RMSE $R^2$
44 Candy & Soda 15 [0.0337905589719, 0.275079631159] 0.044654 0.148815 0.060023
27 Textiles 12 [0.0398895623822, 0.0947009092804] 0.004806 0.135964 0.059896
4 Recreation 40 [0.0342648778059, -0.0638419667855] -0.002016 0.119006 0.058238
32 Shipbuilding, Railroad Equipment 12 [0.0386215090006, -0.336726448009] 0.018463 0.130521 0.055762
36 Tobacco Products 18 [-0.0910288338332, -0.0697010340344] -0.214898 0.085998 0.038160
20 Rubber and Plastic Products 44 [0.0152945261987, 0.0464764203173] 0.014073 0.129537 0.026394
33 Construction Materials 99 [-0.00423950915667, -0.205864266552] -0.007755 0.128077 0.023239
23 Business Supplies 43 [0.0206325724668, -0.0281954727311] 0.015789 0.126419 0.023201
37 Steel Works Etc 82 [0.0191713482051, -0.14423897045] 0.014195 0.140086 0.021659
45 Insurance 173 [0.0255400420054, 0.108633189816] 0.008052 0.109395 0.017682
6 Beer 35 [0.0226151939498, 0.0328919373213] 0.023970 0.132709 0.017493
19 Construction 82 [0.0160183618541, 0.176389430816] 0.001967 0.146898 0.017044
21 Non-Metallic and Industrial Metal Mining 123 [0.0118703832757, -0.0552089124937] 0.000511 0.157760 0.016036
17 Aircraft 28 [0.0181910208234, -0.0462463077567] 0.004431 0.122441 0.013949
35 Communication 229 [0.0130430308352, -0.00882923747458] 0.009973 0.132672 0.013340
12 Measuring and Control Equipment 108 [0.0112468223333, 0.0911315887735] 0.003764 0.131277 0.013195
46 Restaraunts, Hotels, Motels 112 [0.00574985755826, -0.0626621866183] 0.007135 0.128022 0.012521
1 Electrical Equipment 85 [0.00990139907638, -0.109998932394] -0.005459 0.136847 0.012269
3 Automobiles and Trucks 105 [0.0124053731173, -0.0397362110278] -0.007868 0.135629 0.011580
7 Almost Nothing 32 [0.0120293315836, -0.0571389318696] 0.012861 0.113489 0.010925
30 Healthcare 99 [0.0144644753308, 0.00474239750744] -0.001003 0.125919 0.009441
31 Apparel 58 [0.00861006403232, -0.0497797623101] -0.000135 0.138435 0.009106
42 Consumer Goods 97 [0.0114946641582, 0.00563070560872] 0.003432 0.126777 0.008961
39 Trading 708 [0.0104682510634, -0.011724840714] 0.006536 0.114267 0.008175
40 Shipping Containers 15 [0.000533103995562, -0.0416586235948] 0.009286 0.104350 0.006848
29 Machinery 164 [0.0119656121923, -0.0231203010886] 0.010536 0.132385 0.006505
8 Printing and Publishing 40 [0.00806900341395, 0.0424679970781] -0.019574 0.118754 0.006324
41 Real Estate 103 [0.00660130776234, 0.0254643522689] -0.009593 0.115261 0.006106
38 Defense 12 [0.00111954969501, -0.0639300700979] -0.007780 0.121678 0.005996
43 Retail 252 [0.00789106501788, -0.0436637985996] 0.003304 0.131885 0.005685
18 Chemicals 145 [0.0086277043715, 0.0474427818794] 0.007086 0.134075 0.005223
28 Banking 735 [0.0140486663945, 0.0707164133311] -0.012542 0.113233 0.004967
9 Electronic Equipment 338 [0.0117501437686, -0.0108824070764] 0.008151 0.140756 0.004742
34 Coal 24 [0.00283452549572, 0.156798796143] 0.000653 0.131869 0.003920
14 Pharmaceutical Products 706 [0.00749498658282, 0.00344113153373] 0.001501 0.155959 0.003128
24 Business Services 826 [0.00652764584098, -0.0044389463956] 0.003448 0.129167 0.002775
13 Agriculture 24 [0.00685133356268, 0.0717692127791] -0.010135 0.123078 0.002539
2 Entertainment 81 [0.0010573824155, 0.0256519316455] -0.005456 0.128983 0.002314
15 Oil 344 [0.00671804741735, -0.0204423824484] -0.001552 0.137968 0.002239
0 Personal Services 440 [0.00555779258184, 0.0198012046218] 0.004365 0.119355 0.002215
16 Transportation 203 [-0.00166954953013, 0.0425859985442] -0.009781 0.118347 0.002188
25 Wholesale 189 [0.00705687387603, 0.0294003694101] -0.003189 0.126913 0.002070
22 Fabricated Products 16 [0.00565321835893, 0.00511257407008] 0.020088 0.137740 0.001677
10 Medical Equipment 242 [0.0058932109189, -0.00485420409897] -0.007225 0.141686 0.001611
11 Food Products 82 [-0.00511637450076, 0.00125565537008] -0.005978 0.105307 0.001475
26 Computers 147 [-0.000401431814889, -0.0290825089358] -0.006711 0.127641 0.000976
47 Precious Metals 89 [-0.00104133192154, 0.0153678208671] -0.027618 0.167794 0.000668
5 Utilities 181 [0.000482446262707, -0.00456846330988] 0.002183 0.080767 0.000029

Next we prepare the price data for strategy back testing. For each ticker, we create a pandas dataframe for passing into the zipline backtester.

In [24]:
def showData(ind):
    
    tickers = sic_df.filter( sic_df.industry==ind ).select( 'tickers' ).head().tickers
    tickers = tickers.split('|')
    data = input_df.filter( ( input_df.ticker.isin(tickers) ) & (input_df.rn == 1 ) )\
                   .select( 'log_bm', 'log_roe', 'log_ret')
        
    data = winsorize_data(data)
    
    x1, x2, y = [], [], []
    for lbm, lroe, lret in data.collect():
        x1.append(lbm)
        x2.append(lroe)
        y.append(lret)
    
    plt.subplot(211)
    plt.scatter(x1, y)
    plt.subplot(212)
    plt.scatter(x2, y)
    
showData('Recreation')

Next we choose the at random the Beer industry and perform a back test of the model using the train dataset (i.e. the rows with industry=='Beer' and rn=2).

In [25]:
INDUSTRY = 'Beer'
TICKERS = sic_df.filter( sic_df.industry==INDUSTRY ).select( 'tickers' ).head().tickers.split('|')
print 'List of tickers =', TICKERS
List of tickers = [u'ABEV', u'AHNR', u'BF.B', u'BORN', u'BRBMF', u'BREW', u'BUD', u'CABGY', u'CCLAY', u'CCU', u'CWGL', u'DEO', u'DKAM', u'DPS', u'DVDCY', u'EAST', u'GMNI', u'JSDA', u'KNBWY', u'KO', u'LBCC', u'LVMUY', u'MENB', u'NBEV', u'PACV', u'PEP', u'ROX', u'SAM', u'SMGBY', u'STZ', u'STZ.B', u'TAP', u'THST', u'TSGTY', u'WVVI']

Now prepare the data we need for back testing. For every ticker in Beer industry we extract the daily close price into pandas dataframes.

In [26]:
priceBySym = price_df.filter( price_df.ticker.isin(TICKERS) ) \
                     .rdd.map(lambda r:(r.ticker, [(r.date, r.mid)]))\
                         .reduceByKey( lambda x, y : x + y )\
                         .collectAsMap()

data = {}
for k, v in priceBySym.items():
    idx, vals = zip(*v)
    close_prices = pd.DataFrame( { 'close' : vals }, pd.DatetimeIndex(idx) )
    data[k]=close_prices

We also group all the tickers by their report date, and for each report date we collect the log_bm and log_roe info so that we can apply the model to make predictions on whether we should go long/short the next trading day after earnings announcement.

In [38]:
# prepare the relevant tickers for each report date
g = lambda r : ( r.date, [(r.ticker, r.log_bm, r.log_roe)] )

# create factors by date by ticker
res = input_df.filter( input_df.ticker.isin(TICKERS) ) \
              .rdd.map( g ) \
                  .reduceByKey( lambda x, y : x + y ) \
                  .collectAsMap()
        
factorsByDateBySym = {}
for k, v in res.items():
    d = factorsByDateBySym.setdefault(k, {})
    for ticker, log_bm, log_roe in v:
        d[ticker] = (log_bm, log_roe)

Now we implement the buyNextDay and closeOut logic to be invoked by the zipline back-test engine.

In [29]:
def buyNextDay( context, data ):  
    today = get_datetime().date()
    model = context.model
    # if there are companies reporting for today, get the symbols list
    if today in context.factorsByDateBySym.keys():
        factorsBySym = context.factorsByDateBySym[today]
        
        if context.ticker in factorsBySym:
            factors = factorsBySym[context.ticker]

            signal = np.array(factors+(1,)).dot(np.append(model.coefficients.array, model.intercept))
            price = data[symbol(context.ticker)].price

            # note, we don't have to use symsToBuyNextDay next day logic below because
            # for daily simulation in zipline, any order issued at t will execute with t+1 price
            # see https://www.quantopian.com/posts/at-jessica-stauth-question-about-trade-at-specified-time
            if signal > 0.0:
                order(symbol(context.ticker), context.portfolio.cash/price)
                #context.symsToBuyNextDay.append((sym, 1.))
            elif signal < 0.0:
                order(symbol(context.ticker), -context.portfolio.cash/price)
                #context.symsToBuyNextDay.append((sym, -1.))

            # track age of position, so that we can liquidate after 21 days
            context.posAgeBySym[context.ticker] = 0

def closeOut(context):
    # close out positions for those that reached 21 days
    # update position age
    for sym in context.posAgeBySym:
        context.posAgeBySym[context.ticker] += 1
        
    today = get_datetime().date()
    for asset, pos in context.portfolio.positions.items():
        # use holdingDays - 1 below to work around the next day order issue again
        if context.posAgeBySym[asset.symbol] == holdingDays-1:

            order(asset, -pos.amount)

Below we have the logic for the back test helper class.

In [96]:
class BackTest(object):
    
    def __init__(self, industry, closeBySym, modelsByIndustry, sic_df, input_df):
        self.industry = industry
        tickers       = sic_df.filter( sic_df.industry==industry ).select( 'tickers' ).head().tickers
        self.tickers  = tickers.split('|')
        self.model    = modelsByIndustry[industry]
        self.closeBySym = dict( ( k, v ) for k, v in closeBySym.items() if k in self.tickers )
        
        g = lambda r : ( r.date, [(r.ticker, r.log_bm, r.log_roe)] )
        temp_df = input_df.filter( input_df.ticker.isin(self.tickers) & (input_df.rn==2) )
        res = temp_df.rdd.map(g).reduceByKey( lambda x, y : x + y ).collectAsMap()

        self.minMaxDatesForSym = {}
        self.factorsByDateBySym = {}
        for idate, v in res.items():
            for t, bm, roe in v:
                d = self.factorsByDateBySym.setdefault(idate, {})
                d[t] = (bm, roe)

                dates = self.minMaxDatesForSym.setdefault(t, [date.max, date.min])
                if idate < dates[0]:
                    dates[0] = idate
                if idate > dates[1]:
                    dates[1] = idate

        
        for sym, (l, h) in self.minMaxDatesForSym.items():
            print sym, l, h
        
    def initialize(self, context, ticker):
        context.model = self.model
        context.factorsByDateBySym = self.factorsByDateBySym    
        context.posAgeBySym={}
        context.ticker = ticker
        context.sym = symbol(ticker)
        set_benchmark(context.sym) # overcome SPY issue

    def handle_data(self, context, data):   
        # this is the main strategy logic, nice and simple :]
        closeOut(context)      
        buyNextDay(context, data)
    
    def run( self ):
        self.resultsBySym  = {}
        self.noPriceList   = []
        self.exceptionList = []
        self.noTradesList  = []
        
        for ticker in self.tickers:
            
            if (not ticker in data) or ( data[ticker].dropna().size == 0 ):
                self.noPriceList.append(ticker)
                continue
            else:
                sim_panel = pd.Panel( { ticker:self.closeBySym[ticker] } )       
            try:
                start, end = self.minMaxDatesForSym[ticker]
                start = pd.Timestamp(start, tz=pytz.UTC)
                end = pd.Timestamp(end, tz=pytz.UTC)
                sim_results = run_algorithm(start=start, end=end, 
                                            initialize=partial( self.initialize, ticker=ticker ), 
                                            handle_data=self.handle_data, 
                                            capital_base=100000.0, 
                                            data=sim_panel )

                trades = [ t for t in sim_results.transactions if t ]
                if len(trades) == 0:
                    self.noTradesList.append(ticker)

                self.resultsBySym[ticker] = sim_results


            except Exception as e:

                self.exceptionList.append(ticker)

    def plot_results( self ):
        valid_res = dict( (k, v) for k, v in self.resultsBySym.items() if k not in self.noTradesList )

        numCharts = len(valid_res)
        if numCharts % 2 == 1:
            numRows = ( numCharts + 1 ) / 2
        else:
            numRows = numCharts /2

        f, axarr = plt.subplots(numRows, 2, figsize=(12, 20))
        f.tight_layout()
        for i, (sym, res) in enumerate( valid_res.items()):
            r = i / 2
            c = i % 2

            res.portfolio_value.plot(ax=axarr[r,c], title='ticker = %s' % sym)

        plt.tight_layout()

Run individual back test for each ticker. This is similar to equal-weight portfolio on each of the tickers.

In [97]:
bt = BackTest( 'Beer', data, modelsByIndustry, sic_df, input_df )
bt.run()
WVVI 2010-11-15 2013-11-12
TAP 2010-11-04 2013-11-06
SAM 2010-11-04 2013-10-30
JSDA 2010-11-15 2013-11-08
PEP 2010-10-07 2013-10-16
MENB 2010-11-15 2013-11-14
ROX 2010-11-15 2013-11-13
KO 2010-10-29 2013-10-24
STZ 2010-10-12 2013-10-10
BREW 2010-05-14 2013-11-06
DPS 2011-04-27 2013-10-24
BUD 2008-04-25 2008-11-06
CWGL 2013-11-08 2013-11-08
/Users/unclechu/anaconda/lib/python2.7/site-packages/ipykernel/__main__.py:12: ZiplineDeprecationWarning: `data[sid(N)]` is deprecated. Use `data.current`.
In [98]:
bt.plot_results()
In [116]:
def cagr(bt):
    
    avg_return = np.mean([res.algorithm_period_return[-1] for res in bt.resultsBySym.values()])
    
    # find the overall min, max dates for all our above back-test periods for various tickers
    
    min_date, max_date = date.max, date.min
    
    days = 0
    for sym, (l, h) in bt.minMaxDatesForSym.items():
        if sym in bt.resultsBySym and sym != 'BUD':
            # exclude BUD since we can see from the charts 
            # there is only few months of data in 2008, which will
            # skew the cagr computation by too much
            if l < min_date:
                min_date = l
            if h > max_date:
                max_date = h
    
    cagr = (1+avg_return) ** (1/((max_date-min_date).days/365.)) - 1
    print "Start date = %s, End date = %s" % (min_date, max_date)
    print "Average return for whole period = %s" % avg_return
    print "CAGR = %s" % cagr

cagr(bt)
Start date = 2010-05-14, End date = 2013-11-14
Average return for whole period = 0.202412361772
CAGR = 0.0539687555913