/********************************************************************************* * * The storage manager code. * Copyright (c) 2002, 2003 by Leonidas Fegaras, the University of Texas at * Arlington. All rights reserved. * Programmer: Leonidas Fegaras * Date: 1/10/02 * ********************************************************************************/ #include #include "gc_cpp.h" #include "ldb_sm.h" #include "server.h" // set it to 1 to debug the RPC stubs #define STUBS_DEBUG 0 // the RPC stubs (generated from stubs.x using stubs.awk) #include "stubs_out.cc" // for tracing pinned objects const bool LSM_DEBUG = false; pinned_object* current_object; // set when dereferencing an object storage_manager* ldb_sm; // LDB storage manager inline int min ( int x, int y ) { return x > ( rpc_stream& o, raw_string &s ) { sm_string x; o >> x; s.length = x.size(); s.offset = (int) x.content(); return o; }; rpc_stream& operator<< ( rpc_stream& o, const raw_string &s ) { return o << sm_string(s); }; rpc_stream& operator>> ( rpc_stream& o, raw_sequence &s ) { o >> s.element_size; o >> s.cardinality; return o >> s.offset; }; rpc_stream& operator<< ( rpc_stream& o, const raw_sequence &s ) { return o << s.element_size << s.cardinality << s.offset; }; #ifndef LDB_SERVER ostream& operator<< ( ostream& o, const oid_t &x ) { return o << x.oid; }; rpc_stream& operator>> ( rpc_stream& o, oid_t &s ) { return o >> s.oid; }; rpc_stream& operator<< ( rpc_stream& o, const oid_t &s ) { return o << s.oid; }; #else ostream& operator<< ( ostream& o, const oid_t &x ) { return o << (serial_t) x; }; rpc_stream& operator>> ( rpc_stream& o, oid_t &s ) { int n; o >> n; s = *(serial_t*) &n; return o; }; rpc_stream& operator<< ( rpc_stream& o, const oid_t &s ) { return o << (*(int*)(serial_t*) &s); }; #endif void object_cache::clear () { for(int i=0; inext) if (r->key == key) { info = *r->info; return true; }; return false; }; void object_cache::remove ( int key ) { element* s = NULL; for(element* r = memo[key % memo_size]; r!=NULL; s=r, r=r->next) if (r->key == key) { if (s) s->next = r->next; else memo[key % memo_size] = r->next; return; }; }; void object_cache::store ( int key, const sm_string info ) { int loc = key % memo_size; element* e = new (GC) element; e->key = key; e->info = new (GC) sm_string(info); e->next = memo[loc]; memo[loc] = e; }; pinned_object::pinned_object ( size_t size ) { tag = TRANSIENT; data.value.length = size; data.value.content = new (GC) char[size]; }; pinned_object::pinned_object ( size_t size, const char* content ) { tag = TRANSIENT; data.value.length = size; data.value.content = (char*) content; }; #ifndef LDB_SERVER static object_cache object_cache_table; #endif const char* pinned_object::get ( int offset, size_t size ) { sm_string sdata; if (LSM_DEBUG) cout << "Pinned: get " << size << " bytes, starting from " << offset << ", from the record " << oid() << " of size " << body_size() << " " << (void*)&data << endl; if (tag==TRANSIENT) if (offset+size > data.value.length) SM_ERROR(fform("insufficient number of bytes in transient object: get(%d,%d) from %d bytes", offset,size,data.value.length)); else return data.value.content+offset; else if (size <= 0) return NULL; #ifndef LDB_SERVER else if (object_cache_table.find(ref,sdata)) return sdata.content()+offset; else { sdata = pinned_get(offset,size); object_cache_table.store(ref,sdata); return sdata.content(); }; #else else return pinned_get(offset,size).content(); #endif }; void pinned_object::set ( int offset, size_t size, const char* ddata ) { if (LSM_DEBUG) cout << "Pinned: set " << size << " bytes, starting from " << offset << ", from the record " << oid() << " of size " << body_size() << " " << (void*)&data << endl; if (tag==TRANSIENT) { if (offset > data.value.length) SM_ERROR(fform("insufficient number of bytes in transient object: set(%d,%d) from %d bytes", offset,size,data.value.length)); else if (offset == data.value.length) append(ddata,size); else if (offset+size > data.value.length) { copy_n_bytes(data.value.content+offset,ddata,data.value.length-offset); append(ddata+data.value.length-offset,size+offset-data.value.length); } else copy_n_bytes(data.value.content+offset,ddata,size); } else { #ifndef LDB_SERVER object_cache_table.remove(ref); #endif return pinned_set(offset,size,sm_string(ddata,size)); }; }; int pinned_object::append ( const char* ddata, size_t data_size ) { if (LSM_DEBUG) cout << "Pinned: append " << data_size << " bytes to the record " << oid() << " of size " << body_size() << " " << (void*)&data << endl; if (tag==TRANSIENT) { int n = data.value.length; char* c = new (GC) char[n+data_size]; copy_n_bytes(c,data.value.content,n); copy_n_bytes(c+n,ddata,data_size); data.value.length += data_size; data.value.content = c; return n; } else { #ifndef LDB_SERVER object_cache_table.remove(ref); #endif return pinned_append(sm_string(ddata,data_size),data_size); }; }; /* insert data (of size data_size) after offset+size */ int pinned_object::append ( int offset, size_t size, const char* ddata, size_t data_size ) { if (LSM_DEBUG) cout << "Pinned: append " << data_size << " bytes, after " << offset << "+" << size << ", to the record " << oid() << " of size " << body_size() << " " << (void*)&data << endl; if (tag==TRANSIENT) { if (offset+size >= data.value.length) return append(ddata,data_size); else { int n = data.value.length; append(get(offset,size),size); append(ddata,data_size); return n; }; } else { #ifndef LDB_SERVER object_cache_table.remove(ref); #endif return pinned_append(offset,size,sm_string(ddata,data_size),data_size); }; }; int pinned_object::replace ( int offset, size_t size, const char* ddata, size_t data_size ) { if (LSM_DEBUG) cout << "Pinned: replace " << size << " bytes, in " << offset << "+" << size << ", in the record " << oid() << " of size " << body_size() << " " << (void*)&data << endl; if (tag==TRANSIENT) { if (size >= data_size) { set(offset,data_size,ddata); return offset; } else if (offset+size >= data.value.length) { set(offset,data.value.length-offset,ddata); append(ddata+data.value.length-offset,data_size-data.value.length+offset); return offset; } else return append(ddata,data_size); } else { #ifndef LDB_SERVER object_cache_table.remove(ref); #endif return pinned_replace(offset,size,sm_string(ddata,data_size),data_size); }; }; void pinned_object::remove ( int offset, size_t data_size, int remaining_size ) { if (LSM_DEBUG) cout << "Pinned: remove " << data_size << " bytes, starting at " << offset << " with remaining size " << remaining_size << ", from the record " << oid() << " of size " << body_size() << " " << (void*)&data << endl; set(offset,remaining_size,get(offset+data_size,remaining_size)); }; void pinned_object::print () { int n = body_size(); for(int i=0; icurrent_object->set(offset,size,data); #endif }; #ifdef LDB_SERVER rpc_stream& operator>> ( rpc_stream& o, pinned_object &s ) { int n; o >> n; s = *(pinned_object*) get_object(n); return o; }; rpc_stream& operator<< ( rpc_stream& o, const pinned_object &s ) { return o << store_object((void*) &s); }; #else rpc_stream& operator>> ( rpc_stream& o, pinned_object &s ) { s.tag = pinned_object::PINNED; return o >> s.ref; }; rpc_stream& operator<< ( rpc_stream& o, const pinned_object &s ) { return o << s.ref; }; #endif raw_string::raw_string ( const sm_string &s ) { size_t slen = s.size(); length = slen; offset = sizeof(raw_string); pinned_object* p = new (GC) pinned_object(sizeof(raw_string),(char*)this); p->append(s.content(),align_length(slen)); #ifdef LDB_SERVER transaction_thread_t::me()->current_object = p; #endif }; raw_string& raw_string::operator= ( const sm_string &s ) { ((sm_string) *this).operator=(s); return *this; }; ostream& operator<< ( ostream& o, const raw_string &x ) { return o << sm_string(x); }; ostream& operator<< ( ostream& o, const raw_sequence &x ) { return o << x.cardinality; }; raw_ref& raw_ref::operator= ( const sm_ref &s ) { #ifdef LDB_SERVER if (s.oid().serial_t::is_on_disk()) #else if (current_object->pinnedp()) #endif ref = s.oid(); else ref = *(oid_t*) &s.data.pinned; return *this; }; raw_ref::raw_ref ( sm_ref &x ) { ref = x.oid(); }; ostream& operator<< ( ostream& o, const raw_ref &x ) { return o << x.oid(); }; raw_sequence& raw_sequence::operator= ( const sm_sequence &s ) { ((sm_sequence)(*this)).operator=(s); return *this; }; sm_string::sm_string () { tag = TRANSIENT; length = 0; data.content = NULL; }; sm_string::sm_string ( const char* c ) { tag = TRANSIENT; if (c == NULL) { length = 0; data.content = NULL; } else { length = strlen(c); data.content = (char*) c; }; }; sm_string::sm_string ( const char* c, int n ) { tag = TRANSIENT; length = n; data.content = (char*) c; }; sm_string::sm_string ( pinned_object* po, size_t header_offset ) { tag = PINNED; data.pinned = po; length = header_offset; }; sm_string::sm_string ( const raw_string &x ) { #ifdef LDB_SERVER tag = PINNED; data.pinned = transaction_thread_t::me()->current_object; length = ((char*) &x) - (data.pinned->get(0,1)); #else tag = TRANSIENT; length = x.length; data.content = (char*) get_pinned_data(x.offset,x.length).content(); #endif }; size_t sm_string::size () const { if (tag == TRANSIENT) return length; else return *(size_t*) data.pinned->get(length,sizeof(size_t)); }; const sm_string sm_string::substring ( int start, size_t dsize ) const { if (start > size()) return sm_string(); else return sm_string(content()+start,min(dsize,size()-start)); }; size_t sm_string::body_offset () const { return ((raw_string*) data.pinned->get(length,sizeof(raw_string)))->offset; }; const char* sm_string::content () const { if (tag == TRANSIENT) return data.content; else return data.pinned->get(body_offset(),size()); }; const char* sm_string::scontent () const { char* os = new (GC) char[size()+1]; copy_n_bytes(os,content(),size()); os[size()] = '\0'; return os; }; const char sm_string::operator[] ( int n ) const { if (n >= size()) return '\0'; else if (tag == TRANSIENT) return data.content[n]; else return *data.pinned->get(body_offset()+n,1); }; sm_string& sm_string::operator= ( const sm_string &source ) { sm_storage_type dtag = tag; if (dtag == TRANSIENT && size()==0) dtag = source.tag; if (dtag == TRANSIENT || source.size() == 0) { tag = TRANSIENT; length = source.length; data.content = (char*) source.content(); } else if (tag == TRANSIENT) { tag = PINNED; length = source.length; data.pinned = source.data.pinned; } else { tag = PINNED; size_t slen = source.size(); raw_string nh( slen, data.pinned->replace(body_offset(),align_length(size()), source.content(),align_length(slen)) ); data.pinned->set(length,sizeof(raw_string),(char*) &nh); }; return *this; }; sm_string& sm_string::operator= ( const raw_string &source ) { return operator=(sm_string(*(raw_string*) &source)); }; const sm_string sm_string::operator+ ( const sm_string source ) const { char* s = new (GC) char[size()+source.size()]; copy_n_bytes(s,content(),size()); copy_n_bytes(s+size(),source.content(),source.size()); return sm_string(s,size()+source.size()); }; const sm_string sm_string::operator+ ( const char* source ) const { size_t slen = strlen(source); char* s = new (GC) char[size()+slen]; copy_n_bytes(s,content(),size()); copy_n_bytes(s+size(),source,slen); return sm_string(s,size()+slen); }; short sm_string::strcmp ( const sm_string source ) const { for(int i = 0; i source[i]) return 1; return (source.size()-size()); }; ostream& operator<< ( ostream& o, const sm_string &s ) { return o << s.scontent(); }; rpc_stream& operator>> ( rpc_stream& o, sm_string &s ) { int len; o >> len; const char* v = NULL; if (len>0) { v = new (GC) char[len]; o.rpc_receive((void*) v,len); s = sm_string(v,len); } else s = sm_string(); return o; }; rpc_stream& operator<< ( rpc_stream& o, const sm_string &s ) { int len = s.size(); if (len==0) return o << (int) 0; o << len; o.rpc_send((void*) s.scontent(),len); return o; }; sm_sequence::sm_sequence ( size_t size ) { element_size = size; seq_cardinality = 0; tag = TRANSIENT; data.sequence = NULL; }; sm_sequence::sm_sequence () { element_size = 1; seq_cardinality = 0; tag = TRANSIENT; data.sequence = NULL; }; sm_sequence::sm_sequence ( pinned_object* po, size_t header_offset ) { data.pinned = po; element_size = header_offset; sm_sequence* sh = ((sm_sequence*) po->get(header_offset,sizeof(sm_sequence))); seq_cardinality = sh->seq_cardinality; }; size_t sm_sequence::esize () const { int size = (tag == TRANSIENT) ? element_size : *(size_t*) data.pinned->get(element_size,sizeof(size_t)); if (false && size <= 0) SM_ERROR("Uninitialized Sequence"); return size; }; sm_sequence::sm_sequence ( const raw_sequence &x ) { #ifdef LDB_SERVER seq_cardinality = x.cardinality; tag = PINNED; data.pinned = transaction_thread_t::me()->current_object; element_size = ((char*) &x) - (data.pinned->get(0,1)); #else tag = TRANSIENT; element_size = x.element_size; seq_cardinality = x.cardinality; int len = x.element_size*x.cardinality; data.sequence = new (GC) char[len]; copy_n_bytes(data.sequence,get_pinned_data(x.offset,len).content(),len); #endif }; size_t sm_sequence::body_offset () const { return ((raw_sequence*) data.pinned->get(element_size,sizeof(raw_sequence)))->offset; }; const char* sm_sequence::access ( int n ) const { if (n >= seq_cardinality) SM_ERROR("Insufficient number of elements in sequence"); else if (tag == TRANSIENT) return data.sequence+n*esize(); else return data.pinned->get(body_offset()+n*esize(),esize()); }; const char* sm_sequence::sequence () const { if (tag == TRANSIENT) return data.sequence; else return data.pinned->get(body_offset(),esize()*seq_cardinality); }; sm_sequence& sm_sequence::operator= ( const sm_sequence &source ) { if (tag == TRANSIENT || source.seq_cardinality == 0) { tag = TRANSIENT; element_size = source.element_size; seq_cardinality = source.seq_cardinality; data.sequence = (char*) source.sequence(); } else { size_t slen = source.esize()*source.seq_cardinality; raw_sequence nh; nh.element_size = esize(); nh.cardinality = source.seq_cardinality; nh.offset = data.pinned->replace(body_offset(),esize()*seq_cardinality, source.sequence(),slen); seq_cardinality = source.seq_cardinality; data.pinned->set(element_size,sizeof(raw_sequence),(char*) &nh); }; return *this; }; bool sm_sequence::member ( const char* data ) const { for(int i=0; i= seq_cardinality) SM_ERROR("Insufficient number of elements in sequence"); else if (tag == TRANSIENT) copy_n_bytes(data.sequence+n*esize(),c,esize()); else data.pinned->set(body_offset()+n*esize(),esize(),c); }; void sm_sequence::append ( const char* c, int n ) { if (tag == TRANSIENT) { char* s = new (GC) char[(seq_cardinality+n)*esize()]; copy_n_bytes(s,data.sequence,seq_cardinality*esize()); copy_n_bytes(s+seq_cardinality*esize(),c,n*esize()); seq_cardinality += n; data.sequence = s; } else { // else PINNED raw_sequence nh; nh.element_size = esize(); nh.cardinality = seq_cardinality+n; nh.offset = data.pinned->append(body_offset(),seq_cardinality*esize(),c,n*esize()); seq_cardinality += n; data.pinned->set(element_size,sizeof(raw_sequence),(char*) &nh); }; }; sm_sequence sm_sequence::subsequence ( int n, size_t size ) const { sm_sequence s(size*esize()); s.append(sequence()+n,size); return s; }; void sm_sequence::append ( const char* c ) { append(c,1); }; void sm_sequence::insert ( const char* c ) { if (!member(c)) append(c); }; void sm_sequence::insert ( const char* c, bool* (*eq)(const char*,const char*) ) { if (!member(c,eq)) append(c); }; void sm_sequence::remove ( int n ) { if (n >= seq_cardinality) SM_ERROR("Insufficient number of elements in sequence"); else if (tag == TRANSIENT) { copy_n_bytes(data.sequence+n*esize(),data.sequence+(n+1)*esize(),(seq_cardinality-n-1)*esize()); seq_cardinality--; } else { // else PINNED data.pinned->remove(body_offset()+n*esize(),esize(),(seq_cardinality-n-1)*esize()); seq_cardinality--; raw_sequence nh; nh.element_size = element_size; nh.cardinality = seq_cardinality; nh.offset = body_offset(); data.pinned->set(element_size,sizeof(raw_sequence),(char*) &nh); }; }; ostream& operator<< ( ostream& o, const sm_sequence &x ) { return o << x.element_size; }; sm_ref::sm_ref ( const char* content ) { tag = TRANSIENT; data.pointer = new (GC) char[strlen(content)+1]; strcpy(data.pointer,content); }; sm_ref::sm_ref ( size_t size ) { tag = TRANSIENT; data.pointer = new (GC) char[size]; }; sm_ref::sm_ref () { tag = TRANSIENT; data.pointer = NULL; }; sm_ref::sm_ref ( pinned_object* po, int header_offset ) { tag = PINNED; header = header_offset; data.pinned = po; }; sm_ref::sm_ref ( const oid_t x ) { tag = RAW; data.oid = new (GC) oid_t(x); }; sm_ref::sm_ref ( const raw_ref &s ) { #ifdef LDB_SERVER if (s.ref.serial_t::is_on_disk()) { tag = RAW; data.oid = new (GC) oid_t(s.ref); } else { tag = PINNED; data.pinned = *(pinned_object**) &s.ref; }; #else tag = RAW; data.oid = new (GC) oid_t(s.ref); #endif }; const oid_t sm_ref::oid () const { switch (tag) { case TRANSIENT: if (data.pointer==NULL) return oid_t(); else return *(oid_t*) data.pointer; case RAW: return *data.oid; case PINNED: if (header>=0) return ((raw_ref*) data.pinned->get(header,sizeof(raw_ref)))->ref; else return oid_t(); }; }; const char* sm_ref::deref ( size_t size ) const { switch (tag) { case TRANSIENT: return data.pointer; #ifdef LDB_SERVER case PINNED: { if (data.pinned==NULL) SM_ERROR("Empty current object"); transaction_thread_t::me()->current_object = data.pinned; return transaction_thread_t::me()->current_object->get(0,size); }; case RAW: { transaction_thread_t::me()->current_object = new (GC) pinned_object(oid()); return transaction_thread_t::me()->current_object->get(0,size); }; #else case PINNED: { if (data.pinned==NULL) SM_ERROR("Empty current object"); current_object = data.pinned; return current_object->get(0,size); }; case RAW: { sm_string sdata; int ref = oid().oid; if (!object_cache_table.find(ref,sdata)) { sdata = object_deref(oid(),size); object_cache_table.store(ref,sdata); } else set_current_object(ref); return sdata.content(); }; #endif }; }; size_t sm_ref::size () const { switch (tag) { case TRANSIENT: return strlen(data.pointer); case RAW: return pinned_object(oid()).body_size(); case PINNED: return data.pinned->body_size(); }; }; ostream& operator<< ( ostream& o, const sm_ref &x ) { return o << x.oid(); }; rpc_stream& operator>> ( rpc_stream& o, sm_ref &s ) { s.tag = RAW; oid_t x; o >> x; s.data.oid = new (GC) oid_t(x); return o; }; rpc_stream& operator<< ( rpc_stream& o, const sm_ref &s ) { return o << s.oid(); }; sm_ref sm_file::append ( size_t size ) { return append(new (GC) char[size],size); }; ostream& operator<< ( ostream& o, const sm_file &x ) { return o; }; ostream& operator<< ( ostream& o, const sm_index &x ) { return o; }; #ifdef LDB_SERVER rpc_stream& operator>> ( rpc_stream& o, sm_file &s ) { int n; o >> n; s = *(sm_file*) get_object(n); return o; }; rpc_stream& operator<< ( rpc_stream& o, const sm_file &s ) { return o << store_object((void*) &s); }; rpc_stream& operator>> ( rpc_stream& o, sm_index &s ) { int n; o >> n; s = *(sm_index*) get_object(n); return o; }; rpc_stream& operator<< ( rpc_stream& o, const sm_index &s ) { return o << store_object((void*) &s); }; #else rpc_stream& operator>> ( rpc_stream& o, sm_file &s ) { return o >> s.ref; }; rpc_stream& operator<< ( rpc_stream& o, const sm_file &s ) { return o << s.ref; }; rpc_stream& operator>> ( rpc_stream& o, sm_index &s ) { return o >> s.ref; }; rpc_stream& operator<< ( rpc_stream& o, const sm_index &s ) { return o << s.ref; }; #endif #ifndef LDB_SERVER storage_manager* current_storage_manager () { storage_manager* m = new (GC) storage_manager(); m->ref = ldb_sm_location; return m; }; #endif