scala Slick 3.0 批量插入或更新(upsert)
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/35001493/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Slick 3.0 bulk insert or update (upsert)
提问by opus111
what is the correct way to do a bulk insertOrUpdate in Slick 3.0?
在 Slick 3.0 中执行批量 insertOrUpdate 的正确方法是什么?
I am using MySQL where the appropriate query would be
我在适当的查询所在的地方使用 MySQL
INSERT INTO table (a,b,c) VALUES (1,2,3),(4,5,6)
ON DUPLICATE KEY UPDATE c=VALUES(a)+VALUES(b);
Here is my current code which is very slow :-(
这是我当前的代码,非常慢:-(
// FIXME -- this is slow but will stop repeats, an insertOrUpdate
// functions for a list would be much better
val rowsInserted = rows.map {
row => await(run(TableQuery[FooTable].insertOrUpdate(row)))
}.sum
What I am looking for is the equivalent of
我正在寻找的是相当于
def insertOrUpdate(values: Iterable[U]): DriverAction[MultiInsertResult, NoStream, Effect.Write]
回答by Sean Vieira
There are several ways that you can make this code faster (each one shouldbe faster than the preceding ones, but it gets progressively less idiomatic-slick):
有几种方法可以使这段代码更快(每一种都应该比前面的更快,但它逐渐变得不那么惯用了):
Run
insertOrUpdateAllinstead ofinsertOrUpdateif on slick-pg 0.16.1+await(run(TableQuery[FooTable].insertOrUpdateAll rows)).sumRun your DBIO events all at once, rather than waiting for each one to commit before you run the next:
val toBeInserted = rows.map { row => TableQuery[FooTable].insertOrUpdate(row) } val inOneGo = DBIO.sequence(toBeInserted) val dbioFuture = run(inOneGo) // Optionally, you can add a `.transactionally` // and / or `.withPinnedSession` here to pin all of these upserts // to the same transaction / connection // which *may* get you a little more speed: // val dbioFuture = run(inOneGo.transactionally) val rowsInserted = await(dbioFuture).sumDrop down to the JDBC level and run your upsert all in one go (idea via this answer):
val SQL = """INSERT INTO table (a,b,c) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE c=VALUES(a)+VALUES(b);""" SimpleDBIO[List[Int]] { session => val statement = session.connection.prepareStatement(SQL) rows.map { row => statement.setInt(1, row.a) statement.setInt(2, row.b) statement.setInt(3, row.c) statement.addBatch() } statement.executeBatch() }
在 slick-pg 0.16.1+ 上运行
insertOrUpdateAll而不是insertOrUpdateifawait(run(TableQuery[FooTable].insertOrUpdateAll rows)).sum一次性运行所有 DBIO 事件,而不是在运行下一个之前等待每个事件都提交:
val toBeInserted = rows.map { row => TableQuery[FooTable].insertOrUpdate(row) } val inOneGo = DBIO.sequence(toBeInserted) val dbioFuture = run(inOneGo) // Optionally, you can add a `.transactionally` // and / or `.withPinnedSession` here to pin all of these upserts // to the same transaction / connection // which *may* get you a little more speed: // val dbioFuture = run(inOneGo.transactionally) val rowsInserted = await(dbioFuture).sum下降到 JDBC 级别并一次性运行您的 upsert(想法来自这个答案):
val SQL = """INSERT INTO table (a,b,c) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE c=VALUES(a)+VALUES(b);""" SimpleDBIO[List[Int]] { session => val statement = session.connection.prepareStatement(SQL) rows.map { row => statement.setInt(1, row.a) statement.setInt(2, row.b) statement.setInt(3, row.c) statement.addBatch() } statement.executeBatch() }
回答by marcospereira
As you can see at Slick examples, you can use ++=function to insert using JDBC batch insert feature. Per instance:
正如您在Slick examples 中看到的,您可以使用++=函数来使用 JDBC 批量插入功能进行插入。每个实例:
val foos = TableQuery[FooTable]
val rows: Seq[Foo] = ...
foos ++= rows // here slick will use batch insert
You can also "size" you batch by "grouping" the rows sequence:
您还可以通过“分组”行序列来“调整大小”:
val batchSize = 1000
rows.grouped(batchSize).foreach { group => foos ++= group }
回答by cclient
use sqlu
使用 sql
this demo work
这个演示工作
case ("insertOnDuplicateKey",answers:List[Answer])=>{
def buildInsert(r: Answer): DBIO[Int] =
sqlu"insert into answer (aid,bid,sbid,qid,ups,author,uid,nick,pub_time,content,good,hot,id,reply,pic,spider_time) values (${r.aid},${r.bid},${r.sbid},${r.qid},${r.ups},${r.author},${r.uid},${r.nick},${r.pub_time},${r.content},${r.good},${r.hot},${r.id},${r.reply},${r.pic},${r.spider_time}) ON DUPLICATE KEY UPDATE `aid`=values(aid),`bid`=values(bid),`sbid`=values(sbid),`qid`=values(qid),`ups`=values(ups),`author`=values(author),`uid`=values(uid),`nick`=values(nick),`pub_time`=values(pub_time),`content`=values(content),`good`=values(good),`hot`=values(hot),`id`=values(id),`reply`=values(reply),`pic`=values(pic),`spider_time`=values(spider_time)"
val inserts: Seq[DBIO[Int]] = answers.map(buildInsert)
val combined: DBIO[Seq[Int]] = DBIO.sequence(inserts)
DEST_DB.run(combined).onComplete(data=>{
println("insertOnDuplicateKey data result",data.get.mkString)
if (data.isSuccess){
println(data.get)
val lastid=answers.last.id
Sync.lastActor !("upsert",tablename,lastid)
}else{
//retry
self !("insertOnDuplicateKey",answers)
}
})
}
and i try to use sqlu in a single sql but error maybe sqlu dont supply String Interpolation
我尝试在单个 sql 中使用 sqlu 但错误可能是 sqlu 不提供字符串插值
this demo dont work
这个演示不起作用
case ("insertOnDuplicateKeyError",answers:List[Answer])=>{
def buildSql(execpre:String,values: String,execafter:String): DBIO[Int] = sqlu"$execpre $values $execafter"
val execpre="insert into answer (aid,bid,sbid,qid,ups,author,uid,nick,pub_time,content,good,hot,id,reply,pic,spider_time) values "
val execafter=" ON DUPLICATE KEY UPDATE `aid`=values(aid),`bid`=values(bid),`sbid`=values(sbid),`qid`=values(qid),`ups`=values(ups),`author`=values(author),`uid`=values(uid),`nick`=values(nick),`pub_time`=values(pub_time),`content`=values(content),`good`=values(good),`hot`=values(hot),`id`=values(id),`reply`=values(reply),`pic`=values(pic),`spider_time`=values(spider_time)"
val valuesstr=answers.map(row=>("("+List(row.aid,row.bid,row.sbid,row.qid,row.ups,"'"+row.author+"'","'"+row.uid+"'","'"+row.nick+"'","'"+row.pub_time+"'","'"+row.content+"'",row.good,row.hot,row.id,row.reply,row.pic,"'"+row.spider_time+"'").mkString(",")+")")).mkString(",\n")
val insertOrUpdateAction=DBIO.seq(
buildSql(execpre,valuesstr,execafter)
)
DEST_DB.run(insertOrUpdateAction).onComplete(data=>{
if (data.isSuccess){
println("insertOnDuplicateKey data result",data)
//retry
val lastid=answers.last.id
Sync.lastActor !("upsert",tablename,lastid)
}else{
self !("insertOnDuplicateKey2",answers)
}
})
}
a mysql sync tool with scala slick https://github.com/cclient/ScalaMysqlSync
一个带有 scala slick https://github.com/cclient/ScalaMysqlSync的 mysql 同步工具

