AI For Trading:Project 7: helper function (117)

Project 7 用到的辅助文件。

requirements.txt

requirements.txt 文件

alphalens==0.3.2
graphviz==0.10.1
numpy==1.13.3
pandas==0.18.1
python-dateutil==2.6.1
pytz==2017.3
scipy==1.0.0
scikit-learn==0.19.1
six==1.11.0
tables==3.3.0
tqdm==4.19.5
zipline===1.2.0

project_helper.py

project_helper.py 文件

import alphalens as al
import graphviz
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from IPython.display import Image
from sklearn.tree import export_graphviz
from zipline.assets._assets import Equity  # Required for USEquityPricing
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.classifiers import Classifier
from zipline.pipeline.engine import SimplePipelineEngine
from zipline.pipeline.loaders import USEquityPricingLoader
from zipline.utils.numpy_utils import int64_dtype

EOD_BUNDLE_NAME = 'eod-quotemedia'

class PricingLoader(object):
    def __init__(self, bundle_data):
        self.loader = USEquityPricingLoader(
            bundle_data.equity_daily_bar_reader,
            bundle_data.adjustment_reader)

    def get_loader(self, column):
        if column not in USEquityPricing.columns:
            raise Exception('Column not in USEquityPricing')
        return self.loader

class Sector(Classifier):
    dtype = int64_dtype
    window_length = 0
    inputs = ()
    missing_value = -1

    def __init__(self):
        self.data = np.load('../../data/project_7_sector/data.npy')

    def _compute(self, arrays, dates, assets, mask):
        return np.where(
            mask,
            self.data[assets],
            self.missing_value,
        )

def build_pipeline_engine(bundle_data, trading_calendar):
    pricing_loader = PricingLoader(bundle_data)

    engine = SimplePipelineEngine(
        get_loader=pricing_loader.get_loader,
        calendar=trading_calendar.all_sessions,
        asset_finder=bundle_data.asset_finder)

    return engine

def plot_tree_classifier(clf, feature_names=None):
    dot_data = export_graphviz(
        clf,
        out_file=None,
        feature_names=feature_names,
        filled=True,
        rounded=True,
        special_characters=True,
        rotate=True)

    return Image(graphviz.Source(dot_data).pipe(format='png'))

def plot(xs, ys, labels, title='', x_label='', y_label=''):
    for x, y, label in zip(xs, ys, labels):
        plt.ylim((0.5, 0.55))
        plt.plot(x, y, label=label)

    plt.title(title)
    plt.xlabel(x_label)
    plt.ylabel(y_label)

    plt.legend(bbox_to_anchor=(1.04, 1), borderaxespad=0)
    plt.show()

def rank_features_by_importance(importances, feature_names):
    indices = np.argsort(importances)[::-1]
    max_feature_name_length = max([len(feature) for feature in feature_names])

    print('      Feature{space: <{padding}}      Importance'.format(padding=max_feature_name_length - 8, space=' '))

    for x_train_i in range(len(importances)):
        print('{number:>2}. {feature: <{padding}} ({importance})'.format(
            number=x_train_i + 1,
            padding=max_feature_name_length,
            feature=feature_names[indices[x_train_i]],
            importance=importances[indices[x_train_i]]))

def sharpe_ratio(factor_returns, annualization_factor=np.sqrt(252)):
    return annualization_factor * factor_returns.mean() / factor_returns.std()

def get_factor_returns(factor_data):
    ls_factor_returns = pd.DataFrame()

    for factor, factor_data in factor_data.items():
        ls_factor_returns[factor] = al.performance.factor_returns(factor_data).iloc[:, 0]

    return ls_factor_returns

def plot_factor_returns(factor_returns):
    (1 + factor_returns).cumprod().plot(ylim=(0.8, 1.2))

def plot_factor_rank_autocorrelation(factor_data):
    ls_FRA = pd.DataFrame()

    unixt_factor_data = {
        factor: factor_data.set_index(pd.MultiIndex.from_tuples(
            [(x.timestamp(), y) for x, y in factor_data.index.values],
            names=['date', 'asset']))
        for factor, factor_data in factor_data.items()}

    for factor, factor_data in unixt_factor_data.items():
        ls_FRA[factor] = al.performance.factor_rank_autocorrelation(factor_data)

    ls_FRA.plot(title="Factor Rank Autocorrelation", ylim=(0.8, 1.0))

def build_factor_data(factor_data, pricing):
    return {factor_name: al.utils.get_clean_factor_and_forward_returns(factor=data, prices=pricing, periods=[1])
        for factor_name, data in factor_data.iteritems()}

project_tests.py

project_tests.py 文件

from collections import OrderedDict
import numpy as np
import pandas as pd

from sklearn.ensemble import BaggingClassifier, RandomForestRegressor
from sklearn.tree import DecisionTreeClassifier
from unittest.mock import patch
from zipline.data import bundles

from tests import assert_output, project_test, generate_random_dates, assert_structure

def get_assets(ticker_count):
    bundle = bundles.load('eod-quotemedia')
    return bundle.asset_finder.retrieve_all(bundle.asset_finder.sids[:ticker_count])

@project_test
def test_train_valid_test_split(fn):
    columns = ['test column 1', 'test column 2', 'test column 3']
    dates = generate_random_dates(10)
    assets = get_assets(3)
    index = pd.MultiIndex.from_product([dates, assets])
    values = np.arange(len(index) * len(columns)).reshape([len(columns), len(index)]).T
    targets = np.arange(len(index))

    fn_inputs = {
        'all_x': pd.DataFrame(values, index, columns),
        'all_y': pd.Series(targets, index, name='target'),
        'train_size': 0.6,
        'valid_size': 0.2,
        'test_size': 0.2}
    fn_correct_outputs = OrderedDict([
        ('X_train', pd.DataFrame(values[:18], index[:18], columns=columns)),
        ('X_valid', pd.DataFrame(values[18:24], index[18:24], columns=columns)),
        ('X_test', pd.DataFrame(values[24:], index[24:], columns=columns)),
        ('y_train', pd.Series(targets[:18], index[:18])),
        ('y_valid', pd.Series(targets[18:24], index[18:24])),
        ('y_test', pd.Series(targets[24:], index[24:]))])

    assert_output(fn, fn_inputs, fn_correct_outputs, check_parameter_changes=False)

@project_test
def test_non_overlapping_samples(fn):
    columns = ['test column 1', 'test column 2']
    dates = generate_random_dates(8)
    assets = get_assets(3)
    index = pd.MultiIndex.from_product([dates, assets])
    values = np.arange(len(index) * len(columns)).reshape([len(columns), len(index)]).T
    targets = np.arange(len(index))

    fn_inputs = {
        'x': pd.DataFrame(values, index, columns),
        'y': pd.Series(targets, index),
        'n_skip_samples': 2,
        'start_i': 1}

    new_index = pd.MultiIndex.from_product([dates[fn_inputs['start_i']::fn_inputs['n_skip_samples'] + 1], assets])
    fn_correct_outputs = OrderedDict([
        (
            'non_overlapping_x',
            pd.DataFrame(
                [
                    [3, 27],
                    [4, 28],
                    [5, 29],
                    [12, 36],
                    [13, 37],
                    [14, 38],
                    [21, 45],
                    [22, 46],
                    [23, 47]],
                new_index, columns)),
        (
            'non_overlapping_y',
            pd.Series([3, 4, 5, 12, 13, 14, 21, 22, 23], new_index))])

    assert_output(fn, fn_inputs, fn_correct_outputs, check_parameter_changes=False)

@project_test
def test_bagging_classifier(fn):
    n_estimators = 200
    parameters = {
            'criterion': 'entropy',
            'min_samples_leaf': 2500,
            'oob_score': True,
            'n_jobs': -1,
            'random_state': 0}

    fn_inputs = {
        'n_estimators': n_estimators,
        'max_samples': 0.2,
        'max_features': 1.0,
        'parameters': parameters}

    return_value = fn(**fn_inputs)

    assert isinstance(return_value, BaggingClassifier),\
        'Returned object is wrong. It should be a BaggingClassifier.'
    assert return_value.max_samples == fn_inputs['max_samples'],\
        'BaggingClassifier\'s max_samples is the wrong value.'
    assert return_value.max_features == fn_inputs['max_features'],\
        'BaggingClassifier\'s max_features is the wrong value.'
    assert return_value.oob_score == parameters['oob_score'],\
        'BaggingClassifier\'s oob_score is the wrong value.'
    assert return_value.n_jobs == parameters['n_jobs'],\
        'BaggingClassifier\'s n_jobs is the wrong value.'
    assert return_value.random_state == parameters['random_state'],\
        'BaggingClassifier\'s random_state is the wrong value.'

    assert isinstance(return_value.base_estimator, DecisionTreeClassifier),\
        'BaggingClassifier\'s base estimator is the wrong value type. It should be a DecisionTreeClassifier.'
    assert return_value.base_estimator.criterion == parameters['criterion'],\
        'The base estimator\'s criterion is the wrong value.'
    assert return_value.base_estimator.min_samples_leaf == parameters['min_samples_leaf'],\
        'The base estimator\'s min_samples_leaf is the wrong value.'

@project_test
def test_calculate_oob_score(fn):
    n_estimators = 3
    n_features = 2
    n_samples = 1000

    noise = np.random.RandomState(0).random_sample([3, n_samples]) * n_samples
    x = np.arange(n_estimators * n_samples * n_features).reshape([n_estimators, n_samples, n_features])
    y = np.sum(x, axis=-1) + noise
    estimators = [
        RandomForestRegressor(300, oob_score=True, n_jobs=-1, random_state=101).fit(x[estimator_i], y[estimator_i])
        for estimator_i in range(n_estimators)]

    fn_inputs = {
        'classifiers': estimators}
    fn_correct_outputs = OrderedDict([('oob_score', 0.911755651666)])

    assert_output(fn, fn_inputs, fn_correct_outputs, check_parameter_changes=False)

@project_test
def test_non_overlapping_estimators(fn):
    n_estimators = 3
    columns = ['test column 1', 'test column 2']
    dates = generate_random_dates(8)
    assets = get_assets(3)
    index = pd.MultiIndex.from_product([dates, assets])
    noise = np.random.RandomState(0).random_sample([len(index)]) * len(index)
    values = np.arange(len(index) * len(columns)).reshape([len(columns), len(index)]).T
    targets = np.sum(values, axis=-1) + noise

    classifiers = [
        RandomForestRegressor(300, oob_score=True, n_jobs=-1, random_state=101)
        for _ in range(n_estimators)]

    fn_inputs = {
        'x': pd.DataFrame(values, index, columns),
        'y': pd.Series(targets, index),
        'classifiers': classifiers,
        'n_skip_samples': 3}

    random_forest_regressor_fit = RandomForestRegressor.fit
    with patch.object(RandomForestRegressor, 'fit', autospec=True) as mock_fit:
        mock_fit.side_effect = random_forest_regressor_fit
        fn_return_value = fn(**fn_inputs)

        assert_structure(fn_return_value, [RandomForestRegressor for _ in range(n_estimators)], 'PCA')

        for classifier in fn_return_value:
            try:
                classifier.fit.assert_called()
            except AssertionError:
                raise Exception('Test Failure: RandomForestRegressor.fit not called on all classifiers')

tests.py

tests.py 文件

import collections
from collections import OrderedDict
import copy
import pandas as pd
import numpy as np
from datetime import date, timedelta

pd.options.display.float_format = '{:.8f}'.format

def _generate_output_error_msg(fn_name, fn_inputs, fn_outputs, fn_expected_outputs):
    formatted_inputs = []
    formatted_outputs = []
    formatted_expected_outputs = []

    for input_name, input_value in fn_inputs.items():
        formatted_outputs.append('INPUT {}:\n{}\n'.format(
            input_name, str(input_value)))
    for output_name, output_value in fn_outputs.items():
        formatted_outputs.append('OUTPUT {}:\n{}\n'.format(
            output_name, str(output_value)))
    for expected_output_name, expected_output_value in fn_expected_outputs.items():
        formatted_expected_outputs.append('EXPECTED OUTPUT FOR {}:\n{}\n'.format(
            expected_output_name, str(expected_output_value)))

    return 'Wrong value for {}.\n' \
           '{}\n' \
           '{}\n' \
           '{}' \
        .format(
            fn_name,
            '\n'.join(formatted_inputs),
            '\n'.join(formatted_outputs),
            '\n'.join(formatted_expected_outputs))

def _is_equal(x, y):
    is_equal = False

    if isinstance(x, pd.DataFrame) or isinstance(y, pd.Series):
        is_equal = x.equals(y)
    elif isinstance(x, np.ndarray):
        is_equal = np.array_equal(x, y)
    elif isinstance(x, list):
        if len(x) == len(y):
            for x_item, y_item in zip(x, y):
                if not _is_equal(x_item, y_item):
                    break
            else:
                is_equal = True
    else:
        is_equal = x == y

    return is_equal

def project_test(func):
    def func_wrapper(*args):
        result = func(*args)
        print('Tests Passed')
        return result

    return func_wrapper

def generate_random_tickers(n_tickers=None):
    min_ticker_len = 3
    max_ticker_len = 5
    tickers = []

    if not n_tickers:
        n_tickers = np.random.randint(8, 14)

    ticker_symbol_random = np.random.randint(ord('A'), ord('Z')+1, (n_tickers, max_ticker_len))
    ticker_symbol_lengths = np.random.randint(min_ticker_len, max_ticker_len, n_tickers)
    for ticker_symbol_rand, ticker_symbol_length in zip(ticker_symbol_random, ticker_symbol_lengths):
        ticker_symbol = ''.join([chr(c_id) for c_id in ticker_symbol_rand[:ticker_symbol_length]])
        tickers.append(ticker_symbol)

    return tickers

def generate_random_dates(n_days=None):
    if not n_days:
        n_days = np.random.randint(14, 20)

    start_year = np.random.randint(1999, 2017)
    start_month = np.random.randint(1, 12)
    start_day = np.random.randint(1, 29)
    start_date = date(start_year, start_month, start_day)

    dates = []
    for i in range(n_days):
        dates.append(start_date + timedelta(days=i))

    return dates

def assert_structure(received_obj, expected_obj, obj_name):
    assert isinstance(received_obj, type(expected_obj)), \
        'Wrong type for output {}. Got {}, expected {}'.format(obj_name, type(received_obj), type(expected_obj))

    if hasattr(expected_obj, 'shape'):
        assert received_obj.shape == expected_obj.shape, \
            'Wrong shape for output {}. Got {}, expected {}'.format(obj_name, received_obj.shape, expected_obj.shape)
    elif hasattr(expected_obj, '__len__'):
        assert len(received_obj) == len(expected_obj), \
            'Wrong len for output {}. Got {}, expected {}'.format(obj_name, len(received_obj), len(expected_obj))

    if type(expected_obj) == pd.DataFrame:
        assert set(received_obj.columns) == set(expected_obj.columns), \
            'Incorrect columns for output {}\n' \
            'COLUMNS:          {}\n' \
            'EXPECTED COLUMNS: {}'.format(obj_name, sorted(received_obj.columns), sorted(expected_obj.columns))

        # This is to catch a case where __equal__ says it's equal between different types
        assert set([type(i) for i in received_obj.columns]) == set([type(i) for i in expected_obj.columns]), \
            'Incorrect types in columns for output {}\n' \
            'COLUMNS:          {}\n' \
            'EXPECTED COLUMNS: {}'.format(obj_name, sorted(received_obj.columns), sorted(expected_obj.columns))

        for column in expected_obj.columns:
            assert received_obj[column].dtype == expected_obj[column].dtype, \
                'Incorrect type for output {}, column {}\n' \
                'Type:          {}\n' \
                'EXPECTED Type: {}'.format(obj_name, column, received_obj[column].dtype, expected_obj[column].dtype)

    if type(expected_obj) in {pd.DataFrame, pd.Series}:
        assert set(received_obj.index) == set(expected_obj.index), \
            'Incorrect indices for output {}\n' \
            'INDICES:          {}\n' \
            'EXPECTED INDICES: {}'.format(obj_name, sorted(received_obj.index), sorted(expected_obj.index))

        # This is to catch a case where __equal__ says it's equal between different types
        assert set([type(i) for i in received_obj.index]) == set([type(i) for i in expected_obj.index]), \
            'Incorrect types in indices for output {}\n' \
            'INDICES:          {}\n' \
            'EXPECTED INDICES: {}'.format(obj_name, sorted(received_obj.index), sorted(expected_obj.index))

def does_data_match(obj_a, obj_b):
    if type(obj_a) == pd.DataFrame:
        # Sort Columns
        obj_b = obj_b.sort_index(1)
        obj_a = obj_a.sort_index(1)

    if type(obj_a) in {pd.DataFrame, pd.Series}:
        # Sort Indices
        obj_b = obj_b.sort_index()
        obj_a = obj_a.sort_index()
    try:
        data_is_close = np.isclose(obj_b, obj_a, equal_nan=True)
    except TypeError:
        data_is_close = obj_b == obj_a
    else:
        if isinstance(obj_a, collections.Iterable):
            data_is_close = data_is_close.all()

    return data_is_close

def assert_output(fn, fn_inputs, fn_expected_outputs, check_parameter_changes=True):
    assert type(fn_expected_outputs) == OrderedDict

    if check_parameter_changes:
        fn_inputs_passed_in = copy.deepcopy(fn_inputs)
    else:
        fn_inputs_passed_in = fn_inputs

    fn_raw_out = fn(**fn_inputs_passed_in)

    # Check if inputs have changed
    if check_parameter_changes:
        for input_name, input_value in fn_inputs.items():
            passed_in_unchanged = _is_equal(input_value, fn_inputs_passed_in[input_name])

            assert passed_in_unchanged, 'Input parameter "{}" has been modified inside the function. ' \
                                        'The function shouldn\'t modify the function parameters.'.format(input_name)

    fn_outputs = OrderedDict()
    if len(fn_expected_outputs) == 1:
        fn_outputs[list(fn_expected_outputs)[0]] = fn_raw_out
    elif len(fn_expected_outputs) > 1:
        assert type(fn_raw_out) == tuple,\
            'Expecting function to return tuple, got type {}'.format(type(fn_raw_out))
        assert len(fn_raw_out) == len(fn_expected_outputs),\
            'Expected {} outputs in tuple, only found {} outputs'.format(len(fn_expected_outputs), len(fn_raw_out))
        for key_i, output_key in enumerate(fn_expected_outputs.keys()):
            fn_outputs[output_key] = fn_raw_out[key_i]

    err_message = _generate_output_error_msg(
        fn.__name__,
        fn_inputs,
        fn_outputs,
        fn_expected_outputs)

    for fn_out, (out_name, expected_out) in zip(fn_outputs.values(), fn_expected_outputs.items()):
        assert_structure(fn_out, expected_out, out_name)
        correct_data = does_data_match(expected_out, fn_out)

        assert correct_data, err_message

为者常成,行者常至