일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- neovim
- 여행의이유
- kafka-connect
- liveness
- 잠든사이월급버는미국배당주투자
- 플릿러너
- apollo-server-v3
- 달리기
- 코로나19
- 러닝크루
- 잘쉬어야지
- 스타벅스리저브콜드브루
- 마이더치콜드브루
- 재택커피
- 집커피
- parquet
- Zone2
- 오운완
- 콜드브루메이커
- 런데이
- schema-registry
- apollo-sandbox
- 플라스틱은 어떻게 브랜드의 무기가 되는가
- 이코노미스트한국구독센터
- 티지아이포럼
- 재택끝나간다
- sky빛의아이들
- 런데이애플워치
- 미국배당주투자
- sparksql
- Today
- Total
해뜨기전에자자
pyspark timezone, datetime handling function 본문
timezone 설정과 datetime handling은 필수적이고 기본적인 것들 중 하나다. datetime을 다룰때는 기본적으로 UTC를 잡아서 쓰고 timezone을 반드시 포함하여 저장하는 것이 좋다. 그 중 pyspark의 datetime handling 함수들을 정리할 겸, 샘플 코드를 작성했고, session.timezone을 UTC, Asia/Seoul 로 돌려 봤을 때의 결과를 정리했다. 실행 환경은 spark 2.3.2, docker on yarn 환경이다.
- pyspark를 다룰 때 참고할 document 문서 https://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html
Spark session timezone setting
spark 2.3.2 기준으로 timezone 은 아래와 같이 설정할 수 있다.
SparkSession.config('spark.sql.session.timeZone', timezone)
spark 2.4 부터는 아래와 같은 코드를 추가해야 제대로 동작할 것으로 보인다. 실험을 따로 해보지는 않았다. 참고: https://stackoverflow.com/a/48767250
...
.config("spark.driver.extraJavaOptions", "-Duser.timezone=UTC") \
.config("spark.executor.extraJavaOptions", "-Duser.timezone=UTC")
spark 2.3.2 기준으로 spark.driver.extraJavaOptions, spark.executor.extraJavaOptions를 추가해서 user.timezone을 'KST'혹은 'Asia/Seoul'로 테스트 해본 결과 'UTC'와 동일한 결과가 나왔다. 즉, 현재 환경에서는 extraJavaOptions가 영향을 미치지 않는다.
Pyspark datetime handling function samples
아래는 pyspark Function 중 datetime과 관련있어 보이는 Function들에 대해 sample code를 작성한 것이다.
# -*- coding: utf-8 -*-
"""
datetime 을 다루는 함수들 예제
- date_format(dt, format)
* Date/Timestamp/String => String (depends on current timezone)
- to_date(col, format=None)
* String => Timestamp
- unix_timestamp(dt_string, format='yyyy-MM-dd HH:mm:ss')
* String => Long
- to_timestamp(col, format=None)
* String => Timestamp
- from_unixtime(timestamp, format='yyyy-MM-dd HH:mm:ss')
* Long => String
- from_utc_timestamp
- to_utc_timestamp
"""
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from dateutil.parser import parse
from pytz import utc
def get_utc(dt):
return parse(dt).astimezone(utc).isoformat()
"""
Spark Session Timezone Setting
"""
app_name = 'test_timezone'
timezone = 'Asia/Seoul' # or 'UTC'
spark = SparkSession \
.builder \
.appName(app_name) \
.config('spark.sql.session.timeZone', timezone) \
.getOrCreate()
print('Hello, World! This is %s [%s]' % (app_name, timezone))
udf_to_utc = F.udf(lambda x: get_utc(x))
"""
주어진 값이 string datetime 값일 경우 to_date를 구하는 예제
- pyspark에서 datetime format은 java의 것을 따른다. (e.g. yyyy-MM-dd'T'HH:mm:ss.SSSXXX, yyyy-MM-dd'T'HH:mm:ss.SSSZ)
* yyyy-MM-dd'T'HH:mm:ss.SSSXXX 가 python datetime.isoformat()와 포맷이 일치
- date_format 은 local timezone을 반영한 값을 리턴한다.
- to_date은 local timezone에 관계없이 주어진 시간 기준으로 date를 반환
* to_date("2020-05-20T15:21:31.539+0000") # 2020-05-20
* to_date("2020-05-21T00:21:31.539+0900") # 2020-05-21
"""
df = spark.createDataFrame([('2020-05-20T15:21:31.539+0000',)], ['dt']) \
.withColumn('dt_local', F.date_format('dt', "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")) \
.withColumn('dt_utc', udf_to_utc('dt_local')) \
.withColumn('to_date', F.to_date('dt')) \
.withColumn('to_date_local', F.to_date('dt_local')) \
.withColumn('to_date_format', F.date_format('dt', "yyyy-MM-dd"))
df.printSchema()
df.show(10, False)
"""
주어진 값이 string datetime 값인 경우 timestamp를 구하는 예제
- unix_timestamp 는 millisec 단위의 유실이 있다. 아래와 같이 사용하면 millis 를 포함할 수 있다.
* unix_timestamp('datetime_string', "yyyy-MM-dd'T'HH:mm:ss.SSSZ") + substring('datetime_string', 21, 3).cast('float')/1000)
- to_utc_timestamp, from_utc_timestamp는 기존 timezone을 무시하고 파라미터로 넘긴 timezone으로 세팅하는 함수들이다.
- 가급적 datetime에 timezone을 포함하도록 하고, 해당 함수는 쓰지 않는 것이 좋겠다.
"""
df = spark.createDataFrame([('2020-05-20T15:21:31.539+0000',)], ['dt']) \
.withColumn('unix_timestamp', F.unix_timestamp('dt', "yyyy-MM-dd'T'HH:mm:ss.SSSZ")) \
.withColumn('to_timestamp', F.to_timestamp('dt')) \
.withColumn('to_utc_timestamp', F.to_utc_timestamp('dt', 'Asia/Seoul')) \
.withColumn('from_utc_timestamp', F.from_utc_timestamp('to_timestamp', 'Asia/Seoul'))
df.printSchema()
df.show(10, False)
"""
주어진 값이 long 값인 경우 datetime string 등으로 표현하는 예제
- e.g. from_unixtime(1585307248) # 2020-03-27 20:07:28
"""
sample = spark.createDataFrame([(1590246000,)], ['timestamp']) \
.withColumn('from_unixtime', F.from_unixtime('timestamp')) \
.withColumn('date_format', F.date_format('from_unixtime', "yyyy-MM-dd'T'HH:mm:ss.SSSZ")) \
.withColumn('to_date', F.to_date('from_unixtime')) \
.withColumn('to_date_local', F.date_format('from_unixtime', "yyyy-MM-dd")) \
sample.printSchema()
sample.show(10, False)
Pyspark datetime handling function samples Result
위 코드를 Asia/Seoul과 UTC로 실행한 결과는 아래와 같다.
- spark.sql.session.timeZone=Asia/Seoul 결과
Hello, World! This is test_timezone [Asia/Seoul] root |-- dt: string (nullable = true) |-- dt_local: string (nullable = true) |-- dt_utc: string (nullable = true) |-- to_date: date (nullable = true) |-- to_date_local: date (nullable = true) |-- to_date_format: string (nullable = true)
+----------------------------+-----------------------------+--------------------------------+----------+-------------+--------------+
|dt |dt_local |dt_utc |to_date |to_date_local|to_date_format|
+----------------------------+-----------------------------+--------------------------------+----------+-------------+--------------+
|2020-05-20T15:21:31.539+0000|2020-05-21T00:21:31.539+09:00|2020-05-20T15:21:31.539000+00:00|2020-05-20|2020-05-21 |2020-05-21 |
+----------------------------+-----------------------------+--------------------------------+----------+-------------+--------------+
root
|-- dt: string (nullable = true)
|-- unix_timestamp: long (nullable = true)
|-- to_timestamp: timestamp (nullable = true)
|-- to_utc_timestamp: timestamp (nullable = true)
|-- from_utc_timestamp: timestamp (nullable = true)
+----------------------------+--------------+-----------------------+-----------------------+-----------------------+
|dt |unix_timestamp|to_timestamp |to_utc_timestamp |from_utc_timestamp |
+----------------------------+--------------+-----------------------+-----------------------+-----------------------+
|2020-05-20T15:21:31.539+0000|1589988091 |2020-05-21 00:21:31.539|2020-05-20 15:21:31.539|2020-05-21 09:21:31.539|
+----------------------------+--------------+-----------------------+-----------------------+-----------------------+
root
|-- timestamp: long (nullable = true)
|-- from_unixtime: string (nullable = true)
|-- date_format: string (nullable = true)
|-- to_date: date (nullable = true)
|-- to_date_local: string (nullable = true)
+----------+-------------------+----------------------------+----------+-------------+
|timestamp |from_unixtime |date_format |to_date |to_date_local|
+----------+-------------------+----------------------------+----------+-------------+
|1590246000|2020-05-24 00:00:00|2020-05-24T00:00:00.000+0900|2020-05-24|2020-05-24 |
+----------+-------------------+----------------------------+----------+-------------+
- spark.sql.session.timeZone=UTC 결과
Hello, World! This is test_timezone [UTC]
root
|-- dt: string (nullable = true)
|-- dt_local: string (nullable = true)
|-- dt_utc: string (nullable = true)
|-- to_date: date (nullable = true)
|-- to_date_local: date (nullable = true)
|-- to_date_format: string (nullable = true)
+----------------------------+------------------------+--------------------------------+----------+-------------+--------------+
|dt |dt_local |dt_utc |to_date |to_date_local|to_date_format|
+----------------------------+------------------------+--------------------------------+----------+-------------+--------------+
|2020-05-20T15:21:31.539+0000|2020-05-20T15:21:31.539Z|2020-05-20T15:21:31.539000+00:00|2020-05-20|2020-05-20 |2020-05-20 |
+----------------------------+------------------------+--------------------------------+----------+-------------+--------------+
root
|-- dt: string (nullable = true)
|-- unix_timestamp: long (nullable = true)
|-- to_timestamp: timestamp (nullable = true)
|-- to_utc_timestamp: timestamp (nullable = true)
|-- from_utc_timestamp: timestamp (nullable = true)
+----------------------------+--------------+-----------------------+-----------------------+-----------------------+
|dt |unix_timestamp|to_timestamp |to_utc_timestamp |from_utc_timestamp |
+----------------------------+--------------+-----------------------+-----------------------+-----------------------+
|2020-05-20T15:21:31.539+0000|1589988091 |2020-05-20 15:21:31.539|2020-05-20 06:21:31.539|2020-05-21 00:21:31.539|
+----------------------------+--------------+-----------------------+-----------------------+-----------------------+
root
|-- timestamp: long (nullable = true)
|-- from_unixtime: string (nullable = true)
|-- date_format: string (nullable = true)
|-- to_date: date (nullable = true)
|-- to_date_local: string (nullable = true)
+----------+-------------------+----------------------------+----------+-------------+
|timestamp |from_unixtime |date_format |to_date |to_date_local|
+----------+-------------------+----------------------------+----------+-------------+
|1590246000|2020-05-23 15:00:00|2020-05-23T15:00:00.000+0000|2020-05-23|2020-05-23 |
+----------+-------------------+----------------------------+----------+-------------+
```
'개발 > spark' 카테고리의 다른 글
Spark SQL로, Hive parquet 접근 시 필드명이 parquet schema와 다르게 잡힐 때.. (0) | 2020.11.19 |
---|---|
spark mongodb upsert, merge (0) | 2020.07.06 |
spark standalone cluster (0) | 2018.03.16 |