본문으로 바로가기
반응형

보통 파이썬으로 MongoDB에 접근하여 작업할 때 MongoEngine을 자주 사용했었다. 형태가 Django의 모델과 상당히 유사하고 사용법또한 간단했다. 또한 가장 눈여겨봤던 점은 커뮤니티 활성도였다. 타 ODM보다 깃허브 스타가 많았으며 스택오버플로우 등에서 관련 자료를 찾기 쉬웠다.


MongoEngine은 PyMongo를 좀 더 사용하기 쉽게 Wrapping해놓은 라이브러리이다. PyMongo를 사용해보면 알겠지만 문법이 간결하지않고, 조금 더 Raw한 느낌이다. 실제로도 가장 네이티브에 근접하게 만들어진 라이브러리이기에 속도 또한 빠르다. 그렇기에 MongoEngine이란 ODM을 사용하고 있었는데, 아무래도 PyMongo의 모든 기능을 제공하진 않는다. 


일반적인 작업은 그냥 몽고엔진만을 통해서도 가능하다. 하지만 작업을 하며 문제점을 발견했는데, 바로 대량 배치 작업 시 Upsert옵션이 MongoEngine에 없다는 점이었다. 주기적으로 데이터를 긁어와서 Bulk로 Update 또는 Insert를 진행해야하는데, MongoEngine에서는 불가능했다. 그렇다고 MongoEngine, PyMongo 둘 다 각각의 커넥션 객체를 들고 다니기엔 뭔가 비효율적이고 위험해보였다. 따라서 본 포스팅에서는 Bulk Insert/Update(with upsert)하는 방법과 MongoEngine에서 pymongo connection을 꺼내오는 방법에 대해 기술한다.


일반적으로 MongoEngine을 통해 Bulk Insert를 하려면 아래와 같은 소스를 사용한다.


from mongoengine import *


def create():
connect(db='test', host='localhost:27017')

class User(Document):
user_id = StringField()
user_pw = StringField()

meta = {
'index_background': True,
'indexes': [
{
'fields': ['+user_id'],
'unique': True
},
]
}

if __name__ == '__main__':
create()
users = [
User(user_id='hide', user_pw='123'),
User(user_id='hide', user_pw='123')
]
User.objects.insert(users)


간단하게 리스트 안에 여러개의 User 객체를 담아주고 insert() 메소드를 통해 한번에 삽입한다. 여기서 문제점은 위에서 말했듯이, insert() 메소드에서 upsert옵션이 없다는 점이었다. 예를 들어 아래와 같은 소스가 있다고 가정해보자.


from mongoengine import *


def create():
connect(db='test', host='localhost:27017')

class User(Document):
user_id = StringField()
user_pw = StringField()

meta = {
'index_background': True,
'indexes': [
{
'fields': ['+user_id'],
'unique': True
},
]
}

if __name__ == '__main__':
create()
User(user_id='hide', user_pw='123').save()
User(user_id='hide', user_pw='123').save()


위 소스를 실행시키면 pymongo.errors.OperationFailure: Index with name: user_id_1 already exists with different options 오류가 발생한다. 모델을 보면 알겠지만, user_id에 unique를 걸어놔서 중복된 값을 삽입할 수 없다. 관련 자료를 찾아보니 아직까지는 MongoEngine을 통해 해결할 수 있는 방법이 없는 것 같다. 따라서 PyMongo를 사용하여 작업해줘야 한다.


PyMongo의 Bulk Write관련 자료를 찾아보면 UpdateOne을 통해 upsert옵션을 사용할 수 있다고 나와있다. (http://api.mongodb.com/python/current/examples/bulk.html) 따라서 UpdateOne을 통해 존재한다면 업데이트하고, 없다면 삽입하는 기능을 구현할 수 있다. 대략적인 문법은 아래와 같다.


UpdateOne({'user_id': user_id}, {'$set': {field: value}}, upsert=True)


위처럼 가장 마지막에 upsert=True 옵션을 주면 된다. 간단하다.


이제 본 포스팅의 주제였던 PyMongo에서 Connection을 가져오는 방법에 대해 알아본다.


from pymongo import MongoClient


if __name__ == '__main__':
connection = MongoClient(host='localhost:27017')


이제 위 conn변수를 찍어보면,


MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True)


와 같은 결과를 얻을 수 있다. 다음으로 MongoEngine도 연결 객체를 찍어본다.


from mongoengine import *


def create():
return connect(
db='test',
host='localhost:27017',
)


if __name__ == '__main__':
connection = create()


MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True, read_preference=Primary())


PyMongo, MongoEngine 둘다 MongoClient라는 동일한 객체를 반환한다. 따라서 위 객체를 사용한다면 MongoEngine을 사용하기 위해 맺은 커넥션 객체 하나만을 통해 PyMongo의 문법까지도 사용할 수 있다. 위 내용을 종합하여 MongoEngine의 커넥션을 통해 PyMongo의 Bulk Write 명령을 사용하는 방법은 다음과 같다. 


from collections import namedtuple
from mongoengine import *
from pymongo import MongoClient, UpdateOne


def create():
return connect(
db='test',
host='localhost:27017',
)

class User(Document):
user_id = StringField()
user_pw = StringField()

meta = {
'index_background': True,
'indexes': [
{
'fields': ['+user_id'],
'unique': True
},
]
}

def convert_to_document(user_id: str, field: str, value: str) -> UpdateOne:
return UpdateOne({'user_id': user_id}, {'$set': {field: value}}, upsert=True)

def create_user(user_id: str, field: str, value: str) -> namedtuple:
users = namedtuple('Users', ['user_id', 'field', 'value'])
return users(user_id=user_id, field=field, value=value)

if __name__ == '__main__':
connection = create()

user1 = create_user(user_id='hide', field='user_pw', value='first change')
user2 = create_user(user_id='hide', field='user_pw', value='second change')
users = [
convert_to_document(user_id=user1.user_id, field=user1.field, value=user1.value),
convert_to_document(user_id=user2.user_id, field=user2.field, value=user2.value)
]
collections = connection.test.get_collection('user')
collections.bulk_write(users)


글을 다 작성하고 찾아보니 그냥 애초에 몽고엔진쪽에서 커넥션을 반환해주는 함수를 제공해주고 있었다ㅋㅋㅡㅡ


from mongoengine import *

return connection.get_connection()


반응형