무던히 하다보면 느는

[[위키독스]] Spark & 로그 정의 및 파싱 (Python 데이터 분석 실무) 본문

🌵Python 데이터 분석 실무🌵

[[위키독스]] Spark & 로그 정의 및 파싱 (Python 데이터 분석 실무)

무던히 하다보면 느는 2022. 7. 29. 16:30

https://wikidocs.net/16564

 

01-4. 소프트 역량

## 문제 정의 역량 분석 프로세스의 첫 단계는 **문제 정의**이다. 이 단계에서 첫단추를 어떻게 맞추는가에 따라 프로젝트의 성패가 갈리는 경우가 많다. 문제의 정의를 잘 ...

wikidocs.net

 

 

 

https://github.com/7rohj/Spark-

 

GitHub - 7rohj/Spark-: https://wikidocs.net/16565

https://wikidocs.net/16565. Contribute to 7rohj/Spark- development by creating an account on GitHub.

github.com

 

Spark 데이터 추출 및 전처리

SparkContext 생성 </br> DataFrame 생성 및 추출 </br> 전처리 및 분석

In [ ]:
! pip install pyspark
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
     |████████████████████████████████| 281.3 MB 40 kB/s 
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
     |████████████████████████████████| 199 kB 41.8 MB/s 
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... done
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=3a69408ecaff5386b6611be01e306b6851feffc27e29ee75c9d1f182cc3ddf2d
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0
In [ ]:
# 모듈 불러오기
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
In [14]:
# 라이브러리 통해 csv 불러오기
df = sqlContext.read.format('com.databricks.spark.csv')\
                    .options(header='true',inferSchema='true')\
                    .load('./doc_use_log.csv')
In [15]:
# 데이터베이스인 경우 데이터프레임에서 tmp table로 변환
df.registerTempTable("df_tmp")
/usr/local/lib/python3.7/dist-packages/pyspark/sql/dataframe.py:229: FutureWarning: Deprecated in 2.0, use createOrReplaceTempView instead.
  warnings.warn("Deprecated in 2.0, use createOrReplaceTempView instead.", FutureWarning)
In [20]:
df.head(3)
Out[20]:
[Row(actiontype='OPEN', ismydoc=False, ext='PDF', sessionid='9400fd2e43d7dc2d054ca78806236ee1', documentposition='LOCALSTORAGE', datetime='2016.7.18'),
 Row(actiontype='CLOSE', ismydoc=False, ext='PDF', sessionid='9400fd2e43d7dc2d054ca78806236ee1', documentposition='LOCALSTORAGE', datetime='2016.7.18'),
 Row(actiontype='OPEN', ismydoc=True, ext='PDF', sessionid='9400fd2e43d7dc2d054ca78806236ee1', documentposition='MYPOLARISDRIVE', datetime='2016.7.18')]
In [16]:
# sql을 통해 테이블로부터 데이터 추출
df1 = sqlContext.sql("select ismydoc, actiontype, sessionid, datetime from df_tmp where ismydoc = true")
In [17]:
df1
Out[17]:
DataFrame[ismydoc: boolean, actiontype: string, sessionid: string, datetime: string]
In [18]:
## Laze Execution
df2 = sqlContext.sql("select * from df_tmp")
In [19]:
df2_pdf = df2.select("sessionid", "ext").filter("ext=='PDF' or ext='DOC'").dropDuplicates().cache()
df2.distinct().count() #
Out[19]:
301833
In [22]:
df2_min_date=df2.groupby("sessionid").agg(min("datetime").alias("min_date")) # groupby().agg(())
df2_min_date.show()
+--------------------+---------+
|           sessionid| min_date|
+--------------------+---------+
|0001625bdb4fb9136...|2016.7.19|
|00037c1d86c69902b...|2016.7.27|
|00042bfc107cef995...| 2016.7.9|
|00050ec6afac496d0...|2016.7.14|
|00057c8dd7571757f...| 2016.7.5|
|00072e9f8dc9f3fdd...|2016.7.25|
|0007fab4b524ec1cb...|2016.7.21|
|0008a6f44a79ce8d5...|2016.7.20|
|00092416a5d734e1f...|2016.7.15|
|00095209f71059966...|2016.7.24|
|000aaad0732dcc29a...|2016.7.12|
|000acf8c21536985e...|2016.7.18|
|000ad8bfdff1ac4ab...|2016.7.17|
|000cb1674586adf43...|2016.7.26|
|000d2213fadedf76d...| 2016.7.5|
|000e2c9feea14df21...| 2016.7.1|
|0010529888ad09c03...|2016.7.11|
|0012b5034e55e1760...| 2016.7.6|
|0013d2118e4ad6f4f...|2016.7.24|
|00157f9e3dcf17ce0...|2016.7.20|
+--------------------+---------+
only showing top 20 rows

In [24]:
df2_join = df2_pdf.join(df2_min_date,"sessionid","left")
df2_join.show()
+--------------------+---+---------+
|           sessionid|ext| min_date|
+--------------------+---+---------+
|551de498388693734...|PDF| 2016.7.9|
|ffef6402dac05483f...|PDF|2016.7.12|
|635a5c8d3df7b0a40...|PDF|2016.7.15|
|c389b7b211b044b56...|PDF|2016.7.22|
|7fb01e6cc98ece873...|DOC| 2016.7.1|
|d5b91aaa2093e421a...|PDF| 2016.7.1|
|83be4b26072cc132d...|DOC|2016.7.14|
|204f6839bbe3e5504...|PDF|2016.7.15|
|8c8fed61f21992f00...|PDF|2016.7.17|
|0dd214b151ccbd20d...|DOC|2016.7.26|
|10ad1c7d1d4f7f4ad...|PDF|2016.7.25|
|33c6ef601e915c1a0...|PDF|2016.7.14|
|a7d01eac986e2f8f1...|PDF| 2016.7.1|
|e57b225c29eb34e4a...|PDF|2016.7.26|
|7ef129729b2fedbd3...|PDF| 2016.7.7|
|f86a2b77e67c7ca72...|PDF|2016.7.20|
|258b4a4457e738216...|PDF|2016.7.13|
|85b20d33749c548b7...|PDF|2016.7.30|
|4555db28c4d95abb1...|DOC|2016.7.27|
|9ad56677f726a1718...|PDF|2016.7.30|
+--------------------+---+---------+
only showing top 20 rows

In [25]:
df2_join1=df2_join.groupby("min_date","ext").agg(count("sessionid").alias("cnt"))

df2_join1.describe().show()
+-------+--------+----+-----------------+
|summary|min_date| ext|              cnt|
+-------+--------+----+-----------------+
|  count|      60|  60|               60|
|   mean|    null|null|809.6333333333333|
| stddev|    null|null|473.5906108303528|
|    min|2016.7.1| DOC|              231|
|    max|2016.7.9| PDF|             1503|
+-------+--------+----+-----------------+

In [27]:
# 🐼 판다스
df2_pd = df2.toPandas()
df2_pd.head()
df2_pd.describe()
Out[27]:
actiontypeismydocextsessioniddocumentpositiondatetimecountuniquetopfreq
301861 301861 301861 301861 301861 301861
8 2 16 114994 7 30
OPEN False PDF 7067a43577238ba049257fbde912bb04 OTHERAPP 2016.7.12
151802 183129 82004 31 213779 12340

 

로그 정의/ 설계

로그 데이터는 최근 사용자의 사용성 및 행동 패턴을 확인하거나 유저 클러스터링, 모델링 등 다양한 목적으로 사용되는 행동 기반 데이터이다. 로그는 설문과 같은 사용자 응답 및 기억에 의존하는 데이터 수집 방법 대비, 행동을 정확하게 파악/예측할 수 있는 장점이 있다. 또 RDB의 결과론적인 데이터와 달리 특정 결과에 이르는 과정과 흐름을 상세히 파악할 수 있어, 서비스를 개선하는 데 매우 유용한 자료이다. 대신 데이터 용량이 크기 때문에 스토리지 관련 비용/리소스가 발생하고, JSON, CSV, TSV와 같은 비정형 텍스트 형태이므로 기존 RDB와는 다른 수집/처리 시스템과 전문 인력이 요구된다는 단점을 가지고 있다.

In [ ]:
# 로그 스키마 예시
{
    "memid":" ", #int
    "sessionid":" ", #string
    "ver":" ", #string
    "screen":"Main",#string
    "event":"View",#string
    "area":"Seoul",#string
    "group":"A",#string,A or B...
    "params": {
                "isGuest":'T',#boolean
                "UserType":" "#string
    }
}

JSON Parsing

로그 정의후 수집이 이루어지면, JSON과 Pandas 라이브러리를 통해 판다스 데이터프레임 형태로 아래와 같이 파싱할 수 있다.

In [ ]:
import json
import pandas as pd

data = []
mydf = pd.DataFrame()

with open('파일명.json') as f:
  for line in f:
    data.append(json.loads(line))

  for i in range(0,len(data)):
    df=pd.DataFrame.from_dict([data[i]]) ##
    mydf=mydf.append(df)

mydf.reset_index(drop=True,inplace=True)

mydf['date'] = pd.to_datetime(mydf['date'],unit='s').df.date
mtdf['date'] = mydf['date'].astype('datetime64[ns]')