create a Cassandra Trigger with User Data Type and Set Type

create schema


CREATE KEYSPACE testkeystore WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};

CREATE TYPE testkeystore.phone (
name text,
number int
);

CREATE TYPE testkeystore.addressdetail (
number int,
street text,
code int,
city text
);

CREATE TYPE testkeystore.address (
name text,
howlongyear int,
addressdetail frozen

);

CREATE TYPE testkeystore.contact (

email set,
);

CREATE TABLE testkeystore.person(
person_id text PRIMARY KEY,
firstname text,
lastname text,
age int,
jobs set,
address set< frozen <address>>,
contact frozen
);

trigger source

package com.cassandra.trigger;

public class ElasticsearchTrigger implements ITrigger {
private static final Logger logger = LoggerFactory
.getLogger(ElasticsearchTrigger.class);

public Collection augment(ByteBuffer key, ColumnFamily update) {
CFMetaData cfm = update.metadata();
String localKey = cfm.getKeyValidator().getString(key);
logger.info("key={}.", localKey);
Map setMap = new HashMap();
Map simpleMap = new HashMap();
StringBuffer json = new StringBuffer();
json.append("{" + wrappedDoubleQuotes("id") + ":" + localKey+",");

for (Cell cell : update) {

if (cell.name().isCollectionCell()
&& cfm.getValueValidator(cell.name()) instanceof SetType) {
String name = wrappedDoubleQuotes(cell.name()
.cql3ColumnName(cfm).toString());

SetType setType = ((SetType) cfm.getValueValidator(cell.name()));
String value = getSetTypeValue(cell.name().collectionElement(),
setType);
if (setType.elements instanceof UserType) {
StringBuffer utsb = getUserTypeValue(cell.name()
.collectionElement(), (UserType) setType.elements);
value = utsb.toString();
} else {
if (setType.elements instanceof Int32Type) {
value = setType.elements.getString(cell.name()
.collectionElement());
} else if (setType.elements instanceof UTF8Type) {
value = wrappedDoubleQuotes(setType.elements
.getString(cell.name().collectionElement()));
}
}

if (value != null && !value.isEmpty()) {
if (setMap.get(name) == null)
setMap.put(name, value);
else {
setMap.put(name, setMap.get(name) + "," + value);
}
}
} else {
String name = wrappedDoubleQuotes(cfm.comparator.getString(cell
.name()));
String value = cfm.getValueValidator(cell.name()).getString(
cell.value());
if (cfm.getValueValidator(cell.name()) instanceof UTF8Type)
simpleMap.put(name, wrappedDoubleQuotes(value));
else if (cfm.getValueValidator(cell.name()) instanceof Int32Type)
simpleMap.put(name, value);
else if (cfm.getValueValidator(cell.name()) instanceof UserType)
simpleMap.put(
name,
getUserTypeValue(
cell.value(),
(UserType) cfm.getValueValidator(cell
.name())).toString());
}
}
boolean flag = false;
for (String name : simpleMap.keySet()) {
if (!name.isEmpty()) {
if (flag)
json.append(",");
flag = true;
json.append(name);
json.append(":");
json.append(simpleMap.get(name));
}

}

for (String name : setMap.keySet()) {
if (flag)
json.append(",");
json.append(name);
json.append(":");
json.append("[");
json.append(setMap.get(name));
json.append("]");
}

json.append("}");
logger.info("json={}.", json.toString());
return null;
}

private StringBuffer getUserTypeValue(ByteBuffer values, UserType userType) {
StringBuffer utsb = new StringBuffer();
utsb.append("{");
boolean flag = false;
for (int i = 0; i < userType.size(); i++) {
ByteBuffer[] ByteBuffers = userType.split(values);
String userTypeValue = userType.fieldType(i).getString(
ByteBuffers[i]);
if (flag)
utsb.append(",");
flag = true;
utsb.append(wrappedDoubleQuotes(UTF8Type.instance
.getString(userType.fieldName(i))));
utsb.append(":");
if (userType.fieldType(i) instanceof UTF8Type) {
utsb.append(wrappedDoubleQuotes(userTypeValue));
} else {
if (userType.fieldType(i) instanceof Int32Type) {
utsb.append(userTypeValue);
} else if (userType.fieldType(i) instanceof UserType) {
utsb.append(getUserTypeValue(ByteBuffers[i],
(UserType) userType.fieldType(i)));
} else if (userType.fieldType(i) instanceof SetType) {
utsb.append(getSetTypeValue2(ByteBuffers[i],
(SetType) userType.fieldType(i)));
}

}

}
utsb.append("}");
return utsb;
}

private String getSetTypeValue(ByteBuffer bytevalue, SetType setType) {
String value = "";
if (setType.elements instanceof UserType) {
value = getUserTypeValue(bytevalue, (UserType) setType.elements)
.toString();
} else {
if (setType.elements instanceof UTF8Type)
value = "'" + setType.elements.getString(bytevalue) + "'";
else if (setType.elements instanceof Int32Type)
value = setType.elements.getString(bytevalue);
}
return value;

}

private String getSetTypeValue2(ByteBuffer bytevalue, SetType setType) {
StringBuffer value = new StringBuffer();
value.append("[");
Set s = (Set) setType.getSerializer()
.deserializeForNativeProtocol(bytevalue, 3);
ByteBuffer input = bytevalue.duplicate();
int n = org.apache.cassandra.serializers.CollectionSerializer
.readCollectionSize(input, 3);
Set setbb = new LinkedHashSet(n);
for (int i = 0; i < n; i++) {
ByteBuffer databb = org.apache.cassandra.serializers.CollectionSerializer
.readValue(input, 3);
setbb.add(databb);
}
boolean flag = false;
for (Object o : s) {
if (flag)
value.append(",");
flag = true;
if (setType.elements instanceof UserType) {
value.append(getUserTypeValue(bytevalue,
(UserType) setType.elements).toString());
} else {
if (setType.elements instanceof UTF8Type)
value.append(wrappedDoubleQuotes((String) o));
else if (setType.elements instanceof Int32Type)
value.append(setType.elements.getString(bytevalue));
}
}
value.append("]");
return value.toString();

}

private String wrappedDoubleQuotes(String value) {
return "\"" + value + "\"";
}
}

deploy trigger


cd ELASTICSEARCH_CASSANDRA_TRIGGER_HOME
mvn clean install
sudo cp target/elasticsearch-cassandra-trigger-0.0.1-SNAPSHOT.jar /etc/cassandra/triggers
nodetool -h localhost reloadtriggers
cqlsh
CREATE TRIGGER IF NOT EXISTS elasticsearchtrigger ON testkeystore.person USING 'com.cassandra.trigger.ElasticsearchTrigger';

execute update


INSERT INTO testkeystore.person(person_id,firstname,lastname,age,jobs,address,contact) VALUES('123456','samplefirstname','samplelastname',18,{'job1','job2'},{{name:'home',howlongyear:10,addressdetail:{number:123,street:'samplestreet',code:4569,city:'samplecity'}},{name:'work',howlongyear:2,addressdetail:{number:456,street:'samplestreet2',code:1234,city:'samplecity2'}}},{ email:{'email1@example.com','email2@example.com'} });

check log with


sudo tail -f /var/log/cassandra/system.log

INFO [SharedPool-Worker-1] 2014-11-11 10:26:35,838 ElasticsearchTrigger.java:111 - json={"id":123456,"firstname":"samplefirstname","age":18,"lastname":"samplelastname","contact":{"email":["email1@example.com","email2@example.com"]},"jobs":["job1","job2"],"address":[{"name":"home","howlongyear":10,"addressdetail":{"number":123,"street":"samplestreet","code":4569,"city":"samplecity"}},{"name":"work","howlongyear":2,"addressdetail":{"number":456,"street":"samplestreet2","code":1234,"city":"samplecity2"}}]}.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s