/********************************************************************************* * * The evaluator interface for ODMG OQL based on SHORE SDL * Copyright (c) 1999-2003 by Leonidas Fegaras, the University of Texas at * Arlington. All rights reserved. * Programmer: Leonidas Fegaras * Date: 9/27/98 * ********************************************************************************/ #ifndef __eval_h__ #define __eval_h__ /* Results between evaluation operations are passed lazily (in a stream-based manner). * A stream item is a Tuple object, which is a vector of Elements. An Element is a * sm_ref* (a reference to an object), or void* (a pointer to a value), or null. */ class Element: public gc { private: enum { Reference, Value, Null } tag; union { sm_ref* reference; // a reference to an object void* value; // a pointer to a value } info; public: // constructors Element () { tag = Null; }; Element ( sm_ref* e ) { tag = Reference; info.reference = e; }; Element ( void* e ) { tag = Value; info.value = e; }; // element equality bool operator== ( const Element e ) const; bool operator!= ( const Element e ) const { return !(*this == e); }; // testing the type of the element inline bool referencep () const { return tag == Reference; }; inline bool valuep () const { return tag == Value; }; inline bool nullp () const { return tag == Null; }; // retrieve the reference or the value sm_ref* reference() const; void* value() const; }; class Tuple: public gc { private: Sequence v; // a vector of Elements public: // construct a Tuple of n elements Tuple ( const int n ); // construct a Tuple with one null element Tuple (); // construct a Tuple with only one element x Tuple ( Element x ) { v.append(x); }; Tuple ( sm_ref* x ) { v.append(Element(x)); }; Tuple ( void* x ) { v.append(Element(x)); } // equality bool operator== ( Tuple x ); // setting and accessing Tuples Element operator[] ( const size_t i ) { return v[i]; }; friend void* tuple_value ( Tuple* x, const size_t i ) { return (*x)[i].value(); }; friend sm_ref* tuple_ref ( Tuple* x, const size_t i ) { return (*x)[i].reference(); }; void set ( const size_t i, Element x ) { v.set(i,x); }; size_t size () const { return v.cardinality(); }; const char* content () const { return v.sequence(); }; // when something wrong, return an invalid Tuple (vsize=0) bool is_valid () const { return v.valid() && size()>0; }; bool is_invalid () const { return !is_valid(); }; friend Tuple* invalid_data (); // concatenate two Tuples Tuple* concat ( Tuple* x ); friend bool* referencep ( Tuple* x, long* n ) { return new (GC) bool((*x)[*n].referencep()); }; friend bool* valuep ( Tuple* x, long* n ) { return new (GC) bool((*x)[*n].valuep()); }; }; typedef Tuple* tuple; extern tuple global_tuple; /* Lazy streams are captured as suspended functions; every time you * want to access a stream element (i.e. a Tuple), you call its suspended. * function. When you get invalid_data, you reached end-of-stream. * The interpreter needs a closure to capture a lambda abstraction * instead of a function */ typedef tuple (*suspended_function) (); typedef struct { void* body; void* X; void* Y; void* environment; } lambda_closure; /* A search key is a string (used in indexes, sorting, etc) */ typedef sm_string Key; /* A stream can be suspended, materialized, indexed, nested, or residual. * If suspended, it uses a suspended function; if materialized, * it scans a pool; if indexed, it scans an index (a B+tree); if * nested, it scans the inner collection of a nested collection; if * residual, it scans a vector of residual elements in memory. * The flag is_transient indicates whether the materialized stream is * transient (in which case it is garbage-collected at the end), or * permanent (such as a class extent). Some operations always materialize * streams implicitly; e.g. nested_loop materializes the inner * stream. Most evaluation operators return a Tuple, not a stream. For * example, when nested_loop finds a tuple that satisfies the join * predicate, it immediately returns this tuple; later, when you call * this operator again it retrieves the next qualified tuple. This * stream-based evaluation avoids materialization of intermediate * results when possible. */ class Stream: public gc { private: enum { materialized, indexed, suspended, nested, residual, sorted } tag; bool closed; // is the stream opened or closed? Tuple* last_data; // used in joins to remember the last Tuple of the outer stream Tuple* kept; // used in outer joins to remember the last tuple to be kept // (to be padded with nulls if necessary) bool found; // used in outer-joins and outer-unnests Tuple* prefix; // every stream tuple is prefixed by this tuple int location; // element location in a collection (used during unnesting) union { struct{ bool is_transient; // is the materialized stream transient or permanent? char* name; // name of permanent stream sm_file* pool; // the pool asociated with a materialized stream sm_file_scan* scan; // a scan to access the materialized pool } mat; // materialized stream struct{ bool is_temporary; // temporary or permanent? sm_index* index; // the index (a B+tree) sm_index_scan* indexscan; // a scan for the index Key* low; // lowest search key value Key* high; // highest search key value } ind; // indexed stream struct{ sm_sort_scan* sort_scan; } sort; // sorted stream (not used yet) struct{ tuple (*nth) (tuple,int*); lambda_closure* closure; // used by the iterpreter to suspend function nth int location; // element location in the inner collection tuple env; } nested; // scanning a nested collection struct{ tuple* data; // vector with the residual tuples bool* joinp; // vector of flags: true if the corresponding tuple // has already been joined (for outer join) int current; // location of the current tuple being read int size; // number of tuples in data int current_joinp; // location of the current flag being read } res; // a stream of residual tuples struct{ suspended_function suspended_source; lambda_closure* closure; // used by the iterpreter to suspend code } lazy; // lazy (suspended) stream } info; public: Stream () {}; // construct a materialized stream and open it Stream ( const char* extent_name ); friend Stream* materialized_stream ( sm_string* extent_name, tuple x ); // construct an indexed stream to access values of keys between low and high and open it Stream ( sm_index* index, const bool is_temporary, Key* low = NULL, Key* high = NULL ); friend Stream* indexed_stream ( sm_index* index, const bool* is_temporary, Key* low, Key* high, tuple x ); // construct a lazy (suspended) stream Stream ( suspended_function f ); friend Stream* suspended_stream ( suspended_function f ) { return new Stream(f); }; void set_closure ( lambda_closure* c ) { info.lazy.closure = c; }; // construct a residual stream of size n Stream ( const int n ); friend Stream* residual_buffer ( int* n, tuple x ); // construct a residual stream with one element e Stream ( Element e ); friend Stream* residual_value_stream ( void* e ) { return new Stream(Element(e)); }; friend Stream* residual_ref_stream ( sm_ref* e ) { return new Stream(Element(e)); }; // construct a stream to scan a nested collection friend Stream* nested_collection_stream ( tuple (*nth) (tuple x,int* i), tuple env ); friend Stream* nested_collection_stream_with_closure ( lambda_closure* closure, tuple env ); Stream ( sm_sort_scan* sort_scan) { tag = sorted; info.sort.sort_scan = sort_scan; }; ~Stream (); bool is_closed () const { return closed; } // open the stream void open (); friend void reset_stream ( Stream* s ) { s->open(); }; // close the stream void close (); void set_prefix ( Tuple* x ) { prefix = x; }; // get the next element of the stream tuple next (); // materialize a suspended stream into a transient pool Stream* materialize (); friend Stream* materialize_tuples ( tuple (*next_tuple)() ); friend Stream* materialize_stream ( Stream* s ); // residualize the stream s into memory void residualize ( Stream* s ); // sequential table scan over the stream s using pred to filter out elements friend tuple table_scan ( Stream* s, bool* (*pred) (tuple) ); // index scan over the indexed stream s friend tuple index_scan ( Stream* s, bool* (*pred) (tuple) ); // sort the stream s using a tuple comparison friend Stream* sort_stream ( Stream* s, long* (*tuple_compare) (tuple,tuple) ); friend Stream* sort_scan_stream ( Stream* s, Key* (*key) (tuple), bool* uniquep ); // nested-loop join between sx and sy using the join predicate pred; // if outer=true, it's a left-outer join; in that case the equality function, // keep, determines which data to keep from the outer stream in outerjoin // (and padded with nulls if no matched tuples are found in the outer) friend tuple nested_loop ( Stream* sx, Stream* sy, bool* (*pred) (tuple,tuple), bool* outer, bool* (*keep) (tuple,tuple) ); // block nested-loop join between sx and sy using the join predicate pred; // rx is a residual stream holding tuples from the outer stream sx friend tuple block_nested_loop ( Stream* sx, Stream* rx, Stream* sy, bool* (*pred) (tuple,tuple), bool* outer, bool* (*keep) (tuple,tuple) ); // index nested-loop join using an index key and the join predicate pred friend tuple indexed_nested_loop ( Stream* sx, Stream* sy, Key* (*key) (tuple), bool* (*pred) (tuple,tuple), bool* outer, bool* (keep) (tuple,tuple) ); // merge streams using geq (ie. >=); it assumes that streams are sorted and the sort key // is a key for at least one of the input streams (determined by left_key) friend tuple merge_join ( Stream* sx, Stream* sy, bool* (*pred) (tuple,tuple), bool* (*geq) (tuple,tuple), bool* left_key ); // map fnc to all elements of s friend tuple map ( Stream* s, tuple (*fnc) (tuple), bool* (*pred) (tuple), bool* outer, bool* (*keep) (tuple,tuple) ); // unnest s using path (if outer=true, it's an outer-unnest); // path(x,i) gives the ith element of the nested collection in x friend tuple unnest ( Stream* s, tuple (*path) (tuple,int*), bool* (*pred) (tuple), bool* outer, bool* (*nullp) (tuple), bool* (*keep) (tuple,tuple) ); // nest (group-by) the stream s using the key to get the group-by value; // it assumes that all tuples in s with the same key value are consecutive in s. // It merges each group (which contains all values with the same key) using // the merge function and zero as the initial value and appends the group to the key friend tuple nest ( Stream* s, tuple (*head) (tuple), tuple (*key) (tuple), bool* (*eq) (tuple,tuple), bool* (*before_pred) (tuple), bool* (*after_pred) (tuple), bool* (*valid_headp) (tuple), tuple (*merge) (tuple,tuple), tuple zero ); // reduce the input stream by applying head to each element. // The head (and the result of reduce) is a tuple of size 1 friend tuple reduce ( Stream* s, bool* (*pred) (tuple), tuple (*head) (tuple) ); // various collection-based operations (element, first, last, []) friend tuple single_element ( Stream* s ); friend tuple first_element ( Stream* s ); friend tuple last_element ( Stream* s ); friend tuple nth_element ( Stream* s, long* n ); // access the n'th collection element of x[0] (a pointer to a collection) friend tuple access_collection_ref ( tuple x, int* n ); // list, set, bag singleton friend tuple unit ( Stream* s ); // works like a set union, bag union, or list append friend tuple merge ( Stream* sx, Stream* sy ); }; /* A stack of streams used by the current query */ extern Stream*** returned_stream; void allocate_streams ( int sn ); int pop_streams (); void new_stream ( Stream* s ); Stream* nth_stream ( int sn ); // An evaluation error is a fatal error that aborts the current // transaction and exits the program void evaluation_error ( const char* s ); void cleanup_temp_files (); sm_file* find_extent ( const char* extent_name ); sm_ref* new_persistent_object ( sm_string* extent, long* size ); sm_ref* new_transient_object ( long* size ); // create a new permanent index with a given name sm_index* create_index ( const char* name, const bool unique ); // find a named index sm_index* find_index ( sm_string* index_name ); // create an index with name index_name for the relation void create_pool_index ( const char* relation, const char* extent, Key* (key) (tuple), const char* index_name, const bool unique ); void insert_into_index ( sm_string* index_name, Key* key, sm_ref* object ); void remove_from_index ( sm_string* index_name, Key* key, sm_ref* object ); // remove the transient pools of a query void remove_query_transient_pools (); // remove all transient pools of a transactions (should be called once per process) void odmg_cleanup (); // must be called once before the first query execution void odmg_initialize (); sm_sequence* collect_values ( Stream* s, long* element_size, bool* uniquep, bool* (*eq)(const char*,const char*) ); sm_sequence* collect_references ( Stream* s, bool* uniquep, bool* (*eq)(const char*,const char*) ); // Reduce the stream s into an aggregate value (usually int or bool) // using the accumulator to aggregate values and starting from zero. void* aggregate ( Stream* s, void* zero, void* (*accumulator) (void*,void*) ); sm_sequence* list_merge ( void* x, sm_sequence* y ); sm_sequence* bag_merge ( void* x, sm_sequence* y ); sm_sequence* set_merge ( void* x, sm_sequence* y, bool* (*eq)(const char*,const char*) ); bool* list_equality ( sm_sequence* x, sm_sequence* y, bool* (*eq)(const char*,const char*) ); bool* bag_equality ( sm_sequence* x, sm_sequence* y, bool* (*eq)(const char*,const char*) ); bool* set_equality ( sm_sequence* x, sm_sequence* y, bool* (*eq)(const char*,const char*) ); bool* enum_equality ( void* x, void* y ); tuple access_collection_value ( sm_sequence* s, const int* loc ); void* reduce_list ( void* (*merge)(void*,void*), void* zero, sm_sequence* s ); sm_string* import_string ( void* r ); sm_ref* import_ref ( void* r ); sm_sequence* import_sequence ( void* r, long* size, void* (*import_element)(void*) ); // store a function so it can be be used by the interpreter // and return its location in the memo table int store_function ( const char* function_name, const void* function_address, const short num_of_all_args, const short num_of_functional_args ); // return the location of the function in the memo table (-1 if not found) int find_function ( const char* function_name ); bool lambda_or ( bool x, bool y ); bool lambda_and ( bool x, bool y ); double abs ( double x ); long mod ( long x, long y ); // merges and zeros for various monoids long* max_merge ( long* x, long* y ); long* max_zero (); long* min_merge ( long* x, long* y ); long* min_zero (); long* sum_merge ( long* x, long* y ); long* sum_zero (); float* fmax_merge ( float* x, float* y ); float* fmax_zero (); float* fmin_merge ( float* x, float* y ); float* fmin_zero (); float* fsum_merge ( float* x, float* y ); float* fsum_zero (); bool* all_merge ( bool* x, bool* y ); bool* all_zero (); bool* some_merge ( bool* x, bool* y ); bool* some_zero (); sm_string* concat_merge ( sm_string* x, sm_string* y ); sm_string* concat_zero (); tuple empty_tuple (); tuple null_tuple (); bool* null_element ( sm_ref* e ); Key* concat_keys ( Key* x, Key* y ); tuple concat_tuples ( tuple x, tuple y ); sm_string string_concat ( sm_string x, sm_string y ); sm_string* concat ( sm_string* x, sm_string* y ); bool* string_match ( sm_string* pattern, sm_string* str ); sm_string* inverse_string ( sm_string* s ); /* compare various types */ long* int_cmp ( long* x, long* y ); long* float_cmp ( float* x, float* y ); long* string_cmp ( sm_string* x, sm_string* y ); long* bool_cmp ( bool* x, bool* y ); long* oid_cmp ( sm_ref* x, sm_ref* y ); bool* valid_oid ( raw_ref* r ); /* string comparisons */ long* merge_cmp ( long* x, long* y ); int string_compare ( sm_string x, sm_string y ); bool string_eq ( sm_string x, sm_string y ); bool string_neq ( sm_string x, sm_string y ); bool string_gt ( sm_string x, sm_string y ); bool string_lt ( sm_string x, sm_string y ); bool string_geq ( sm_string x, sm_string y ); bool string_leq ( sm_string x, sm_string y ); /* B-tree/sort key operations */ Key* string_to_key ( sm_string* s ); Key* int_to_key ( const long* s ); Key* real_to_key ( const float* s ); Key* bool_to_key ( const bool* s ); Key* oid_to_key ( const sm_ref* s ); bool* key_geq ( Key* x, Key* y ); Key* set_to_key ( Set* v, Key* (*ef) (void*) ); Key* bag_to_key ( Bag* v, Key* (*ef) (void*) ); Key* list_to_key ( Sequence* v, Key* (*ef) (void*) ); /* sequence updates */ void sequence_insert ( void* e, sm_sequence* s ); void sequence_remove ( void* e, sm_sequence* s ); // This is used when the result of a query is a collection of type T; // uses a lazy stream to access each T value (the result) of a query. // Improvement: use hashing to remove duplicates template const bool access ( Stream* s, T &result, const bool remove_duplicates ) { static Set< T > *elements = new Set< T >; tuple x = s->next(); while (x->is_valid()) { if (!(*x)[0].nullp()) { result = T(); result = *((T*) (*x)[0].value()); if (!remove_duplicates) return true; if (!elements->member(result)) { elements->insert(result); return true; }; }; x = s->next(); }; delete elements; elements = new Set< T >; return false; }; #endif