Markets

analysis
finance
python
Author
Published

March 22, 2024

factors_r = ["SP500", "DTWEXAFEGS"] # "SP500" does not contain dividends; note: "DTWEXM" discontinued as of Jan 2020
factors_d = ["DGS10", "BAMLH0A0HYM2"]

Price momentum

One month reversal and 2-12 month momentum are two ends of the spectrum. The general trend indicates that positive acceleration leads to reversals or negative acceleration leads to rebounds. An unsustainable acceleration leading to reversal can reconcile the one-month reversal and 2-12 month momentum. The key is that it implies that acceleration is not sustainable.

# "Momentum, Acceleration, and Reversal"
def pnl(x):
    return np.nanprod(1 + x) - 1
order = 20
momentum_df = returns_df["returns"].shift(order).rolling(width - order, min_periods = 1).apply(pnl, raw = False).dropna()

Time-series score

Suppose we are looking at \(n\) independent and identically distributed random variables, \(X_{1},X_{2},\ldots,X_{n}\). Since they are iid, each random variable \(X_{i}\) has to have the same mean, which we will call \(\mu\), and variance, which we will call \(\sigma^{2}\):

\[ \begin{aligned} \mathrm{E}\left(X_{i}\right)&=\mu\\ \mathrm{Var}\left(X_{i}\right)&=\sigma^{2} \end{aligned} \]

Let’s suppose we want to look at the average value of our \(n\) random variables:

\[ \begin{aligned} \bar{X}=\frac{X_{1}+X_{2}+\cdots+X_{n}}{n}=\left(\frac{1}{n}\right)\left(X_{1}+X_{2}+\cdots+X_{n}\right) \end{aligned} \]

We want to find the expected value and variance of the average, \(\mathrm{E}\left(\bar{X}\right)\) and \(\mathrm{Var}\left(\bar{X}\right)\).

Expected value

\[ \begin{aligned} \mathrm{E}\left(\bar{X}\right)&=\mathrm{E}\left[\left(\frac{1}{n}\right)\left(X_{1}+X_{2}+\cdots+X_{n}\right)\right]\\ &=\left(\frac{1}{n}\right)\mathrm{E}\left(X_{1}+X_{2}+\cdots+X_{n}\right)\\ &=\left(\frac{1}{n}\right)\left(n\mu\right)\\ &=\mu \end{aligned} \]

Variance

\[ \begin{aligned} \mathrm{Var}\left(\bar{X}\right)&=\mathrm{Var}\left[\left(\frac{1}{n}\right)\left(X_{1}+X_{2}+\cdots+X_{n}\right)\right]\\ &=\left(\frac{1}{n}\right)^{2}\mathrm{Var}\left(X_{1}+X_{2}+\cdots+X_{n}\right)\\ &=\left(\frac{1}{n}\right)^{2}\left(n\sigma^{2}\right)\\ &=\frac{\sigma^{2}}{n} \end{aligned} \]

def sd(x):
    
    n_rows = sum(~np.isnan(x))
        
    if n_rows > 1:
        result = np.sqrt(np.nansum(x ** 2) / (n_rows - 1))
    else:
        result = np.nan
        
    return result
# volatility scale only
score_df = (momentum_df / momentum_df.rolling(width, min_periods = 1).apply(sd, raw = False)).dropna()
# overall_df = score_df.mean(axis = 1)
# overall_df = overall_df / overall_df.rolling(width, min_periods = 1).apply(risk, raw = False)
# score_df.insert(loc = 0, column = "Overall", value = overall_df)
# score_df = score_df.dropna()

Outlier detection

import statsmodels.api as sm

Interquartile range

Outliers are defined as the regression residuals that fall below \(Q_{1}−1.5\times IQR\) or above \(Q_{3}+1.5\times IQR\):

def outliers(z):
  
    n_cols = z.shape[1]
    result_ls = []

    for j in range(n_cols):
      
        y = z.iloc[:, j]

        if (n_cols == 0):
            x = sm.add_constant(range(len(y)))
        else:
            x = sm.add_constant(z.drop(z.columns[j], axis = 1))

        coef = sm.WLS(y, x).fit().params
        predict = coef.iloc[0] + np.dot(x.iloc[:, 1:], coef[1:])
        resid = y - predict

        lower = resid.quantile(0.25)
        upper = resid.quantile(0.75)
        iqr = upper - lower

        total = y[(resid < lower - 1.5 * iqr) | (resid > upper + 1.5 * iqr)]
        
        total = pd.DataFrame({"date": total.index, "symbol": total.name, "values": total})
        result_ls.append(total)

    result = pd.concat(result_ls, ignore_index = True)
    result = result.pivot_table(index = "date", columns = "symbol", values = "values")

    return result
outliers_df = outliers(score_df)

Contour ellipsoid

Granger causality

from scipy.stats import chi2

\[ \begin{aligned} \left(R\hat{\beta}-r\right)^\mathrm{T}\left(R\hat{V}R^\mathrm{T}\right)^{-1}\left(R\hat{\beta}-r\right)\xrightarrow\quad\chi_{Q}^{2} \end{aligned} \]

def granger_test(x, y, order):
    
    # compute lagged observations
    lag_x = x.shift(order)
    lag_y = y.shift(order)
    
    # collect series
    df = pd.DataFrame({"x": x, "y": y, "lag_x": lag_x, "lag_y": lag_y})
    x = sm.add_constant(df[["lag_y", "lag_x"]])
    
    # fit full model
    fit = sm.WLS(df["y"], x, missing = "drop").fit()
    
    R = np.array([0, 0, 1])
    coef = fit.params
    r = 0 # technically a matrix (see Stack Exchange)
    
    matmul = np.dot(R, coef) - r
    matmul_mid = np.linalg.inv(np.atleast_2d(np.dot(R, np.dot(fit.cov_params(), R.T))))
    wald = np.dot(matmul.T, np.dot(matmul_mid, matmul))
    
    result = 1 - chi2.cdf(wald, 1)
    
    return np.ravel(result)
def roll_lead_lag(x, y, width, order, p_value):
  
    n_rows = len(x)
    x_name = x.name
    y_name = y.name
    x_y_ls = []
    y_x_ls = []

    for i in range(width - 1, n_rows):
      
        idx = range(max(i - width + 1, 0), i + 1)

        x_y = granger_test(x.iloc[idx], y.iloc[idx], order)
        y_x = granger_test(y.iloc[idx], x.iloc[idx], order)

        x_y_status = (x_y < p_value) and (y_x > p_value)
        y_x_status = (x_y > p_value) and (y_x < p_value)
        
        x_y_ls.append(x_y_status)
        y_x_ls.append(y_x_status)
    
    result = pd.DataFrame({x_name: x_y_ls, y_name: y_x_ls}, index = x.index[(width - 1):])

    return result
p_value = 0.05
score_x_df = score_df.loc[:, "SP500"]
score_y_df = score_df.loc[:, "DGS10"]
lead_lag_df = roll_lead_lag(score_x_df, score_y_df, width, order, p_value)