10bet网址
MySQL NDB集群API开发指南
本手册下载

MySQL NDB集群API开发指南// 使用同步事务和多集群的NDB API示例

2.5.2使用同步事务和多集群的NDB API示例

这个例子演示了同步事务以及在一个NDB API应用程序中连接到多个集群。

这个程序的源代码可以在NDB集群源树中找到,在文件中存储/ ndb ndbapi-examples / ndbapi_simple_dual / main.cpp

请注意

示例文件以前被命名ndbapi_simple_dual.cpp

/ * * ndbapi_simple_dual:使用NDB API中的同步事务* *从该程序的正确输出是:* * attr1 attr2 * 0 10 * 1 1 * 2 12 *检测到删除的元组不存在!* 4 14 * 5 5 * 6 16 * 7 7 * 8 18 * 9 * attr1 attr2 * 0 10 * 1 1 * 2 12 *检测到删除的元组不存在!* 4 14 * 5 5 * 6 16 * 7 7 * 8 18 * 9 9 * * / #ifdef _win32 #include  #endif #include  #include  #include  //用于cout #include  #include 静态void run_application(mysql&,ndb_cluster_connection&,const char *表,const char * db);#define print_error(代码,msg)\ std :: cout << << __File_ <<“错误,行:”<< __line__ \ <<“,代码:”<<代码\ <<“,msg:“<< msg <<”。“<< std :: endl #define mysqlerror(mysql){\ print_error(mysql_errno(&mysql),mysql_error(&mysql));\出口(-1);#define apierror(错误){\ print_error(Error.Code,Error.Message);\出口(-1); } int main(int argc, char** argv) { if (argc != 5) { std::cout << "Arguments are    .\n"; exit(-1); } // ndb_init must be called first ndb_init(); { char * mysqld1_sock = argv[1]; const char *connectstring1 = argv[2]; char * mysqld2_sock = argv[3]; const char *connectstring2 = argv[4]; // Object representing the cluster 1 Ndb_cluster_connection cluster1_connection(connectstring1); MYSQL mysql1; // Object representing the cluster 2 Ndb_cluster_connection cluster2_connection(connectstring2); MYSQL mysql2; // connect to mysql server and cluster 1 and run application // Connect to cluster 1 management server (ndb_mgmd) if (cluster1_connection.connect(4 /* retries */, 5 /* delay between retries */, 1 /* verbose */)) { std::cout << "Cluster 1 management server was not ready within 30 secs.\n"; exit(-1); } // Optionally connect and wait for the storage nodes (ndbd's) if (cluster1_connection.wait_until_ready(30,0) < 0) { std::cout << "Cluster 1 was not ready within 30 secs.\n"; exit(-1); } // connect to mysql server in cluster 1 if ( !mysql_init(&mysql1) ) { std::cout << "mysql_init failed\n"; exit(-1); } if ( !mysql_real_connect(&mysql1, "localhost", "root", "", "", 0, mysqld1_sock, 0) ) MYSQLERROR(mysql1); // connect to mysql server and cluster 2 and run application // Connect to cluster management server (ndb_mgmd) if (cluster2_connection.connect(4 /* retries */, 5 /* delay between retries */, 1 /* verbose */)) { std::cout << "Cluster 2 management server was not ready within 30 secs.\n"; exit(-1); } // Optionally connect and wait for the storage nodes (ndbd's) if (cluster2_connection.wait_until_ready(30,0) < 0) { std::cout << "Cluster 2 was not ready within 30 secs.\n"; exit(-1); } // connect to mysql server in cluster 2 if ( !mysql_init(&mysql2) ) { std::cout << "mysql_init failed\n"; exit(-1); } if ( !mysql_real_connect(&mysql2, "localhost", "root", "", "", 0, mysqld2_sock, 0) ) MYSQLERROR(mysql2); // run the application code run_application(mysql1, cluster1_connection, "api_simple_dual_1", "ndb_examples"); run_application(mysql2, cluster2_connection, "api_simple_dual_2", "ndb_examples"); } // Note: all connections must have been destroyed before calling ndb_end() ndb_end(0); return 0; } static void create_table(MYSQL &, const char* table); static void do_insert(Ndb &, const char* table); static void do_update(Ndb &, const char* table); static void do_delete(Ndb &, const char* table); static void do_read(Ndb &, const char* table); static void drop_table(MYSQL &,const char* table); static void run_application(MYSQL &mysql, Ndb_cluster_connection &cluster_connection, const char* table, const char* db) { /******************************************** * Connect to database via mysql-c * ********************************************/ char db_stmt[256]; sprintf(db_stmt, "CREATE DATABASE %s\n", db); mysql_query(&mysql, db_stmt); sprintf(db_stmt, "USE %s", db); if (mysql_query(&mysql, db_stmt) != 0) MYSQLERROR(mysql); create_table(mysql, table); /******************************************** * Connect to database via NdbApi * ********************************************/ // Object representing the database Ndb myNdb( &cluster_connection, db ); if (myNdb.init()) APIERROR(myNdb.getNdbError()); /* * Do different operations on database */ do_insert(myNdb, table); do_update(myNdb, table); do_delete(myNdb, table); do_read(myNdb, table); /* * Drop the table */ drop_table(mysql,table); } /********************************************************* * Create a table named by table if it does not exist * *********************************************************/ static void create_table(MYSQL &mysql, const char* table) { char create_stmt[256]; sprintf(create_stmt, "CREATE TABLE %s \ (ATTR1 INT UNSIGNED NOT NULL PRIMARY KEY,\ ATTR2 INT UNSIGNED NOT NULL)\ ENGINE=NDB", table); if (mysql_query(&mysql, create_stmt)) MYSQLERROR(mysql); } /************************************************************************** * Using 5 transactions, insert 10 tuples in table: (0,0),(1,1),...,(9,9) * **************************************************************************/ static void do_insert(Ndb &myNdb, const char* table) { const NdbDictionary::Dictionary* myDict= myNdb.getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable(table); if (myTable == NULL) APIERROR(myDict->getNdbError()); for (int i = 0; i < 5; i++) { NdbTransaction *myTransaction= myNdb.startTransaction(); if (myTransaction == NULL) APIERROR(myNdb.getNdbError()); NdbOperation *myOperation= myTransaction->getNdbOperation(myTable); if (myOperation == NULL) APIERROR(myTransaction->getNdbError()); myOperation->insertTuple(); myOperation->equal("ATTR1", i); myOperation->setValue("ATTR2", i); myOperation= myTransaction->getNdbOperation(myTable); if (myOperation == NULL) APIERROR(myTransaction->getNdbError()); myOperation->insertTuple(); myOperation->equal("ATTR1", i+5); myOperation->setValue("ATTR2", i+5); if (myTransaction->execute( NdbTransaction::Commit ) == -1) APIERROR(myTransaction->getNdbError()); myNdb.closeTransaction(myTransaction); } } /***************************************************************** * Update the second attribute in half of the tuples (adding 10) * *****************************************************************/ static void do_update(Ndb &myNdb, const char* table) { const NdbDictionary::Dictionary* myDict= myNdb.getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable(table); if (myTable == NULL) APIERROR(myDict->getNdbError()); for (int i = 0; i < 10; i+=2) { NdbTransaction *myTransaction= myNdb.startTransaction(); if (myTransaction == NULL) APIERROR(myNdb.getNdbError()); NdbOperation *myOperation= myTransaction->getNdbOperation(myTable); if (myOperation == NULL) APIERROR(myTransaction->getNdbError()); myOperation->updateTuple(); myOperation->equal( "ATTR1", i ); myOperation->setValue( "ATTR2", i+10); if( myTransaction->execute( NdbTransaction::Commit ) == -1 ) APIERROR(myTransaction->getNdbError()); myNdb.closeTransaction(myTransaction); } } /************************************************* * Delete one tuple (the one with primary key 3) * *************************************************/ static void do_delete(Ndb &myNdb, const char* table) { const NdbDictionary::Dictionary* myDict= myNdb.getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable(table); if (myTable == NULL) APIERROR(myDict->getNdbError()); NdbTransaction *myTransaction= myNdb.startTransaction(); if (myTransaction == NULL) APIERROR(myNdb.getNdbError()); NdbOperation *myOperation= myTransaction->getNdbOperation(myTable); if (myOperation == NULL) APIERROR(myTransaction->getNdbError()); myOperation->deleteTuple(); myOperation->equal( "ATTR1", 3 ); if (myTransaction->execute(NdbTransaction::Commit) == -1) APIERROR(myTransaction->getNdbError()); myNdb.closeTransaction(myTransaction); } /***************************** * Read and print all tuples * *****************************/ static void do_read(Ndb &myNdb, const char* table) { const NdbDictionary::Dictionary* myDict= myNdb.getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable(table); if (myTable == NULL) APIERROR(myDict->getNdbError()); std::cout << "ATTR1 ATTR2" << std::endl; for (int i = 0; i < 10; i++) { NdbTransaction *myTransaction= myNdb.startTransaction(); if (myTransaction == NULL) APIERROR(myNdb.getNdbError()); NdbOperation *myOperation= myTransaction->getNdbOperation(myTable); if (myOperation == NULL) APIERROR(myTransaction->getNdbError()); myOperation->readTuple(NdbOperation::LM_Read); myOperation->equal("ATTR1", i); NdbRecAttr *myRecAttr= myOperation->getValue("ATTR2", NULL); if (myRecAttr == NULL) APIERROR(myTransaction->getNdbError()); if(myTransaction->execute( NdbTransaction::Commit ) == -1) { if (i == 3) { std::cout << "Detected that deleted tuple doesn't exist!" << std::endl; } else { APIERROR(myTransaction->getNdbError()); } } if (i != 3) { printf(" %2d %2d\n", i, myRecAttr->u_32_value()); } myNdb.closeTransaction(myTransaction); } } /************************** * Drop table after usage * **************************/ static void drop_table(MYSQL &mysql, const char* table) { char drop_stmt[75]; sprintf(drop_stmt, "DROP TABLE %s", table); if (mysql_query(&mysql,drop_stmt)) MYSQLERROR(mysql); }

在NDB 8.0.1之前,这个程序不能在同一个会话中连续运行不止一次(Bug #27009386)。