해뜨기전에자자

spark mongodb upsert, merge 본문

개발/spark

spark mongodb upsert, merge

조앙'ㅁ' 2020. 7. 6. 09:52

unique column이 정해져 있을 때 spark mongodb upsert를 하려면 아래와 같이 하면 된다.

import json

df.write.format('com.mongodb.spark.sql') \
  .option('collection', 'target_collection') \
  .option('replaceDocument', 'true') \
  .option('shardKey', json.dumps({'date': 1, 'name': 1, 'resource': 1})) \
  .mode('append') \
  .save()
  • 새로운 document로 replace하려는 경우, (MongoSpark.scala 참조)

    • replaceDocument=true
      • false로 주면 기존 sharedKey에 해당하는 부분을 제외한 부분이 변경된다. merge를 할 때 쓸 수 있다.
    • sharedKey를 unique key들에 대해 value를 1로 준다
      • default는 {"_id": 1}로 되어있다.
  • SaveMode (DefaultSource.scala 참조)

    • Append : write
    • Overwrite : collection을 drop한 후 write
    • ErrorIfExists: collection이 존재하면 UnsupportedOperationException("MongoCollection already exists") 을 던지고, 존재하지 않으면 write
    • Ignore: collection이 존재하지 않으면 write

ref: https://docs.mongodb.com/spark-connector/master/python-api/

+++
이와 관련하여 stackoverflow에도 답변을 올려놨다. 그치만 이 질문을 맞닥들이는 사람은 잘 없는 모양이다(..)