现在的位置: 首页 > 综合 > 正文

Interfacing to kdb+ from Java

2014年11月25日 ⁄ 综合 ⁄ 共 31760字 ⁄ 字号 评论关闭

http://www.sinv.de/slog/?p=23

Retrieving data from kdb+ into Java is extremely simple.

The java driver for kdb+ resides in one file, c.java, and can be downloaded from www.kx.com/q/c. Don’t be put off when reading what is inside c.java - it is actually very straightforward. Most of the code is to serialize java objects into kdb+ objects, and
vice versa. You can stick to the public methods, like the constructor, k, ks and close. In fact you need only be aware of 3 functions to query a server!

Essentially to query a server, one needs to open a connection, send a query, and then receive a response. We’ll go through that here:

public void test() {
c connection= null;
try
{
// open a connection using hostname, port, user and password
connection=new c(”localhost”,5001,”cskelton:xyz”);

// send a blocking query
Object result=connection.k(”fn”,new Integer(10),new Integer(20));

if( result instanceof Integer){
System.out.println( “Result is “+((Integer)result).intValue());
}
else {
System.err.println( “Unexpected result type received”);
}
}
catch( c.KException e){
System.err.println( “Server Reported Error: “+e.toString());
}
catch( java.io.IOException e){
System.err.println( “IOException: ” + e.getMessage());
}
finally{
if( connection != null)
{
try{
connection.close();
}
catch(java.io.IOException e){}
}
}
}

The above code opens a connection to the server, and invokes a function called fn, passing two parameters to it (2 integers, of values 10 and 20). The result from that call is stored in an Object result, which we then interrogate to see what type it is -
we expect an Integer object, and if that’s what we receive, we print it and close the connection. On the server side we should create a suitable function to call, something like

fn:{x+y}

c.java only throws 2 types of checked exceptions - c.KException and java.io.IOException. KException is to indicate that the server itself has reported an error, and IOException is used to indicate a problem with the underlying network.

In the event of no errors or exceptions we should receive the Integer 30 as a result. If we cannot connect to the server, we print a message as

IOException: Connection refused

and if no function called fn exists on the server we would see

Server Reported Error: c$KException: fn

There are a few other errors that can happen, e.g. if authentication fails.

The methods k and ks inside c.java are for blocking and non-blocking queries. Typically you would use k(…) for your usual queries that you expect an immediate result from. If you are sending data to a kdb+ process, and do not need a confirmation that it
has been processed you could use ks(…) instead.

 

 

====

http://www.kx.com/q/d/kdb+.htm

Copyright © kx.com

Abridged Kdb+ Database Manual

Arthur Whitney

1 Summary

Kdb+ is an RDBMS and a general purpose programming language:
http://kx.com/q/d/q.htm

* OLTP from 100 thousand to 1 million records per second per cpu.

* OLAP from 1 million to 100 million records per second per cpu.

* Solaris, Linux and Windows. 32bit and 64bit.

trade:([]time:`time$();sym:`symbol$();price:`float$();size:`int$()) / create`trade insert(12:34:56.789;`xx;93.5;300)                            / insertselect sum size by sym from trade                                   / select

The realtime OLTP+OLAP part of the database is generally in memory with an optional update log. There is no limit to the size of the OLAP historical databases on disk. Typical customers run realtime analysis on 100 million transactions per day and historical
analysis on the accumulated billions of records.

2 Install

Unzip kdb+ (and k4.lic ) in QHOME (default: q ).

SOL64> unzip s64.zip -d q                (q/s64/q)LIN64> unzip l64.zip -d q                (q/l64/q)WIN32> w32.exe            (WINDIR/q.exe)          

Executable is q . Put QHOME/{s64|l64} on PATH.

3 Server

>q [f] [-p p] [-s s] [-t t] [-T T] [-o o] [-u u] [-w w] [-r r] [-l] f load script (*.q), file or directory                            -p port kdbc(/jdbc/odbc) http(html xml txt csv)                    -s slaves for parallel execution                                   -t timer milliseconds                                              -T timeout seconds                                                 -o offset hours(from GMT: affects .z.Z)                            -u usr:pwd file (-U allows file operations)                        -w workspace MB limit                                              -r replicate from :host:port                                       -l log updates to filesystem (-L sync)                             


-z [0] "D"$ uses mm/dd/yyyy or dd/mm/yyyy-P [7] printdigits(0:all)                -W [2] week offset(0:sat)                

The simplest database is just a process which may be initialized with a script. Each process handles any number of concurrent http and kdbc (jdbc/odbc) clients. For example, start a trade.q database listening on port 5001.

>q trade.q -p 5001

View the database with a browser: http://localhost:5001

4 Client

Kdb+ returns text (html, txt, csv or xml) to http queries.

browser                                                 http://localhost:5001/?select sum size by sym from trade


browser/excel                                                http://localhost:5001/q.csv?select sum size by sym from trade


excel/rtd                          http://www.skelton.de/rtdserver.htm


excel (data/getexternaldata/newwebquery)                     http://localhost:5001/q.txt?select sum size by sym from trade


console (q/l32/qcon localhost:5001)             localhost:5001>select sum size by sym from trade


perl (use LWP::Simple;)                                            get("http://localhost:5001/.txt?select sum size by sym from trade")

Kdb+ returns data to java, .net, c, jdbc/odbc or q clients. The results are atom, list, dict or table.

Result=k(string | {function,args})


java (q/c/c.java)                                                                                 import java.sql.*;                                                                                try{c c=new c("localhost",5001);                    // connect                                     Object[]x={new Time(System.currentTimeMillis()%86400000),"xx",new Double(93.5),new Integer(300)}; c.k("insert","trade",x);                           // insert                                      Object r=c.k("select sum size by sym from trade"); // select                                     }catch(Exception e){}                                                                             


c    (q/c/c.c)                                                          #include"k.h"                                                           int main(){K x;int c=khp("localhost",5001);if(c==-1)return c; // connect x=knk(4,kt(1000*(time(0)%86400)),ks("xx"),kf(93.5),ki(300));            k(-c,"insert",ks("trade"),x,0);                              // insert  x=k(c,"select sum size by sym from trade",0);r0(x);          // select  return 0;}                                                             


q                                                   c:hopen`:localhost:5001                    / connectc("insert";`trade;("t"$.z.Z;`xx;93.5;300)) / insert c"select sum size by sym from trade"       / select 

Data is faster and more flexible than text. You should get 100,000 single record selects, inserts, updates [and deletes] per second. You can insert or update a million records per second in bulk. Kdb+ is generally faster than the network.

5 Datatypes

t| size literal      q        sql       java      .net     xmlschema ---------------------------------------------------------------------b  1    0b           boolean            Boolean   boolean  boolean   x  1    0x0          byte               Byte      byte     byte      h  2    0h           short    smallint  Short     int16    short     i  4    0            int      int       Integer   int32    int       j  8    0j           long     bigint    Long      int64    long      e  4    0e           real     real      Float     single   single    f  8    0.0          float    float     Double    double   double    c  1    " "          char               Character char               s  .    `            symbol   varchar   String    string   string    m  4    2000.01m     month                                           d  4    2000.01.01   date     date      Date               date      z  8    dateTtime    datetime timestamp Timestamp DateTime dateTime  u  4    00:00        minute                                          v  4    00:00:00     second                                          t  4    00:00:00.000 time     time      Time      TimeSpan time      *  4    `s$`         enum                                            

All but boolean and byte have nulls. The int, float, char and symbol literal nulls are:
0N 0n " " ` . The rest use type extensions, e.g.
0Nd
. Each type also a list notation, e.g. in the following nested list:

(01b;0x00ff;0 1h;0 1;0 1j;0 1e;0 1.0;"ab";`a`b;2000.01 2000.02m;2000.01.01 2000.01.02;..)

There must be no spaces in symbol lists, e.g. `a`b`c . Symbols can have any non-zero characters (e.g. `$"a-b") but identifiers must be alphanumeric.

6 Insert and Upsert

The q

`t insert ..

is similar to the sql

insert into t ..

It will check primary key violations. The q

`t upsert ..  (also t,:..)

is insert for tables and upsert (update existing and insert new) for keyed tables.

7 Select and Update

q is an extension of sql capabilities. Expressions are short and simple.

select vwap:size wavg price by sym from trade     / volume weighted average priceselect from trade where sym=`IBM, 0<deltas price  / price went up                select sum qty by order.customer.nation from item / rollup of 4-way join         

These all run at several million records per second. In general:

select [a] [by b] from t [where c]update [a] [by b] from t [where c]

These are similar to (but more powerful than) the sql

select [b,] [a] from t [where c] [group by b order by b]update t set [a] [where c]                              

In sql the where and group clauses are atomic and the
select and update clauses are atomic or aggregate if grouping. In q the
where and by clauses are uniform and the
select and update clauses are uniform or aggregate if grouping (
by ). All clauses execute on the columns and therefore q can take advantage of order. Sql can't tell the difference. Sql repeats the
group by expressions in the select and the
where clause is one boolean expression. The q where clause is a cascading list of constraints which nicely obviates some complex sql correlated subqueries and also gets rid of some parentheses. q relational
queries are generally half the size of the corresponding sql. Ordered and functional queries do things that are difficult in sql. See
http://kx.com/q/e for examples.
i is a special token that indicates record handle.

8 Default Column Names

Create, select and update will create names from expressions if there is no assignment. The name is last token (first token if operation is +, -, *, %, & or |) in the expression or
x if this token isn't a valid name, e.g.

select count i,sum qty by order.customer.nation ..

is short for

select x:count i,qty:sum qty by nation:order.customer.nation ..

9 Parameterized Queries

String[]x={"IBM","MSFT"};                                          r=c.k("{[s]select last price by sym from trade where sym in s}",x);

10 Table Arithmetic

t:([x:`a`b]y:2 3)        u:([x:`b`c]y:4 5)        t+u / ([x:`a`b`c]y:2 7 5)

It is common to want to do arithmetic with tables. Extending arithmetic to tables replaces complicated sql coalescing and outer joins.

11 Foreign Keys

Foreign keys are useful in q, e.g. given

s:([s]name;city;..)                                p:([p]name;city;..)                                sp:([]s:`s$();p:`p$();..) / foreign key definitions

then

select .. from sp where s.city=p.city / supplier city equal part cityupdate .. from sp where s.city=p.city / try that in sql              

See http://kx.com/q/e/tpcd.q for more examples.

12 Joins

Joins generally run at 10 to 100 million records per second.

See http://kx.com/q/e/tpcd.txt for an 8-way join

13 Q from SQL

Following is a map of how to do sql things in q. There is no reverse map since q is a much richer language than sql. With respect to CJDate "The SQL Standard"

s)create table s(s varchar(*)primary key,name varchar(*),status int,city varchar(*))q)s:([s:`symbol$()]name:`symbol$();status:`int$();city:`symbol$())                  


s)create table sp(s varchar(*)references s,p varchar(*)references p,qty int)q)sp:([]s:`s$();p:`p$();qty:`int$())                                        


s)insert into s values('s1','smith',20,'london')q)`s insert(`s1;`smith;20;`london)              


s)update s set status=status+1 where s='s1'q)update status:status+1 from`s where s=`s1


s)delete from s where s='s1'  q)delete from`s where s=`s1                                 s)select * from s where s='s1'q)select from s where s=`s1   


s)select s,count(*)as x from sp group by s order by sq)select count i by s from sp                        


s)select s,sum(qty)as qty from sp,s,p where sp.s=s.s and sp.p=p.p and p.city=s.city group s order sq)select sum qty by s from sp where s.city=p.city                                                                                                                                                     s)select distinct ..                                                                               q)select distinct ..                                                                                                                                                                                  s)count sum min max avg                                                                            q)count sum min max avg prd first last wavg wsum ..                                                


s)+ - * / < > = <= >= <> like between-and not and or   q)+ - * % < > = <= >= <> like within      not &   |  ..

14 Stored Procedures

Server functions and procedures are written in q. Why not java? q is faster, smaller and better than java. For example, a server function to subtract a client table p from a server table P is
f:{[p]P-:p}

15 Rename, Rearrange

xcol  [p]rename    `a`b xcol sp xcols [p]rearrange `p`s xcols sp

16 Correlated Subquery

(aggr;exp)fby exp

obviates common correlated subqueries, e.g.

select from sp where qty>(avg;qty)fby p      select from sp where s=`s1,qty>(avg;qty)fby p

see also http://kx.com/q/e/tpcd.q

17 Performance

Choose layout and optimizations to fit the important queries:

`s sorted `u unique `p parted `g grouped


don't bother with any on fewer than one million rows.don't bother with `p  on fewer than one billion rows.

think about the queries. think about disk. e.g. if you want fast access on terabytes by time and customer store it both ways.

18 Layout

Tables are:

size| small medium    large     --------------------------------32bit 1GB   32M rows  unlimited 64bit 10GB  512M rows unlimited store file  directory partitionsquery .01ms .3ms      10ms(seek)      tick            taq       

large are usually sorted and parted so that important queries are one seek.

64bit can generally handle realtime tables around 10 times bigger than 32bit.

64bit has an advantage on medium (and large) because queries on 32bit are dominated by mapping.

19 Small

http://kx.com/q/tick

One day's worth of tick(trades, quotes, orders, ..) is a few GB and fits in RAM.

Queries are around 10 microseconds.

20 Medium

t `s#date,`g#mas, ..

dayend append (and `g#mas) is fast.

once data is cached in memory

\t select i from t where date=..\t select i from t where mas=.. 

Queries are around 1 millisecond.

21 Large

http://kx.com/q/taq

trade `p#symquote `p#sym

Queries run from disk at 10ms per date/sym/field.

The partition field is date month year or int and is virtual.

Kdb+taq is more than 2500 partitions of daily trades and quotes. Each new day is more than 1GB. [Queries on partitioned databases can run in parallel.] To set a subview -- handy for test and development:

.Q.view 2#date / view two first partitions.Q.view[]      / reset                    

22 Limits

Each database runs in memory and/or disk map-on-demand -- possibly partitioned. There is no limit on the size of a partitioned database but on 32-bit systems the main memory OLTP portion of a database is limited to about 1GB of raw data, i.e. 1/4 of the
address space. The raw data of a main memory 64bit process should be limited to about 1/2 of available RAM.

23 Partition

A kdb+ historical database is a file system directory. Small tables are simply binary files inside this directory. Medium-sized tables (<100 million rows) are splayed (a file for each column) across a subdirectory. Big tables (billions of rows) are horizontally
partitioned (often by date) and then splayed across a directory. An example layout for kdb+taq is:

/data/db/                     sym  /enumeration of symbols mas  /master table           2005.03.15/                   trade/time ..                quote/time ..               2005.03.16/                   trade/time ..                quote/time ..               ..                          

Kdb+ databases run 24*7. After a new date is added a simple reset message is sent to the server. Small tables are in memory. The big table columns are mapped in and out on demand. To avoid swapping RAM should be at least 10* size of the largest column, e.g.
if one table in one partition can have 100 million rows there should be 8GB (10*8(byte float)*100,000,000) of RAM. [Address usage is 20* size of the largest column so on 32bit systems tables shouldn't have more than 16.7 million rows.] The number of partitions,
tables and columns doesn't matter. Some customers have 1000's of partitions. Some have 1000's of tables. Some have 1000's of columns in the tables.

Tables with many columns are good splayed table candidates because the bulk of these tables will always be out of memory (queries rarely require access to more than a few columns at a time). The performance characteristics of splayed tables are independent
of the number of columns in the table. They depend only on the number of columns that must be in memory at any one time, which in turn depends on the specific queries being executed. Columns of Kdb+ splayed tables are stored contiguously on disk and laid out
contiguously in real memory when accessed. This organization gives optimal disk access performance. The operating system caches accessed columns in real memory. Consequently, if the real memory hit-rate is high, the performance is real memory performance.
(This is often the case because many database applications have a few heavily used queries that reference the same few columns). There can be one or many instances of the database. All instances point to the same data. All processes will reside on the server
including a gateway should one be required. The system can be implemented with or without a gateway process. In a system without a gateway, users connect directly to a database instance and send queries, with a gateway the users connect and send queries to
the gateway process which allocates the query based on which instance is free. This can be handy for running more than one query at a time but overall throughput can of course suffer to the extent that the queries are competing for disk. But this is a way
to let an important user get something quick while some bigger analysis is going on in another instance.

24 Parallel

Partitioned databases can run in parallel. But if we run in parallel it is best to have parallel i/o. Each kdb+ process can easily consume 100MB per second. Therefore the fastest multi-terabyte kdb+ databases use parallel i/o with guaranteed no contention.
Customers do this with DAS (direct attached storage). U320 scsi controllers are fast and inexpensive. Customers sometimes use SAN (200MB/sec total) or NAS (80MB/sec total) because the IT departments insist on it. That is ok. But this is the limiting factor.
If you use NAS be sure to have at least 1Gbit ethernet.

If you use NAS there is no point in going parallel -- i.e. one kdb+ process can easily consume everything the filesystem throws at it. 4 slaves will saturate SAN. With DAS (direct attached storage) we can have one slave per disk array and ideally at least
2 disk arrays per cpu core.

If all queries access just one partition/date then there is no point in going parallel. If you have DAS and have queries that span more than one partition then the best setup is to have one or more disk arrays per slave process (thus no contention) with
approximately 2 slaves processes per cpu. (one is crunching while the other is reading). But be sure to have enough RAM per process so that there is no swapping, e.g. a machine with 4cpu/32GBram/4controllers(2channels each)/8diskarray(14*146GB drives each)
can have 8 slave processes on 50 million row (per partition) tables and 4 slave processes on 100 million row (per partition) tables. The o/s independent way to allocate partitions to different disk-arrays is to use a file (par.txt) that lists the different
directories, e.g.

/data/db/ sym      mas      par.txt 

where par.txt is

/0/db/1/db/2/db/3/db

and these directories are:

/0/db                   2005.03.15/trade quote 2005.03.19/trade quote/1/db                   2005.03.16/trade quote 2005.03.20/trade quote..                     

Partitions are round-robin allocated mod N where N is the number of partitions. Round-robin is to get maximum parallelization on date ranges.

25 Logs

Log/commit strategies are none, file (-l) or synch (-L). None is good for test, readonly, readmostly, trusted, duplicated or cache db's. File (100,000 transactions per second) is useful if you trust (or duplicate) the machine. Synch is 100 times per second
with SCSI disk and 10,000 times per second with SSD(solid state disk). In any case a single drive can commit more than 40MB of updates per second. A logging database is [script/][state/]log, e.g.

&gt;q d -l / loads d.q(func), d.qdb(data), d.log(updates)                \l      / checkpoint d.qdb and empty d.log (d must not have &quot;.&quot; in it)

i.e. put code in d.q in directory .u(utilities). all update messages are logged. To send a message from the console use 0, e.g.
0"v+:1" . An error after a partial update will cause rollback. d can start empty. d, d.q and d.log should all be in the same launch directory. In general only log in production. Keep the database as simple as possible.

26 Load

(types;widths)0:file / text load(types;widths)1:file / data load

The windows version comes with an odbc loader, e.g.

&gt;q w32/odbc.k                                         q).odbc.load`northwind.mdb / will load entire database

You should get 10MB/sec -- if the source rdbms and network are up to it. Also,

h:.odbc.open`       t:.odbc.tables h    r:.odbc.eval[h]&quot;...&quot;  .odbc.close h     

$bcp t out t.txt -c

27 Archive

It is trivial to incrementally archive the large parallel databases: simply move the old directories away and send the reset message.

28 User

Servers can control access with a file of usr:pwd's, e.g.

&gt;q f -p 1234 -u ../user.txt

Clients connect with:

     q&gt; c:hopen`:host:1234:usr:pwd         c.java&gt; c c=new k.c(&quot;host&quot;,1234,&quot;usr:pwd&quot;);  c.cs&gt; c c=new k.c(&quot;host&quot;,1234,&quot;usr:pwd&quot;);   c.o&gt; int c=khpu(&quot;host&quot;,1234,&quot;usr:pwd&quot;); 

Even when there is no server -u the q client still sends username. All incoming messages reflect:

.z.w / who-socket.z.a / address   .z.u / user      

Clients can only do file i/o in the server directory.

29 Grid

Kdb+ is a parallel/grid programming system.

&gt;q [f] -s n / will start n slaves threads{..}peach x / parallel execute on x, e.g.

The overhead is conditional threads - 1 microsecond - so use accordingly. Test overhead with:

\t {x}peach til 1000

Ideal parallel cpu candidate has high cpu%data, e.g.

{sum exp x?1.0}peach 2#1000000 / should be twice as fast with 2 cpu's

Peach is also good for going after many different drives at once even if you only have one cpu.

 

=============================c.java

package kx; //jar cf c.jar kx/*.class
import java.net.*;import java.io.*;import java.sql.*;import java.lang.reflect.Array;import java.text.*;
//tick: c c=new c("",5010);Object[]x={"GE",new Double(2.5),new Integer(23)};c.k(".u.upd","trade",x);
//Object[]x={new Time(t()),"xx",new Double(93.5),new Integer(300)};for(int i=0;i<1000;++i)c.ks("upsert","trade",x);c.k("");
//Flip t=td(c.k("select sum size by sym from trade"));O(n(t.x));O(n(t.y[0]));O(at(t.y[0],0)); //cols rows data
public class c{
/*public static void main(String[]args){try{c c=new c("",5001);
// O(c.k("0N!",c.k("0N!1999.01.01D-1")));
//c.k("0N!",NULL('z'));
//c.setEncoding("UTF-8");O("Unicode "+c.k("{`$x}","Ranby Björklund AB".toCharArray()));O("Unicode "+c.k("{x}",(String)c.k("{`$x}",(char[])c.k("\"c\"$0x52616e627920426ac3b6726b6c756e64204142"))));  

 c.close();}catch(Exception e){e.printStackTrace();}}
*/
private static String e="ISO-8859-1";private static PrintStream out=System.out;
public static void setEncoding(String e)throws UnsupportedEncodingException{c.e=e;out=new PrintStream(System.out,true,e);}
public Socket s;DataInputStream i;OutputStream o;byte[]b,B;int j,J;boolean a,v6;
void io(Socket x)throws IOException{s=x;i=new DataInputStream(s.getInputStream());o=s.getOutputStream();}public void close()throws IOException{s.close();i.close();o.close();}
public c(ServerSocket s)throws IOException{io(s.accept());i.read(b=new byte[99]);o.write(b,0,1);} //c c=new c(new ServerSocket(5010));while(true)c.w(2,c.k());
public c(String h,int p,String u)throws KException,IOException{B=new byte[2+ns(u)];io(new Socket(h,p));J=0;w(u+"\1");o.write(B);if(1!=i.read(B,0,1)){close();B=new byte[1+ns(u)];io(new Socket(h,p));J=0;w(u);o.write(B);if(1!=i.read(B,0,1)){close();throw new KException("access");}}v6=B[0]==1;}
public c(String h,int p)throws KException,IOException{this(h,p,System.getProperty("user.name"));}
public static class Month{public int i;public Month(int x){i=x;}public String toString(){int m=i+24000,y=m/12;return i==ni?"":i2(y/100)+i2(y%100)+"-"+i2(1+m%12);}}
public static class Minute{public int i;public Minute(int x){i=x;}public String toString(){return i==ni?"":i2(i/60)+":"+i2(i%60);}}
public static class Second{public int i;public Second(int x){i=x;}public String toString(){return i==ni?"":new Minute(i/60).toString()+':'+i2(i%60);}}
public static class Timespan{public long j;public Timespan(long x){j=x;}public String toString(){return j==nj?"":j+"";}}
public static class Dict{public Object x;public Object y;public Dict(Object X,Object Y){x=X;y=Y;}}
public static class Flip{public String[]x;public Object[]y;public Flip(Dict X){x=(String[])X.x;y=(Object[])X.y;}public Object at(String s){return y[find(x,s)];}}
public static class KException extends Exception{KException(String s){super(s);}}

private void u(){int n=0,r=0,f=0,s=8,p=s;short i=0;j=0;byte[]dst=new byte[ri()];int d=j;int[]aa=new int[256];while(s<dst.length){if(i==0){f=0xff&(int)b[d++];i=1;}if((f&i)!=0){r=aa[0xff&(int)b[d++]];dst[s++]=dst[r++];dst[s++]=dst[r++];n=0xff&(int)b[d++];for(int
m=0;m<n;m++)dst[s+m]=dst[r+m];}else dst[s++]=b[d++];while(p<s-1)aa[(0xff&(int)dst[p])^(0xff&(int)dst[p+1])]=p++;if((f&i)!=0)p=s+=n;i*=2;if(i==256)i=0;}b=dst;j=8;}
void w(byte x){B[J++]=x;}static int ni=Integer.MIN_VALUE;static long nj=Long.MIN_VALUE;static double nf=Double.NaN;
boolean rb(){return 1==b[j++];}void w(boolean x){w((byte)(x?1:0));}  char rc(){return(char)(b[j++]&0xff);}void w(char c){w((byte)c);}
short rh(){int x=b[j++],y=b[j++];return(short)(a?x&0xff|y<<8:x<<8|y&0xff);}                               void w(short h){w((byte)(h>>8));w((byte)h);}
int ri(){int x=rh(),y=rh();return a?x&0xffff|y<<16:x<<16|y&0xffff;}                                       void w(int i){w((short)(i>>16));w((short)i);}
long rj(){int x=ri(),y=ri();return a?x&0xffffffffL|(long)y<<32:(long)x<<32|y&0xffffffffL;}                void w(long j){w((int)(j>>32));w((int)j);}
float re(){return Float.intBitsToFloat(ri());}                                                            void w(float e){w(Float.floatToIntBits(e));}
double rf(){return Double.longBitsToDouble(rj());}                                                        void w(double f){w(Double.doubleToLongBits(f));}
Month rm(){return new Month(ri());}   void w(Month m){w(m.i);} Minute ru(){return new Minute(ri());}      void w(Minute u){w(u.i);}
Second rv(){return new Second(ri());} void w(Second v){w(v.i);}Timespan rn(){return new Timespan(rj());}  void w(Timespan n){if(!v6)throw new RuntimeException("Timespan not valid pre kdb+2.6");w(n.j);}
public java.util.TimeZone tz=java.util.TimeZone.getDefault();
static long k=86400000L*10957,n=1000000000L;long o(long x){return tz.getOffset(x);}long lg(long x){return x+o(x);}long gl(long x){return x-o(x-o(x));}
Date rd(){int i=ri();return new Date(i==ni?nj:gl(k+86400000L*i));}                             void w(Date d){long j=d.getTime();w(j==nj?ni:(int)(lg(j)/86400000-10957));}
Time rt(){int i=ri();return new Time(i==ni?nj:gl(i));}                                         void w(Time t){long j=t.getTime();w(j==nj?ni:(int)(lg(j)%86400000));}
//Timestamp
java.util.Date rz(){double f=rf();return new java.util.Date(Double.isNaN(f)?nj:gl(k+Math.round(8.64e7*f)));} void w(java.util.Date z){long j=z.getTime();w(j==nj?nf:(lg(j)-k)/8.64e7);}
Timestamp rp(){long j=rj(),d=j<0?(j+1)/n-1:j/n;Timestamp p=new Timestamp(j==nj?j:gl(k+1000*d));if(j!=nj)p.setNanos((int)(j-n*d));return p;}
void w(Timestamp p){long j=p.getTime();if(!v6)throw new RuntimeException("Timestamp not valid pre kdb+2.6");w(j==nj?j:1000000*(lg(j)-k)+p.getNanos()%1000000);}

String rs()throws UnsupportedEncodingException{int i=j;for(;b[j++]!=0;);return (i==j-1)?"":new String(b,i,j-1-i,e);}void w(String s)throws UnsupportedEncodingException{int i=0,n=ns(s);byte[]b=s.getBytes(e);for(;i<n;)w(b[i++]);B[J++]=0;}
Object r()throws UnsupportedEncodingException{int i=0,n,t=b[j++];if(t<0)switch(t){case-1:return new Boolean(rb());case-4:return new Byte(b[j++]);case-5:return new Short(rh());
  case-6:return new Integer(ri());case-7:return new Long(rj());case-8:return new Float(re());case-9:return new Double(rf());case-10:return new Character(rc());case-11:return rs();
  case-12:return rp();case-13:return rm();case-14:return rd();case-15:return rz();case-16:return rn();case-17:return ru();case-18:return rv();case-19:return rt();}
 if(t>99){if(t==100){rs();return r();}if(t<104)return b[j++]==0&&t==101?null:"func";if(t>105)r();else for(n=ri();i<n;i++)r();return"func";}
 if(t==99)return new Dict(r(),r());j++;if(t==98)return new Flip((Dict)r());n=ri();switch(t){
  case 0:Object[]L=new Object[n];for(;i<n;i++)L[i]=r();return L;        case 1:boolean[]B=new boolean[n];for(;i<n;i++)B[i]=rb();return B;
  case 4:byte[]G=new byte[n];for(;i<n;i++)G[i]=b[j++];return G;         case 5:short[]H=new short[n];for(;i<n;i++)H[i]=rh();return H;
  case 6:int[]I=new int[n];for(;i<n;i++)I[i]=ri();return I;             case 7:long[]J=new long[n];for(;i<n;i++)J[i]=rj();return J;
  case 8:float[]E=new float[n];for(;i<n;i++)E[i]=re();return E;         case 9:double[]F=new double[n];for(;i<n;i++)F[i]=rf();return F;
 case 10:char[]C=new String(b,j,n,e).toCharArray();j+=n;return C;       case 11:String[]S=new String[n];for(;i<n;i++)S[i]=rs();return S;
 case 12:Timestamp[]P=new Timestamp[n];for(;i<n;i++)P[i]=rp();return P; case 13:Month[]M=new Month[n];for(;i<n;i++)M[i]=rm();return M;
 case 14:Date[]D=new Date[n];for(;i<n;i++)D[i]=rd();return D;           case 15:java.util.Date[]Z=new java.util.Date[n];for(;i<n;i++)Z[i]=rz();return Z;
 case 16:Timespan[]N=new Timespan[n];for(;i<n;i++)N[i]=rn();return N;   case 17:Minute[]U=new Minute[n];for(;i<n;i++)U[i]=ru();return U;
 case 18:Second[]V=new Second[n];for(;i<n;i++)V[i]=rv();return V;       case 19:Time[]T=new Time[n];for(;i<n;i++)T[i]=rt();return T;}return null;}

//object.getClass().isArray()   t(int[]) is .5 isarray is .1 lookup .05
public static int t(Object x){return
 x instanceof Boolean?-1:x instanceof Byte?-4:x instanceof Short?-5:x instanceof Integer?-6:x instanceof Long?-7:x instanceof Float?-8:x instanceof Double?-9:x instanceof Character?-10:x instanceof String?-11:
x instanceof Date?-14:x instanceof Time?-19:x instanceof Timestamp?-12:x instanceof java.util.Date?-15:x instanceof Timespan?-16: x instanceof Month?-13:x instanceof Minute?-17:x instanceof Second?-18:
 x instanceof boolean[]?1:x instanceof byte[]?4:x instanceof short[]?5:x instanceof int[]?6:x instanceof long[]?7:x instanceof float[]?8:x instanceof double[]?9:x instanceof char[]?10:x instanceof String[]?11:
x instanceof Date[]?14:x instanceof Time[]?19:x instanceof Timestamp[]?12:x instanceof java.util.Date[]?15:x instanceof Timespan[]?16:x instanceof Month[]?13:x instanceof Minute[]?17:x instanceof Second[]?18:
 x instanceof Flip?98:x instanceof Dict?99:0;}

static int[]nt={0,1,0,0,1,2,4,8,4,8,1,0,8,4,4,8,8,4,4,4};static int ns(String s)throws UnsupportedEncodingException{int i;if(s==null)return 0;if(-1<(i=s.indexOf('\000')))s=s.substring(0,i);return s.getBytes(e).length;}
public static int n(Object x)throws UnsupportedEncodingException{return x instanceof Dict?n(((Dict)x).x):x instanceof Flip?n(((Flip)x).y[0]):x instanceof char[]?new String((char[])x).getBytes(e).length:Array.getLength(x);}
public int nx(Object x)throws UnsupportedEncodingException{int i=0,n,t=t(x),j;if(t==99)return 1+nx(((Dict)x).x)+nx(((Dict)x).y);if(t==98)return 3+nx(((Flip)x).x)+nx(((Flip)x).y);
 if(t<0)return t==-11?2+ns((String)x):1+nt[-t];j=6;n=n(x);if(t==0||t==11)for(;i<n;++i)j+=t==0?nx(((Object[])x)[i]):1+ns(((String[])x)[i]);else j+=n*nt[t];return j;}
void w(Object x)throws UnsupportedEncodingException{int i=0,n,t=t(x);w((byte)t);if(t<0)switch(t){case-1:w(((Boolean)x).booleanValue());return;
  case-4:w(((Byte)x).byteValue());return;       case-5:w(((Short)x).shortValue());return;
  case-6:w(((Integer)x).intValue());return;     case-7:w(((Long)x).longValue());return;
  case-8:w(((Float)x).floatValue());return;     case-9:w(((Double)x).doubleValue());return;
  case-10:w(((Character)x).charValue());return; case-11:w((String)x);return;
  case-12:w((Timestamp)x);return;               case-13:w((Month)x);return;case-14:w((Date)x);return;
  case-15:w((java.util.Date)x);return;          case-16:w((Timespan)x);return;case-17:w((Minute)x);return;
  case-18:w((Second)x);return;case-19:w((Time)x);return;}
 if(t==99){Dict r=(Dict)x;w(r.x);w(r.y);return;}B[J++]=0;if(t==98){Flip r=(Flip)x;B[J++]=99;w(r.x);w(r.y);return;}
 w(n=n(x));if(t==10){byte[]b=new String((char[])x).getBytes(e);for(;i<b.length;)w(b[i++]);}else for(;i<n;++i)if(t==0)w(((Object[])x)[i]);else if(t==1)w(((boolean[])x)[i]);else if(t==4)w(((byte[])x)[i]);
 else if(t==5)w(((short[])x)[i]);else if(t==6)w(((int[])x)[i]);else if(t==7)w(((long[])x)[i]);
 else if(t==8)w(((float[])x)[i]);else if(t==9)w(((double[])x)[i]);
 else if(t==11)w(((String[])x)[i]);else if(t==12)w(((Timestamp[])x)[i]);else if(t==13)w(((Month[])x)[i]);else if(t==14)w(((Date[])x)[i]);
 else if(t==15)w(((java.util.Date[])x)[i]);else if(t==16)w(((Timespan[])x)[i]);else if(t==17)w(((Minute[])x)[i]);else if(t==18)w(((Second[])x)[i]);
 else w(((Time[])x)[i]);}
void w(int i,Object x)throws IOException{int n=nx(x)+8;synchronized(o){B=new byte[n];B[0]=0;B[1]=(byte)i;J=4;w(n);w(x);o.write(B);}}
public void ks(String s)throws IOException{w(0,cs(s));}public void ks(Object x)throws IOException{w(0,x);} char[]cs(String s){return s.toCharArray();}
public void ks(String s,Object x)throws IOException{Object[]a={cs(s),x};w(0,a);}
public void ks(String s,Object x,Object y)throws IOException{Object[]a={cs(s),x,y};w(0,a);}
public void ks(String s,Object x,Object y,Object z)throws IOException{Object[]a={cs(s),x,y,z};w(0,a);}
public Object k()throws KException,IOException,UnsupportedEncodingException{synchronized(i){i.readFully(b=new byte[8]);a=b[0]==1;boolean c=b[2]==1;j=4;i.readFully(b=new byte[ri()-8]);if(c)u();else j=0;if(b[0]==-128){j=1;throw new KException(rs());}return r();}}
public synchronized Object k(Object x)throws KException,IOException{w(1,x);return k();}
public Object k(String s)throws KException,IOException{return k(cs(s));}
public Object k(String s,Object x)throws KException,IOException{Object[]a={cs(s),x};return k(a);}
public Object k(String s,Object x,Object y)throws KException,IOException{Object[]a={cs(s),x,y};return k(a);}
public Object k(String s,Object x,Object y,Object z)throws KException,IOException{Object[]a={cs(s),x,y,z};return k(a);}

public static Object[]NULL={null,new Boolean(false),null,null,new Byte((byte)0),new Short(Short.MIN_VALUE),new Integer(ni),new Long(nj),new Float(nf),new Double(nf),new Character(' '),"",
 new Timestamp(nj),new Month(ni),new Date(nj),new java.util.Date(nj),new Timespan(nj),new Minute(ni),new Second(ni),new Time(nj)};
public static Object NULL(char c){return NULL[" b  xhijefcspmdznuvt".indexOf(c)];}

public static boolean qn(Object x){int t=-t(x);return t>4&&x.equals(NULL[t]);}
public static Object at(Object x,int i){return qn(x=Array.get(x,i))?null:x;}
public static void set(Object x,int i,Object y){Array.set(x,i,null==y?NULL[t(x)]:y);}

static int find(String[]x,String y){int i=0;for(;i<x.length&&!x[i].equals(y);)++i;return i;}
public static Flip td(Object X)throws java.io.UnsupportedEncodingException{if(X instanceof Flip)return(Flip)X;Dict d=(Dict)X;Flip a=(Flip)d.x,b=(Flip)d.y;int m=n(a.x),n=n(b.x);String[]x=new String[m+n];System.arraycopy(a.x,0,x,0,m);System.arraycopy(b.x,0,x,m,n);Object[]y=new
Object[m+n];System.arraycopy(a.y,0,y,0,m);System.arraycopy(b.y,0,y,m,n);return new Flip(new Dict(x,y));}
public static Object O(Object x){out.println(x);return x;}public static void O(int x){out.println(x);}public static void O(boolean x){out.println(x);}public static void O(long x){out.println(x);}public static void O(double x){out.println(x);}
public static long t(){return System.currentTimeMillis();}static long t;public static void tm(){long u=t;t=t();if(u>0)O(t-u);}static String i2(int i){return new DecimalFormat("00").format(i);}
}
//2012.02.09 close() if connect fails
//2012.01.06 read datetime, rz(), was truncating mS rather than rounding
//2010.10.06 block sending timestamp/timespan types to versions prior to kdb+2.6
//2010.05.06 optimized rs() for reading null symbols
//2010.03.20 changed datetime to java.util.Date as it was incompatible with timestamp
//2010.02.01 added unicode support for char vectors and symbol
//2010.01.06 fixed 0Np
//2009.12.07 removed v6 dependencies
//2009.12.02 uncommented at, set and qn
//2009.10.29 u - uncompress, connect retry for v<=2.5
//2009.09.23 Timestamp,Timespan,v6 connect
//2008.08.14 String(,,,"ISO-8859-1") to avoid mutex
//2007.10.18 tz
//2007.08.06 kx
//2007.04.20 sql.{Date|Time|Timestamp}

 

抱歉!评论已关闭.