MySQL Opimization of insert sequences
I have a realtime application that processes information and log's it to a MySQL database (actually MariaDB, a fork of MySQL). It does anywhere around 1.5 million inserts a day + 150,000 deletes.
I am having great problems with perfor开发者_如何学JAVAmance and don't know how to make it function any better.
The basic structure of the application is that I have a producer class, that pushes a Struct to a threadsafe deque. The following code
#include "dbUserQueue.h"
dbUserQueue::~dbUserQueue() {
}
void dbUserQueue::createConnection()
{
sql::Driver * driver = sql::mysql::get_driver_instance();
std::auto_ptr< sql::Connection > newCon(driver->connect(dbURL, dbUser, dbPass));
con = newCon;
std::auto_ptr< sql::Statement > stmt(con->createStatement());
stmt->execute("USE twitter");
}
inline void dbUserQueue::updateStatement(const std::string & value,
std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
if(value != "\0") stmt->setString(index, value);
else stmt->setNull(index,sql::DataType::VARCHAR);
}
inline void dbUserQueue::updateStatement(const boost::int64_t & value,
std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
if(value != -1) stmt->setInt64(index,value);
else stmt->setNull(index,sql::DataType::BIGINT);
}
inline void dbUserQueue::updateStatement(const bool value,
std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
stmt->setBoolean(index, value);
}
inline void dbUserQueue::updateStatement(const int value,
std::auto_ptr< sql::PreparedStatement> & stmt, const int index)
{
if(value != -1) stmt->setInt(index,value);
else stmt->setNull(index,sql::DataType::INTEGER);
}
inline void dbUserQueue::updateStatementDateTime(const std::string & value,
std::auto_ptr< sql::PreparedStatement> & stmt, const int & index)
{
if(value != "\0") stmt->setDateTime(index, value);
else stmt->setNull(index,sql::DataType::DATE);
}
/*
* This method creates a database connection
* and then creates a new thread to process the incoming queue
*/
void dbUserQueue::start() {
createConnection();
if(con->isClosed() == false)
{
insertStmt = std::auto_ptr< sql::PreparedStatement>(con->prepareStatement("\
insert ignore into users(contributors_enabled, created_at, \
description, favourites_count, followers_count, \
following, friends_count, geo_enabled, id, lang, listed_count, location, \
name, notifications, screen_name, show_all_inline_media, statuses_count, \
url, utc_offset, verified) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));
}
thread = boost::thread(&dbUserQueue::processLoop, this);
}
/*
* Stops the thread once it is finished processing the information
*/
void dbUserQueue::join(){
thread.interrupt();
thread.join();
}
/*
* The worker function of the thread.
* Pops items from the queue and updates the database accordingly.
*/
void dbUserQueue::processLoop() {
user input;
int recordCount = 0;
con->setAutoCommit(false);
while (true) {
try {
if(recordCount >= 1000)
{
recordCount = 0;
con->commit();
}
// Insert all the data into the prepared statement
if (userQ.wait_and_pop(input)) {
updateStatement(input.contributors_enabled, insertStmt, 1);
updateStatementDateTime(input.created_at, insertStmt, 2);
updateStatement(input.description, insertStmt, 3);
updateStatement(input.favourites_count, insertStmt, 4);
updateStatement(input.followers_count, insertStmt, 5);
updateStatement(input.following, insertStmt, 6);
updateStatement(input.friends_count, insertStmt, 7);
updateStatement(input.geo_enabled, insertStmt, 8);
updateStatement(input.id, insertStmt, 9);
updateStatement(input.lang, insertStmt, 10);
updateStatement(input.listed_count, insertStmt, 11);
updateStatement(input.location, insertStmt, 12);
updateStatement(input.name, insertStmt, 13);
updateStatement(input.notifications, insertStmt, 14);
updateStatement(input.screenName, insertStmt, 15);
updateStatement(input.show_all_inline_media, insertStmt, 16);
updateStatement(input.statuses_count, insertStmt, 17);
updateStatement(input.url, insertStmt, 18);
updateStatement(input.utc_offset, insertStmt, 19);
updateStatement(input.verified, insertStmt, 20);
insertStmt->executeUpdate();
insertStmt->clearParameters();
recordCount++;
continue;
}
} catch (std::exception & e) {
}
}// end of while
// Close the statements and the connection before exiting
insertStmt->close();
con->commit();
if(con->isClosed() == false)
con->close();
}
My questions is on how to improve the performance? Things I have tried:
Having multiple consumers connecting to one MySQL/MariaDB Committing after a large number of recordsSingle Producer, Single consumer, commit after 1000 records = ~275 Seconds
Dual Producer, Triple consumers, commit after 1000 records = ~100 Seconds
Dual Producer, Triple consumers, commit after 2000 records = ~100 Seconds
Dual Producer, Triple consumers, commit every 1 record = ~100 Seconds
Dual Producer, 6 Consumers, commit every 1 record = ~95 Seconds
Dual Producer, 6 Consumers, commit every 2000 records = ~100 Seconds
Triple Producer, 6 Consumesr, commit every 2000 records = ~100 Seconds
A couple notes on the problem domain. The messages to insert and or delete come randomly throughout the day with an average of ~20 inserts/deletes per second, bursts much higher but there is no reason that the updates can't be queued for a short period, as long as the queue doesn't grow to large.
The table that the data is currently being inserted into has approximately 52 million records in it. Here is the MySQL table information
CREATE TABLE `users` (
`id` bigint(20) unsigned NOT NULL,
`contributors_enabled` tinyint(4) DEFAULT '0',
`created_at` datetime NOT NULL,
`description` varchar(255) DEFAULT NULL,
`favourites_count` int(11) NOT NULL,
`followers_count` int(11) DEFAULT NULL,
`following` varchar(255) DEFAULT NULL,
`friends_count` int(11) NOT NULL,
`geo_enabled` tinyint(4) DEFAULT '0',
`lang` varchar(255) DEFAULT NULL,
`listed_count` int(11) DEFAULT NULL,
`location` varchar(255) DEFAULT NULL,
`name` varchar(255) DEFAULT NULL,
`notifications` varchar(45) DEFAULT NULL,
`screen_name` varchar(45) NOT NULL,
`show_all_inline_media` tinyint(4) DEFAULT NULL,
`statuses_count` int(11) NOT NULL,
`url` varchar(255) DEFAULT NULL,
`utc_offset` int(11) DEFAULT NULL,
`verified` tinyint(4) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `id_UNIQUE` (`id`)
) ENGINE=MARIA DEFAULT CHARSET=latin1 CHECKSUM=1 PAGE_CHECKSUM=1 TRANSACTIONAL=1
You could change the code to do bulk inserts, rather than insert one row at a time.
精彩评论