Node.js FileStreamの読み込みを一時停止しつつ、一括登録を実行する

郵便番号データをTedious BulkLoadで一括登録してみました。
Node.js Tedious でBulkLoadを使用して郵便番号データを一括登録

このときは郵便番号約12万件をすべて読みこんで一括登録しましたが、
登録するデータ件数が増えた場合を考慮し、1万件毎にBulkLoadするよう修正してみます。


最初のサンプル



1万件読み込んだらBulkLoadを実行すればよいだろうと修正したソースがこちら。


  1. const fs = require('fs');
  2. const readline = require('readline')
  3. const { Connection, TYPES } = require('tedious')
  4. // 設定に従いデータベースへ接続
  5. function create_connection(config) {
  6.     
  7.     const connection = new Connection(config)
  8.     // Promiseをnewした時点で引数のfunctionが実行される
  9.     const p = new Promise(function(resolve, reject) {
  10.         connection.on('connect', err => {
  11.             if (err) {
  12.                 reject(err)
  13.             } else {
  14.                 resolve(connection)
  15.             }
  16.         });
  17.         connection.connect()
  18.     });
  19.     return p
  20.     
  21. }
  22. // BulkLoadの実行
  23. function execBulkLoad(connection, rows) {
  24.     const p = new Promise(function(resolve, reject) {
  25.         // BulkLoad用の設定
  26.         const options = {}
  27.         const bulkLoad = connection.newBulkLoad('postal_code', options, function (error, rowCount) {
  28.             resolve('inserted ' + rowCount + ' rows');
  29.         })
  30.         bulkLoad.addColumn('code', TYPES.Char, { nullable: false })
  31.         bulkLoad.addColumn('address', TYPES.NVarChar, { length: 100, nullable: false })
  32.         rows.forEach((row) => {
  33.             // { code: '郵便番号', address: '住所'} の形式のデータをaddRow
  34.             bulkLoad.addRow(row)
  35.         })
  36.         // バルクロード実行
  37.         connection.execBulkLoad(bulkLoad)
  38.     });
  39.     return p
  40. }
  41. async function main() {
  42.     // データベースに接続
  43.     const config = {
  44.         authentication: {
  45.             options: {
  46.                 userName: 'sa',
  47.                 password: 'P@ssw0rd'
  48.             },
  49.             type: 'default'
  50.         },
  51.         server: 'localhost',
  52.         options: {
  53.             database: 'sample',
  54.             encrypt: false
  55.         }
  56.     }
  57.     const connection = await create_connection(config);
  58.     const stream = fs.createReadStream('./KEN_ALL_UTF8.CSV', 'utf8')
  59.     const reader = readline.createInterface({ input: stream })
  60.     let rows = []
  61.     let rowCount = 0
  62.     reader.on('line', async (data) => {
  63.         // 郵便番号情報を取得
  64.         const item = data.split(',').map((value) => { return value.replace(/^"+|"+$/g,'') })
  65.         const code = item[2]
  66.         const address = item[6] + item[7] + item[8]
  67.         rows.push({code: code, address: address})
  68.         if (rows.length == 10000) {
  69.             const msg = await execBulkLoad(connection, rows)
  70.             console.log(msg)
  71.             rows = []
  72.         }
  73.         rowCount++
  74.     })
  75.     reader.on('close', async () => {
  76.         // 登録実行
  77.         if (rows.length) {
  78.             const msg = await execBulkLoad(connection, rows)
  79.             console.log(msg)
  80.         }
  81.         console.log(rowCount)
  82.         connection.close()
  83.     })
  84. }
  85. main()




実行すると、半分程度しか登録されません。


$ node app.js
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 876 rows
124517



読み込んだ件数12万件に対し、インサートログは60,874件。
実際にデータベースへ登録されていたのは、60,876件でした。
なんでだ?



pause / resume



BulkLoadを実行中もファイル読み込みが実行されるのでデータ件数が合わないのでは?

ドキュメントを見てみると、streamにはpauseとresumeというメソッドがあります。
https://nodejs.org/api/stream.html#stream_readable_pause

BulkLoad前にpauseを呼び出し。
終わったらresumeを呼び出して処理再開としてみます。

修正箇所の抜粋です。


  1.     reader.on('line', async (data) => {
  2.         // 郵便番号情報を取得
  3.         const item = data.split(',').map((value) => { return value.replace(/^"+|"+$/g,'') })
  4.         const code = item[2]
  5.         const address = item[6] + item[7] + item[8]
  6.         rows.push({code: code, address: address})
  7.         if (rows.length == 10000) {
  8.             reader.pause() // 読み込みを一旦停止
  9.             const msg = await execBulkLoad(connection, rows)
  10.             console.log(msg)
  11.             rows = []
  12.             reader.resume() // 読み込み再開
  13.         }
  14.         rowCount++
  15.     })





$ node app.js
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 10000 rows
inserted 2091 rows
124517



読み込んだレコード数と登録件数がかなり近づきましたが、まだ漏れがあります。



await pause



pauseが実行されるまでに読み込んでしまうデータがあるのでは?と思い、pauseにawaitをつけてみました。


  1.     reader.on('line', async (data) => {
  2.         // 郵便番号情報を取得
  3.         const item = data.split(',').map((value) => { return value.replace(/^"+|"+$/g,'') })
  4.         const code = item[2]
  5.         const address = item[6] + item[7] + item[8]
  6.         rows.push({code: code, address: address})
  7.         if (rows.length == 10000) {
  8.             await reader.pause() // 読み込みを一旦停止
  9.             const msg = await execBulkLoad(connection, rows)
  10.             console.log(msg)
  11.             rows = []
  12.             reader.resume() // 読み込み再開
  13.         }
  14.         rowCount++
  15.     })




$ node app.js
inserted 10376 rows
inserted 10311 rows
inserted 10123 rows
inserted 10291 rows
inserted 10219 rows
inserted 10029 rows
inserted 10024 rows
inserted 10316 rows
inserted 10190 rows
inserted 10161 rows
inserted 10221 rows
inserted 10165 rows
inserted 2091 rows
124517




これで読み込んだ件数と登録件数が一致してくれました。
・・・しかし、指定した件数でのデータ登録とはならず、分割境界値付近での挙動が気になります。
エディタにもawaitの意味がないという警告が表示されますし。



pause event



pauseしたときに発生するイベント内でデータ登録を行うよう修正しました。


  1. const fs = require('fs');
  2. const readline = require('readline')
  3. const { Connection, TYPES } = require('tedious')
  4. // 設定に従いデータベースへ接続
  5. function create_connection(config) {
  6.     
  7.     const connection = new Connection(config)
  8.     // Promiseをnewした時点で引数のfunctionが実行される
  9.     const p = new Promise(function(resolve, reject) {
  10.         connection.on('connect', err => {
  11.             if (err) {
  12.                 reject(err)
  13.             } else {
  14.                 resolve(connection)
  15.             }
  16.         });
  17.         connection.connect()
  18.     });
  19.     return p
  20.     
  21. }
  22. // BulkLoadの実行
  23. function execBulkLoad(connection, rows) {
  24.     const p = new Promise(function(resolve, reject) {
  25.         // BulkLoad用の設定
  26.         const options = {}
  27.         const bulkLoad = connection.newBulkLoad('postal_code', options, function (error, rowCount) {
  28.             resolve('inserted ' + rowCount + ' rows (input:' + rows.length+')');
  29.         })
  30.         bulkLoad.addColumn('code', TYPES.Char, { nullable: false })
  31.         bulkLoad.addColumn('address', TYPES.NVarChar, { length: 100, nullable: false })
  32.         rows.forEach((row) => {
  33.             // { code: '郵便番号', address: '住所'} の形式のデータをaddRow
  34.             bulkLoad.addRow(row)
  35.         })
  36.         // バルクロード実行
  37.         connection.execBulkLoad(bulkLoad)
  38.     });
  39.     return p
  40. }
  41. async function main() {
  42.     // データベースに接続
  43.     const config = {
  44.         authentication: {
  45.             options: {
  46.                 userName: 'sa',
  47.                 password: 'P@ssw0rd'
  48.             },
  49.             type: 'default'
  50.         },
  51.         server: 'localhost',
  52.         options: {
  53.             database: 'sample',
  54.             encrypt: false
  55.         }
  56.     }
  57.     const connection = await create_connection(config);
  58.     const stream = fs.createReadStream('./KEN_ALL_UTF8.CSV', 'utf8')
  59.     const reader = readline.createInterface({ input: stream })
  60.     let rows = []
  61.     let bulkRows = []
  62.     let rowCount = 0
  63.     reader.on('line', (data) => {
  64.         // 郵便番号情報を取得
  65.         const item = data.split(',').map((value) => { return value.replace(/^"+|"+$/g,'') })
  66.         const code = item[2]
  67.         const address = item[6] + item[7] + item[8]
  68.         rows.push({code: code, address: address})
  69.         
  70.         if (rows.length == 10000) {
  71.             bulkRows.push(rows)
  72.             rows = []
  73.             reader.pause() // 読み込みを一旦停止
  74.         }
  75.         rowCount++
  76.     })
  77.     reader.on('pause', async () => {
  78.         // reader.pauseに加え、closeイベントの前にも呼び出される
  79.         // 登録内容が存在しない場合はスキップ
  80.         if (bulkRows.length == 0) {
  81.             return
  82.         }
  83.         const msg = await execBulkLoad(connection, bulkRows.pop())
  84.         console.log(msg)
  85.         reader.resume() // 読み込み再開
  86.     })
  87.     reader.on('close', async () => {
  88.         // 登録実行
  89.         if (rows.length > 0) {
  90.             const msg = await execBulkLoad(connection, rows)
  91.             console.log(msg)
  92.         }
  93.         console.log(rowCount)
  94.         connection.close()
  95.     })
  96. }
  97. main()




狙い通りの実行結果です。


$ node app.js
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 10000 rows (input:10000)
inserted 4517 rows (input:4517)
124517


関連記事

fastify データベースを検索した結果をhtmlで表示する

fastifyでデータベースの検索やhtmlの表示を試してみました。
fastifyで郵便番号検索APIのサンプル
fastifyでhtmlビューを表示する(point-of-view, ejs)

2つを組み合わせて、データベースを検索した結果をhtmlで表示してみます。


サンプル



サーバー部分は以下のようになりました。

・server.js


  1. const fastify = require('fastify')({ logger: true })
  2. const postal = require('./module/postal')()
  3. fastify.register(require('point-of-view'), {
  4.     engine: {
  5.         ejs: require('ejs')
  6.     }
  7. })
  8. fastify.get('/', async (req, reply) => {
  9.     // 検索結果をviewに渡す
  10.     const rows = await postal.search('銀座')
  11.     reply.view('/views/index.ejs', { rows: rows })
  12. })
  13. fastify.listen(3000, '0.0.0.0', err => {
  14.     if (err) throw err
  15.     console.log(`server listening on ${fastify.server.address().port}`)
  16. })




データベースを検索する処理です。

・module/postal.js


  1. const { Connection, Request } = require('tedious')
  2. module.exports = () => {
  3.     // 接続情報
  4.     const config = {
  5.         authentication: {
  6.             options: {
  7.                 userName: 'sa',
  8.                 password: 'P@ssw0rd'
  9.             },
  10.             type: 'default'
  11.         },
  12.         server: 'localhost',
  13.         options: {
  14.             database: 'sample',
  15.             encrypt: false,
  16.             rowCollectionOnRequestCompletion : true
  17.         }
  18.     }
  19.     // 設定に従いデータベースへ接続
  20.     this.create_connection = (config) => {
  21.     
  22.         const connection = new Connection(config)
  23.         // Promiseをnewした時点で引数のfunctionが実行される
  24.         const p = new Promise(function(resolve, reject) {
  25.             connection.on('connect', err => {
  26.                 if (err) {
  27.                     reject(err)
  28.                 } else {
  29.                     resolve(connection)
  30.                 }
  31.             })
  32.             connection.connect()
  33.         })
  34.         return p
  35.         
  36.     }
  37.     // SQLの実行
  38.     this.execute = (connection, sql) => {
  39.         const p = new Promise(function(resolve, reject) {
  40.             const request = new Request(sql, (err, rowCount, columns) => {
  41.                 if (err) {
  42.                     reject(err)
  43.                     return
  44.                 }
  45.                 let rows = []
  46.                 columns.forEach(column => {
  47.                     let row = {}
  48.                     column.forEach(field => {
  49.                         row[field.metadata.colName] = field.value
  50.                     });
  51.                     rows.push(row)
  52.                 });
  53.                 
  54.                 resolve(rows)
  55.             })
  56.             connection.execSql(request)
  57.         });
  58.         return p
  59.     }
  60.     // 住所の一部でデータを検索
  61.     this.search = async (address) => {
  62.         const connection = await this.create_connection(config)
  63.         const rows = await this.execute(connection, "SELECT * FROM postal_code WHERE address LIKE '%" + address + "%' ORDER BY code")
  64.         connection.close()
  65.         return rows
  66.     }
  67.     return this
  68. }




表示部分はこのようになりました。

・views/index.ejs


  1. <html lang="ja">
  2. <head>
  3. <meta charset="UTF-8">
  4. <title>fastifyサンプル</title>
  5. <style>
  6. table {
  7.     border: solid 2px;
  8.     border-collapse: collapse;
  9. }
  10. th, td {
  11.     border: solid 2px;
  12.     padding: 4px;
  13. }
  14. </style>
  15. </head>
  16. <body>
  17.     <table>
  18.         <thead>
  19.             <tr>
  20.                 <th>郵便番号</th>
  21.                 <th>住所</th>
  22.             </tr>
  23.         </thead>
  24.         <tbody>
  25.             <% rows.forEach(row => { %>
  26.                 <tr>
  27.                     <td><%= row.code %></td>
  28.                     <td><%= row.address %></td>
  29.                 </tr>
  30.             <% }) %>
  31.         </tbody>
  32.     </table>
  33. </body>
  34. </html>




サーバーを起動してブラウザで表示してみると、ちゃんと住所に「銀座」を含む結果が表示されました。

a44_01.png

関連記事

fastify 入力データの検証(validation)

fastifyで入力データの検証を試してみます。

id:数値
address:文字列 必須

というパターンだとこんな感じになりました。
受け取ったデータをそのまま送り返しています。


  1. const fastify = require('fastify')({ logger: true })
  2. const opts = {
  3.     schema: {
  4.         body: {
  5.             type: 'object',
  6.             required: [ // addressを必須項目に
  7.                 'address'
  8.             ],
  9.             properties: {
  10.                 id: { type: 'number'},
  11.                 address: { type: 'string' }
  12.             }
  13.         }
  14.     }
  15. }
  16. fastify.post('/', opts, async (req, reply) => {
  17.     // jsonデータはreq.bodyに設定される
  18.     return req.body
  19. })
  20. fastify.listen(3000, '0.0.0.0', err => {
  21.     if (err) throw err
  22.     console.log(`server listening on ${fastify.server.address().port}`)
  23. })




正常系

$ curl 'http://192.168.11.104:3000' --data '{"id": 123, "address":"住所"}' -X POST -H 'Content-Type:application/json'
{"id":123,"address":"住所"}



idが文字列(数値へ変換可能)

$ curl 'http://192.168.11.104:3000' --data '{"id": "123", "address":"住所"}' -X POST -H 'Content-Type:application/json'
{"id":123,"address":"住所"}


自動的に数値へ変換されました。

idが文字列(数値へ変換不可)

$ curl 'http://192.168.11.104:3000' --data '{"id": "abc", "address":"住所"}' -X POST -H 'Content-Type:application/json'
{"statusCode":400,"error":"Bad Request","message":"body.id should be number"}


ちゃんとエラーになりました。

idがなし

$ curl 'http://192.168.11.104:3000' --data '{"address":"住所"}' -X POST -H 'Content-Type:application/json'
{"address":"住所"}



addressが空白文字列

$ curl 'http://192.168.11.104:3000' --data '{"id": 123, "address":""}' -X POST -H 'Content-Type:application/json'
{"id":123,"address":""}



addressが数値

$ curl 'http://192.168.11.104:3000' --data '{"id": 123, "address":456}' -X POST -H 'Content-Type:application/json'
{"id":123,"address":"456"}



addressがなし

$ curl 'http://192.168.11.104:3000' --data '{"id": 123}' -X POST -H 'Content-Type:application/json'
{"statusCode":400,"error":"Bad Request","message":"body should have required property 'address'"}




このvalidationは、「Ajv」を使用しているとのことで、詳しい使い方はこちらのドキュメントを見たほうが良さそうです。
https://ajv.js.org/json-schema.html

関連記事

fastify json形式のPOSTデータ受け取り

fastifyはPOSTデータはjson形式を想定しており、x-www-form-urlencoded形式のデータ受信にはひと工夫必要でした。
fastifyでPOSTデータ受信時、FST_ERR_CTP_INVALID_MEDIA_TYPE

POSTデータがjson形式の場合について見てみます。

プログラム修正




  1. {"address":"銀座"}



という形式で検索パラメーターを送信することにします。
json形式のPOSTデータは自動的にパースされ、req.bodyに設定されました。


  1. const fastify = require('fastify')({ logger: true })
  2. const postal = require('./module/postal')()
  3. fastify.post('/', async (req, reply) => {
  4.     // jsonデータはreq.bodyに設定される
  5.     const address = req.body.address
  6.     const rows = await postal.search(address)
  7.     return rows
  8. })
  9. fastify.listen(3000, '0.0.0.0', err => {
  10.     if (err) throw err
  11.     console.log(`server listening on ${fastify.server.address().port}`)
  12. })




動作確認


$ curl 'http://192.168.11.104:3000' --data '{"address":"銀座"}' -X POST -H 'Content-Type:application/json'
[
    {"code":"0691331","address":"北海道夕張郡長沼町銀座"},
    {"code":"1040061","address":"東京都中央区銀座"}
...
]

fastifyでPOSTデータ受信時、FST_ERR_CTP_INVALID_MEDIA_TYPE

fastifyで郵便番号検索APIを作ってみました。
fastifyで郵便番号検索APIのサンプル

検索する住所をGETのクエリーパラメーターではなく、POSTで送信するよう変更してみます。


最初の実装



「fastify.getをfastify.postに変えれば動くだろう」と思い変更してみました。

・server.js


  1. const fastify = require('fastify')({ logger: true })
  2. const postal = require('./module/postal')()
  3. /*
  4. fastify.get('/', async (req, reply) => {
  5.     // クエリーパラメーター取得
  6.     const address = req.query.address
  7.     const rows = await postal.search(address)
  8.     return rows
  9. })
  10. */
  11. // postに変更
  12. fastify.post('/', async (req, reply) => {
  13.     const address = req.query.address
  14.     const rows = await postal.search(address)
  15.     return rows
  16. })
  17. fastify.listen(3000, '0.0.0.0', err => {
  18.     if (err) throw err
  19.     console.log(`server listening on ${fastify.server.address().port}`)
  20. })




curlで動作を確認すると、FST_ERR_CTP_INVALID_MEDIA_TYPE
Unsupported Media Type: application/x-www-form-urlencoded
というエラーになります。


$ curl 'http://192.168.11.104:3000' --data 'address=%E9%8A%80%E5%BA%A7' -X POST
{"statusCode":415,"code":"FST_ERR_CTP_INVALID_MEDIA_TYPE","error":"Unsupported Media Type","message":"Unsupported Media Type: application/x-www-form-urlencoded"





POST対応



調べてみると、デフォルトでPOSTはjson形式のみを受け付け、x-www-form-urlencodedなデータはエラーになるようです。
multipartも同様の模様。
Giving error with node/fastify Unsupported Media Type: application/x-www-form-urlencoded

fastify-formbodyを使うといいよとのこと。
fastify-formbody

インストールします。


$ npm install fastify-formbody




インストールしたfastify-formbodyを登録し、POSTデータをreq.bodyから取得するよう修正します。


  1. const fastify = require('fastify')({ logger: true })
  2. const postal = require('./module/postal')()
  3. // POST対応
  4. fastify.register(require('fastify-formbody'))
  5. // postに変更
  6. fastify.post('/', async (req, reply) => {
  7.     // req.bodyからPOSTデータを取得するよう変更
  8.     //const address = req.query.address
  9.     const address = req.body.address
  10.     const rows = await postal.search(address)
  11.     return rows
  12. })
  13. fastify.listen(3000, '0.0.0.0', err => {
  14.     if (err) throw err
  15.     console.log(`server listening on ${fastify.server.address().port}`)
  16. })




これでPOSTデータを受け取り、検索が実行できるようになりました。


$ curl 'http://192.168.11.104:3000' --data 'address=%E9%8A%80%E5%BA%A7' -X POST
[
     {"code":"0691331","address":"北海道夕張郡長沼町銀座"},
     {"code":"1040061","address":"東京都中央区銀座"}
...
]


関連記事

プロフィール

Author:symfo
blog形式だと探しにくいので、まとめサイト作成中です。
Symfoware まとめ

PR




検索フォーム

月別アーカイブ