Data Analysis/Python for DA

[Invest Class] 5. Relative Momentum 전략 함수 생성

Jiyeon's Desk 2025. 4. 1. 14:26

1. Relative Momentum 전략 

매달 모든 종목의 수익률을 계산하여 수익률 기준 상위 n개 종목을 고르고, 그 상위권에 들어온 종목은 신호 발생이라고 보고 매수 대상이 된다. 다음 달에도 그 종목이 상위권에 계속 포함되면 매수를 유지하고 상위권에서 탈락하면 매도한다.

 

*

os.listdir('./DATA')
files = glob('./DATA/*.csv')

첫쨰줄 : 폴더 안에 있는 모든 파일과 폴더 이름을 리스트 형태로 출력 ex) ['AAPL.csv', 'TSLA.csv', 'MSFT.csv']

둘째줄 : glob()특정 패턴에 맞는 파일 이름들을 리스트로 반환하는 함수 ex) ['./DATA/AAPL.csv', './DATA/TSLA.csv', './DATA/MSFT.csv']

for file in files:
    # file : 주식데이터의 경로와 파일명
    # 경로와 파일명을 나눠준다. 
    folder, name = os.path.split(file)
    # name : AAPL.csv --> AAPL.csv 나눈다. 
    head, tail = os.path.splitext(name)
    # head 변수는 create_1m_rtn() 함수에서 _ticker 사용

 

경로와 파일명 분리-> 파일명과 확장자 분리

 

* 각 월별로 종목들의 수익률 순위를 백분율(%)로 변환하는 작업

month_rtn_df = month_rtn_df.rank(
    axis=1, 
    ascending=False, 
    method='max', 
    pct=True
)

pct =  True 를 사용하여 이 순위를 전체 개수로 나눠서 0~1사이의 값으로 바꿈. 이것을 수행하는 이유는 모멘텀 상위 n개 종목을 고르기 위해서임

 

* 매수할 종목 선별하기 위한 과정

month_rtn_df = month_rtn_df.where(month_rtn_df < 0.40, 0)

where(조건, 조건을 만족하지 않을 경우의 값)

상위 n% 종목만 투자 대상으로 선택하고 나머지는 0 값을 넣어서 매수하지 않는다는 신호.

 

*trading 함수 

def trading(df, stock_codes):
    std_ym = ''
    buy_phase = False
    book =df.copy()
    
    for code in stock_codes:
        for idx in book.index:
            if (book.loc[idx, f"position_{code}"]=="") & (book.shift().loc[idx, f"position_{code}"]==f"ready_{code}"):
                std_ym = book.loc[idx, 'STD-YM']
                buy_phase =True
            if (book.loc[idx, f"position_{code}"]=="") & (book.loc[idx, 'STD-YM']==std_ym) & (buy_phase):
                book.loc[idx, f"position_{code}"] = f"buy_{code}"
            if book.loc[idx, f"position_{code}"] == '':
                std_ym = ''
                buy_phase = False
    return book

매수 신호(ready)가 생기면 매수 기록하고, 그 신호가 유효한 동안 매수 상태를 유지하는 로직

신호가 발생하면 해당 월에 매수를 시작하고, 같은 월에 계속 매수를 하기 위해 buy_종목을 유지시킨다. 신호가 끝났거나 다음 달로 넘어가면 매수 상태를 종료시킨다.

 

 

 

2. Relative Momentum 전략 함수 생성

1. 월별 수익률을 계산하기 위해 데이터를 정제하는 함수

def create_1m_rtn(_df, ticker, start='2010-01-01', col = 'Adj Close'):
    df = _df.copy()
    if 'Date' in df.columns:
        df.set_index('Date', inplace = True)
    df.index = pd.to_datetime(df.index)
    
    df = df.loc[start:,[col]]
    
    df['STD-YM'] = df.index.strftime('%Y-%m')
    df['1m_rtn'] = 0
    df['CODE'] = ticker
    
    ym_list = df['STD-YM'].unique()
    return df, ym_list

2. 데이터를 로드하고 월별 수익률을 계산하여 데이터프레임을 결합하는 함수 

def data_load(path = "./data", 
              end = 'csv', 
              start = '2010-01-01', 
              col = 'Adj Close'):
    
    files = glob(f"{_path}/*.{_end}")
   
    stock_df = pd.DataFrame()
    month_last_df = pd.DataFrame()

    for file in files:
        folder , name = os.path.split(file)
        head, tail = os.path.splitext(name)
        read_df = pd.read_csv(file)

        
        price_df, ym_list = create_1m_rtn(read_df, head, _start, _col)
        stock_df = pd.concat( [stock_df, price_df] )

        for ym in ym_list:
            flag = price_df['STD-YM'] == ym
            buy = price_df.loc[flag].iloc[0, 0]
            sell = price_df.loc[flag].iloc[-1, 0]
   
            m_rtn = sell / buy
            price_df.loc[flag, '1m_rtn'] = m_rtn
     
            data = price_df.loc[flag, ['CODE', '1m_rtn']].tail(1)
            month_last_df = pd.concat( [month_last_df, data] )
            
    return stock_df, month_last_df

3. 월별 수익률을 기준으로 랭크를 설정하는 함수

def create_position(df, pct = 0.15):
    result = df.reset_index()
 
    if pct >= 1:
        pct = pct / 100
 
    result = result.pivot_table(
        index = 'Date', 
        columns = 'CODE', 
        values = '1m_rtn'
    )
  
    result = result.rank(
        axis=1, 
        ascending = False, 
        method = 'max', 
        pct = True
    )
   
    result = result.where(result < pct, 0)
    result[result != 0] = 1

    # 거래 신호를 딕셔너리로 생성하기 위한 빈 딕셔너리 생성 
    sig_dict = dict()
    for date in result.index:
        ticker_list = list(result.loc[date, result.loc[date] == 1].index)
        sig_dict[date] = ticker_list
        # { 2010-01-28 : [AAPL, BND] }

    stock_code = list(result.columns)
    return sig_dict, stock_code

4. 거래 내역컬럼을 생성하는 함수

def create_trade_book(df, codes, signal):
    book = df.reset_index()
    book = book.pivot_table(
        index = 'Date', 
        columns = 'CODE', 
        values = book.columns[1]
    )
    book['STD-YM'] = book.index.strftime('%Y-%m')
    
    for code in _codes:
        book[f"p_{code}"] = ""
        book[f"r_{code}"] = ""
   
    for date, values in _signal.items():
        for value in values:
            book.loc[date, f"p_{value}"] = f"ready_{value}"
    return book

5. 거래 내역을 추가하는 함수

def trading(df, stock_codes):
    std_ym = ''
    buy_phase = False
    book =df.copy()
    
    for code in stock_codes:
        for idx in book.index:
            if (book.loc[idx, f"p_{code}"]=="") & (book.shift().loc[idx, f"p_{code}"]==f"ready_{code}"):
                std_ym = book.loc[idx, 'STD-YM']
                buy_phase =True
            if (book.loc[idx, f"p_{code}"]=="") & (book.loc[idx, 'STD-YM']==std_ym) & (buy_phase):
                book.loc[idx, f"p_{code}"] = f"buy_{code}"
            if book.loc[idx, f"p_{code}"] == '':
                std_ym = ''
                buy_phase = False
    return book

6. 수익률을 계산하는 함수

def multi_return(df, stock_codes):
    book = df.copy()
    rtn = 1
    buy_dict = dict()
    sell_dict = dict()
    
    for idx in book.index:
        for code in stock_codes:
            if (book.shift(2).loc[idx, f"p_{code}"]=='') &\
                (book.shift(1).loc[idx,f"p_{code}"]==f"ready_{code}") &\
                    (book.loc[idx, f"p_{code}"]==f"buy_{code}"):
                        buy_dict[code] = book.loc[idx, code]
                        print(f"매수일 : {idx}, 매수가 : {buy_dict[code]}, 종목코드 : {code}")
            elif (book.shift(1).loc[idx, f"p_{code}"]==f"buy_{code}")&  \
                (book.loc[idx, f"p_{code}"] == ""):
                    sell_dict[code] = book.loc[idx, code]
                    rtn = sell_dict[code]/ buy_dict[code]
                    book.loc[idx, f"r_{code}"] = rtn
                    print(f"매도일 : {idx}, 매도가 : {sell_dict[code]}, 종목코드 : {code}, 수익율 : {rtn}")
            if book.loc[idx, f"p_{code}"] =="":
                buy_dict[code] = 0
                sell_dict[code] = 0
    return book

7. 누적 수익률을 계산하는 함수

def multi_acc_return(df, stock_codes):
    book = df.copy()
    acc_rtn = 1
    for idx in book.index:
        count =0
        rtn = 0
        for code in stock_codes:
            if book.loc[idx, f"r_{code}"]:
                count +=1
                rtn += book.loc[idx, f"r_{code}"]
        if(rtn !=0)& (count != 0):
            acc_rtn *= (rtn/count)
            print(f"누적 - 매도일 : {idx}, 매도 종목수 : {count}, 수익율 : {round( rtn / count, 2 )}")
        book.loc[idx, 'acc_rtn'] = acc_rtn
    print(acc_rtn)
    return book, acc_rtn

 

 

 

 

흐름을 따라가기 너무 어려웠다..!!