일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- vlookup
- 오류
- split
- Delete
- putty
- trim
- 북마크
- 이전날짜제거
- mysql
- D2E8DA72
- 함수
- random.uniform
- 전처리
- Join
- 아나콘다
- safe mode 해제
- 태블로
- SQL
- 외부접속허용
- TabPy
- 태블로퍼블릭
- concat
- 에러
- Tableau
- 정제
- 살려줘
- 데이터전처리
- 파이썬
- Def
- 맵차트
- Today
- Total
무던히 하다보면 느는
[[위키독스]] Spark & 로그 정의 및 파싱 (Python 데이터 분석 실무) 본문
01-4. 소프트 역량
## 문제 정의 역량 분석 프로세스의 첫 단계는 **문제 정의**이다. 이 단계에서 첫단추를 어떻게 맞추는가에 따라 프로젝트의 성패가 갈리는 경우가 많다. 문제의 정의를 잘 ...
wikidocs.net
https://github.com/7rohj/Spark-
GitHub - 7rohj/Spark-: https://wikidocs.net/16565
https://wikidocs.net/16565. Contribute to 7rohj/Spark- development by creating an account on GitHub.
github.com
Spark 데이터 추출 및 전처리
SparkContext 생성 </br> DataFrame 생성 및 추출 </br> 전처리 및 분석
! pip install pyspark
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
Downloading pyspark-3.3.0.tar.gz (281.3 MB)
|████████████████████████████████| 281.3 MB 40 kB/s
Collecting py4j==0.10.9.5
Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
|████████████████████████████████| 199 kB 41.8 MB/s
Building wheels for collected packages: pyspark
Building wheel for pyspark (setup.py) ... done
Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=3a69408ecaff5386b6611be01e306b6851feffc27e29ee75c9d1f182cc3ddf2d
Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0
# 모듈 불러오기
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
# 라이브러리 통해 csv 불러오기
df = sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true',inferSchema='true')\
.load('./doc_use_log.csv')
# 데이터베이스인 경우 데이터프레임에서 tmp table로 변환
df.registerTempTable("df_tmp")
/usr/local/lib/python3.7/dist-packages/pyspark/sql/dataframe.py:229: FutureWarning: Deprecated in 2.0, use createOrReplaceTempView instead.
warnings.warn("Deprecated in 2.0, use createOrReplaceTempView instead.", FutureWarning)
df.head(3)
[Row(actiontype='OPEN', ismydoc=False, ext='PDF', sessionid='9400fd2e43d7dc2d054ca78806236ee1', documentposition='LOCALSTORAGE', datetime='2016.7.18'),
Row(actiontype='CLOSE', ismydoc=False, ext='PDF', sessionid='9400fd2e43d7dc2d054ca78806236ee1', documentposition='LOCALSTORAGE', datetime='2016.7.18'),
Row(actiontype='OPEN', ismydoc=True, ext='PDF', sessionid='9400fd2e43d7dc2d054ca78806236ee1', documentposition='MYPOLARISDRIVE', datetime='2016.7.18')]
# sql을 통해 테이블로부터 데이터 추출
df1 = sqlContext.sql("select ismydoc, actiontype, sessionid, datetime from df_tmp where ismydoc = true")
df1
DataFrame[ismydoc: boolean, actiontype: string, sessionid: string, datetime: string]
## Laze Execution
df2 = sqlContext.sql("select * from df_tmp")
df2_pdf = df2.select("sessionid", "ext").filter("ext=='PDF' or ext='DOC'").dropDuplicates().cache()
df2.distinct().count() #
301833
df2_min_date=df2.groupby("sessionid").agg(min("datetime").alias("min_date")) # groupby().agg(())
df2_min_date.show()
+--------------------+---------+
| sessionid| min_date|
+--------------------+---------+
|0001625bdb4fb9136...|2016.7.19|
|00037c1d86c69902b...|2016.7.27|
|00042bfc107cef995...| 2016.7.9|
|00050ec6afac496d0...|2016.7.14|
|00057c8dd7571757f...| 2016.7.5|
|00072e9f8dc9f3fdd...|2016.7.25|
|0007fab4b524ec1cb...|2016.7.21|
|0008a6f44a79ce8d5...|2016.7.20|
|00092416a5d734e1f...|2016.7.15|
|00095209f71059966...|2016.7.24|
|000aaad0732dcc29a...|2016.7.12|
|000acf8c21536985e...|2016.7.18|
|000ad8bfdff1ac4ab...|2016.7.17|
|000cb1674586adf43...|2016.7.26|
|000d2213fadedf76d...| 2016.7.5|
|000e2c9feea14df21...| 2016.7.1|
|0010529888ad09c03...|2016.7.11|
|0012b5034e55e1760...| 2016.7.6|
|0013d2118e4ad6f4f...|2016.7.24|
|00157f9e3dcf17ce0...|2016.7.20|
+--------------------+---------+
only showing top 20 rows
df2_join = df2_pdf.join(df2_min_date,"sessionid","left")
df2_join.show()
+--------------------+---+---------+
| sessionid|ext| min_date|
+--------------------+---+---------+
|551de498388693734...|PDF| 2016.7.9|
|ffef6402dac05483f...|PDF|2016.7.12|
|635a5c8d3df7b0a40...|PDF|2016.7.15|
|c389b7b211b044b56...|PDF|2016.7.22|
|7fb01e6cc98ece873...|DOC| 2016.7.1|
|d5b91aaa2093e421a...|PDF| 2016.7.1|
|83be4b26072cc132d...|DOC|2016.7.14|
|204f6839bbe3e5504...|PDF|2016.7.15|
|8c8fed61f21992f00...|PDF|2016.7.17|
|0dd214b151ccbd20d...|DOC|2016.7.26|
|10ad1c7d1d4f7f4ad...|PDF|2016.7.25|
|33c6ef601e915c1a0...|PDF|2016.7.14|
|a7d01eac986e2f8f1...|PDF| 2016.7.1|
|e57b225c29eb34e4a...|PDF|2016.7.26|
|7ef129729b2fedbd3...|PDF| 2016.7.7|
|f86a2b77e67c7ca72...|PDF|2016.7.20|
|258b4a4457e738216...|PDF|2016.7.13|
|85b20d33749c548b7...|PDF|2016.7.30|
|4555db28c4d95abb1...|DOC|2016.7.27|
|9ad56677f726a1718...|PDF|2016.7.30|
+--------------------+---+---------+
only showing top 20 rows
df2_join1=df2_join.groupby("min_date","ext").agg(count("sessionid").alias("cnt"))
df2_join1.describe().show()
+-------+--------+----+-----------------+
|summary|min_date| ext| cnt|
+-------+--------+----+-----------------+
| count| 60| 60| 60|
| mean| null|null|809.6333333333333|
| stddev| null|null|473.5906108303528|
| min|2016.7.1| DOC| 231|
| max|2016.7.9| PDF| 1503|
+-------+--------+----+-----------------+
# 🐼 판다스
df2_pd = df2.toPandas()
df2_pd.head()
df2_pd.describe()
301861 | 301861 | 301861 | 301861 | 301861 | 301861 |
8 | 2 | 16 | 114994 | 7 | 30 |
OPEN | False | 7067a43577238ba049257fbde912bb04 | OTHERAPP | 2016.7.12 | |
151802 | 183129 | 82004 | 31 | 213779 | 12340 |
로그 정의/ 설계
로그 데이터는 최근 사용자의 사용성 및 행동 패턴을 확인하거나 유저 클러스터링, 모델링 등 다양한 목적으로 사용되는 행동 기반 데이터이다. 로그는 설문과 같은 사용자 응답 및 기억에 의존하는 데이터 수집 방법 대비, 행동을 정확하게 파악/예측할 수 있는 장점이 있다. 또 RDB의 결과론적인 데이터와 달리 특정 결과에 이르는 과정과 흐름을 상세히 파악할 수 있어, 서비스를 개선하는 데 매우 유용한 자료이다. 대신 데이터 용량이 크기 때문에 스토리지 관련 비용/리소스가 발생하고, JSON, CSV, TSV와 같은 비정형 텍스트 형태이므로 기존 RDB와는 다른 수집/처리 시스템과 전문 인력이 요구된다는 단점을 가지고 있다.
# 로그 스키마 예시
{
"memid":" ", #int
"sessionid":" ", #string
"ver":" ", #string
"screen":"Main",#string
"event":"View",#string
"area":"Seoul",#string
"group":"A",#string,A or B...
"params": {
"isGuest":'T',#boolean
"UserType":" "#string
}
}
JSON Parsing
로그 정의후 수집이 이루어지면, JSON과 Pandas 라이브러리를 통해 판다스 데이터프레임 형태로 아래와 같이 파싱할 수 있다.
import json
import pandas as pd
data = []
mydf = pd.DataFrame()
with open('파일명.json') as f:
for line in f:
data.append(json.loads(line))
for i in range(0,len(data)):
df=pd.DataFrame.from_dict([data[i]]) ##
mydf=mydf.append(df)
mydf.reset_index(drop=True,inplace=True)
mydf['date'] = pd.to_datetime(mydf['date'],unit='s').df.date
mtdf['date'] = mydf['date'].astype('datetime64[ns]')
'🌵Python 데이터 분석 실무🌵' 카테고리의 다른 글
[[위키독스]] SQL 데이터 추출 (Python 데이터 분석 실무) (0) | 2022.07.29 |
---|