/********************************************************************************* * * The query evaluator code for ODMG OQL * Copyright (c) 1999-2003 by Leonidas Fegaras, the University of Texas at * Arlington. All rights reserved. * Programmer: Leonidas Fegaras * Date: 9/27/98, 1/12/02 * ********************************************************************************/ #include #include #include #include #include #include #include #include "gc_cpp.h" #include "basic.h" #include "odl.h" #include "eval.h" /* if you set it to true, it will meterialize all streams */ bool materialize_streams = false; static list* opened_streams = new list; tuple global_tuple; bool first_time = true; const int max_stream_environments = 1000; int stream_environment = 0; /* A vector of static streams used by the current query */ Stream** evaluation_streams[max_stream_environments]; int current_stream[max_stream_environments]; void allocate_streams ( int sn ) { if (stream_environment >= max_stream_environments) evaluation_error("Run out of streams. Probably caused by an infinite recursion"); evaluation_streams[stream_environment] = new (GC) Stream*[sn]; current_stream[stream_environment++] = 0; }; int pop_streams () { if (stream_environment <= 0) evaluation_error("Tried to pop an empty stream environment"); stream_environment--; return 0; }; void new_stream ( Stream* s ) { evaluation_streams[stream_environment-1][current_stream[stream_environment-1]++] = s; }; Stream* nth_stream ( int sn ) { return evaluation_streams[stream_environment-1][sn]; }; static list* temporary_files = new list; // hook to the interpreter (defined in interpreter.gen) void* eval_expr ( Expr e, binding* env, bool coercep ); long mod ( long x, long y ) { return x % y; }; bool lambda_or ( bool x, bool y ) { return x || y; }; bool lambda_and ( bool x, bool y ) { return x && y; }; // an evaluation error is a fatal error that aborts the current // transaction and exits the program void evaluation_error ( const char* s) { eout << "*** Evaluation error: " << s << "\n"; throw odmg_error_class(eout.str()); }; // create an index with name index_name for a given relation void create_pool_index ( const char* relation, const char* extent, Key* (*key) (tuple), const char* index_name, const bool unique ) { sm_index* idx = new sm_index(new_index(new string(index_name),new string(relation),unique)->oid,unique); Stream s = Stream(extent); tuple e = s.next(); while (e->is_valid()) // index all elements of the pool { sm_ref* data = (*e)[0].reference(); idx->add(*(*key)(e),data->oid()); e = s.next(); }; s.close(); }; sm_ref* new_persistent_object ( sm_string* extent, long* size ) { return new sm_ref(find_extent(extent->scontent())->append(*size)); }; sm_ref* new_transient_object ( long* size ) { return new sm_ref(new (GC) pinned_object((size_t) *size),-1); }; sm_index* find_idx ( const char* index_name ) { Ref d = get_data_entry(new string(index_name)); if (!d.valid() || d->tag != index_entry) evaluation_error(fform("Index %s doesn't exist",index_name)); return new sm_index(d->oid,d->uniquep); }; sm_index* find_index ( sm_string* index_name ) { return (sm_index*) find_idx(index_name->scontent()); }; void insert_into_index ( sm_string* index_name, Key* key, sm_ref* object ) { sm_index* index = find_idx(index_name->scontent()); index->add(*key,object->oid()); }; void remove_from_index ( sm_string* index_name, Key* key, sm_ref* object ) { sm_index* index = find_idx(index_name->scontent()); index->remove(*key,object->oid()); }; // must be called once before the first query execution void odmg_initialize () { }; // remove all transient pools of a transactions (should be called once per process) void odmg_cleanup () { cleanup_temp_files(); }; bool Element::operator== ( const Element e ) const { if (tag != e.tag) return false; if (referencep()) return info.reference == e.info.reference; else if (valuep()) return info.value == e.info.value; }; sm_ref* Element::reference () const { if (nullp()) return new (GC) sm_ref; else return info.reference; }; void* Element::value () const { if (nullp()) return NULL; else return info.value; }; // construct a Tuple value with n elements Tuple::Tuple ( const int n ) { if (n>0) v.append(new (GC) Element[n],n); }; // construct a Tuple value with one null element Tuple::Tuple () { v.append(Element()); }; bool Tuple::operator== ( Tuple x ) { if (x.size() != size()) return false; for(int i=0; isize()); for(int i=0; iset(i,(*this)[i]); for(int i=0; isize(); i++) z->set(size()+i,(*x)[i]); return z; }; sm_file* find_extent ( const char* extent_name ) { Ref d = get_data_entry(new string(extent_name)); if (!d.valid() || d->tag != extent_entry) evaluation_error(fform("Extent %s doesn't exist",extent_name)); return new sm_file(d->oid); }; // construct a materialized stream and open it Stream::Stream ( const char* extent_name ) { tag = materialized; closed = true; last_data = NULL; kept = NULL; found = false; prefix = NULL; info.mat.is_transient = false; info.mat.name = new (GC) char[strlen(extent_name)+1]; strcpy(info.mat.name,extent_name); info.mat.pool = find_extent(extent_name); info.mat.scan = NULL; open(); }; // construct an indexed stream to access values of keys between low and high and open it Stream::Stream ( sm_index* index, const bool is_temporary, Key* low, Key* high ) { tag = indexed; closed = true; last_data = NULL; kept = NULL; found = false; prefix = NULL; info.ind.is_temporary = is_temporary; info.ind.index = new sm_index; info.ind.index = index; info.ind.low = low; info.ind.high = high; open(); }; Stream* materialized_stream ( sm_string* extent_name, tuple x ) { Stream* s = new Stream(extent_name->scontent()); s->prefix = x; return s; }; Stream* indexed_stream ( sm_index* index, const bool* is_temporary, Key* low, Key* high, tuple x ) { Stream* s = new Stream(index,*is_temporary,low,high); s->prefix = x; return s; }; Stream* residual_buffer ( int* n, tuple x ) { Stream* s = new Stream(*n); return s; }; // construct a lazy (suspended) stream Stream::Stream ( suspended_function f ) { tag = suspended; closed = false; last_data = NULL; kept = NULL; found = false; prefix = NULL; info.lazy.suspended_source = f; info.lazy.closure = NULL; if (materialize_streams) this->materialize(); }; // construct a residual stream of size n Stream::Stream ( const int n ) { tag = residual; closed = false; last_data = NULL; kept = NULL; found = false; prefix = NULL; info.res.size = n; info.res.current = 0; info.res.current_joinp = 0; info.res.data = new (GC) tuple[n]; info.res.joinp = new (GC) bool[n]; if (n>0) info.res.data[0] = NULL; }; // construct a residual stream with one element e Stream::Stream ( Element e ) { tag = residual; closed = false; last_data = NULL; kept = NULL; found = false; prefix = NULL; location = 0; info.res.size = 1; info.res.current = 0; info.res.current_joinp = 0; info.res.data = new (GC) tuple[1]; info.res.joinp = new (GC) bool[1]; info.res.data[0] = new Tuple(e); info.res.joinp[0] = false; }; Stream* nested_collection_stream ( tuple (*nth) (tuple x,int* i), tuple env ) { Stream* s = new Stream(); s->tag = Stream::nested; s->info.nested.nth = nth; s->info.nested.env = env; s->info.nested.location = 0; s->info.nested.closure = NULL; s->location = 0; s->closed = false; s->last_data = NULL; s->kept = NULL; s->found = false; s->prefix = env; return s; }; Stream* nested_collection_stream_with_closure ( lambda_closure* closure, tuple env ) { Stream* s = new Stream(); s->tag = Stream::nested; s->info.nested.nth = NULL; s->info.nested.env = env; s->info.nested.location = 0; s->info.nested.closure = closure; s->location = 0; s->closed = false; s->last_data = NULL; s->kept = NULL; s->found = false; s->prefix = env; return s; }; Stream::~Stream () { if (tag == materialized) delete info.mat.scan; }; // open a stream void Stream::open () { closed = false; last_data = NULL; if (tag == materialized) { kept = NULL; found = false; info.mat.scan = new sm_file_scan(*info.mat.pool); } else if (tag == indexed) { const sm_string clow = sm_string("\001"); const sm_string chigh = sm_string("\255"); sm_string low = (info.ind.low) ? *info.ind.low : clow; sm_string high = (info.ind.high) ? *info.ind.high : chigh; info.ind.indexscan = new sm_index_scan(*info.ind.index,low,high); kept = NULL; found = false; opened_streams = Cons(this,opened_streams); } else if (tag == residual) { info.res.current = 0; info.res.current_joinp = 0; } else if (tag == nested) location = 0; }; // close a stream void Stream::close () { closed = true; if (info.mat.scan && tag == materialized) { info.mat.scan->close(); delete info.mat.scan; info.mat.scan = NULL; } else if (info.ind.indexscan && tag == indexed) { info.ind.indexscan->close(); delete info.ind.indexscan; info.ind.indexscan = NULL; }; }; // get the next element of the stream tuple Stream::next () { if (is_closed()) if (first_time) // if this is the first next() on this stream, open it first { open(); return next(); } else return invalid_data(); else if (last_data) // use the current element return last_data; else if (tag == suspended) // read the next element from the lazy stream { tuple t; if (info.lazy.closure == NULL) t = (*info.lazy.suspended_source)(); else // evaluate the suspended function using the interpreter t = (tuple) eval_expr((Expr) info.lazy.closure->body, (binding*) info.lazy.closure->environment, true); if (t->is_valid()) return (prefix) ? prefix->concat(t) : t; } else if (tag == materialized) // scan the next element from the pool { sm_ref data; if (info.mat.is_transient) // for a transient pool, we have a stream of tuples { if (info.mat.scan->next(data)) { int len = *(int*) data.deref(sizeof(int)); Element* c = (Element*) (data.deref(sizeof(int)+len*sizeof(Element))+sizeof(int)); tuple t = new Tuple(len); for(int i=0; iset(i,c[i]); return t; }; } // for an extent, create a Tuple with one element else if (info.mat.scan->next(data)) { tuple t = new Tuple(new sm_ref(data)); tuple s = prefix ? prefix->concat(t) : t; return s; }; } else if (tag == indexed) // use the index { sm_string key; raw_ref data; if (info.ind.is_temporary) { if (info.ind.indexscan->next(key,data)) { Ref sd = (Ref) data; return new Tuple(*sd); }; } else if (info.ind.indexscan->next(key,data)) return ((prefix) ? (prefix->concat(new Tuple(new sm_ref(data)))) : (new Tuple(new sm_ref(data)))); } else if (tag == sorted) { // from sorted stream sm_string key, data; if (info.sort.sort_scan->next(key,data)) return (tuple) data.content(); } else if (tag == nested) { tuple x; if (info.nested.closure == NULL) x = info.nested.nth(info.nested.env,new (GC) int(info.nested.location++)); // evaluate the suspended nth function using the interpreter else { const static String X = new string("x"); const static String Y = new string("y"); x = (tuple) eval_expr((Expr) info.nested.closure->body, ((binding*) info.nested.closure->environment)->extend(X,info.nested.env) ->extend(Y,new (GC) int(info.nested.location++)), true); }; if (!x->is_valid()) return invalid_data(); if (prefix) return prefix->concat(x); return x; } else if (tag == residual) // use a residual element if (info.res.current < info.res.size) return info.res.data[info.res.current++]; close(); first_time=false; return invalid_data(); }; Stream* materialize_tuples ( tuple (*next_tuple)() ) { Stream* new_stream = new Stream(); new_stream->tag = Stream::materialized; new_stream->closed = true; new_stream->info.mat.is_transient = true; sm_file* f = new (GC) sm_file(true); new_stream->info.mat.name = new (GC) char; new_stream->info.mat.name[0] = '\0'; new_stream->info.mat.pool = f; temporary_files = Cons(f,temporary_files); sm_file_append p(*f); tuple d = next_tuple(); int len = (d->is_valid()) ? d->size() : 0; int size = sizeof(int)+len*sizeof(Element); char* dest = new (GC) char[size]; (*(int*) dest) = len; while (d->is_valid()) { copy_n_bytes(dest+sizeof(int),d->content(),size-sizeof(int)); p.append(dest,size); d = next_tuple(); }; p.close(); new_stream->open(); return new_stream; }; void cleanup_temp_files () { ldb_sm->begin_transaction(); for(list* r=opened_streams; r->consp(); r=r->tl) r->hd->close(); opened_streams = new list; for(list* r=temporary_files; r->consp(); r=r->tl) r->hd->delete_file(); temporary_files = new list; ldb_sm->commit_transaction(); }; static Stream* current_materialized_stream; tuple next_tuple () { return current_materialized_stream->next(); }; // materialize a suspended stream into a transient pool Stream* Stream::materialize () { current_materialized_stream = this; return materialize_tuples(next_tuple); }; Stream* materialize_stream ( Stream* s ) { return s->materialize(); }; // residualize the stream s into memory void Stream::residualize ( Stream* s ) { if (tag == residual) { closed = true; tuple x = s->next(); int i = 0; for(; x->is_valid() && inext(); }; info.res.size = i; info.res.current = 0; info.res.current_joinp = 0; open(); }; }; // sequential table scan over the stream s using pred to filter out elements tuple table_scan ( Stream* s, bool* (*pred) (tuple) ) { tuple x = s->next(); while (x->is_valid()) { if (*(*pred)(x)) return x; x = s->next(); }; return invalid_data(); }; // index scan over the indexed stream s tuple index_scan ( Stream* s, bool* (*pred) (tuple) ) { if (s->tag != Stream::indexed) evaluation_error("expected an indexed stream"); tuple x = s->next(); while (x->is_valid()) { if (*(*pred)(x)) return x; x = s->next(); }; return invalid_data(); }; /* Sorting is done by taking sorting_buffer_size number of tuples * from the input stream into memory, sort them using quick sort, * and dump them in a runfile. The resulting files are merged * in one step using a priority queue implemented as a heap. */ // Number of tuples that can be sorted in memory const int sorting_buffer_size = 1000; const int MAXSTACKDEPTH = 30; const int LIMIT = 10; static long* (*current_cmp_function) (tuple,tuple); int tuple_compare ( const tuple x, const tuple y ) { return *(*current_cmp_function)(x,y); }; /* Taken from SHORE: sm/sort.cc */ void QuickSort ( tuple a[], int cnt, int (*compare)(const tuple, const tuple) ) { int stack[MAXSTACKDEPTH][2]; int sp = 0; int l, r; tuple tmp; register int i, j; tuple pivot; long randx = 1; for (l = 0, r = cnt - 1; ; ) { if (r - l < LIMIT) { if (sp-- <= 0) break; l = stack[sp][0], r = stack[sp][1]; continue; }; randx = (randx * 1103515245 + 12345) & 0x7fffffff; pivot = a[l + randx % (r - l)]; for (i = l, j = r; i <= j; ) { while (compare(a[i], pivot) < 0) i++; while (compare(pivot, a[j]) < 0) j--; if (i < j) { tmp=a[i]; a[i]=a[j]; a[j]=tmp; } if (i <= j) i++, j--; }; if (j - l < r - i) { if (i < r) stack[sp][0] = i, stack[sp++][1] = r; r = j; } else { if (l < j) stack[sp][0] = l, stack[sp++][1] = j; l = i; }; }; for (i = 1; i < cnt; a[j+1] = pivot, i++) for (j = i - 1, pivot = a[i]; j >= 0 && (compare(pivot, a[j]) < 0); a[j+1] = a[j], j--) ; }; // the heap element typedef struct { tuple data; Stream* stream; } opened_stream; void heapify ( opened_stream* s[], int size, int i ) { int l = 2*i+1; int r = l+1; int min = (l < size && tuple_compare(s[l]->data,s[i]->data) < 0) ? l : i; if (r < size && tuple_compare(s[r]->data,s[min]->data) < 0) min = r; if (min != i) { opened_stream* temp = s[i]; s[i] = s[min]; s[min] = temp; heapify(s,size,min); }; }; void build_heap ( opened_stream* s[], int size ) { for(int i = (int) floor(size/2.0)-1; i>=0; i--) heapify(s,size,i); }; tuple replace_min ( opened_stream* s[], int &size ) { if (size==0) return invalid_data(); tuple old = s[0]->data; tuple min = s[0]->stream->next(); if (min->is_valid()) s[0]->data = min; else s[0] = s[--size]; heapify(s,size,0); return old; }; static int current_file_num; static opened_stream** current_opened_streams; tuple current_next_tuple () { return replace_min(current_opened_streams,current_file_num); }; Stream* sort_stream ( Stream* s, long* (*cmp) (tuple,tuple) ) { Stream* ns; list* streams = new list; current_cmp_function = cmp; int files = 0; tuple x = s->next(); for(; x->is_valid(); files++) { ns = new Stream(sorting_buffer_size); tuple* nsv = ns->info.res.data; int i = 0; for(; x->is_valid() && inext()) nsv[i++] = x; ns->info.res.size = i; ns->info.res.current = 0; ns->info.res.current_joinp = 0; QuickSort(nsv,i,&tuple_compare); if (x->is_valid()) streams = streams->cons(ns->materialize()); }; if (files==1) return ns; streams = streams->cons(ns); opened_stream* os[files]; for(int i=0; itl, i++) { streams->hd->close(); streams->hd->open(); os[i] = new (GC) opened_stream; os[i]->data = streams->hd->next(); os[i]->stream = streams->hd; }; build_heap(os,files); current_opened_streams = os; current_file_num = files; return materialize_tuples(current_next_tuple); }; Stream* sort_scan_stream ( Stream* s, Key* (*key) (tuple), bool* uniquep ) { sm_sort_scan* sc = new (GC) sm_sort_scan(uniquep); tuple x = s->next(); while(x->is_valid()) { sc->put(*key(x),sm_string((char*)x,sizeof(Element)*x->size())); x = s->next(); }; return new Stream(sc); }; // nested-loop join using the join predicate pred (if outer=true, it's a left-outer join); // the equality function keep determines which data to keep from the outer stream in outerjoin tuple nested_loop ( Stream* sx, Stream* sy, bool* (*pred) (tuple,tuple), bool* outer, bool* (*keep) (tuple,tuple) ) { tuple x = sx->next(); while (x->is_valid()) { tuple y = sy->next(); while (y->is_valid()) { if (*(*pred)(x,y)) { sx->last_data = x; // remember x so that sx->next()==x next time sx->kept = NULL; sx->found = true; return x->concat(y); }; y = sy->next(); }; /* end of inner stream */ if (*outer && !sx->found) // if outer tuple wasn't joined with any inner tuple sx->kept = x; sx->found = false; sx->last_data = NULL; // forget x since we reached end of inner stream x = sx->next(); if (x->is_valid()) { sy->open(); if (*outer && sx->kept != NULL && !*(*keep)(x,sx->kept)) { /* case of outerjoin: if the new x is keep-different than the previous x and */ /* the previous x was not joined with any y, then join the previous x with null */ sx->last_data = x; tuple res = sx->kept->concat(new Tuple()); sx->kept = NULL; return res; }; } else if (*outer && sx->kept != NULL) // outerjoin case for end of outer stream return sx->kept->concat(new Tuple()); }; return invalid_data(); }; /* Block nested loop: read the outer stream sx in chunks of 1000 tuples into the residual * * buffer rx. For each outer chunk, read the inner stream sy only once. So sy is read * * rx/1000 times. Outer joins are done by marking each residual tuple in rx if it's * * joined (joinp flag) and by scanning rx at the end for unjoined tuples. */ tuple block_nested_loop ( Stream* sx, // outer Stream* rx, // residualized sx: buffer in memory holding tuples from sx Stream* sy, // inner bool* (*pred) (tuple,tuple), bool* outer, bool* (*keep) (tuple,tuple) ) { tuple x = rx->next(); if (x == NULL) { rx->residualize(sx); x = rx->next(); } else if (!x->is_valid()) // end of residual buffer { if (sy->last_data == NULL) // work on the next residual chunk of sx { rx->residualize(sx); x = rx->next(); if (x->is_valid()) sy->open(); } else { // if sy->last_data isn't null, we still have work to do with rx rx->open(); sy->last_data = NULL; x = rx->next(); }; }; while (x->is_valid()) { tuple y = sy->next(); while (y->is_valid()) // for each inner tuple ... { while(x->is_valid()) // ... scan the entire residual buffer { if (*(*pred)(x,y)) // success! { sy->last_data = y; // remember y so that sy->next()==y next time // raise the joinp flag of the current tuple rx->info.res.joinp[rx->info.res.current-1] = true; return x->concat(y); }; x = rx->next(); }; rx->open(); x = rx->next(); sy->last_data = NULL; // forget y since we reached end of inner stream y = sy->next(); }; /* end of inner stream */ if (*outer) // check the entire residual buffer for unjoined tuples for(int i = rx->info.res.current_joinp; i < rx->info.res.size; i++) { if (rx->kept == NULL) { rx->kept = rx->info.res.data[i]; rx->found = rx->info.res.joinp[i]; } else if (*(*keep)(rx->kept,rx->info.res.data[i])) rx->found = rx->found || rx->info.res.joinp[i]; else { // when we find the first keep-different tuple, we may have an outerjoin tuple tuple prev_tuple = rx->kept; bool found = rx->found; rx->kept = rx->info.res.data[i]; rx->found = rx->info.res.joinp[i]; if (!found) // none of the keep-equal tuples was joined { rx->info.res.current_joinp = i+1; return prev_tuple->concat(new Tuple()); }; }; }; rx->residualize(sx); x = rx->next(); if (x->is_valid()) sy->open(); }; return invalid_data(); }; // index nested-loop join using an index key and the join prediacate pred tuple indexed_nested_loop ( Stream* sx, Stream* sy, Key* (*key) (tuple), bool* (*pred) (tuple,tuple), bool* outer, bool* (*keep) (tuple,tuple) ) { if (sy->tag != Stream::indexed) evaluation_error("indexed nested loop expects an indexed inner stream"); tuple x = sx->next(); while (x->is_valid()) { sy->info.ind.low = (*key)(x); sy->info.ind.high = (*key)(x); tuple y = sy->next(); while (y->is_valid()) { if (*(*pred)(x,y)) { sx->last_data = x; // remember x so that next sx->next()==x sx->kept = NULL; sx->found = true; return x->concat(y); }; y = sy->next(); }; /* end of inner stream */ if (*outer && !sx->found) // if outer tuple wasn't joined with any inner tuple sx->kept = x; sx->found = false; sx->last_data = NULL; // forget x since we reached end of inner stream x = sx->next(); if (x->is_valid()) { sy->info.ind.low = (*key)(x); sy->info.ind.high = (*key)(x); sy->open(); if (*outer && sx->kept != NULL && !*(*keep)(x,sx->kept)) { /* case of outerjoin: if the new x is keep-different than the previous x and */ /* the previous x was not joined with any y, then join the previous x with null */ sx->last_data = x; tuple res = sx->kept->concat(new Tuple()); sx->kept = NULL; return res; }; } else if (*outer && sx->kept != NULL) // outerjoin case for end of outer stream return sx->kept->concat(new Tuple()); }; return invalid_data(); }; // merge streams using geq (ie. >=); it assumes that streams are sorted and the sort key // is a key for at least one of the input streams (determined by left_key) tuple merge_join ( Stream* sx, Stream* sy, bool* (*pred) (tuple,tuple), bool* (*geq) (tuple,tuple), bool* left_key ) { tuple x = sx->next(); tuple y = sy->next(); while (x->is_valid() && y->is_valid()) { if (*(*pred)(x,y)) { if (*left_key) sx->last_data = x; else sy->last_data = y; return x->concat(y); }; sx->last_data = NULL; sy->last_data = NULL; if (*(*geq)(x,y)) y = sy->next(); else x = sx->next(); }; return invalid_data(); }; // unnest s using path (if outer=true, it's an outer-unnest); // path(x,i) gives the ith element of the nested collection in x tuple unnest ( Stream* s, tuple (*path) (tuple,int*), bool* (*pred) (tuple), bool* outer, bool* (*nullp) (tuple), bool* (*keep) (tuple,tuple) ) { tuple x = s->next(); while (x->is_valid()) { if (*(*nullp)(x)) if (*outer) return x->concat(new Tuple()); else { x = s->next(); continue; }; tuple y = (*path)(x,&s->location); s->location++; while (y->is_valid()) { tuple z = x->concat(y); if (*(*pred)(z)) { s->last_data = x; // remember x so that next s.next()==x s->kept = NULL; s->found = true; return z; }; y = (*path)(x,&s->location); s->location++; }; // reached end of inner collection if (*outer && !s->found) // if outer tuple wasn't joined with any inner tuple s->kept = x; s->found = false; s->last_data = NULL; // forget x s->location = 0; // reset inner collection scan x = s->next(); if (x->is_valid()) { if (*outer && s->kept != NULL && !*(*nullp)(s->kept) && !*(*nullp)(x) && !*(*keep)(x,s->kept)) { s->last_data = x; tuple res = s->kept->concat(new Tuple()); s->kept = NULL; return res; }; } else if (*outer && s->kept != NULL) // outerjoin case for end of outer stream return s->kept->concat(new Tuple()); }; return invalid_data(); }; bool test_for_nulls ( tuple x ) { for(int i = 0; isize(); i++) if ((*x)[i].nullp()) return true; return false; }; // nest (group-by) the stream s using the key to get the group-by value; // it assumes that all tuples in s with the same key value are consecutive in s. // It merges each group (which contains all values with the same key) using // the merge function and zero as the initial value and appends the group to the key tuple nest ( Stream* s, tuple (*head) (tuple), tuple (*key) (tuple), bool* (*eq) (tuple,tuple), bool* (*before_pred) (tuple), bool* (*after_pred) (tuple), bool* (*valid_headp) (tuple), tuple (*merge) (tuple,tuple), tuple zero ) { tuple x = s->next(); s->last_data = NULL; // forget last y while (x->is_valid()) { tuple res = zero; tuple y = x; tuple keyx = (*key)(x); if (keyx->is_valid()) { do { if (*(*before_pred)(y) && *(*valid_headp)(y)) res = (*merge)((*head)(y),res); y = s->next(); } while (y->is_valid() && *(*eq)(keyx,(*key)(y))); } else y = s->next(); tuple z = keyx->concat(res); if (*(*after_pred)(z)) { s->last_data = y; // remember last y so that next s.next()==y return z; }; x = y; }; return invalid_data(); }; // map fnc to all elements of s tuple map ( Stream* s, tuple (*fnc) (tuple), bool* (*pred) (tuple), bool* outer, bool* (*keep) (tuple,tuple) ) { tuple x = s->next(); while (x->is_valid()) { tuple res = x->concat((*fnc)(x)); if (*(*pred)(res)) return res; else if (*outer) return x; x = s->next(); }; return invalid_data(); }; // list, set, bag singleton tuple unit ( Stream* s ) { if (s->is_closed()) return invalid_data(); return s->next(); }; // works as a set union, bag union, and list append tuple merge ( Stream* sx, Stream* sy ) { if (sx->is_closed()) return sy->next(); tuple x = sx->next(); if (x->is_valid()) return x; sx->closed = true; return sy->next(); }; // reduce the input stream by applying head to each element. // The head (and the result of reduce) is a tuple of size 1 tuple reduce ( Stream* s, bool* (*pred) (tuple), tuple (*head) (tuple) ) { tuple x = s->next(); while (x->is_valid()) { if (*(*pred)(x)) return (*head)(x); x = s->next(); }; return invalid_data(); }; sm_sequence* collect_values ( Stream* s, long* element_size, bool* uniquep, bool* (*eq)(const char*,const char*) ) { sm_sequence* result = new sm_sequence(*element_size+sizeof(long)); tuple x = s->next(); while (x->is_valid()) { if ((*x)[0].valuep()) if (*uniquep) result->insert((char*) (*x)[0].value(),*eq); else result->append((char*) (*x)[0].value()); else evaluation_error("expected a value"); x = s->next(); }; return result; }; sm_sequence* collect_references ( Stream* s, bool* uniquep, bool* (*eq)(const char*,const char*) ) { sm_sequence* result = new sm_sequence(sizeof(sm_ref)); tuple x = s->next(); while (x->is_valid()) { if ((*x)[0].referencep()) if (*uniquep) result->insert((char*) (*x)[0].reference(),*eq); else result->append((char*) (*x)[0].reference()); else evaluation_error("expected a reference"); x = s->next(); }; return result; }; // Reduce the stream s into an aggregate value (usually int or bool) // using the accumulator to aggregate values and starting from zero. void* aggregate ( Stream* s, void* zero, void* (*accumulator) (void*,void*) ) { void* res = zero; tuple x = s->next(); while (x->is_valid()) { res = accumulator((*x)[0].value(),res); x = s->next(); }; return res; }; sm_string* import_string ( void* r ) { return new (GC) sm_string(*(raw_string*)r); }; sm_ref* import_ref ( void* r ) { return new (GC) sm_ref(*(raw_ref*)r); }; bool* valid_oid ( raw_ref* r ) { return new (GC) bool(r!=NULL && !(r->oid()==oid_t())); } sm_sequence* import_sequence ( void* r, long* size, void* (*import_element)(void*) ) { sm_sequence* s = new (GC) sm_sequence(*size); sm_sequence rs(*(raw_sequence*)r); int card = rs.cardinality(); s->append(new (GC) char[*size*card],card); for(int i=0; iset(i,(char*)import_element((void*)rs.access(i))); return s; }; sm_sequence* list_merge ( void* x, sm_sequence* y ) { y->append((char*) x); return y; }; sm_sequence* bag_merge ( void* x, sm_sequence* y ) { y->append((char*) x); return y; }; sm_sequence* set_merge ( void* x, sm_sequence* y, bool* (*eq)(const char*,const char*) ) { y->insert((char*) x,eq); return y; }; bool* list_equality ( sm_sequence* x, sm_sequence* y, bool* (*eq)(const char*,const char*) ) { if (x->cardinality() != y->cardinality()) return new (GC) bool(false); int len = x->cardinality(); for(int i = 0; iaccess(i),y->access(i))) return new (GC) bool(false); return new (GC) bool(true); }; bool* bag_equality ( sm_sequence* x, sm_sequence* y, bool* (*eq)(const char*,const char*) ) { if (x->cardinality() != y->cardinality()) return new (GC) bool(false); int len = x->cardinality(); bool flags[len]; for(int i = 0; iaccess(i),y->access(j))) { flags[j] = true; found = true; }; if (!found) return new (GC) bool(false); }; return new (GC) bool(true); }; bool* set_equality ( sm_sequence* x, sm_sequence* y, bool* (*eq)(const char*,const char*) ) { if (x->cardinality() != y->cardinality()) return new (GC) bool(false); int len = x->cardinality(); for(int i = 0; imember(x->access(i),eq)) return new (GC) bool(false); return new (GC) bool(true); }; bool* enum_equality ( void* x, void* y ) { return new (GC) bool(*(short*)x == *(short*)y); }; tuple access_collection_ref ( tuple x, int* loc ) { if ((*x)[0].valuep()) { Sequence< sm_ref >* s = (Sequence< sm_ref >*) (*x)[0].value(); if (*loc >= s->cardinality()) return invalid_data(); return new Tuple(new (GC) sm_ref((*s)[*loc])); } else return invalid_data(); }; tuple access_collection_value ( sm_sequence* s, const int* loc ) { if (*loc >= s->cardinality()) return invalid_data(); return new Tuple((void*) s->access(*loc)); }; void* reduce_list ( void* (*merge)(void*,void*), void* zero, sm_sequence* s ) { void* res = zero; for(int i=s->cardinality()-1; i>=0; i--) res = merge((void*) s->access(i),res); return res; }; tuple empty_tuple () { return new Tuple((int) 0); }; tuple null_tuple () { return new Tuple(); }; bool* null_element ( sm_ref* e ) { return new (GC) bool(e == NULL); }; long* max_merge ( long* x, long* y ) { return new (GC) long((*x>*y) ? *x : *y); }; long* max_zero () { return new (GC) long(0); }; long* min_merge ( long* x, long* y ) { return new (GC) long((*x>*y) ? *y : *x); }; long* min_zero () { return new (GC) long(0); }; long* sum_merge ( long* x, long* y ) { return new (GC) long(*x + *y); }; long* sum_zero () { return new (GC) long(0); }; float* fmax_merge ( float* x, float* y ) { return new (GC) float((*x>*y) ? *x : *y); }; float* fmax_zero () { return new (GC) float(0); }; float* fmin_merge ( float* x, float* y ) { return new (GC) float((*x>*y) ? *y : *x); }; float* fmin_zero () { return new (GC) float(0); }; float* fsum_merge ( float* x, float* y ) { return new (GC) float(*x + *y); }; float* fsum_zero () { return new (GC) float(0); }; bool* all_merge ( bool* x, bool* y ) { return new (GC) bool(*x && *y); }; bool* all_zero () { return new (GC) bool(true); }; bool* some_merge ( bool* x, bool* y ) { return new (GC) bool(*x || *y); }; bool* some_zero () { return new (GC) bool(false); }; sm_string* concat_zero () { return new (GC) sm_string(""); }; sm_string* concat_merge ( sm_string* x, sm_string* y ) { return new sm_string(*x+*y); }; Key* string_to_key ( sm_string* s ) { return new sm_string(fform("%-20.20s",s->scontent())); }; Key* int_to_key ( const long* s ) { return new sm_string(fform("%20d",*s)); }; Key* real_to_key ( const float* s ) { return new sm_string(fform("%20.5f",*s)); }; const char* loid ( sm_ref x ) { if (!x.valid()) return fform("%20d",0); oid_t* z = new oid_t(x.oid()); return fform("%20d",*(int*) z); }; Key* oid_to_key ( const sm_ref* s ) { return new sm_string(loid(*s)); }; Key* bool_to_key ( const bool* s ) { return new sm_string((*s) ? "1" : "0"); }; Key* set_to_key ( Set* v, Key* (*ef) (void*) ) { Key res = sm_string(fform("%5d",v->cardinality())); for(int i = 0; icardinality(); i++) res = res + *(*ef)((*v)[i]); return new sm_string(res); }; Key* bag_to_key ( Bag* v, Key* (*ef) (void*) ) { Key res = sm_string(fform("%5d",v->cardinality())); for(int i = 0; icardinality(); i++) res = res + *(*ef)((*v)[i]); return new sm_string(res); }; Key* list_to_key ( Sequence* v, Key* (*ef) (void*) ) { Key res = sm_string(fform("%5d",v->cardinality())); for(int i = 0; icardinality(); i++) res = res + *(*ef)((*v)[i]); return new sm_string(res); }; bool* key_geq ( Key* x, Key* y ) { return new (GC) bool( *x >= *y ); }; sm_string string_concat ( sm_string x, sm_string y ) { return x+y; }; sm_string* concat ( sm_string* x, sm_string* y ) { return new sm_string(*x+*y); }; sm_string* inverse_string ( sm_string* s ) { int len = s->size(); char* c = (char*) s->content(); for(int i = 0; iconcat(y); }; tuple single_element ( Stream* s ) { tuple x = s->next(); if (!x->is_valid()) evaluation_error("OQL element: collection is empty"); tuple y = s->next(); if (y->is_valid()) evaluation_error("OQL element: collection contains more than one element"); return x; }; tuple first_element ( Stream* s ) { if (s->kept) return invalid_data(); tuple x = s->next(); if (!x->is_valid()) evaluation_error("OQL first: sequence is empty"); s->kept = x; return x; }; tuple last_element ( Stream* s ) { tuple x = NULL; tuple y; do { y = x; x = s->next(); } while(x->is_valid()); if (y == NULL) evaluation_error("OQL last: sequence is empty"); return y; }; tuple nth_element ( Stream* s, long* n ) { tuple x = s->next(); for(long i = 0; i<*n && x->is_valid(); i++) x = s->next(); if (!x->is_valid()) evaluation_error("OQL vector access: sequence does not have enough elements"); s->close(); return x; }; bool* string_match ( sm_string* pattern, sm_string* str ) { return new (GC) bool(fnmatch(pattern->scontent(),str->scontent(),0) == 0); }; long* int_cmp ( long* x, long* y ) { return new (GC) long((*x>*y) ? 1 : ((*x<*y) ? -1 : 0)); }; long* float_cmp ( float* x, float* y ) { return new (GC) long((*x>*y) ? 1 : ((*x<*y) ? -1 : 0)); }; long* string_cmp ( sm_string* x, sm_string* y ) { return new (GC) long(x->strcmp(*y)); }; long* bool_cmp ( bool* x, bool* y ) { return new (GC) long((*x>*y) ? 1 : ((*x<*y) ? -1 : 0)); }; long* oid_cmp ( sm_ref* x, sm_ref* y ) { return new (GC) long(x->oid().compare(y->oid())); }; long* merge_cmp ( long* x, long* y ) { if (*x != 0) return x; else return y; }; bool oid_eq ( sm_ref x, sm_ref y ) { return x.oid().compare(y.oid()) == 0; }; void sequence_insert ( void* e, sm_sequence* s ) { s->insert((char*)e); }; void sequence_remove ( void* e, sm_sequence* s ) { s->remove((char*)e); }; /* void relationship_insert ( sm_ref* e, Sequence* s ) { s->insert(*e,oid_eq); }; void relationship_remove ( sm_ref* e, Sequence* s ) { s->remove(*e,oid_eq); }; */ /* All evaluation functions are stored in a table so they can be used by the interpreter */ const int function_table_size = 1000; const int function_hash_table_size = 1000; int function_table_top = 0; typedef struct { char* name; void* address; short args; short fncs; } function_table_element; struct function_hash_table_element { char* name; int loc; function_hash_table_element* next; }; function_table_element function_table[function_table_size]; function_hash_table_element* function_hash_table[function_hash_table_size]; unsigned long hash_string ( const char* s ) { unsigned long res = 0; for(int i = strlen(s)-1; i>=0; i--) res = (res<<3) | (s[i]); return res; }; // return the location of the function in the memo table (-1 if not found) int find_function ( const char* function_name ) { int loc = hash_string(function_name) % function_hash_table_size; for(function_hash_table_element* r = function_hash_table[loc]; r!=NULL; r=r->next) if (strcmp(r->name,function_name) == 0) return r->loc; return -1; }; // store a function so it can be be used by the interpreter // and return its location in the memo table int store_function ( const char* function_name, const void* function_address, const short num_of_all_args, const short num_of_functional_args ) { int loc = find_function(function_name); if (loc>=0) return loc; function_table[function_table_top].name = new (GC) char[strlen(function_name)+1]; strcpy(function_table[function_table_top].name,function_name); function_table[function_table_top].address = (void*) function_address; function_table[function_table_top].args = num_of_all_args; function_table[function_table_top].fncs = num_of_functional_args; loc = hash_string(function_name) % function_hash_table_size; function_hash_table_element* e = new (GC) function_hash_table_element; e->name = new (GC) char[strlen(function_name)+1]; strcpy(e->name,function_name); e->loc = function_table_top; e->next = function_hash_table[loc]; function_hash_table[loc] = e; return function_table_top++; }; void initialize_stored_functions () { for(int i = 0; i