안녕하세요 coconut입니다.

이번에는 유명한 Pandas 외에 Dask라는 오픈소스를 소개합니다.

기존의 pandas는 읽어들이는 모든 데이터를 메모리에 적재하여, 연산하는 방식이어서 거대한 규모의 데이터를

pandas에 적재할 시에 메모리가 부족해 지는 이슈가 자주 발생하였습니다.


이러한 문제를 그나마 적은 메모리로 연산을 할 수 있는 오픈소스가 있으니 그것이 dask라는 오픈소스입니다.

dask는 거대한 데이터를 가상의 데이터프레임으로 형성합니다.

# 가상의 데이터프레임은 메모리에 모든 데이터를 적재하지 않습니다.

그렇다면 어떻게 데이터를 연산하느냐?

가상의 데이터프레임파티션(구역)으로 나누어 메모리에 순차적으로 올리고 내리어 연산을 하게 됩니다.

그렇게 때문에 대용량 데이터라도 그에 비해 적은 메모리로 처리가 가능합니다.

# csv file read

import dask.dataframe as dd

ddf = dd.read_csv(“data.csv”, dtype=str)

 


# S3 bucket direct access

import dask.dataframe as dd

ddf = dd.read_csv(“s3://s3_bucket_url”)  # aws s3 버킷의 csv형식의 데이터를 다이렉트로 가져옵니다.
ddf = dd.read_json(“s3://s3_bucket_url”)  # aws s3 버킷의 json형식의 데이터를 다이렉트로 가져옵니다.

위의 코드는 dask를 통해서 기본적으로 컴퓨터에 있는 csv데이터를 가져오는 방법과 아래에는 많이 사용하는 aws의 s3 버킷의 데이터를 dask를 통해서 가져올 수 있는

코드입니다.

(단 s3 버킷의 데이터는 pip install s3fs를 통해서 추가로 package를 설치해 줘야 합니다.  aws의 credentials 도 셋팅되어 있어야 함.)

 

dask의 데이터프레임과 pandas의 데이터프레임은 같지 않습니다.

때문에 dask과 pandas간의 데이터프레임 전환을 할 수 있는 함수가 있습니다.

dask의 데이터프레임을 ddf, pandas의 데이터프레임을 df라 하겠습니다.

dask => pandas

df = ddf.compute()


pandas => dask

ddf = dd.from_pandas(df)

이러한 식으로 변환이 가능합니다.

dask의 여러가지 기능을 정확히 알고 사용하려면 문서를 참고할 수 있습니다. => https://dask.org/

제가 자주 이용하는 애트리뷰트 몇가지를 소개합니다.

dask.dataframe.read_csv => csv 형식 데이터를 읽어올 때

dask.dataframe.read_json => json 형식 데이터를 읽어올 때

dask.dataframe.assign => 기존의 데이터프레임을 함수를 통해 변형 후에 기존 데이터는 변형되지 않고 새로운 변수에 리턴함.

dask.dataframe.apply => 기존의 데이터프레임을 함수를 통해 변형 후에 기존 데이터가 변형되므로 기존의 데이터를 보증할 수 없음.

dask.dataframe.count => 유효한 데이터를 컬럼별로 count하여 표시하여 줍니다. count 후에 compute를 해야 연산이 실행됩니다.

dask.dataframe.dropna => nan인 데이터를 로우 혹은 컬럼별로 제거할 수 있습니다.

 


다른 개발자의 dask 설명글도 출처를 첨부합니다.  => https://devtimes.com/python-dask/

 

위의 설명글을 보고 샘플 코드를 작성하여 보았습니다.

위의 개발자분이 설명한 글의 데이터를 다운받기 위해서

wget -O crime.csv https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD

와 같은 명령어로 crime.csv 데이터셋을 다운받습니다.

 

아래의 코드는 이해를 돕기 위해 첨부합니다.

import dask.dataframe as dd
from datetime import datetime
import time
start_time = datetime.now()
st = time.time()
print(“start_time : {}”.format(start_time))
ddf = dd.read_csv(“crime.csv”, dtype=str, error_bad_lines=False)
print(ddf.head())
ddf = ddf.dropna()
ddf[‘Location’] = ddf[‘Location’].str.replace(‘(‘, ”).str.replace(‘)’, ”).str.split(r’,’)
ddf = ddf.assign(**{k: ddf.Location.map(lambda x, i=i: x[i]) for i, k in enumerate([‘lat’, ‘lon’])})
ddf[‘lat’] = ddf[‘lat’].astype(‘float64’)
ddf[‘lon’] = ddf[‘lon’].astype(‘float64’)
print(ddf.head(30))
agg_ddf = ddf.groupby([‘Date’]).agg({“lat”: [‘mean’, ‘sum’, ‘count’]})
print(agg_ddf.head(30))
print(ddf.count().compute())
end_time = datetime.now()
et = time.time()
print(“running time : {} seconds”.format(et – st))
print(“end_time : {}”.format(end_time))
위의 코드에 대한 간단한 설명을 하겠습니다.
crime.csv를 읽어서 Location 컬럼의 위경도를 split하여 float형태의 데이터 타입으로 변경한 뒤
데이터 전체를 Date 기준으로 위도의 (평균 / 합계 / 개수) 형태로 aggregation을 하여주는 코드입니다.
코드와 글을 통해 이해하여 즐거운 코딩되시길 바랍니다.