Alternative Architecture DOJO

オルターブースのクラウドネイティブ特化型ブログです。

Azure Cosmos DB for mongoDB vcore を用いたベクトル検索

こんにちは!オルターブースのいけだです! 季節はすっかり冬ですね⛄ 私は寒いのが苦手なのですが、冬はお鍋が美味しいので季節の中で一番好きです。

さて、今回は先日のMicrosoft Build 2023で発表され、一般公開となったAzure Cosmos DB MongoDB vCoreでのベクトル検索について触れていきたいと思います!

azure.microsoft.com

目次

Azureのベクターストア

Azureで使用できるベクターストアですが、現在以下が選択肢となります。

  • Azure AI Search (Cognitive Searchから名称変更)
  • Azure Cache for Redis Enterprise
  • Azure Data Explorer
  • Azure Database for PostgreSQL (pgvector拡張)
  • Azure Cosmos DB MongoDB vCore

Cosmos DB ( MongoDB vCore) を選択できるようになり、コストの観点でも非常に有り難いなと感じました。

また、CosmosDB (SQL Core) をAIチャットボットシステムのDBにする事は多いと思いますが、 CosmosDBのChangeFeed の機能を用いて新たな会話履歴がDBに追加されたら即座にベクトル化しCosmos DB ( MongoDB vCore) へ格納するといった用途でも効率的に処理できそうですね!

では以下のドキュメントを参考に早速やってみましょう!

learn.microsoft.com

Azure Cosmos DB MongoDB vCoreのデプロイ

まずはAzureのポータル画面からCosmos DB MongoDB vCoreをデプロイします。

CosmosDBの作成画面から「MongoDB向けのAzure Cosmos DB API」を選択します。

続いて「vCore cluster」を選択します。

次に設定画面ですが、今回はほぼ全てデフォルトで進めます。usernamePassword は後で使うのでメモしておきます。 なお、この図ではFree tierを選択したので、選択できるリージョンが限られた為、Southeast Asiaとなってます。

リソースの作成後は接続文字列をコピーしておきます。

MongoDB CompassでMongoDBへ接続

下記からMongoDB Compassをインストールします。

www.mongodb.com

起動して接続ボタンを選択すると以下のような画面になるので、先程コピーした接続文字列を貼り付け「Connect」ボタンを選択し接続できる事を確認しておきます。

ベクトル検索の元となるデータを作成する

今回はこちらの公開データセットを使用します。これは映画のレビューデータです。

github.com

元はjsonなのですが、今回分かりやすくする為に予めcsvに加工しました。

ここからは、Python (VSCode拡張 Jupyter Notebook)を使用してcsvのデータを読み込み、ベクトル列を追加する等の前処理をしていきます。 また今回「Azure OpenAI Service」を使用してます。使用するモデルはtext-embedding-ada-002gpt-35-turbo-16k です。

Azure OpenAI Serviceのデプロイ方法については省略させて頂きます。

learn.microsoft.com

import openai
import os
import pandas as pd
import numpy as np
import tiktoken
from pymongo import MongoClient

API_KEY = "<Azure OpenAI ServiceのAPIキー>"
RESOURCE_ENDPOINT = "<Azure OpenAI Serviceのエンドポイント>"

openai.api_type = "azure"
openai.api_key = API_KEY
openai.azure_endpoint = RESOURCE_ENDPOINT
openai.api_version = "2023-05-15"

元々のCSVデータは以下のように色々な映画レビューが記載されてます。

df = pd.read_csv("combined.csv")
df.head(3)

ここから、今回のベクトル検索の対象となる列同士を結合して新たな列を作成します。

#日本語になっている列名を変更
df = df.rename(columns={
   'タイトル': 'title',
   '製作年度': 'year',
   '監督名': 'director',
   '監督説明': 'director_description',
   'キャスト名': 'cast',
   'キャスト説明': 'cast_description',
   'ジャンル': 'genre',
   'レビュー': 'review',
   'あらすじ': 'synopsis'
})

#列を結合し、新たな列を作成 (ここは色々変えて見ると結果が変化するので面白い)
columns_to_combine = df.columns.difference(['movie_title', 'title', 'year', 'director', 'director_description', 'cast', 'cast_description','synopsis'])
df['combined'] = df[columns_to_combine].apply(lambda row: ','.join(row.values.astype(str)), axis=1)

結合して新たに作成した「combined」列に対してtext-embedding-ada-002 を使用してベクトル化の処理をしていきます。

def get_embedding(text, model="text-embedding-ada-002"):
   text = text.replace("\n", " ")
   res = openai.embeddings.create(input = [text], model=model).data[0].embedding
   return res

#ベクトル化
df['vectorContent'] = df['combined'].apply(lambda x: get_embedding(x,model="text-embedding-ada-002"))

df[["combined","vectorContent"]].head(2)

print(len(df['vectorContent'][0]))

1536

新たに「vectorContent」列が作成され、そのベクトルデータは1536次元である事が分かります。

一旦、ここまでで事前準備は完了です。

MongoDBにベクトル検索用のインデックスを作成する

先程、接続したMongoDB Compassを開きます。

データベースとコレクションを先に作成しておきます。 今回はデーベース名をdoc コレクション名をdocs としました。

以下の画面下部にあるMongo shellを起動します。

シェルが起動するので、コマンドを入力します。

use doc
# ベクトル検索用のインデックスを作成

db.runCommand({
  createIndexes: 'docs',
  indexes: [
    {
      name: 'vectorSearchIndex',
      key: {
        "vectorContent": "cosmosSearch"
      },
      cosmosSearchOptions: {
        kind: 'vector-ivf',
        numLists: 1, 
        similarity: 'COS',
        dimensions: 1536
      }
    }
  ]
});

以下はプロパティの説明です。

  • createIndexes
    • 作成したコレクション名
  • name
    • indexの名前
  • key
    • { <ベクトルを格納しているフィールド名> : <indexタイプ> "cosmosSearch"はベクトル検索用のindexType }
  • kind
    • vector-indexの種類。 主にvector-ivf がサポートされている
  • numLists
    • インデックスがベクトルデータのグループ化に使用するクラスタの数。ドキュメント数が100万まではnumListsをドキュメント数/1000に、100万以上はドキュメント数の平方根を取る事が推奨値。検索クエリの実行時間に影響。
  • similarity
    • COS[コサイン類似度] , L2[ユークリッド距離], IP[内積]
  • dimensions
    • 次元数

similarity の値は今回「コサイン類似度」と「ユークリッド距離」をそれぞれ試してみたところ変化がありました。 numLists の値も変化させてみましたが、今回はドキュメント数が少ないのもありそれほど変化は感じられませんでした。

無事にインデックスが作成されると以下の表示になります。

{
  raw: {
    defaultShard: {
      numIndexesBefore: 1,
      numIndexesAfter: 2,
      createdCollectionAutomatically: false,
      ok: 1
    }
  },
  ok: 1
}

確認すると、確かにインデックスが作成された事が分かります。

MongoDBへデータを格納し、ベクトル検索を実行する

先程作成したCSVデータを元にMongoDBへデータを格納していきます。

connection_string = <Cosmos DB for mongoDB vcoreの接続文字列>

client = MongoClient(connection_string)
collection = client["doc"]["docs"]

#MongoDBへ格納
for index, row in df.iterrows():
   doc = {
       'dialog_id': row['dialog_id'],
       'movie_title': row['movie_title'],
       'title': row['title'],
       'year': row['year'],
       'director': row['director'],
       'director_description': row['director_description'],
       'cast': row['cast'],
       'cast_description': row['cast_description'],
       'synopsis': row['synopsis'],
       'vectorContent': row['vectorContent']
   }
   collection.insert_one(doc)

確認すると、しっかり格納できてそうですね!

それでは自然言語で検索してみましょう。 まずは検索クエリになる自然言語をベクトル化します。

message = "ハラハラするアクション映画教えて"
query_vector = get_embedding(message, model="text-embedding-ada-002")

print(query_vector)
print(len(query_vector))
[-0.03269115090370178, -0.006066054571419954, -0.004153274465352297,...
1536

このベクトルで検索してみましょう。

search_query = {
    "$search": {
        "cosmosSearch": {
            "vector": query_vector, #検索ベクトル
            "path": "vectorContent", #ベクトルが格納されているフィールド
            "k": 2  #類似スコア上位2件
        },
        "returnStoredSource": True
    }
}
#類似スコアも返却対象に
project_stage = {
    "$project": {
        "similarityScore": {"$meta": "searchScore"},
        "document": "$$ROOT"
    }
}

results = collection.aggregate([search_query, project_stage])
for result in results:
   print(f"タイトル:{result['document']['movie_title']}")
   print(f"類似度:{result['similarityScore']}")

結果は以下のようになりました。確かにという感じですね。

チャットボット風にしてみる

ここまでのものを纏めてチャットボット風(いわゆるRAG)にしてみます。

def open_ai_chat(message) -> str:
   query_vector = get_embedding(message, model="text-embedding-ada-002")
   search_query = {
       "$search": {
           "cosmosSearch": {
               "vector": query_vector,
               "path": "vectorContent",
               "k": 2
           },
           "returnStoredSource": True
       }
   }
   project_stage = {
       "$project": {
           "similarityScore": {"$meta": "searchScore"},
           "document": "$$ROOT"
       }
   }
   results = collection.aggregate([search_query, project_stage])
   assistant_message = ",".join([result['document']['movie_title'] for result in results])
   print(f"検索結果: {assistant_message}")

   response = openai.chat.completions.create(
      model="gpt-35-turbo-16k", #deployment_name
      #検索結果をプロンプトに加える
      messages=[
        {"role": "system", "content": f"お勧め映画は{assistant_message}です。,で区切られた2件のお勧め映画情報のみを絶対に返答するようにしてください。"},
        {"role": "user", "content": message}
      ],
      temperature=0,
   )

   return response.choices[0].message.content


ans = open_ai_chat("泣ける映画教えて")
print(f"AIの回答:{ans}")
検索結果: HACHI約束の犬,映画「聲の形」

AIの回答:お勧め映画は「HACHI 約束の犬」と「聲の形」です。

temperature を0にする事で回答のランダム性を無くしています。

プロンプトはもっと作り込む必要がありそうですが、とりあえずは意図した通りの回答をしてくれました。

LangChain

LangChainを使用する事でより少ない記述でインデックスの作成から、検索までおこなえます。

python.langchain.com

今回、詳細は割愛しますが、例えば既に作成しているインデックスに対して検索をかける際は以下の記述だけで可能です。

from langchain.embeddings import AzureOpenAIEmbeddings
from langchain.vectorstores.azure_cosmos_db import AzureCosmosDBVectorSearch
from pymongo import MongoClient

client = MongoClient(<connection string>)
collection = client["doc"]["docs"]

azure_openai_embeddings = AzureOpenAIEmbeddings(
    openai_api_version="2023-05-15",
    azure_deployment=<deployment_name>,
    azure_endpoint=<azure openai service endpoint>,
    openai_api_key=<azure opena ai service key>,
)

vectorstore = AzureCosmosDBVectorSearch(
    collection, azure_openai_embeddings, index_name="vectorSearchIndex", text_key="title", embedding_key="vectorContent"
)

query = "SF映画教えて"
docs = vectorstore.similarity_search(query=query, k=2)

for doc in docs:
  print(doc.page_content)
インターステラー
SPACEBATTLESHIPヤマト

インデックスの作成は各種ドキュメントローダーを使用する事で簡単におこなえます。

Document loaders | 🦜️🔗 Langchain

CosmosDB SQL Core の ChangeFeedを利用する

CosmosDB SQL CoreのChangeFeedを利用して、CosmosDBへ新たにデータが追加された事を契機にAzureFunctionsを起動し、データをベクトル化しMongo DBへ保存されるようにしてみます。

CosmosDB SQL CoreとAzureFunctionsの作成方法については省略させて頂きます。

learn.microsoft.com

__init__.py (model v1)

import logging
from pymongo import MongoClient
import openai
import azure.functions as func
import os

API_KEY = os.getenv("OPENAI_API_KEY")
RESOURCE_ENDPOINT = os.getenv("OPENAI_RESOURCE_ENDPOINT")

openai.api_type = "azure"
openai.api_key = API_KEY
openai.azure_endpoint = RESOURCE_ENDPOINT
openai.api_version = "2023-05-15"

connection_string = os.getenv("MONGO_CONNECTION_STRING")

client = MongoClient(connection_string)
collection = client["doc"]["docs"]

def main(documents: func.DocumentList) -> str:
    if documents:
        logging.info('Document id: %s', documents[0]['dialog_id'])

        fields = ['movie_title', 'title', 'year', 'director', 'director_description', 'cast', 'cast_description', 'synopsis']
        combined = ", ".join([str(documents[0][field]) for field in fields])

        #ベクトル化
        vectorContent = get_embedding(combined,model="text-embedding-ada-002")
        logging.info("vectorContent: %s", len(vectorContent))

        doc = {
            'dialog_id': int(documents[0]["dialog_id"]),
            'movie_title': documents[0]["movie_title"],
            'title': documents[0]["title"],
            'year': documents[0]["year"],
            'director': documents[0]["director"],
            'director_description': documents[0]["director_description"],
            'cast': documents[0]["cast"],
            'cast_description': documents[0]["cast_description"],
            'synopsis': documents[0]["synopsis"],
            'vectorContent': vectorContent
        }

        collection.insert_one(doc)
        logging.info('Document inserted to MongoDB!!!')


def get_embedding(text, model="text-embedding-ada-002"):
   text = text.replace("\n", " ")
   res = openai.embeddings.create(input = [text], model=model).data[0].embedding
   return res

Cosmos DBのポータル画面から新しいデータを追加します。

Azure Functionsのログに実行履歴が残ってますね。どうやら無事に成功しているようです。

MongoDB Compass で確認すると作成されている事が確認できました。 体感ですが、ここまで1分かからず実行されたかと思います。

最後に

今回、Azure Cosmos DB MongoDB vCoreでベクトル検索ができるようになり、よりベクターストアの選択肢が広がった気がします。

コストの部分で気兼ねなく試せる感があるのも個人的には嬉しいポイントです😀

これで以上となります。拙い内容で恐縮ですが最後までご覧いただき有難うございました!


サービス一覧 www.alterbooth.com cloudpointer.tech www.alterbooth.com