함수형 언어의 가장 중요한 기능중 하나는 고차원함수라는 것이다(higher-order function) 이는 함수가 파라미터로 넘겨 질 수 있고 결과로 함수를 리턴 할 수 있다는 뜻인데 파이썬은 이런 기능을 다양한 방법으로 지원을 합니다.


1. Higher-order function의 3가지 성질


  • Functions that accept a function as one of its arguments
  • Functions that return a function
  • Functions that accept a function and return function 


위 조건을 만족 해야만 고차원 함수 또는 함수가 1급객체라고 표현합니다.


그럼 첫번째 Functions that accept a function as one of its arguments 를 Python에서 언제 사용하는지 알아보겠습니다.


보통 이런 함수는 우리가 자주 보는 max(), min(), sorted() 함수가 higher-order function 기능을 key=라는 파라미터로 제공하는 대표적인 higher-order function입니다. map(), filter()는 첫번째 파라미터로 받기도 합니다.


max() 함수의 key 파라미터를 활용하는 예를 한번 만들어 보겠습니다.

some_tuples = (
    (1, 2),
    (3, 5),
    (1, 1),
    (10, 9)
)

def get_first(d):
    return d[0]

print(max(some_tuples, key=get_first))

위 코드에서 처럼 첫번째 값을 기준으로 가장 큰 튜플을 찾고자 할때 get_first라는 함수를 임의로 만들어서 파라미터를 입력받아 0번째 값을 리턴하도록 만들고 이 함수를 key에 넘겨서 결과에 반영하도록 하였습니다.


프로그래밍을 하다보면 이러한 higher-order function을 이용하여 함수를 작성하는 일이 자주 있는데 예전엔 이를 higher-order function이라고 하는줄 몰랐네요.


그럼 이번엔 이걸 이용해서 실무에서 활용하는 예제를 만들어 보겠습니다.


2. 실전 연습


Customer과 Customers라는 클래스 두개를 만들고 Customers 객체가 Enabled된 Customer을 불러오는 간단한 로직을 예로 들어 보겠습니다. 문제를 점점 해결 하는 과정을 시나리오로 작성 하였기때문에 처음부터 천천히 보시길 권장하며 또한 코드를 직접 작성하셔야지 자기 것이 됩니다.!!


먼저 Customer.py를 만듭니다.  Customer Class는 이름(name), 주소(address), 계약여부(is_contracted), 활성화여부(is_enabled)라는 프로퍼티를 가지도록 만들었습니다.

class Customer:

    def __init__(self, name, address, is_contracted, is_enabled):
        self._name = name
        self._address = address
        self._is_contract = is_contracted
        self._is_enable = is_enabled

    @property
    def name(self):
        return self._name

    @property
    def address(self):
        return self._address

    @property
    def is_contract(self):
        return self._is_contract

    @property
    def is_enabled(self):
        return self._is_enable

Customer 객체의 프로퍼티를 잘 불러 오는지 테스트 코드를 작성 해 보도록 하겠습니다.

from unittest import TestCase
from ch_02.customer import Customer

class TestCustomers(TestCase):

    def test_making_customer(self):
        name = '최명규'
        address = '대치동'
        is_contracted = True
        is_enabled = False

        customer = Customer(name, address, is_contracted, is_enabled)

        self.assertEqual(customer.name, name)
        self.assertEqual(customer.address, address)
        self.assertEqual(customer.is_contract, is_contracted)
        self.assertEqual(customer.is_enabled, is_enabled)

Python Property란?

객체안의 변수를 캡슐화 하고 싶을 때 사용합니다. 흔히 아는 자바에서 private 변수가 있고 이를 접근 하는 메서드를 따로 두는 것이라고 생각 하시면됩니다. @property를 쓰면 .name 이렇게 내부 변수를 건들이지 않고 접글 할 수 있습니다.


여기 까지 작성 하시고 테스트 코드를 실행 해보면 Success 되시는 걸 볼 수 있을 겁니다.

$ python -m unittest ch_02.tests.test_customer

-------------------------------------------
Ran 1 test in 0.000s

OK


그럼 이제 부턴 Customers 라는 컬렉션역할을 할 클래스를 만들어서 Customer를 필터링 해서 가져 오는 함수를 추가 해 보겠습니다.


지금 부턴 TDD로 진행 해 보겠습니다.


우선 Customers 라는 클래스가 Customer List를 입력 받아서 생성되도록 가정 하고 코드를 작성 하였습니다. 


아래는 test_customers.py 예제 코드

from unittest import TestCase

from ch_02.customer import Customer
from ch_02.customers import Customers

class TestCustomers(TestCase):

    def setUp(self):
        enable = True
        disable = False

        enabled_customers = [
            Customer('최명규', '대치동', True, enable),
            Customer('김홍민', '대치동', True, enable),
            Customer('천준희', '신림동', True, enable),
        ]

        disabled_customers = [
            Customer('이명규', '대치동', True, disable),
            Customer('최홍민', '대치동', True, disable),
            Customer('김준희', '신림동', True, disable),
            Customer('황성현', '신림동', True, disable),
        ]
        self.customer_list = enabled_customers + disabled_customers

        self.customers = Customers(self.customer_list)

Customers Class의 모습은 어떨까요? 테스트를 기반으로 간단하게 작성 해 보겠습니다.

class Customers:

    def __init__(self, customers):
        self.customers = customers

단순히 customers라는 변수로 리스트를 입력 받아서 대입합니다. 그럼 정말 잘 동작하는지 확인 하기 위해서 customers에 원소 갯수가 우리가 넣은 것과 일치 하는지 테스트를 해 봅니다.


length를 비교 하기 위해서 테스트 코드 추가

    def test_get_length(self):
        result_length = self.customers.get_length()
        expected_length = len(self.customer_list)
        self.assertEqual(result_length, expected_length)

Customers Class엔 get_length() 함수가 없으므로 추가 해 줍니다. 추가 하기전엔 테스트가 실패 할 겁니다.

class Customers:

    def __init__(self, customers):
        self.customers = customers

    def get_length(self):
        return len(self.customers)

이렇게 코드를 작성 후 test_customers.py를 실행 시키면 테스트가 성공 합니다.


그러나 python 보통 get_length()이런 함수보단 len()함수가 더 익숙 하므로 한번 리펙토링을 해 줍니다.

class Customers:

    def __init__(self, customers):
        self.customers = customers

    def __len__(self):
        return len(self.customers)

테스트 코드도 len() 함수를 호출 하여 length를 구하도록 수정 합니다.

def test_get_length(self):
    result_length = len(self.customers)
    expected_length = len(self.customer_list)
    self.assertEqual(result_length, expected_length)

리펙토링 후 꼭 테스트가 성공하는지 확인 해주세요.


이번엔 본격적으로 Higher-order function 활용을 위한 함수를 작성을 할 차례입니다. 우선 Higher-order function을 쓰지 않는다면 어떤 코드가 작성되는지 부터 보겠습니다.


enabled된 Customer의 이름 리스트를 가져와서 테스트

def test_get_enabled_customer_names(self):
        result_names = self.customers.get_enabled_customer_names()
        expected_names = list(
            map(lambda x: x.name,
                filter(lambda x: x.is_enabled, self.customer_list))
        )
        self.assertCountEqual(result_names, expected_names)

테스트 코드를 작성 후 테스를 실행 하면 당연히 get_enabled_customer_names가 없으니실패 하겠죠?

    def get_enabled_customer_names(self):
        ret_list = []
        for customer in self.customers:
            if customer.is_enabled:
                ret_list.append(customer.name)
        return ret_list

get_enabled_customer_names() 함수를 만들라고 하면 명령형 프로그래밍에 익숙한 대부분의 개발자들은 위와 같이 코드를 작성 할 것이고 문제도 없어 보입니다. 그럼 이번엔 이와 비슷한 일을 하는 활성화된 고객에 대한 주소리스트를 가져오는걸 만들어 보겠습니다.


주소를 가져오는 테스트 코드 작성

    def test_get_enabled_customer_addresses(self):
        result_names = self.customers.get_enabled_customer_addresses()
        expected_names = list(
            map(lambda x: x.address,
                filter(lambda x: x.is_enabled, self.customer_list))
        )
        self.assertCountEqual(result_names, expected_names)

get_enabled_customer_addresses() 함수 작성

    def get_enabled_customer_addresses(self):
        ret_list = []
        for customer in self.customers:
            if customer.is_enabled:
                ret_list.append(customer.address)
        return ret_list

코드를 추가 한 후 테스트 코드 실행 해서 Success 되면 잘 따라 오신 겁니다.

그러나 슬슬 로직상 중복된 코드가 등장하는 문제가 보이기 시작 합니다. 우리들이 흔히 카피엔 페이스트를 하면 자주 볼 수 있는 장면입니다.


이름, 주소 말고 다른 정보가 더 추가 된다면 똑같을 일을 하는 함수를 계속 작성 해야 될까요? 이 문제를 어떻게 함수형으로 해결 할 수 있는지 이제부터 소개 하겠습니다.


함수를 인자로 전달 하여 해결 해보기


Customers.py에 get_enabled_customer_field() 함수 추가로 추상화 하기

    def get_enabled_customer_field(self, field):
        ret_list = []
        for customer in self.customers:
            if customer.is_enabled:
                if field == 'name':
                    ret_list.append(customer.name)
                elif field == 'address':
                    ret_list.append(customer.address)
        return ret_list

    def get_enabled_customer_names(self):
        return self.get_enabled_customer_field('name')

    def get_enabled_customer_addresses(self):
        return self.get_enabled_customer_field('address')

또 명령형 코딩이 익숙한 개발자들은 위 처럼 코드를 작성 할 것입니다. 위 코드는 언뜻 보기에 추상화 된 것처럼 보이지만 if else 문으로 도배가 될 수 밖에 없는 코드 입니다.


if else를 사용하지 않고 필드를 반환하도록 하는 함수 추가 하여 추상화 한번더 하기


ConversionFunction이라는 인터페이스를 둔다고 가정 하고 안에  call 함수를 만들어 줍니다. 그리고 이 인터페이스를 상속받아서 해당 이름을 리턴하거나 주소를 리턴하는 클래스를 만듭니다. 코드는 아래와 같습니다.

class ConversionFunction:

    def call(self, customer):
        raise NotImplementedError

class CustomerName(ConversionFunction):
    
    def call(self, customer):
        return customer.name

class CustomerAddress(ConversionFunction):
    
    def call(self, customer):
        return customer.address

이젠 이 클래스들을 활용하여 if else 문을 제거 해 보도록 하겠습니다.

class Customers:

    def __init__(self, customers):
        self.customers = customers

    def __len__(self):
        return len(self.customers)

    def get_enabled_customer_names(self):
        return self.get_enabled_customer_field(CustomerName())

    def get_enabled_customer_addresses(self):
        return self.get_enabled_customer_field(CustomerAddress())

    def get_enabled_customer_field(self, func):
        ret_list = []
        for customer in self.customers:
            if customer.is_enabled:
                ret_list.append(func.call(customer))
        return ret_list

테스트 코드를 실행해도 Success 되는 것을 볼 수 있습니다. 이렇게 함수를 넘겨서 추상화를 함으로 코드를 아주 깔끔하게 만들 수 있다는 것을 알게 되었습니다. 그럼 좀더 파이썬 내장함수를 사용하여 리펙토링 할 수 없을까요? 아래는 get_enabled_customer_field함수를 파이썬 내장 함수를 사용하여 더 단순하게 만드는 예입니다.

    def get_enabled_customer_field(self, func):
        enabled_customer = filter(lambda x: x.is_enabled, self.customers)
        return list(map(func.call, enabled_customer))

정말 코드량이 줄어 드는 것을 볼 수 있죠? 명령형에 익숙한 우리들이 조금만 고민하고 추상화 하면 아주 멋진 코드로 바뀔 수 있다는 것을 보여주는 한 예 인것 같습니다.


지금부턴 최종 결과물 코드를 그대로 보여 드리겠습니다.

customer.py

class Customer:

    def __init__(self, name, address, is_contracted, is_enabled):
        self._name = name
        self._address = address
        self._is_contract = is_contracted
        self._is_enable = is_enabled

    @property
    def name(self):
        return self._name

    @property
    def address(self):
        return self._address

    @property
    def is_contract(self):
        return self._is_contract

    @property
    def is_enabled(self):
        return self._is_enable

customers.py

class ConversionFunction:

    def call(self, customer):
        raise NotImplementedError

class CustomerName(ConversionFunction):

    def call(self, customer):
        return customer.name

class CustomerAddress(ConversionFunction):

    def call(self, customer):
        return customer.address

class Customers:

    def __init__(self, customers):
        self.customers = customers

    def __len__(self):
        return len(self.customers)

    def get_enabled_customer_names(self):
        return self.get_enabled_customer_field(CustomerName())

    def get_enabled_customer_addresses(self):
        return self.get_enabled_customer_field(CustomerAddress())

    def get_enabled_customer_field(self, func):
        enabled_customer = filter(lambda x: x.is_enabled, self.customers)
        return list(map(func.call, enabled_customer))


test_customer.py

from unittest import TestCase
from ch_02.customer import Customer

class TestCustomers(TestCase):

    def test_making_customer(self):
        name = '최명규'
        address = '대치동'
        is_contracted = True
        is_enabled = False

        customer = Customer(name, address, is_contracted, is_enabled)

        self.assertEqual(customer.name, name)
        self.assertEqual(customer.address, address)
        self.assertEqual(customer.is_contract, is_contracted)
        self.assertEqual(customer.is_enabled, is_enabled)

test_customers.py

from unittest import TestCase

from ch_02.customer import Customer
from ch_02.customers import Customers

class TestCustomers(TestCase):

    def setUp(self):
        enable = True
        disable = False

        enabled_customers = [
            Customer('최명규', '대치동', True, enable),
            Customer('김홍민', '대치동', True, enable),
            Customer('천준희', '신림동', True, enable),
        ]

        disabled_customers = [
            Customer('이명규', '대치동', True, disable),
            Customer('최홍민', '대치동', True, disable),
            Customer('김준희', '신림동', True, disable),
            Customer('황성현', '신림동', True, disable),
        ]
        self.customer_list = enabled_customers + disabled_customers

        self.customers = Customers(self.customer_list)

    def test_get_length(self):
        result_length = len(self.customers)
        expected_length = len(self.customer_list)
        self.assertEqual(result_length, expected_length)

    def test_get_enabled_customer_names(self):
        result_names = self.customers.get_enabled_customer_names()
        expected_names = list(
            map(lambda x: x.name,
                filter(lambda x: x.is_enabled, self.customer_list))
        )
        self.assertCountEqual(result_names, expected_names)

    def test_get_enabled_customer_addresses(self):
        result_names = self.customers.get_enabled_customer_addresses()
        expected_names = list(
            map(lambda x: x.address,
                filter(lambda x: x.is_enabled, self.customer_list))
        )
        self.assertCountEqual(result_names, expected_names)

위 포스팅은 becoming functional, functional python programming 책을 보고 공부 한 것을 바탕으로 한 것입니다.

'Python' 카테고리의 다른 글

Python Higher-order function  (0) 2015.09.07
Python 함수형으로 짜는 1차원 배열 그룹하기  (0) 2015.09.07
Posted by bench87

가끔 파이썬으로 코딩을 하다 보면 1차원 리스트를 특정 숫자의 갯수로 묶어야 할 때가 있습니다.


예를들어 [1, 2, 3, 4, 5] 이런 리스트를 두개씩 묶는다면 [[1, 2], [3, 4], [5]] 이렇게 하고 싶을때가 있는데요


위처럼 그룹핑을 하는 여러가지 방법을 소개 하고자 합니다. 그리고 이 방법을 소개하는 방법은 첨엔 그냥 단순하게 파이썬스럽게 짜는 법을 소개 하고 점점 함수형 스타일의 단순함으로 접근 하는 방법으로 글을 작성 하겠습니다.


많은 개발자들이 아래와 같이 코드를 작성 할 것입니다.


def group_by_seq( n, sequence ):
    flat_iter=iter(sequence)
    full_sized_items = list( tuple(next(flat_iter)
        for i in range(n))
            for row in range(len(sequence)//n))
    trailer = tuple(flat_iter)
    if trailer:
        return full_sized_items + [trailer]
    else:
        return full_sized_items


입력 받은 n의 숫자에 따라 그룹을 하고 마지막에 남은 엘리먼트가 있다면 뒤에 붙이고 리턴하고 그렇지 않으면 결과 그대로 리턴하는걸 볼 수 있습니다.


하지만 위 방법은 먼가 직관적으로 쉽게 작성 하기 쉬워 보이나 사실 단순하거나 일반적으로 알고 있던 함수형과는 조금 다른 것 같습니다. 그럼 좀더 단순하게 코드를 작성 해 보도록 하겠습니다.

def group_by_iter( n, iterable ):
    row= tuple(next(iterable) for i in range(n))
    while row:
        yield row
        row= tuple(next(iterable) for i in range(n))

조금 단순해 졌습니다. 처음 입력받은 n의 숫자 만큼 row를 만들고 while을 돌면서 같은 일을 반복하는 것을 알 수 있습니다. tuple(next(iterable) for i in range(n)) 이 코드에서 리스트의 끝에 도달 했을 튜플의 길이가 0인 튜플을 리턴하므로 while문이 중단되면서 우리가 원하던 값을 얻을 수 있게 됩니다.

위 방법으로 while문을 나오도록 하는 것은 제귀적으로 코드를 짜던 방식과 유사 합니다.


하지만 위 방법 보다 파이썬은 더 쉽게 할 수 있습니다. 아래와 같이 zip함수를 이용한다면 아주 간단한 코드로 그룹핑을 할 수 있는 것처럼 보이기도 하는데요...


zip(some_list[0::2], some_list[1::2])


위 코드를 이용하면 정말 함수형 스타일로 우리가 원하던 결과를 얻을 수 있는 것 처럼 보이지만 그룹하고자 하는 짝이 맞지 않으면 그 맞지 않은 엘리먼트는 제외된 결과를 얻게 됩니다.


일단 위 방법을 좀더 유연하게 처리 하는 방법은 아래와 같습니다.


zip(*(some_list[i::n] for i in range(n)))

위 방법으로 하면 n의 갯수를 좀더 유연하게 처리 할 수 있습니다.


그러나 (len(some_list) % n !==0) 인 경우 처리를 하려면 itertools.zip_longest()함수를 사용하여 해결 합니다.


from itertools import zip_longest

def group_by_slice( n, sequence ):
    return zip_longest(*(sequence[i::n] for i in range(n)))


Python이 전통 함수형 언어는 아니지만 함수형을 많이 지원하고 또한 함수형으로 사고하기 위해 훈련하기 좋은 언어라고 생각합니다.


끝으로 위 포스팅은 Functional Python Programming책을 공부 하며 작성한 것입니다.


'Python' 카테고리의 다른 글

Python Higher-order function  (0) 2015.09.07
Python 함수형으로 짜는 1차원 배열 그룹하기  (0) 2015.09.07
Posted by bench87



첫번째 챕터는 기계학습이 어떤건지 맛보기를 하는 건데 그 주제는 다음과 같다 가상 창업 회사인 MLAAS라는 회사가 웹상에서 기계 학습 알고리즘을 제공하는 서비스를 한다고 한다. 이 회사는 점차 번창해서 웹 요청을 충분히 처리하고자 기반 시설을 늘리려고 하는데 비싼 장비를 무턱대고 증설 할 수 없으니 시간당 100,000요청이 있다고 추정하고 현재 장비가 언제 최대치가 되어서 클라우드에 서버를 준비 할지를 파악하려 한다.


1. 데이터를 읽자

기계학습을 위한 데이터는 Building Machine Learning System with Python 책 출판사 홈페이지에 가서 받을 수 있다. 이 파일에는 시간당  요청 수가 저장되어 있는데, 각 줄은 연속적으로 나열된 시간이며 그 시간의 요청수를 나타낸다.


head 명령어로 첫 몇줄을 출력 해 보았다.


1 2272

2 nan

3 1386

4 1365

5 1488

6 1337

7 1883

8 2283

9 1335

10 1025

11 1139

12 1477

13 1203

14 1311

15 1299

16 1494

   17     1159


이 텍스트 파일은 SciPy의 genfrom.txt()를 사용해서 쉽게 데이터를 읽어 SciPy 2차원 array로 변환 할 수 있다.

 
import scipy as sp
data = sp.genfromtxt('data/web_traffic.tsv', delimiter="\t")

데이터가 올바르게 읽혔는지 바로 확인 해보자

 
>>> print data[:10]
[[  1.00000000e+00   2.27200000e+03]
 [  2.00000000e+00              nan]
 [  3.00000000e+00   1.38600000e+03]
 [  4.00000000e+00   1.36500000e+03]
 [  5.00000000e+00   1.48800000e+03]
 [  6.00000000e+00   1.33700000e+03]
 [  7.00000000e+00   1.88300000e+03]
 [  8.00000000e+00   2.28300000e+03]
 [  9.00000000e+00   1.33500000e+03]
 [  1.00000000e+01   1.02500000e+03]]

데이터 정리와 전처리

SciPy에서 743개의 데이터를 2개의 벡터로 나누면 좀더 편리하니깐 아래와 같이 하여 x, y에 담자

x = data[:,0]
y = data[:,1]

요청 횟수열을 보면 nan이라는 값이 있는데 그 값이 제거 하는 작업이 필요 하다.

x = x[~sp.isnan(y)]
y = y[~sp.isnan(y)]

SciPy의 배열을 또 다른 배열로 인덱싱 할 수 있는데 sp.isnan(y)는 엔트리가 수치인지 아닌지를 boolean으로 표시된 배열로 반환하다. y의 유효값이 있는 엔트리를 기준으로 x 에서 유효한 엔트리를 선별하기 위해 ~를 사용한다.


그럼 이번엔 데이터의 형태를 파악하기 위해 matplotlib의 점도표를 그려보자.

import matplotlib.pyplot as plt
plt.scatter(x,y)
plt.title("Web traffic over the last month")
plt.xlabel("Time")
plt.ylabel("Hits/hour")
plt.xticks([w*7*24 for w in range(10)],
           ['week %i'%w for w in range(10)])
plt.autoscale(tight=True)
plt.grid()
plt.show()



적절한 모델 학습 알고리즘 선택

기본적인 모델이 직선이라고 가정해보고, 어떻게 하면 근사치 오차가 가장 작도록 차트에 직선을 잘 그을 수 있을까를 고민해보자. Scipy의 polyfit() 함수가 정확히 이 문제를 해결하는데 도움을 주는 함수이다. 이 함수는 x, 우리가 원하는 다항 함수의 차수를 고려해 이전에 정의했던 오차 함수를 최소로 만드는 모델 함수를 찾는다.

fp1, residuals, rank, sv, rcond = sp.polyfit(x, y, 1, full=True)

polyfit() 함수는 적합화된 모델 함수  fp1의 매개변수를 반환한다. full을 True로 주면 적합화하는 과정의 추가적인 정보를 얻을 수 있다. fp1의 값을 들여다 보면 array([   2.59619213,  989.02487106]) 있는데 가장 적합한 직선은 다음 함수로 볼 수 있다. 


f(x) = 2.59619213 * x + 989.0248716


모델 매개 변수로 부터 모델을 생성하기 위해선 poly1d()함수를 사용 한다.

이 함수를 활용해서 직선을 그어 보자.



f1 = sp.poly1d(fp1)
plt.scatter(x,y)
plt.title("Web traffic over the last month")
plt.xlabel("Time")
plt.ylabel("Hits/hour")
plt.xticks([w*7*24 for w in range(10)],
           ['week %i'%w for w in range(10)])
plt.autoscale(tight=True)

fx = sp.linspace(0, x[-1], 1000) 
plt.plot(fx, f1(fx), linewidth=4)
plt.legend(["d=%i" % f1.order], loc="upper left")

plt.grid()
plt.show()



이 모델의 오차는 얼마 일까? 오차는 모델이 예측한 예상 값과 실제 값 사이의 거리 제곱으로 계산을 한다. 에러를 계산하는 함수는 아래와 같다.

def error(f, x, y):
    return sp.sum((f(x) - y) ** 2)
print error(f1, x, y)

에러를 출력해보면 317389767.34이 나오는데 썩 좋아 보이지 않은 모델 처럼 보인다.


좀더 복잡한 모델을 선택 해 보자.


새로운 모델이 데이터를 더 잘 이해하는지 살펴보기 위해 좀 더 복잡한 3차 다항식 모델로 fitting 해보자.

f2p = sp.polyfit(x, y, 2)
f2p
f2 = sp.poly1d(f2p)
plt.scatter(x,y)
plt.title("Web traffic over the last month")
plt.xlabel("Time")
plt.ylabel("Hits/hour")
plt.xticks([w*7*24 for w in range(10)],
           ['week %i'%w for w in range(10)])
plt.autoscale(tight=True)

fx = sp.linspace(0, x[-1], 1000)

plt.plot(fx, f1(fx), linewidth=4)
#plt.legend(["d=%i" % f1.order],loc="upper left")

plt.plot(fx, f2(fx), linewidth=4)
plt.legend([["d=%i" % f1.order],["d=%i" % f2.order]], loc="upper left")

plt.grid()
plt.show()



이 모델의 오차를 출력 해 보면 직선 모델 오차의 거의 절반인 179983507.87817925이다. 괜찮은 결과지만, 여기엔 대가가 따르는데 좀 더 복잡한 모델이고, plotfit()내부에서 조절하기 위해서 매개변수 하나를 더 사용한다는 뜻이다. fitting된 다항식은 다음과 같다.


f(x) = 0.0105322215 * x ** 2 - 5.26545650 * x + 1974.76082


모델이 복잡할 수록 더 나은 결과가 나온다면 복잡성을 한번 많이 증가 시켜 보자. 3, 10, 100으로 증가 시켜보았는데 내 컴퓨터는 53까지 밖에 되지 않았음

f3p = sp.polyfit(x, y, 3)
f3 = sp.poly1d(f3p)

f10p = sp.polyfit(x, y, 10)
f10 = sp.poly1d(f10p)

f100p = sp.polyfit(x, y, 53)
f100 = sp.poly1d(f100p)


plt.scatter(x,y)
plt.title("Web traffic over the last month")
plt.xlabel("Time")
plt.ylabel("Hits/hour")
plt.xticks([w*7*24 for w in range(10)],
           ['week %i'%w for w in range(10)])
plt.autoscale(tight=True)

fx = sp.linspace(0, x[-1], 1000)

plt.plot(fx, f1(fx), linewidth=4)
plt.plot(fx, f2(fx), linewidth=4)
plt.plot(fx, f3(fx), linewidth=4)
plt.plot(fx, f10(fx), linewidth=4)
plt.plot(fx, f100(fx), linewidth=4)

plt.legend([["d=%i" % f1.order],
            ["d=%i" % f2.order],
            ["d=%i" % f3.order],
            ["d=%i" % f10.order],
            ["d=%i" % f100.order]
            ], 
           loc="upper left")

plt.grid()
plt.show()


for f in [f1, f2, f3, f10, f100]:
    print("Error d=%i: %f" % (f.order, error(f, x, y)))


Error d=1: 317389767.339778

Error d=2: 179983507.878179

Error d=3: 139350144.031725

Error d=10: 121942326.363589

Error d=53: 109452430.039068


10차와 53차 다항식을 보면 넓은 범위에서 굴곡이 많다. 이는 모델이 너무 많은 데이터에 적합화된 것으로 보인다. 주요 데이터뿐만 아니라 노이즈 까지 반영됐는데, 이를 과적합화라 한다. (Over fitting) 이 맥락을 스터디에서는 여자친구를 사귄 횟수로 비유를 하였는데 여자친구를 많이 사귀어서 많은 여자들의 정보를 다 활용 하는 것 보다 의미있는 여자들의 정보만 보는 것이 더 효과적이다라고 비유를 했었다. 많이 안사귀어 봐서 완전 와 닿지는 않았지만 어느정도 이해는 되었다


이쯤에선 모델을 선택 함에 있어서 다음을 고려 하여야 한다. 적합화된 모델 중 하나를 선택해야 할지, 스플라인 같은 좀 더 복잡한 모델로 바꿔야 할지, 데이터를 다르게 분석하고 다시 시작해야 할지를 고려 해 보아야 한다.


5개의 fitting된 모델에서 1차 다항식 모델은 단순하다. 10차 100차 다항식은 overfitting 됐다. 2, 3차 다항식이 데이터에 잘 맞춰진 듯 하나 2개의 경계선에서 추출한다면 이 또한 엉망이 될 수 있다.


일보후퇴, 일보전진 : 데이터 다시 보기

데이터를 다시 검토 할 필요가 있다. 3주차와 4주차 사이에 변곡점이 있는데 3.5주차를 기준으로 데이터를 둘로 나누고 두 선을 따로 훈련 해 보자. 3주차까지 첫 번째 직선을 훈련하고, 나머지 주차로 부터 두 번째 직선으로 훈련하자.

inflection = 3.5 * 7 * 24
xb = x[inflection:]
yb = y[inflection:]

fb = sp.poly1d(sp.polyfit(xb, yb, 1))

plt.scatter(x,y)
plt.title("Web traffic over the last month")
plt.xlabel("Time")
plt.ylabel("Hits/hour")
plt.xticks([w*7*24 for w in range(10)],
           ['week %i'%w for w in range(10)])

fax = sp.linspace(0, xa[-1], 1000)
fbx = sp.linspace(xb[0], xb[-1], 1000)

plt.plot(fax, fa(fax), linewidth=4)
plt.plot(fbx, fb(fbx), linewidth=4)

plt.legend([["d=%i" % fa.order],
            ["d=%i" % fb.order]
            ], 
           loc="upper left")

plt.autoscale(tight=True)
plt.grid()
plt.show()



두 직선의 결합은 이전 모델보다 overfitting을 피한 것 처럼 보이지만 에러를 보면 여전히 고차원 모델 보다 오차가 높다. 왜 직선 모델을 신용하는 걸까? 그 이유는 직선 모델이 미래의 데이터를 좀 더 잘 예측할 수 있다라고 생각을 하기 때문인데 얼마나 미래를 잘 예측 하는지는 아래 결과를 보면 알 수 있다.

colors = ['g', 'k', 'b', 'm', 'r']
linestyles = ['-', '-.', '--', ':', '-']

def plot_models(x, y, models, mx=None, ymax=None, xmin=None):
    plt.clf()
    plt.scatter(x, y, s=10)
    plt.title("Web traffic over the last month")
    plt.xlabel("Time")
    plt.ylabel("Hits/hour")
    plt.xticks(
        [w * 7 * 24 for w in range(10)], ['week %i' % w for w in range(10)])

    if models:
        if mx is None:
            mx = sp.linspace(0, x[-1], 1000)
        for model, style, color in zip(models, linestyles, colors):
            # print "Model:",model
            # print "Coeffs:",model.coeffs
            plt.plot(mx, model(mx), linestyle=style, linewidth=2, c=color)

        plt.legend(["d=%i" % m.order for m in models], loc="upper left")

    plt.autoscale(tight=True)
    plt.ylim(ymin=0)
    if ymax:
        plt.ylim(ymax=ymax)
    if xmin:
        plt.xlim(xmin=xmin)
    plt.grid(True, linestyle='-', color='0.75')
    plt.show()

#이전 모델들의 예측 그래프를 그려보자.
plot_models(
    x, y, [f1, f2, f3, f10, f100],
    mx=sp.linspace(0 * 7 * 24, 6 * 7 * 24, 100),
    ymax=10000, xmin=0 * 7 * 24)




고차원 모델들은 회사가 망하는 것 처럼 예측한다. 이건 데이터를 너무 잘 반영해서 미래 예측이 쓸모가 없어 진 상황으로 보여 진다. 저차 다항식같은 경우넨 너무 과소 적합하여 썩 좋아 보지지 않는다.

그런데 오차만으로 모델 판단 기준으로 정하면 고차원 모델을 선택해야 되는 것 처럼 보인다!!


훈련과 테스트

가령, 모델의 정확도를 낮추는 미래의 데이터를 갖고 있더라도, 단지 결과인 근사치 오차를 바탕으로 모델을 선택 할 수 있어야 한다.


홀드아웃 데이터(학습 데이터와 테스트 데이터를 분리 한다.) 방식


마지막 주의 데이터에서 테스트용 데이터로 쓰기 위해 일부 데이터를 뺀댜. 그리고 그 나머지 데이터로 학습해서 모델을 만든다. 학습 결과로 나온 모델의 성능을 테스트 데이터로 오차를 측정한다.

frac = 0.3
split_idx = int(frac * len(xb))
shuffled = sp.random.permutation(list(range(len(xb))))
test = sorted(shuffled[:split_idx])
train = sorted(shuffled[split_idx:])
fbt1 = sp.poly1d(sp.polyfit(xb[train], yb[train], 1))
fbt2 = sp.poly1d(sp.polyfit(xb[train], yb[train], 2))
fbt3 = sp.poly1d(sp.polyfit(xb[train], yb[train], 3))
fbt10 = sp.poly1d(sp.polyfit(xb[train], yb[train], 10))
fbt100 = sp.poly1d(sp.polyfit(xb[train], yb[train], 100))


테스트 모델을 바탕으로 오차를 구하면 아래와 같다

Test errors for only the time after inflection point

Error d=1: 6938331.520758

Error d=2: 6108367.785081

Error d=3: 6478197.217511

Error d=10: 6401828.693782

Error d=53: 7141231.365425


위 에러를 보면 2차 다항 모델이 예측력이 제일 좋아 보인 다는 것을 알 수 있다.


최초의 질문에 대답하기

 시간당 100,000요청이 언제 들어올까? 정비를 언제 기준으로 구매를 하면될까?


위에서 구했던 2차 다항식 y 값이 100,000이 되는 x를 찾으면된다.

2차 다항식을 가지고 100,000을 빼고 이 결과 다항식의 근을 찾으면 된다. Scipy optimize모듈의 fsolve 함수로 근을 구할 수 있다. fbt2는 우리의 최종 모델이다.

from scipy.optimize import fsolve
print(fbt2)
print(fbt2 - 100000)
reached_max = fsolve(fbt2 - 100000, 800) / (7 * 24)
print("100,000 hits/hour expected at week %f" % reached_max[0])

위 결과는 100,000 hits/hour expected at week 9.978378


약 10주 후엔 시간당 100,000에 도달 할 것이다라고 예측을 했다.


정리하면 전형적인 기계학습 작업은 

데이터의 정제와 이해에 대부분의 시간을 보낸다. 

올바른 실험 준비가 필요하다. 훈련과 테스트가 섞이지 않아야 한다. 

정도로 정리가 될 듯하다.


책내용이 앞부분부터 흥미로운 걸로 봐서  빨리 보고 싶다는 마음이 앞서네요. 파이썬관련 정보를 여기다가 주기적으로 올려서 공유를 하도록 하겠습니다. 

'machine learning' 카테고리의 다른 글

Building machine learning with python - chap1  (0) 2014.11.22
Posted by bench87