csaandra cluster with multi-host docker

in server 192.168.112.101 to run a cassandra seed node, where in cassandra.yaml, listen_address ,broadcast_address and rpc_address is 192.168.112.101

docker run –net=host -d –name cass1 -p 9042:9042 -p 7001:7001 -p 7000:7000 -p 7199:7199 -p 9160:9160 _YOUR_DOCKER_IMAGE_

in server 192.168.112.102 to run a cassandra node, where in cassandra.yaml, listen_address ,broadcast_address, rpc_address and seeds parameter is 192.168.112.102.

- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters: - seeds: "192.168.112.101"

docker run --net=host -d --name cass2 _YOUR_DOCKER_IMAGE_

192.168.112.102 and 192.168.112.101 should see ports 7000,7001,9042 and 7199 of each other.

node.js + cassandra

// install cassandra from http://www.datastax.com/documentation/cassandra/2.1/cassandra/install/install_cassandraTOC.html

cqlsh

CREATE KEYSPACE testkeystore WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};

CREATE TYPE testkeystore.address (
number int,
street text,
code int,
city text
);

CREATE TABLE testkeystore.person(
person_id text PRIMARY KEY,
firstname text,
lastname text,
age int,
address set< frozen

>,
phones map,
);

exit

sudo npm install -g cassandra-driver

subl cassadnra.js

var cassandra = require('cassandra-driver');

var client = new cassandra.Client({contactPoints: ['host1', '127.0.0.1'], keyspace: 'testkeystore'});

var cassandra = require('cassandra-driver');

var client = new cassandra.Client({contactPoints: ['host1', '127.0.0.1'], keyspace: 'testkeystore'});

var query = 'INSERT INTO testkeystore.person(person_id,firstname,lastname,age,address,phones) VALUES(?, ?, ?, ?, {{number:123,street:\'samplestreet\',code:4569,city:\'samplecity\'}},{\'home\':\'9764\'});'
var params = ['1', 'bob', 'bob', 35];

client.execute(query, params, {prepare: true}, function(err) {

if(err)
console.log('ERROR '+err);
else
console.log('Row inserted on the cluster');
});

nodejs cassadnra.js

errors during making a docker image of cassandra on openwrt

conf/cassandra-env.sh: line 91: /opt/jre/bin/java: not found

bash-4.3# which java
/opt/jre/bin/bundled/java

vi Dockerfile

RUN sed -i 's/\"$JAVA_HOME\"\/bin\/java/\"$JAVA_HOME\"\/bin\/bundled\/java/g' /opt/cassandra/bin/cassandra

/opt/cassandra/bin/cassandra

./cassandra: line 165: getopt: not found

vi /opt/cassandra/bin/cassandra
#args=`getopt vfhp:bD:H:E: "$@"`
#eval set -- "$args"
classname="org.apache.cassandra.service.CassandraDaemon"
#while true; do
.
.
.

foreground="yes"
properties="-XX:ErrorFile=/etc/cassandra -XX:HeapDumpPath=/etc/cassandra"
launch_service "$foreground" "$properties" "$classname"

/opt/cassandra/bin/cassandra

# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 8380219392 bytes for committing reserved memory.

modify cassandra-env.sh

add
MAX_HEAP_SIZE=1000M
before
JVM_OPTS="$JVM_OPTS -Xms${MAX_HEAP_SIZE}"

Error: Exception thrown by the agent : java.net.MalformedURLException: Local host name unknown: java.net.UnknownHostException: cassandra-host: cassandra-host

import java.net.Inet4Address;
import java.net.UnknownHostException;

public class Test {
public static void main(String[] args) throws UnknownHostException {
System.setProperty("java.net.preferIPv4Stack" , "true");
System.out.println(Inet4Address.getLocalHost());

}
}

bash-4.3# java Test
Exception in thread "main" java.net.UnknownHostException: 30b91803e12d: 30b91803e12d
at java.net.InetAddress.getLocalHost(InetAddress.java:1473)
at Test.main(Test.java:8)
Caused by: java.net.UnknownHostException: 30b91803e12d
at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)
at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:901)
at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1293)
at java.net.InetAddress.getLocalHost(InetAddress.java:1469)
... 1 more

following libs were lost in jre/lib
+libnss_dns-2.19.so
+libnss_files-2.19.so
libnss_dns.so.2

install cassandra in Windows

after installing cassandra 2.1.2 in Windows 8 and starting it I got the following error:

ERROR 20:20:43 Exception encountered during startup
java.lang.RuntimeException: Incompatible SSTable found. Current version ka is unable to read file: \var\lib\cassandra\da
ta\system\schema_keyspaces\system-schema_keyspaces-ic-1. Please run upgradesstables.
at org.apache.cassandra.db.ColumnFamilyStore.createColumnFamilyStore(ColumnFamilyStore.java:427) ~[apache-cassan
dra-2.1.2.jar:2.1.2]
at org.apache.cassandra.db.ColumnFamilyStore.createColumnFamilyStore(ColumnFamilyStore.java:404) ~[apache-cassan
dra-2.1.2.jar:2.1.2]
at org.apache.cassandra.db.Keyspace.initCf(Keyspace.java:327) ~[apache-cassandra-2.1.2.jar:2.1.2]
at org.apache.cassandra.db.Keyspace.(Keyspace.java:280) ~[apache-cassandra-2.1.2.jar:2.1.2]
at org.apache.cassandra.db.Keyspace.open(Keyspace.java:122) ~[apache-cassandra-2.1.2.jar:2.1.2]
at org.apache.cassandra.db.Keyspace.open(Keyspace.java:99) ~[apache-cassandra-2.1.2.jar:2.1.2]
at org.apache.cassandra.db.SystemKeyspace.checkHealth(SystemKeyspace.java:558) ~[apache-cassandra-2.1.2.jar:2.1.
2]
at org.apache.cassandra.service.CassandraDaemon.setup(CassandraDaemon.java:214) [apache-cassandra-2.1.2.jar:2.1.
2]
at org.apache.cassandra.service.CassandraDaemon.activate(CassandraDaemon.java:448) [apache-cassandra-2.1.2.jar:2
.1.2]
at org.apache.cassandra.service.CassandraDaemon.main(CassandraDaemon.java:537) [apache-cassandra-2.1.2.jar:2.1.2

it will be solved with :


in

E:\DataStax Community\apache-cassandra\conf\cassandra.yaml

change to

$CASSANDRA_HOME/data/data.
data_file_directories:
- E:\DataStax Community\apache-cassandra\data

and

$CASSANDRA_HOME/data/commitlog.
commitlog_directory: E:\DataStax Community\apache-cassandra\log

also I got the following error when I load a trigger with: nodetool -h localhost reloadtriggers

Trigger directory doesn't exist, please create it and try again.

to solve it


in

E:\DataStax Community\apache-cassandra\bin\cassandra.bat

change to

-Dlogback.configurationFile=logback.xml^
-Dcassandra.triggers_dir="E:\DataStax Community\apache-cassandra\conf\triggers"

create a Cassandra Trigger with User Data Type and Set Type

create schema


CREATE KEYSPACE testkeystore WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};

CREATE TYPE testkeystore.phone (
name text,
number int
);

CREATE TYPE testkeystore.addressdetail (
number int,
street text,
code int,
city text
);

CREATE TYPE testkeystore.address (
name text,
howlongyear int,
addressdetail frozen

);

CREATE TYPE testkeystore.contact (

email set,
);

CREATE TABLE testkeystore.person(
person_id text PRIMARY KEY,
firstname text,
lastname text,
age int,
jobs set,
address set< frozen <address>>,
contact frozen
);

trigger source

package com.cassandra.trigger;

public class ElasticsearchTrigger implements ITrigger {
private static final Logger logger = LoggerFactory
.getLogger(ElasticsearchTrigger.class);

public Collection augment(ByteBuffer key, ColumnFamily update) {
CFMetaData cfm = update.metadata();
String localKey = cfm.getKeyValidator().getString(key);
logger.info("key={}.", localKey);
Map setMap = new HashMap();
Map simpleMap = new HashMap();
StringBuffer json = new StringBuffer();
json.append("{" + wrappedDoubleQuotes("id") + ":" + localKey+",");

for (Cell cell : update) {

if (cell.name().isCollectionCell()
&& cfm.getValueValidator(cell.name()) instanceof SetType) {
String name = wrappedDoubleQuotes(cell.name()
.cql3ColumnName(cfm).toString());

SetType setType = ((SetType) cfm.getValueValidator(cell.name()));
String value = getSetTypeValue(cell.name().collectionElement(),
setType);
if (setType.elements instanceof UserType) {
StringBuffer utsb = getUserTypeValue(cell.name()
.collectionElement(), (UserType) setType.elements);
value = utsb.toString();
} else {
if (setType.elements instanceof Int32Type) {
value = setType.elements.getString(cell.name()
.collectionElement());
} else if (setType.elements instanceof UTF8Type) {
value = wrappedDoubleQuotes(setType.elements
.getString(cell.name().collectionElement()));
}
}

if (value != null && !value.isEmpty()) {
if (setMap.get(name) == null)
setMap.put(name, value);
else {
setMap.put(name, setMap.get(name) + "," + value);
}
}
} else {
String name = wrappedDoubleQuotes(cfm.comparator.getString(cell
.name()));
String value = cfm.getValueValidator(cell.name()).getString(
cell.value());
if (cfm.getValueValidator(cell.name()) instanceof UTF8Type)
simpleMap.put(name, wrappedDoubleQuotes(value));
else if (cfm.getValueValidator(cell.name()) instanceof Int32Type)
simpleMap.put(name, value);
else if (cfm.getValueValidator(cell.name()) instanceof UserType)
simpleMap.put(
name,
getUserTypeValue(
cell.value(),
(UserType) cfm.getValueValidator(cell
.name())).toString());
}
}
boolean flag = false;
for (String name : simpleMap.keySet()) {
if (!name.isEmpty()) {
if (flag)
json.append(",");
flag = true;
json.append(name);
json.append(":");
json.append(simpleMap.get(name));
}

}

for (String name : setMap.keySet()) {
if (flag)
json.append(",");
json.append(name);
json.append(":");
json.append("[");
json.append(setMap.get(name));
json.append("]");
}

json.append("}");
logger.info("json={}.", json.toString());
return null;
}

private StringBuffer getUserTypeValue(ByteBuffer values, UserType userType) {
StringBuffer utsb = new StringBuffer();
utsb.append("{");
boolean flag = false;
for (int i = 0; i < userType.size(); i++) {
ByteBuffer[] ByteBuffers = userType.split(values);
String userTypeValue = userType.fieldType(i).getString(
ByteBuffers[i]);
if (flag)
utsb.append(",");
flag = true;
utsb.append(wrappedDoubleQuotes(UTF8Type.instance
.getString(userType.fieldName(i))));
utsb.append(":");
if (userType.fieldType(i) instanceof UTF8Type) {
utsb.append(wrappedDoubleQuotes(userTypeValue));
} else {
if (userType.fieldType(i) instanceof Int32Type) {
utsb.append(userTypeValue);
} else if (userType.fieldType(i) instanceof UserType) {
utsb.append(getUserTypeValue(ByteBuffers[i],
(UserType) userType.fieldType(i)));
} else if (userType.fieldType(i) instanceof SetType) {
utsb.append(getSetTypeValue2(ByteBuffers[i],
(SetType) userType.fieldType(i)));
}

}

}
utsb.append("}");
return utsb;
}

private String getSetTypeValue(ByteBuffer bytevalue, SetType setType) {
String value = "";
if (setType.elements instanceof UserType) {
value = getUserTypeValue(bytevalue, (UserType) setType.elements)
.toString();
} else {
if (setType.elements instanceof UTF8Type)
value = "'" + setType.elements.getString(bytevalue) + "'";
else if (setType.elements instanceof Int32Type)
value = setType.elements.getString(bytevalue);
}
return value;

}

private String getSetTypeValue2(ByteBuffer bytevalue, SetType setType) {
StringBuffer value = new StringBuffer();
value.append("[");
Set s = (Set) setType.getSerializer()
.deserializeForNativeProtocol(bytevalue, 3);
ByteBuffer input = bytevalue.duplicate();
int n = org.apache.cassandra.serializers.CollectionSerializer
.readCollectionSize(input, 3);
Set setbb = new LinkedHashSet(n);
for (int i = 0; i < n; i++) {
ByteBuffer databb = org.apache.cassandra.serializers.CollectionSerializer
.readValue(input, 3);
setbb.add(databb);
}
boolean flag = false;
for (Object o : s) {
if (flag)
value.append(",");
flag = true;
if (setType.elements instanceof UserType) {
value.append(getUserTypeValue(bytevalue,
(UserType) setType.elements).toString());
} else {
if (setType.elements instanceof UTF8Type)
value.append(wrappedDoubleQuotes((String) o));
else if (setType.elements instanceof Int32Type)
value.append(setType.elements.getString(bytevalue));
}
}
value.append("]");
return value.toString();

}

private String wrappedDoubleQuotes(String value) {
return "\"" + value + "\"";
}
}

deploy trigger


cd ELASTICSEARCH_CASSANDRA_TRIGGER_HOME
mvn clean install
sudo cp target/elasticsearch-cassandra-trigger-0.0.1-SNAPSHOT.jar /etc/cassandra/triggers
nodetool -h localhost reloadtriggers
cqlsh
CREATE TRIGGER IF NOT EXISTS elasticsearchtrigger ON testkeystore.person USING 'com.cassandra.trigger.ElasticsearchTrigger';

execute update


INSERT INTO testkeystore.person(person_id,firstname,lastname,age,jobs,address,contact) VALUES('123456','samplefirstname','samplelastname',18,{'job1','job2'},{{name:'home',howlongyear:10,addressdetail:{number:123,street:'samplestreet',code:4569,city:'samplecity'}},{name:'work',howlongyear:2,addressdetail:{number:456,street:'samplestreet2',code:1234,city:'samplecity2'}}},{ email:{'email1@example.com','email2@example.com'} });

check log with


sudo tail -f /var/log/cassandra/system.log

INFO [SharedPool-Worker-1] 2014-11-11 10:26:35,838 ElasticsearchTrigger.java:111 - json={"id":123456,"firstname":"samplefirstname","age":18,"lastname":"samplelastname","contact":{"email":["email1@example.com","email2@example.com"]},"jobs":["job1","job2"],"address":[{"name":"home","howlongyear":10,"addressdetail":{"number":123,"street":"samplestreet","code":4569,"city":"samplecity"}},{"name":"work","howlongyear":2,"addressdetail":{"number":456,"street":"samplestreet2","code":1234,"city":"samplecity2"}}]}.

cassandra trigger

to run the cassandra 2.1.1 InvertedIndex trigger sample from
https://github.com/apache/cassandra/tree/trunk/examples/triggers

 

git clone https://github.com/apache/cassandra.git
cd cassandra
ant
cd examples/triggers/
ant jar
sudo cp build/trigger-example.jar /etc/cassandra/triggers
sudo cp conf/* /etc/cassandra/
nodetool -h localhost reloadtriggers
cqlsh

cqlsh>CREATE KEYSPACE "Keyspace1" WITH replication = {'class':'SimpleStrategy', 'replication_factor':3};

cqlsh>CREATE TABLE IF NOT EXISTS "Keyspace1"."InvertedIndex"(
user_id text PRIMARY KEY,
user_name text
);

cqlsh>CREATE TABLE IF NOT EXISTS "Keyspace1"."Standard1"(
user_id text PRIMARY KEY,
user_name text
);

cqlsh>CREATE TRIGGER test1 ON "Keyspace1"."Standard1" USING 'org.apache.cassandra.triggers.InvertedIndex';

cqlsh>INSERT INTO "Keyspace1"."Standard1"(user_id,user_name) VALUES('1','Bob');

cqlsh> select  * from "Keyspace1"."Standard1";

user_id | user_name
---------+-----------
1 |       Bob

(1 rows)
cqlsh> select  * from "Keyspace1"."InvertedIndex";

user_id | user_name
---------+-----------
Bob |         1

elasticsearch cassandra river

srecon / elasticsearch – cassandra – river

I have tried yesterday to make an integration between elasticsearch 2.1 and cassandra 2.1 with srecon/elasticsearch-cassandra-river .

after integration I have foundout that everytime that elasticsearch-cassandra-river job starts to loading data from cassandra and importing it to elasticsearch to be indexed, makes a duplication for every record.

for examle: I have a simple table.

CREATE TABLE IF NOT EXISTS nortpole.users(
user_id text PRIMARY KEY,
user_name text
);

INSERT INTO nortpole.users(user_id,user_name) VALUES(‘1′,’Bob’);
INSERT INTO nortpole.users(user_id,user_name) VALUES(‘2′,’alice’);

I create a index for river.

curl -XPUT ‘http://localhost:9200/_river/cassandra-river/_meta&#8217; -d ‘{
“type” : “cassandra”,
“cassandra” : {
“cluster_name” : “Test Cluster”,
“keyspace” : “nortpole”,
“column_family” : “users”,
“batch_size” : 20000,
“hosts” : “localhost”,
“dcName” : “DC”,
“cron”  : “0/60 * * * * ?”
},
“index” : {
“index” : “prodinfo”,
“type” : “product”
}
}’

in this init command, it is set that cron job is executed every minute. after two minutes you have two result for following query.

curl -XPOST “http://localhost:9200/prodinfo/product/_search&#8221; -d’
{
“query”: {
“query_string”: {
“query”: “Bob”
}
}
}’

but what does cause this problem?

according to source CassandraRiver.java in

https://github.com/srecon/elasticsearch-cassandra-river/blob/master/src/main/java/com/blu/es/cassandra/CassandraRiver.java

every record from cassandra with a new id is sent to elastic search.


public class CassandraRiver extends AbstractRiverComponent implements River {
….

@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {

String rowId = UUID.randomUUID().toString();

values.put(rowId, rows);

threadExecutor.execute(new Indexer(getBatchSize(),values, getTypeName(), getIndexName()));

public Indexer(int batchSize, Map<String, Map<String, String>> keys, String typeName, String indexName) {
this.batchSize = batchSize;
this.keys = keys;

public void run() {

for(String key : this.keys.keySet()){
try{
bulk.add(Requests.indexRequest(this.indexName).type(this.typeName)
.id(key).source(this.keys.get(key)));
} catch(Exception e){
LOGGER.error(“Exception in run {}”, e);
}
}

also following code shows that every start for indexing, collects all data from a table. in a huge table case, it causes performance problem.

String SQL = “select * from ” + getColumnFamily() +” ALLOW FILTERING;”;