Zhihuは、登録ユーザー数が2億2,000万人を超える中国最大のナレッジ共有プラットフォームであり、Webサイト上での質問に対する回答数は3億1,000万件を突破しています。2021年3月にはニューヨーク証券取引所での上場を果たし、時価総額は53億ドル規模にのぼります。

 

利用者の拡大に伴い、Zhihuはインフラの改善を求めていました。そこで、MySQLから分散SQLデータベースのTiDBに移行することで、Apache Hive Metastoreを水平的にスケーリングし、TiDBを他のビッグデータアプリケーションにも適用することにしました。移行後は、大量のクエリの実行時間が30~40秒から6~7秒にまで短縮されていますTiDBにはそれだけの威力があるのです。

 

この記事では、TiDBによってZhihuのビッグデータアーキテクチャのパフォーマンスがどのように向上したかを、2つの例で示します。TiDBを使用して、Hiveでの大量のクエリに対応するリアルタイムのアラートシステムを構築し、HDFSでのNameNode RPCリクエストを加速する方法を紹介します。

 

Hiveでの大量のクエリに対応するアラート

Apache Hiveは、データのクエリと分析ができるデータウェアハウスです。Zhihuでは、ETL(抽出・変換・ロード)タスクやアドホッククエリにHiveを使用しています。

問題点

ETLタスクではSQLステートメントがおおむね安定していますが、アドホッククエリには多くの場合、不安定なSQLステートメントが含まれています。そのためアドホッククエリの最適化が重要になります。この最適化を行わないと、MapReduceジョブで過大なデータがスキャンされてタスクの実行が低速になり、HDFS(Hadoop分散ファイルシステム)に多大な負荷がかかる可能性があります。クラスタ全体が不安定になるのです。

この問題は、ユーザーが四半期または年間のデータに対するクエリを行う場合に頻繁に発生します。そうしたクエリがHiveで処理されると、クラスタでリソースの不足が発生することが多くなります。十分なリソースがないと、ETLタスクが停滞し、レポートが遅延します。

大量のクエリに対応するアラートシステム

この問題を解決するために、Zhihuは大量のクエリの存在をユーザーにリアルタイムに通知するシステムを開発しました。ユーザーがSQLステートメントをHiveに送信すると、このシステムでは次のように処理されます。

  1. 実行プランを解析し、そのプランをテーブルパスやパーティションパスに変換して、Hiveがスキャンできるようにします。
  2. すべてのパーティションパスを集約して、スキャン対象のデータのサイズを計算します。
  3. データのサイズがしきい値を超えていないかどうかを判定します。超えている場合は、ユーザーにアラートが送信されます。

 

HDFSパスの取得

それぞれのSQLステートメントを解析すると、Hiveサーバーのフックメカニズムを使用して、監査ログをKafkaに出力します。ログの形式は次のとおりです。

{
 "operation": "QUERY",
 "user": "hdfs",
 "time": "2021-07-12 15:43:16.022",
 "ip": "127.0.0.1",
 "hiveServerIp": "127.0.0.1",
 "inputPartitionSize": 2,
 "sql": "select count(*) from test_table where pdate in ('2021-07-01','2021-07-02')",
 "hookType": "PRE_EXEC_HOOK",
 "currentDatabase": "default",
 "sessionId": "5e18ff6e-421d-4868-a522-fc3d342c3551",
 "queryId": "hive_20210712154316_fb366800-2cc9-4ba3-83a7-815c97431063",
 "inputTableList": [
   "test_table"
 ],
 "outputTableList": [],
 "inputPaths": [
   "/user/hdfs/tables/default.db/test_table/2021-07-01",
   "/user/hdfs/tables/default.db/test_table/2021-07-02"
 ],
 "app.owner": "humengyu"
}

次に示すフィールドに注目してください。

フィールド 説明
operation SQLの種類。例: QUERY、DROP
user SQLステートメントを送信するユーザーグループ
sql SQLステートメント
inputPaths スキャン対象のHDFSパス
app.owner SQLステートメントの送信者

パーティションサイズの計算

パーティションのサイズを計算するには、inputPaths内の各ディレクトリのサイズを知る必要があります。そこで、HDFS fsimageを毎日解析し、それぞれのHiveディレクトリのサイズを計算し、その結果をTiDBに保存することにしました。

他のアプリケーションでもfsimageを使用しているため、Hiveディレクトリだけでなく、合計で何百億ものレコードを含むHDFSディレクトリ全体を保存しました。TiDBは、大量のデータの保存とインデックス登録に最適なデータベースです。

 

リアルタイムアラート

このシステムでは、データサイズを計算した後で、ユーザーにリアルタイムアラートを送信してリスクを知らせるかどうかが判定されます。アラートは次のようなプロセスで行われます。

  1. Hiveサーバーが監査ログをKafkaにリアルタイムに送信します。
  2. FlinkがKafkaからのデータを処理し、Kafka Table Sourceを介してKafkaをストリーミングテーブルとして扱います。
  3. FlinkがJDBC Lookup Table Sourceを使用し、TiDBをディメンショナルテーブルとして扱います。
  4. Flinkが各SQLクエリによってスキャンされたデータサイズを計算し、アラートを送信すべきか判定します。

アラートプロセス

最終的に、過大なデータをスキャンするSQLステートメントを書き込んだユーザーには、次のようなアラートが送信されます。

大量のSQLクエリに関するアラート

NameNode RPCリクエストの加速化

大量のクエリに関するアラートの送信に加えて、ZhihuはTiDBを使用してNameNodeの加速化を図りました。

NAMENODEのパフォーマンス上の問題

従来は多くのユーザーが、Hiveでのクエリには何分も、あるいは何時間もかかるという不満を口にしていました。Zhihuがそれについて調査したところ、RPC(リモートプロシージャコール)に問題があることがわかりました。HiveがgetInputSummaryメソッドを呼び出すと、グローバルロックが取得され、ロックが解放されるまでは他のクエリがデータにアクセスできない状態になります。Hiveが大量のクエリを実行した場合には、このメソッドの呼び出しに長時間を要する可能性があります。その間、他のクエリスレッドは待機していなければならないのです。

ZhihuがHiveのソースコードを確認したところ、Hiveが複数のgetInputSummaryメソッドを同時に実行できることがわかりました(内部では、このメソッドはHDFSのgetContentSummaryメソッドを呼び出していました)。そこで、グローバルロックを削除し、スレッドプールに類似した手法に置き換えました。その結果、getContentSummaryでの高度な同時実行が可能になりました。

しかしそこで別の問題が生じました。HDFSのgetContentSummaryは、ファイルシステムのディスク使用量確認(du)に類似したメソッドです。getContentSummaryでの同時実行が過大になると、NameNodeのパフォーマンスが大幅に低下する可能性があります。他のコンピューティングエンジンでもこのメソッドが使用されているため、最適化を行わなければなりません。

キャッシュコンテンツサマリー

2019年の初めに、ZhihuはRouterベースのフェデレーションを使用して、ファイルシステムをHDFSフェデレーションに分割しました。このアーキテクチャでは、リクエストをNameNodeに転送する、Routerというコンポーネントが新たに導入されました。

NameNodeのパフォーマンスに関する問題を解決するために、Routerレイヤ内にHDFSコンテンツサマリーのキャッシュを追加しました。具体的には、fsimageを使用してすべてのディレクトリのコンテンツサマリーを毎日生成し、TiDBにキャッシュを保存するという仕組みです。

クライアントがリクエストを送信すると、HDFSはキャッシュ内でデータの検索を試みます。リクエストされたデータが見つからない場合は、NameNodeからデータを取得し、キャッシュを更新します。この方法の問題点は、親ディレクトリが子ディレクトリ内の変更を検出できず、リーフディレクトリだけがキャッシュされることです。この実装では、NameNodeに対しては非常に少数のリクエストしか行われないため、NameNodeのパフォーマンスへの影響は生じません。

この解決策の利点は、Hiveでの大量クエリに関するアラートシステムを構築した際に、すでにコンテンツサマリーを生成していたことです。キャッシュをTiDBに保存してから、リクエストされたパスのインデックスを作成したところ、通常のgetContentSummaryリクエストのレイテンシが数分から10ミリ秒未満にまで短縮されました。

 

将来的な展望

膨大なデータを保存できるTiDBの水平的スケーラビリティと、クエリを加速するインデックス機能により、Zhihuのビジネス要件に適合するレベルでHDFSメタデータを保存できるようになりました。

HTAP(Hybrid Transactional Analytical Processing)データベースであるTiDBには、非常に大きな可能性があります。たとえば、HDFSファイルメタデータをリアルタイムにキャッシュして、EditLogからのファイル変更にサブスクライブすることができます。ファイル変更とfsimageをTiDBで統合することで、NameNodeのスナップショットを低レイテンシで生成し、オンライン分析に使用することが可能です。

TiDBは将来にわたってZhihuのインフラにとって重要な要素になり、その成長を加速する力になることでしょう。今後はビッグデータアーキテクチャの中で、TiDBをさらに広範に適用していく予定です。