AI For Trading:Project 7: helper function (117)
Project 7 用到的辅助文件。
requirements.txt
requirements.txt 文件
alphalens==0.3.2
graphviz==0.10.1
numpy==1.13.3
pandas==0.18.1
python-dateutil==2.6.1
pytz==2017.3
scipy==1.0.0
scikit-learn==0.19.1
six==1.11.0
tables==3.3.0
tqdm==4.19.5
zipline===1.2.0
project_helper.py
project_helper.py 文件
import alphalens as al
import graphviz
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from IPython.display import Image
from sklearn.tree import export_graphviz
from zipline.assets._assets import Equity # Required for USEquityPricing
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.classifiers import Classifier
from zipline.pipeline.engine import SimplePipelineEngine
from zipline.pipeline.loaders import USEquityPricingLoader
from zipline.utils.numpy_utils import int64_dtype
EOD_BUNDLE_NAME = 'eod-quotemedia'
class PricingLoader(object):
def __init__(self, bundle_data):
self.loader = USEquityPricingLoader(
bundle_data.equity_daily_bar_reader,
bundle_data.adjustment_reader)
def get_loader(self, column):
if column not in USEquityPricing.columns:
raise Exception('Column not in USEquityPricing')
return self.loader
class Sector(Classifier):
dtype = int64_dtype
window_length = 0
inputs = ()
missing_value = -1
def __init__(self):
self.data = np.load('../../data/project_7_sector/data.npy')
def _compute(self, arrays, dates, assets, mask):
return np.where(
mask,
self.data[assets],
self.missing_value,
)
def build_pipeline_engine(bundle_data, trading_calendar):
pricing_loader = PricingLoader(bundle_data)
engine = SimplePipelineEngine(
get_loader=pricing_loader.get_loader,
calendar=trading_calendar.all_sessions,
asset_finder=bundle_data.asset_finder)
return engine
def plot_tree_classifier(clf, feature_names=None):
dot_data = export_graphviz(
clf,
out_file=None,
feature_names=feature_names,
filled=True,
rounded=True,
special_characters=True,
rotate=True)
return Image(graphviz.Source(dot_data).pipe(format='png'))
def plot(xs, ys, labels, title='', x_label='', y_label=''):
for x, y, label in zip(xs, ys, labels):
plt.ylim((0.5, 0.55))
plt.plot(x, y, label=label)
plt.title(title)
plt.xlabel(x_label)
plt.ylabel(y_label)
plt.legend(bbox_to_anchor=(1.04, 1), borderaxespad=0)
plt.show()
def rank_features_by_importance(importances, feature_names):
indices = np.argsort(importances)[::-1]
max_feature_name_length = max([len(feature) for feature in feature_names])
print(' Feature{space: <{padding}} Importance'.format(padding=max_feature_name_length - 8, space=' '))
for x_train_i in range(len(importances)):
print('{number:>2}. {feature: <{padding}} ({importance})'.format(
number=x_train_i + 1,
padding=max_feature_name_length,
feature=feature_names[indices[x_train_i]],
importance=importances[indices[x_train_i]]))
def sharpe_ratio(factor_returns, annualization_factor=np.sqrt(252)):
return annualization_factor * factor_returns.mean() / factor_returns.std()
def get_factor_returns(factor_data):
ls_factor_returns = pd.DataFrame()
for factor, factor_data in factor_data.items():
ls_factor_returns[factor] = al.performance.factor_returns(factor_data).iloc[:, 0]
return ls_factor_returns
def plot_factor_returns(factor_returns):
(1 + factor_returns).cumprod().plot(ylim=(0.8, 1.2))
def plot_factor_rank_autocorrelation(factor_data):
ls_FRA = pd.DataFrame()
unixt_factor_data = {
factor: factor_data.set_index(pd.MultiIndex.from_tuples(
[(x.timestamp(), y) for x, y in factor_data.index.values],
names=['date', 'asset']))
for factor, factor_data in factor_data.items()}
for factor, factor_data in unixt_factor_data.items():
ls_FRA[factor] = al.performance.factor_rank_autocorrelation(factor_data)
ls_FRA.plot(title="Factor Rank Autocorrelation", ylim=(0.8, 1.0))
def build_factor_data(factor_data, pricing):
return {factor_name: al.utils.get_clean_factor_and_forward_returns(factor=data, prices=pricing, periods=[1])
for factor_name, data in factor_data.iteritems()}
project_tests.py
project_tests.py 文件
from collections import OrderedDict
import numpy as np
import pandas as pd
from sklearn.ensemble import BaggingClassifier, RandomForestRegressor
from sklearn.tree import DecisionTreeClassifier
from unittest.mock import patch
from zipline.data import bundles
from tests import assert_output, project_test, generate_random_dates, assert_structure
def get_assets(ticker_count):
bundle = bundles.load('eod-quotemedia')
return bundle.asset_finder.retrieve_all(bundle.asset_finder.sids[:ticker_count])
@project_test
def test_train_valid_test_split(fn):
columns = ['test column 1', 'test column 2', 'test column 3']
dates = generate_random_dates(10)
assets = get_assets(3)
index = pd.MultiIndex.from_product([dates, assets])
values = np.arange(len(index) * len(columns)).reshape([len(columns), len(index)]).T
targets = np.arange(len(index))
fn_inputs = {
'all_x': pd.DataFrame(values, index, columns),
'all_y': pd.Series(targets, index, name='target'),
'train_size': 0.6,
'valid_size': 0.2,
'test_size': 0.2}
fn_correct_outputs = OrderedDict([
('X_train', pd.DataFrame(values[:18], index[:18], columns=columns)),
('X_valid', pd.DataFrame(values[18:24], index[18:24], columns=columns)),
('X_test', pd.DataFrame(values[24:], index[24:], columns=columns)),
('y_train', pd.Series(targets[:18], index[:18])),
('y_valid', pd.Series(targets[18:24], index[18:24])),
('y_test', pd.Series(targets[24:], index[24:]))])
assert_output(fn, fn_inputs, fn_correct_outputs, check_parameter_changes=False)
@project_test
def test_non_overlapping_samples(fn):
columns = ['test column 1', 'test column 2']
dates = generate_random_dates(8)
assets = get_assets(3)
index = pd.MultiIndex.from_product([dates, assets])
values = np.arange(len(index) * len(columns)).reshape([len(columns), len(index)]).T
targets = np.arange(len(index))
fn_inputs = {
'x': pd.DataFrame(values, index, columns),
'y': pd.Series(targets, index),
'n_skip_samples': 2,
'start_i': 1}
new_index = pd.MultiIndex.from_product([dates[fn_inputs['start_i']::fn_inputs['n_skip_samples'] + 1], assets])
fn_correct_outputs = OrderedDict([
(
'non_overlapping_x',
pd.DataFrame(
[
[3, 27],
[4, 28],
[5, 29],
[12, 36],
[13, 37],
[14, 38],
[21, 45],
[22, 46],
[23, 47]],
new_index, columns)),
(
'non_overlapping_y',
pd.Series([3, 4, 5, 12, 13, 14, 21, 22, 23], new_index))])
assert_output(fn, fn_inputs, fn_correct_outputs, check_parameter_changes=False)
@project_test
def test_bagging_classifier(fn):
n_estimators = 200
parameters = {
'criterion': 'entropy',
'min_samples_leaf': 2500,
'oob_score': True,
'n_jobs': -1,
'random_state': 0}
fn_inputs = {
'n_estimators': n_estimators,
'max_samples': 0.2,
'max_features': 1.0,
'parameters': parameters}
return_value = fn(**fn_inputs)
assert isinstance(return_value, BaggingClassifier),\
'Returned object is wrong. It should be a BaggingClassifier.'
assert return_value.max_samples == fn_inputs['max_samples'],\
'BaggingClassifier\'s max_samples is the wrong value.'
assert return_value.max_features == fn_inputs['max_features'],\
'BaggingClassifier\'s max_features is the wrong value.'
assert return_value.oob_score == parameters['oob_score'],\
'BaggingClassifier\'s oob_score is the wrong value.'
assert return_value.n_jobs == parameters['n_jobs'],\
'BaggingClassifier\'s n_jobs is the wrong value.'
assert return_value.random_state == parameters['random_state'],\
'BaggingClassifier\'s random_state is the wrong value.'
assert isinstance(return_value.base_estimator, DecisionTreeClassifier),\
'BaggingClassifier\'s base estimator is the wrong value type. It should be a DecisionTreeClassifier.'
assert return_value.base_estimator.criterion == parameters['criterion'],\
'The base estimator\'s criterion is the wrong value.'
assert return_value.base_estimator.min_samples_leaf == parameters['min_samples_leaf'],\
'The base estimator\'s min_samples_leaf is the wrong value.'
@project_test
def test_calculate_oob_score(fn):
n_estimators = 3
n_features = 2
n_samples = 1000
noise = np.random.RandomState(0).random_sample([3, n_samples]) * n_samples
x = np.arange(n_estimators * n_samples * n_features).reshape([n_estimators, n_samples, n_features])
y = np.sum(x, axis=-1) + noise
estimators = [
RandomForestRegressor(300, oob_score=True, n_jobs=-1, random_state=101).fit(x[estimator_i], y[estimator_i])
for estimator_i in range(n_estimators)]
fn_inputs = {
'classifiers': estimators}
fn_correct_outputs = OrderedDict([('oob_score', 0.911755651666)])
assert_output(fn, fn_inputs, fn_correct_outputs, check_parameter_changes=False)
@project_test
def test_non_overlapping_estimators(fn):
n_estimators = 3
columns = ['test column 1', 'test column 2']
dates = generate_random_dates(8)
assets = get_assets(3)
index = pd.MultiIndex.from_product([dates, assets])
noise = np.random.RandomState(0).random_sample([len(index)]) * len(index)
values = np.arange(len(index) * len(columns)).reshape([len(columns), len(index)]).T
targets = np.sum(values, axis=-1) + noise
classifiers = [
RandomForestRegressor(300, oob_score=True, n_jobs=-1, random_state=101)
for _ in range(n_estimators)]
fn_inputs = {
'x': pd.DataFrame(values, index, columns),
'y': pd.Series(targets, index),
'classifiers': classifiers,
'n_skip_samples': 3}
random_forest_regressor_fit = RandomForestRegressor.fit
with patch.object(RandomForestRegressor, 'fit', autospec=True) as mock_fit:
mock_fit.side_effect = random_forest_regressor_fit
fn_return_value = fn(**fn_inputs)
assert_structure(fn_return_value, [RandomForestRegressor for _ in range(n_estimators)], 'PCA')
for classifier in fn_return_value:
try:
classifier.fit.assert_called()
except AssertionError:
raise Exception('Test Failure: RandomForestRegressor.fit not called on all classifiers')
tests.py
tests.py 文件
import collections
from collections import OrderedDict
import copy
import pandas as pd
import numpy as np
from datetime import date, timedelta
pd.options.display.float_format = '{:.8f}'.format
def _generate_output_error_msg(fn_name, fn_inputs, fn_outputs, fn_expected_outputs):
formatted_inputs = []
formatted_outputs = []
formatted_expected_outputs = []
for input_name, input_value in fn_inputs.items():
formatted_outputs.append('INPUT {}:\n{}\n'.format(
input_name, str(input_value)))
for output_name, output_value in fn_outputs.items():
formatted_outputs.append('OUTPUT {}:\n{}\n'.format(
output_name, str(output_value)))
for expected_output_name, expected_output_value in fn_expected_outputs.items():
formatted_expected_outputs.append('EXPECTED OUTPUT FOR {}:\n{}\n'.format(
expected_output_name, str(expected_output_value)))
return 'Wrong value for {}.\n' \
'{}\n' \
'{}\n' \
'{}' \
.format(
fn_name,
'\n'.join(formatted_inputs),
'\n'.join(formatted_outputs),
'\n'.join(formatted_expected_outputs))
def _is_equal(x, y):
is_equal = False
if isinstance(x, pd.DataFrame) or isinstance(y, pd.Series):
is_equal = x.equals(y)
elif isinstance(x, np.ndarray):
is_equal = np.array_equal(x, y)
elif isinstance(x, list):
if len(x) == len(y):
for x_item, y_item in zip(x, y):
if not _is_equal(x_item, y_item):
break
else:
is_equal = True
else:
is_equal = x == y
return is_equal
def project_test(func):
def func_wrapper(*args):
result = func(*args)
print('Tests Passed')
return result
return func_wrapper
def generate_random_tickers(n_tickers=None):
min_ticker_len = 3
max_ticker_len = 5
tickers = []
if not n_tickers:
n_tickers = np.random.randint(8, 14)
ticker_symbol_random = np.random.randint(ord('A'), ord('Z')+1, (n_tickers, max_ticker_len))
ticker_symbol_lengths = np.random.randint(min_ticker_len, max_ticker_len, n_tickers)
for ticker_symbol_rand, ticker_symbol_length in zip(ticker_symbol_random, ticker_symbol_lengths):
ticker_symbol = ''.join([chr(c_id) for c_id in ticker_symbol_rand[:ticker_symbol_length]])
tickers.append(ticker_symbol)
return tickers
def generate_random_dates(n_days=None):
if not n_days:
n_days = np.random.randint(14, 20)
start_year = np.random.randint(1999, 2017)
start_month = np.random.randint(1, 12)
start_day = np.random.randint(1, 29)
start_date = date(start_year, start_month, start_day)
dates = []
for i in range(n_days):
dates.append(start_date + timedelta(days=i))
return dates
def assert_structure(received_obj, expected_obj, obj_name):
assert isinstance(received_obj, type(expected_obj)), \
'Wrong type for output {}. Got {}, expected {}'.format(obj_name, type(received_obj), type(expected_obj))
if hasattr(expected_obj, 'shape'):
assert received_obj.shape == expected_obj.shape, \
'Wrong shape for output {}. Got {}, expected {}'.format(obj_name, received_obj.shape, expected_obj.shape)
elif hasattr(expected_obj, '__len__'):
assert len(received_obj) == len(expected_obj), \
'Wrong len for output {}. Got {}, expected {}'.format(obj_name, len(received_obj), len(expected_obj))
if type(expected_obj) == pd.DataFrame:
assert set(received_obj.columns) == set(expected_obj.columns), \
'Incorrect columns for output {}\n' \
'COLUMNS: {}\n' \
'EXPECTED COLUMNS: {}'.format(obj_name, sorted(received_obj.columns), sorted(expected_obj.columns))
# This is to catch a case where __equal__ says it's equal between different types
assert set([type(i) for i in received_obj.columns]) == set([type(i) for i in expected_obj.columns]), \
'Incorrect types in columns for output {}\n' \
'COLUMNS: {}\n' \
'EXPECTED COLUMNS: {}'.format(obj_name, sorted(received_obj.columns), sorted(expected_obj.columns))
for column in expected_obj.columns:
assert received_obj[column].dtype == expected_obj[column].dtype, \
'Incorrect type for output {}, column {}\n' \
'Type: {}\n' \
'EXPECTED Type: {}'.format(obj_name, column, received_obj[column].dtype, expected_obj[column].dtype)
if type(expected_obj) in {pd.DataFrame, pd.Series}:
assert set(received_obj.index) == set(expected_obj.index), \
'Incorrect indices for output {}\n' \
'INDICES: {}\n' \
'EXPECTED INDICES: {}'.format(obj_name, sorted(received_obj.index), sorted(expected_obj.index))
# This is to catch a case where __equal__ says it's equal between different types
assert set([type(i) for i in received_obj.index]) == set([type(i) for i in expected_obj.index]), \
'Incorrect types in indices for output {}\n' \
'INDICES: {}\n' \
'EXPECTED INDICES: {}'.format(obj_name, sorted(received_obj.index), sorted(expected_obj.index))
def does_data_match(obj_a, obj_b):
if type(obj_a) == pd.DataFrame:
# Sort Columns
obj_b = obj_b.sort_index(1)
obj_a = obj_a.sort_index(1)
if type(obj_a) in {pd.DataFrame, pd.Series}:
# Sort Indices
obj_b = obj_b.sort_index()
obj_a = obj_a.sort_index()
try:
data_is_close = np.isclose(obj_b, obj_a, equal_nan=True)
except TypeError:
data_is_close = obj_b == obj_a
else:
if isinstance(obj_a, collections.Iterable):
data_is_close = data_is_close.all()
return data_is_close
def assert_output(fn, fn_inputs, fn_expected_outputs, check_parameter_changes=True):
assert type(fn_expected_outputs) == OrderedDict
if check_parameter_changes:
fn_inputs_passed_in = copy.deepcopy(fn_inputs)
else:
fn_inputs_passed_in = fn_inputs
fn_raw_out = fn(**fn_inputs_passed_in)
# Check if inputs have changed
if check_parameter_changes:
for input_name, input_value in fn_inputs.items():
passed_in_unchanged = _is_equal(input_value, fn_inputs_passed_in[input_name])
assert passed_in_unchanged, 'Input parameter "{}" has been modified inside the function. ' \
'The function shouldn\'t modify the function parameters.'.format(input_name)
fn_outputs = OrderedDict()
if len(fn_expected_outputs) == 1:
fn_outputs[list(fn_expected_outputs)[0]] = fn_raw_out
elif len(fn_expected_outputs) > 1:
assert type(fn_raw_out) == tuple,\
'Expecting function to return tuple, got type {}'.format(type(fn_raw_out))
assert len(fn_raw_out) == len(fn_expected_outputs),\
'Expected {} outputs in tuple, only found {} outputs'.format(len(fn_expected_outputs), len(fn_raw_out))
for key_i, output_key in enumerate(fn_expected_outputs.keys()):
fn_outputs[output_key] = fn_raw_out[key_i]
err_message = _generate_output_error_msg(
fn.__name__,
fn_inputs,
fn_outputs,
fn_expected_outputs)
for fn_out, (out_name, expected_out) in zip(fn_outputs.values(), fn_expected_outputs.items()):
assert_structure(fn_out, expected_out, out_name)
correct_data = does_data_match(expected_out, fn_out)
assert correct_data, err_message
为者常成,行者常至
自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)