Node.js FileStreamの読み込みを一時停止しつつ、一括登録を実行する

郵便番号データをTedious BulkLoadで一括登録してみました。
Node.js Tedious でBulkLoadを使用して郵便番号データを一括登録

このときは郵便番号約12万件をすべて読みこんで一括登録しましたが、
登録するデータ件数が増えた場合を考慮し、1万件毎にBulkLoadするよう修正してみます。


最初のサンプル



1万件読み込んだらBulkLoadを実行すればよいだろうと修正したソースがこちら。


  1. const fs = require('fs');
  2. const readline = require('readline')
  3. const { Connection, TYPES } = require('tedious')
  4. // 設定に従いデータベースへ接続
  5. function create_connection(config) {
  6.     
  7.     const connection = new Connection(config)
  8.     // Promiseをnewした時点で引数のfunctionが実行される
  9.     const p = new Promise(function(resolve, reject) {
  10.         connection.on('connect', err => {
  11.             if (err) {
  12.                 reject(err)
  13.             } else {
  14.                 resolve(connection)
  15.             }
  16.         });
  17.         connection.connect()
  18.     });
  19.     return p
  20.     
  21. }
  22. // BulkLoadの実行
  23. function execBulkLoad(connection, rows) {
  24.     const p = new Promise(function(resolve, reject) {
  25.         // BulkLoad用の設定
  26.         const options = {}
  27.         const bulkLoad = connection.newBulkLoad('postal_code', options, function (error, rowCount) {
  28.             resolve('inserted ' + rowCount + ' rows');
  29.         })
  30.         bulkLoad.addColumn('code', TYPES.Char, { nullable: false })
  31.         bulkLoad.addColumn('address', TYPES.NVarChar, { length: 100, nullable: false })
  32.         rows.forEach((row) => {
  33.             // { code: '郵便番号', address: '住所'} の形式のデータをaddRow
  34.             bulkLoad.addRow(row)
  35.         })
  36.         // バルクロード実行
  37.         connection.execBulkLoad(bulkLoad)
  38.     });
  39.     return p
  40. }
  41. async function main() {
  42.     // データベースに接続
  43.     const config = {
  44.         authentication: {
  45.             options: {
  46.                 userName: 'sa',
  47.                 password: 'P@ssw0rd'
  48.             },
  49.             type: 'default'
  50.         },
  51.         server: 'localhost',
  52.         options: {
  53.             database: 'sample',
  54.             encrypt: false
  55.         }
  56.     }
  57.     const connection = await create_connection(config);
  58.     const stream = fs.createReadStream('./KEN_ALL_UTF8.CSV', 'utf8')
  59.     const reader = readline.createInterface({ input: stream })
  60.     let rows = []
  61.     let rowCount = 0
  62.     reader.on('line', async (data) => {
  63.         // 郵便番号情報を取得
  64.         const item = data.split(',').map((value) => { return value.replace(/^"+|"+$/g,'') })
  65.         const code = item[2]
  66.         const address = item[6] + item[7] + item[8]
  67.         rows.push({code: code, address: address})
  68.         if (rows.length == 10000) {
  69.             const msg = await execBulkLoad(connection, rows)
  70.             console.log(msg)
  71.             rows = []
  72.         }
  73.         rowCount++
  74.     })
  75.     reader.on('close', async () => {
  76.         // 登録実行
  77.         if (rows.length) {
  78.             const msg = await execBulkLoad(connection, rows)
  79.             console.log(msg)
  80.         }
  81.         console.log(rowCount)
  82.         connection.close()
  83.     })
  84. }
  85. main()




実行すると、半分程度しか登録されません。


$ node app.js
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 876 rows
124517



読み込んだ件数12万件に対し、インサートログは60,874件。
実際にデータベースへ登録されていたのは、60,876件でした。
なんでだ?



pause / resume



BulkLoadを実行中もファイル読み込みが実行されるのでデータ件数が合わないのでは?

ドキュメントを見てみると、streamにはpauseとresumeというメソッドがあります。
https://nodejs.org/api/stream.html#stream_readable_pause

BulkLoad前にpauseを呼び出し。
終わったらresumeを呼び出して処理再開としてみます。

修正箇所の抜粋です。


  1.     reader.on('line', async (data) => {
  2.         // 郵便番号情報を取得
  3.         const item = data.split(',').map((value) => { return value.replace(/^"+|"+$/g,'') })
  4.         const code = item[2]
  5.         const address = item[6] + item[7] + item[8]
  6.         rows.push({code: code, address: address})
  7.         if (rows.length == 10000) {
  8.             reader.pause() // 読み込みを一旦停止
  9.             const msg = await execBulkLoad(connection, rows)
  10.             console.log(msg)
  11.             rows = []
  12.             reader.resume() // 読み込み再開
  13.         }
  14.         rowCount++
  15.     })





$ node app.js
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 2091 rows
124517



読み込んだレコード数と登録件数がかなり近づきましたが、まだ漏れがあります。



await pause



pauseが実行されるまでに読み込んでしまうデータがあるのでは?と思い、pauseにawaitをつけてみました。


  1.     reader.on('line', async (data) => {
  2.         // 郵便番号情報を取得
  3.         const item = data.split(',').map((value) => { return value.replace(/^"+|"+$/g,'') })
  4.         const code = item[2]
  5.         const address = item[6] + item[7] + item[8]
  6.         rows.push({code: code, address: address})
  7.         if (rows.length == 10000) {
  8.             await reader.pause() // 読み込みを一旦停止
  9.             const msg = await execBulkLoad(connection, rows)
  10.             console.log(msg)
  11.             rows = []
  12.             reader.resume() // 読み込み再開
  13.         }
  14.         rowCount++
  15.     })




$ node app.js
inserted 10376 rows
inserted 10311 rows
inserted 10123 rows
inserted 10291 rows
inserted 10219 rows
inserted 10029 rows
inserted 10024 rows
inserted 10316 rows
inserted 10190 rows
inserted 10161 rows
inserted 10221 rows
inserted 10165 rows
inserted 2091 rows
124517




これで読み込んだ件数と登録件数が一致してくれました。
・・・しかし、指定した件数でのデータ登録とはならず、分割境界値付近での挙動が気になります。
エディタにもawaitの意味がないという警告が表示されますし。



pause event



pauseしたときに発生するイベント内でデータ登録を行うよう修正しました。


  1. const fs = require('fs');
  2. const readline = require('readline')
  3. const { Connection, TYPES } = require('tedious')
  4. // 設定に従いデータベースへ接続
  5. function create_connection(config) {
  6.     
  7.     const connection = new Connection(config)
  8.     // Promiseをnewした時点で引数のfunctionが実行される
  9.     const p = new Promise(function(resolve, reject) {
  10.         connection.on('connect', err => {
  11.             if (err) {
  12.                 reject(err)
  13.             } else {
  14.                 resolve(connection)
  15.             }
  16.         });
  17.         connection.connect()
  18.     });
  19.     return p
  20.     
  21. }
  22. // BulkLoadの実行
  23. function execBulkLoad(connection, rows) {
  24.     const p = new Promise(function(resolve, reject) {
  25.         // BulkLoad用の設定
  26.         const options = {}
  27.         const bulkLoad = connection.newBulkLoad('postal_code', options, function (error, rowCount) {
  28.             resolve('inserted ' + rowCount + ' rows (input:' + rows.length+')');
  29.         })
  30.         bulkLoad.addColumn('code', TYPES.Char, { nullable: false })
  31.         bulkLoad.addColumn('address', TYPES.NVarChar, { length: 100, nullable: false })
  32.         rows.forEach((row) => {
  33.             // { code: '郵便番号', address: '住所'} の形式のデータをaddRow
  34.             bulkLoad.addRow(row)
  35.         })
  36.         // バルクロード実行
  37.         connection.execBulkLoad(bulkLoad)
  38.     });
  39.     return p
  40. }
  41. async function main() {
  42.     // データベースに接続
  43.     const config = {
  44.         authentication: {
  45.             options: {
  46.                 userName: 'sa',
  47.                 password: 'P@ssw0rd'
  48.             },
  49.             type: 'default'
  50.         },
  51.         server: 'localhost',
  52.         options: {
  53.             database: 'sample',
  54.             encrypt: false
  55.         }
  56.     }
  57.     const connection = await create_connection(config);
  58.     const stream = fs.createReadStream('./KEN_ALL_UTF8.CSV', 'utf8')
  59.     const reader = readline.createInterface({ input: stream })
  60.     let rows = []
  61.     let bulkRows = []
  62.     let rowCount = 0
  63.     reader.on('line', (data) => {
  64.         // 郵便番号情報を取得
  65.         const item = data.split(',').map((value) => { return value.replace(/^"+|"+$/g,'') })
  66.         const code = item[2]
  67.         const address = item[6] + item[7] + item[8]
  68.         rows.push({code: code, address: address})
  69.         
  70.         if (rows.length == 10000) {
  71.             bulkRows.push(rows)
  72.             rows = []
  73.             reader.pause() // 読み込みを一旦停止
  74.         }
  75.         rowCount++
  76.     })
  77.     reader.on('pause', async () => {
  78.         // reader.pauseに加え、closeイベントの前にも呼び出される
  79.         // 登録内容が存在しない場合はスキップ
  80.         if (bulkRows.length == 0) {
  81.             return
  82.         }
  83.         const msg = await execBulkLoad(connection, bulkRows.pop())
  84.         console.log(msg)
  85.         reader.resume() // 読み込み再開
  86.     })
  87.     reader.on('close', async () => {
  88.         // 登録実行
  89.         if (rows.length > 0) {
  90.             const msg = await execBulkLoad(connection, rows)
  91.             console.log(msg)
  92.         }
  93.         console.log(rowCount)
  94.         connection.close()
  95.     })
  96. }
  97. main()




狙い通りの実行結果です。


$ node app.js
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 4517 rows (input:4517)
124517


関連記事

プロフィール

Author:symfo
blog形式だと探しにくいので、まとめサイト作成中です。
Symfoware まとめ

PR




検索フォーム

月別アーカイブ