10bet网址
MySQL NDB集群API开发指南
本手册下载

MySQL NDB集群API开发指南// NDB API示例:使用NdbRecord处理BLOB列和值

2.5.10 NDB API示例:使用NdbRecord处理BLOB列和值

控件的操作的NDB API中的NdbRecord接口。它演示了如何使用内联值缓冲区以及读写方法执行插入、读取和更新操作。可以在文件中找到存储/ ndb ndbapi-examples / ndbapi_blob_ndbrecord / main.cpp在NDB集群源树中。

请注意

而本例中使用的MySQL数据类型实际上是文本,同样的原则适用

/ * ndbapi_blob_ndbrecord说明了blob的操纵(实际上在此示例中的文本)。此示例使用NDBrecord样式方法访问元组。使用内联值缓冲区和读/写方法显示插入,读取和更新。* / #ifdef _win32 #include  #encif #include  #include  #include  / *用于cout。* / #include  #include  #include  #include  #include  #include  / ** *帮助程序调试宏*/ #define print_error(代码,msg)\ std :: cout <<“在”<< __file__ <<“中的错误,行:”<< __line__ \ <<“,代码:”<<代码\ <<“,msg:“<< msg <<”。“<< std :: endl #define mysqlerror(mysql){\ print_error(mysql_errno(&mysql),mysql_error(&mysql));\出口(-1);#define apierror(错误){\ print_error(Error.Code,Error.Message);\出口(-1);} / *从项目古腾堡的报价。 */ const char *text_quote= "Just at this moment, somehow or other, they began to run.\n" "\n" " Alice never could quite make out, in thinking it over\n" "afterwards, how it was that they began: all she remembers is,\n" "that they were running hand in hand, and the Queen went so fast\n" "that it was all she could do to keep up with her: and still the\n" "Queen kept crying 'Faster! Faster!' but Alice felt she COULD NOT\n" "go faster, though she had not breath left to say so.\n" "\n" " The most curious part of the thing was, that the trees and the\n" "other things round them never changed their places at all:\n" "however fast they went, they never seemed to pass anything. 'I\n" "wonder if all the things move along with us?' thought poor\n" "puzzled Alice. And the Queen seemed to guess her thoughts, for\n" "she cried, 'Faster! Don't try to talk!'\n" "\n" " Not that Alice had any idea of doing THAT. She felt as if she\n" "would never be able to talk again, she was getting so much out of\n" "breath: and still the Queen cried 'Faster! Faster!' and dragged\n" "her along. 'Are we nearly there?' Alice managed to pant out at\n" "last.\n" "\n" " 'Nearly there!' the Queen repeated. 'Why, we passed it ten\n" "minutes ago! Faster!' And they ran on for a time in silence,\n" "with the wind whistling in Alice's ears, and almost blowing her\n" "hair off her head, she fancied.\n" "\n" " 'Now! Now!' cried the Queen. 'Faster! Faster!' And they\n" "went so fast that at last they seemed to skim through the air,\n" "hardly touching the ground with their feet, till suddenly, just\n" "as Alice was getting quite exhausted, they stopped, and she found\n" "herself sitting on the ground, breathless and giddy.\n" "\n" " The Queen propped her up against a tree, and said kindly, 'You\n" "may rest a little now.'\n" "\n" " Alice looked round her in great surprise. 'Why, I do believe\n" "we've been under this tree the whole time! Everything's just as\n" "it was!'\n" "\n" " 'Of course it is,' said the Queen, 'what would you have it?'\n" "\n" " 'Well, in OUR country,' said Alice, still panting a little,\n" "'you'd generally get to somewhere else--if you ran very fast\n" "for a long time, as we've been doing.'\n" "\n" " 'A slow sort of country!' said the Queen. 'Now, HERE, you see,\n" "it takes all the running YOU can do, to keep in the same place.\n" "If you want to get somewhere else, you must run at least twice as\n" "fast as that!'\n" "\n" " 'I'd rather not try, please!' said Alice. 'I'm quite content\n" "to stay here--only I AM so hot and thirsty!'\n" "\n" " -- Lewis Carroll, 'Through the Looking-Glass'."; /* NdbRecord objects. */ const NdbRecord *key_record; // For specifying table key const NdbRecord *blob_record; // For accessing blob const NdbRecord *full_record; // All columns, for insert /* C struct representing the row layout */ struct MyRow { unsigned int myId; /* Pointer to Blob handle for operations on the blob column * Space must be left for it in the row, but a pointer to the * blob handle can also be obtained via calls to * NdbOperation::getBlobHandle() */ NdbBlob* myText; }; static void setup_records(Ndb *myNdb) { NdbDictionary::RecordSpecification spec[2]; NdbDictionary::Dictionary *myDict= myNdb->getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("api_blob_ndbrecord"); if (myTable == NULL) APIERROR(myDict->getNdbError()); const NdbDictionary::Column *col1= myTable->getColumn("my_id"); if (col1 == NULL) APIERROR(myDict->getNdbError()); const NdbDictionary::Column *col2= myTable->getColumn("my_text"); if (col2 == NULL) APIERROR(myDict->getNdbError()); spec[0].column= col1; spec[0].offset= offsetof(MyRow, myId); spec[0].nullbit_byte_offset= 0; spec[0].nullbit_bit_in_byte= 0; spec[1].column= col2; spec[1].offset= offsetof(MyRow, myText); spec[1].nullbit_byte_offset= 0; spec[1].nullbit_bit_in_byte= 0; key_record= myDict->createRecord(myTable, &spec[0], 1, sizeof(spec[0])); if (key_record == NULL) APIERROR(myDict->getNdbError()); blob_record= myDict->createRecord(myTable, &spec[1], 1, sizeof(spec[0])); if (blob_record == NULL) APIERROR(myDict->getNdbError()); full_record= myDict->createRecord(myTable, &spec[0], 2, sizeof(spec[0])); if (full_record == NULL) APIERROR(myDict->getNdbError()); } /* Function to drop table. */ void drop_table(MYSQL &mysql) { if (mysql_query(&mysql, "DROP TABLE api_blob_ndbrecord")) MYSQLERROR(mysql); } /* Functions to create table. */ int try_create_table(MYSQL &mysql) { return mysql_query(&mysql, "CREATE TABLE" " api_blob_ndbrecord" " (my_id INT UNSIGNED NOT NULL," " my_text TEXT NOT NULL," " PRIMARY KEY USING HASH (my_id))" " ENGINE=NDB"); } void create_table(MYSQL &mysql) { if (try_create_table(mysql)) { if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR) MYSQLERROR(mysql); std::cout << "NDB Cluster already has example table: api_blob_ndbrecord. " << "Dropping it..." << std::endl; /****************** * Recreate table * ******************/ drop_table(mysql); if (try_create_table(mysql)) MYSQLERROR(mysql); } } int populate(Ndb *myNdb) { MyRow row; NdbTransaction *myTrans= myNdb->startTransaction(); if (myTrans == NULL) APIERROR(myNdb->getNdbError()); row.myId= 1; const NdbOperation *myNdbOperation= myTrans->insertTuple(full_record, (const char*) &row); if (myNdbOperation == NULL) APIERROR(myTrans->getNdbError()); NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text"); if (myBlobHandle == NULL) APIERROR(myNdbOperation->getNdbError()); myBlobHandle->setValue(text_quote, strlen(text_quote)); int check= myTrans->execute(NdbTransaction::Commit); myTrans->close(); return check != -1; } int update_key(Ndb *myNdb) { MyRow row; /* Uppercase all characters in TEXT field, using primary key operation. Use piece-wise read/write to avoid loading entire data into memory at once. */ NdbTransaction *myTrans= myNdb->startTransaction(); if (myTrans == NULL) APIERROR(myNdb->getNdbError()); row.myId= 1; const NdbOperation *myNdbOperation= myTrans->updateTuple(key_record, (const char*) &row, blob_record, (const char*) &row); if (myNdbOperation == NULL) APIERROR(myTrans->getNdbError()); NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text"); if (myBlobHandle == NULL) APIERROR(myNdbOperation->getNdbError()); /* Execute NoCommit to make the blob handle active so * that we can determine the actual Blob length */ if (-1 == myTrans->execute(NdbTransaction::NoCommit)) APIERROR(myTrans->getNdbError()); Uint64 length= 0; if (-1 == myBlobHandle->getLength(length)) APIERROR(myBlobHandle->getNdbError()); /* A real application should use a much larger chunk size for efficiency, preferably much larger than the part size, which defaults to 2000. 64000 might be a good value. */ #define CHUNK_SIZE 100 int chunk; char buffer[CHUNK_SIZE]; for (chunk= (length-1)/CHUNK_SIZE; chunk >=0; chunk--) { Uint64 pos= chunk*CHUNK_SIZE; Uint32 chunk_length= CHUNK_SIZE; if (pos + chunk_length > length) chunk_length= length - pos; /* Read from the end back, to illustrate seeking. */ if (-1 == myBlobHandle->setPos(pos)) APIERROR(myBlobHandle->getNdbError()); if (-1 == myBlobHandle->readData(buffer, chunk_length)) APIERROR(myBlobHandle->getNdbError()); int res= myTrans->execute(NdbTransaction::NoCommit); if (-1 == res) APIERROR(myTrans->getNdbError()); /* Uppercase everything. */ for (Uint64 j= 0; j < chunk_length; j++) buffer[j]= toupper(buffer[j]); if (-1 == myBlobHandle->setPos(pos)) APIERROR(myBlobHandle->getNdbError()); if (-1 == myBlobHandle->writeData(buffer, chunk_length)) APIERROR(myBlobHandle->getNdbError()); /* Commit on the final update. */ if (-1 == myTrans->execute(chunk ? NdbTransaction::NoCommit : NdbTransaction::Commit)) APIERROR(myTrans->getNdbError()); } myNdb->closeTransaction(myTrans); return 1; } int update_scan(Ndb *myNdb) { /* Lowercase all characters in TEXT field, using a scan with updateCurrentTuple(). */ char buffer[10000]; NdbTransaction *myTrans= myNdb->startTransaction(); if (myTrans == NULL) APIERROR(myNdb->getNdbError()); NdbScanOperation *myScanOp= myTrans->scanTable(blob_record, NdbOperation::LM_Exclusive); if (myScanOp == NULL) APIERROR(myTrans->getNdbError()); NdbBlob *myBlobHandle= myScanOp->getBlobHandle("my_text"); if (myBlobHandle == NULL) APIERROR(myScanOp->getNdbError()); if (myBlobHandle->getValue(buffer, sizeof(buffer))) APIERROR(myBlobHandle->getNdbError()); /* Start the scan. */ if (-1 == myTrans->execute(NdbTransaction::NoCommit)) APIERROR(myTrans->getNdbError()); const MyRow *out_row; int res; for (;;) { res= myScanOp->nextResult((const char**)&out_row, true, false); if (res==1) break; // Scan done. else if (res) APIERROR(myScanOp->getNdbError()); Uint64 length= 0; if (myBlobHandle->getLength(length) == -1) APIERROR(myBlobHandle->getNdbError()); /* Lowercase everything. */ for (Uint64 j= 0; j < length; j++) buffer[j]= tolower(buffer[j]); /* 'Take over' the row locks from the scan to a separate * operation for updating the tuple */ const NdbOperation *myUpdateOp= myScanOp->updateCurrentTuple(myTrans, blob_record, (const char*)out_row); if (myUpdateOp == NULL) APIERROR(myTrans->getNdbError()); NdbBlob *myBlobHandle2= myUpdateOp->getBlobHandle("my_text"); if (myBlobHandle2 == NULL) APIERROR(myUpdateOp->getNdbError()); if (myBlobHandle2->setValue(buffer, length)) APIERROR(myBlobHandle2->getNdbError()); if (-1 == myTrans->execute(NdbTransaction::NoCommit)) APIERROR(myTrans->getNdbError()); } if (-1 == myTrans->execute(NdbTransaction::Commit)) APIERROR(myTrans->getNdbError()); myNdb->closeTransaction(myTrans); return 1; } struct ActiveHookData { char buffer[10000]; Uint32 readLength; }; int myFetchHook(NdbBlob* myBlobHandle, void* arg) { ActiveHookData *ahd= (ActiveHookData *)arg; ahd->readLength= sizeof(ahd->buffer) - 1; return myBlobHandle->readData(ahd->buffer, ahd->readLength); } int fetch_key(Ndb *myNdb) { /* Fetch a blob without specifying how many bytes * to read up front, in one execution using * the 'ActiveHook' mechanism. * The supplied ActiveHook procedure is called when * the Blob handle becomes 'active'. At that point * the length of the Blob can be obtained, and buffering * arranged, and the data read requested. */ /* Separate rows used to specify key and hold result */ MyRow key_row; MyRow out_row; /* Fetch and show the blob field, using setActiveHook(). */ NdbTransaction *myTrans= myNdb->startTransaction(); if (myTrans == NULL) APIERROR(myNdb->getNdbError()); key_row.myId= 1; out_row.myText= NULL; const NdbOperation *myNdbOperation= myTrans->readTuple(key_record, (const char*) &key_row, blob_record, (char*) &out_row); if (myNdbOperation == NULL) APIERROR(myTrans->getNdbError()); /* This time, we'll get the blob handle from the row, because * we can. Alternatively, we could use the normal mechanism * of calling getBlobHandle(). */ NdbBlob *myBlobHandle= out_row.myText; if (myBlobHandle == NULL) APIERROR(myNdbOperation->getNdbError()); struct ActiveHookData ahd; if (myBlobHandle->setActiveHook(myFetchHook, &ahd) == -1) APIERROR(myBlobHandle->getNdbError()); /* Execute Commit, but calling our callback set up in setActiveHook() before actually committing. */ if (-1 == myTrans->execute(NdbTransaction::Commit)) APIERROR(myTrans->getNdbError()); myNdb->closeTransaction(myTrans); /* Our fetch callback will have been called during the execute(). */ ahd.buffer[ahd.readLength]= '\0'; std::cout << "Fetched data:" << std::endl << ahd.buffer << std::endl; return 1; } int update2_key(Ndb *myNdb) { char buffer[10000]; MyRow row; /* Simple setValue() update specified before the * Blob handle is made active */ NdbTransaction *myTrans= myNdb->startTransaction(); if (myTrans == NULL) APIERROR(myNdb->getNdbError()); row.myId= 1; const NdbOperation *myNdbOperation= myTrans->updateTuple(key_record, (const char*)&row, blob_record, (char*) &row); if (myNdbOperation == NULL) APIERROR(myTrans->getNdbError()); NdbBlob *myBlobHandle= myNdbOperation->getBlobHandle("my_text"); if (myBlobHandle == NULL) APIERROR(myNdbOperation->getNdbError()); memset(buffer, ' ', sizeof(buffer)); if (myBlobHandle->setValue(buffer, sizeof(buffer)) == -1) APIERROR(myBlobHandle->getNdbError()); if (-1 == myTrans->execute(NdbTransaction::Commit)) APIERROR(myTrans->getNdbError()); myNdb->closeTransaction(myTrans); return 1; } int delete_key(Ndb *myNdb) { MyRow row; /* Deletion of row containing blob via primary key. */ NdbTransaction *myTrans= myNdb->startTransaction(); if (myTrans == NULL) APIERROR(myNdb->getNdbError()); row.myId= 1; const NdbOperation *myNdbOperation= myTrans->deleteTuple(key_record, (const char*)&row, full_record); if (myNdbOperation == NULL) APIERROR(myTrans->getNdbError()); if (-1 == myTrans->execute(NdbTransaction::Commit)) APIERROR(myTrans->getNdbError()); myNdb->closeTransaction(myTrans); return 1; } void mysql_connect_and_create(const char *socket) { MYSQL mysql; bool ok; mysql_init(&mysql); ok = mysql_real_connect(&mysql, "localhost", "root", "", "", 0, socket, 0); if(ok) { mysql_query(&mysql, "CREATE DATABASE ndb_examples"); ok = ! mysql_select_db(&mysql, "ndb_examples"); } if(ok) { create_table(mysql); } mysql_close(&mysql); if(! ok) MYSQLERROR(mysql); } void ndb_run_ndbrecord_blob_operations(const char * connectstring) { /* Connect to ndb cluster. */ Ndb_cluster_connection cluster_connection(connectstring); if (cluster_connection.connect(4, 5, 1)) { std::cout << "Unable to connect to cluster within 30 secs." << std::endl; exit(-1); } /* Optionally connect and wait for the storage nodes (ndbd's). */ if (cluster_connection.wait_until_ready(30,0) < 0) { std::cout << "Cluster was not ready within 30 secs.\n"; exit(-1); } Ndb myNdb(&cluster_connection,"ndb_examples"); if (myNdb.init(1024) == -1) { // Set max 1024 parallel transactions APIERROR(myNdb.getNdbError()); exit(-1); } setup_records(&myNdb); if(populate(&myNdb) > 0) std::cout << "populate: Success!" << std::endl; if(update_key(&myNdb) > 0) std::cout << "update_key: Success!" << std::endl; if(update_scan(&myNdb) > 0) std::cout << "update_scan: Success!" << std::endl; if(fetch_key(&myNdb) > 0) std::cout << "fetch_key: Success!" << std::endl; if(update2_key(&myNdb) > 0) std::cout << "update2_key: Success!" << std::endl; if(delete_key(&myNdb) > 0) std::cout << "delete_key: Success!" << std::endl; } int main(int argc, char**argv) { if (argc != 3) { std::cout << "Arguments are  .\n"; exit(-1); } char *mysqld_sock = argv[1]; const char *connectstring = argv[2]; mysql_connect_and_create(mysqld_sock); ndb_init(); ndb_run_ndbrecord_blob_operations(connectstring); ndb_end(0); return 0; }