Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
Tags
- 집커피
- 코로나19
- sparksql
- 여행의이유
- 마이더치콜드브루
- 이코노미스트한국구독센터
- sky빛의아이들
- apollo-sandbox
- liveness
- neovim
- 플라스틱은 어떻게 브랜드의 무기가 되는가
- 러닝크루
- 재택끝나간다
- 잠든사이월급버는미국배당주투자
- parquet
- 달리기
- 티지아이포럼
- apollo-server-v3
- kafka-connect
- Zone2
- schema-registry
- 콜드브루메이커
- 미국배당주투자
- 잘쉬어야지
- 재택커피
- 런데이
- 스타벅스리저브콜드브루
- 런데이애플워치
- 오운완
- 플릿러너
Archives
- Today
- Total
해뜨기전에자자
spark mongodb upsert, merge 본문
unique column이 정해져 있을 때 spark mongodb upsert를 하려면 아래와 같이 하면 된다.
import json
df.write.format('com.mongodb.spark.sql') \
.option('collection', 'target_collection') \
.option('replaceDocument', 'true') \
.option('shardKey', json.dumps({'date': 1, 'name': 1, 'resource': 1})) \
.mode('append') \
.save()
새로운 document로 replace하려는 경우, (MongoSpark.scala 참조)
- replaceDocument=true
- false로 주면 기존 sharedKey에 해당하는 부분을 제외한 부분이 변경된다. merge를 할 때 쓸 수 있다.
- sharedKey를 unique key들에 대해 value를 1로 준다
- default는 {"_id": 1}로 되어있다.
- replaceDocument=true
SaveMode (DefaultSource.scala 참조)
- Append : write
- Overwrite : collection을 drop한 후 write
- ErrorIfExists: collection이 존재하면 UnsupportedOperationException("MongoCollection already exists") 을 던지고, 존재하지 않으면 write
- Ignore: collection이 존재하지 않으면 write
ref: https://docs.mongodb.com/spark-connector/master/python-api/
+++
이와 관련하여 stackoverflow에도 답변을 올려놨다. 그치만 이 질문을 맞닥들이는 사람은 잘 없는 모양이다(..)
'개발 > spark' 카테고리의 다른 글
Spark SQL로, Hive parquet 접근 시 필드명이 parquet schema와 다르게 잡힐 때.. (0) | 2020.11.19 |
---|---|
pyspark timezone, datetime handling function (0) | 2020.06.01 |
spark standalone cluster (0) | 2018.03.16 |