21 #include "session_p.h"
23 #include "imapparser_p.h"
26 #include "servermanager.h"
27 #include "servermanager_p.h"
28 #include "xdgbasedirs_p.h"
33 #include <QCoreApplication>
34 #include <QtCore/QDir>
35 #include <QtCore/QQueue>
36 #include <QtCore/QThreadStorage>
37 #include <QtCore/QTimer>
38 #include <QtCore/QThread>
41 #include <QtNetwork/QLocalSocket>
42 #include <QtNetwork/QTcpSocket>
43 #include <QtNetwork/QHostAddress>
48 #define PIPELINE_LENGTH 0
51 using namespace Akonadi;
56 void SessionPrivate::startNext()
58 QTimer::singleShot( 0, mParent, SLOT(doStartNext()) );
63 QLocalSocket *localSocket = qobject_cast<QLocalSocket*>( socket );
64 if ( localSocket && (localSocket->state() == QLocalSocket::ConnectedState
65 || localSocket->state() == QLocalSocket::ConnectingState ) ) {
70 QTcpSocket *tcpSocket = qobject_cast<QTcpSocket*>( socket );
71 if ( tcpSocket && (tcpSocket->state() == QTcpSocket::ConnectedState
72 || tcpSocket->state() == QTcpSocket::ConnectingState ) ) {
78 QString serverAddress;
83 const QByteArray serverAddressEnvVar = qgetenv(
"AKONADI_SERVER_ADDRESS" );
84 if ( !serverAddressEnvVar.isEmpty() ) {
85 const int pos = serverAddressEnvVar.indexOf(
':' );
86 const QByteArray protocol = serverAddressEnvVar.left( pos );
87 QMap<QString, QString> options;
88 foreach (
const QString &entry, QString::fromLatin1( serverAddressEnvVar.mid( pos + 1 ) ).split( QLatin1Char(
',') ) ) {
89 const QStringList pair = entry.split( QLatin1Char(
'=') );
90 if ( pair.size() != 2 )
92 options.insert( pair.first(), pair.last() );
94 kDebug() << protocol << options;
96 if ( protocol ==
"tcp" ) {
97 serverAddress = options.value( QLatin1String(
"host" ) );
98 port = options.value( QLatin1String(
"port" ) ).toUInt();
100 }
else if ( protocol ==
"unix" ) {
101 serverAddress = options.value( QLatin1String(
"path" ) );
102 }
else if ( protocol ==
"pipe" ) {
103 serverAddress = options.value( QLatin1String(
"name" ) );
108 if ( serverAddress.isEmpty() ) {
110 const QFileInfo fileInfo( connectionConfigFile );
111 if ( !fileInfo.exists() ) {
112 kDebug() <<
"Akonadi Client Session: connection config file '"
113 "akonadi/akonadiconnectionrc' can not be found in"
114 << XdgBaseDirs::homePath(
"config" ) <<
"nor in any of"
115 << XdgBaseDirs::systemPathList(
"config" );
117 const QSettings connectionSettings( connectionConfigFile, QSettings::IniFormat );
119 #ifdef Q_OS_WIN //krazy:exclude=cpp
120 serverAddress = connectionSettings.value( QLatin1String(
"Data/NamedPipe" ), QLatin1String(
"Akonadi" ) ).toString();
122 const QString defaultSocketDir = Internal::xdgSaveDir(
"data" );
123 serverAddress = connectionSettings.value( QLatin1String(
"Data/UnixPath" ), QString(defaultSocketDir + QLatin1String(
"/akonadiserver.socket" )) ).toString();
134 socket = localSocket =
new QLocalSocket( mParent );
135 mParent->connect( localSocket, SIGNAL(error(QLocalSocket::LocalSocketError)), SLOT(socketError(QLocalSocket::LocalSocketError)) );
137 socket = tcpSocket =
new QTcpSocket( mParent );
138 mParent->connect( tcpSocket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(socketError(QAbstractSocket::SocketError)) );
140 mParent->connect( socket, SIGNAL(disconnected()), SLOT(socketDisconnected()) );
141 mParent->connect( socket, SIGNAL(readyRead()), SLOT(dataReceived()) );
145 kDebug() <<
"connectToServer" << serverAddress;
147 tcpSocket->connectToHost( QHostAddress::LocalHost, 31414 );
150 localSocket->connectToServer( serverAddress );
152 tcpSocket->connectToHost( serverAddress, port );
161 return Internal::xdgSaveDir(
"config" ) + QLatin1String(
"/akonadiconnectionrc");
164 void SessionPrivate::socketError( QLocalSocket::LocalSocketError )
166 Q_ASSERT( mParent->sender() == socket );
167 kWarning() <<
"Socket error occurred:" << qobject_cast<QLocalSocket*>( socket )->errorString();
168 socketDisconnected();
171 void SessionPrivate::socketError( QAbstractSocket::SocketError )
173 Q_ASSERT( mParent->sender() == socket );
174 kWarning() <<
"Socket error occurred:" << qobject_cast<QTcpSocket*>( socket )->errorString();
175 socketDisconnected();
178 void SessionPrivate::socketDisconnected()
181 currentJob->d_ptr->lostConnection();
185 void SessionPrivate::dataReceived()
187 while ( socket->bytesAvailable() > 0 ) {
188 if ( parser->continuationSize() > 1 ) {
189 const QByteArray data = socket->read( qMin( socket->bytesAvailable(), parser->continuationSize() - 1 ) );
190 parser->parseBlock( data );
191 }
else if ( socket->canReadLine() ) {
192 if ( !parser->parseNextLine( socket->readLine() ) )
196 if ( parser->tag() == QByteArray(
"0" ) ) {
197 if ( parser->data().startsWith(
"OK" ) ) {
201 kWarning() <<
"Unable to login to Akonadi server:" << parser->data();
203 QTimer::singleShot( 1000, mParent, SLOT(
reconnect()) );
208 if ( parser->tag() ==
"*" && parser->data().startsWith(
"OK Akonadi" ) ) {
209 const int pos = parser->data().indexOf(
"[PROTOCOL" );
212 ImapParser::parseNumber( parser->data(), tmp, 0, pos + 9 );
213 protocolVersion = tmp;
214 Internal::setServerProtocolVersion( tmp );
216 kDebug() <<
"Server protocol version is:" << protocolVersion;
218 writeData(
"0 LOGIN " + ImapParser::quote( sessionId ) +
'\n' );
223 currentJob->d_ptr->handleResponse( parser->tag(), parser->data() );
234 bool SessionPrivate::canPipelineNext()
236 if ( queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH )
238 if ( pipeline.isEmpty() && currentJob )
239 return currentJob->d_ptr->mWriteFinished;
240 if ( !pipeline.isEmpty() )
241 return pipeline.last()->d_ptr->mWriteFinished;
245 void SessionPrivate::doStartNext()
247 if ( !connected || (queue.isEmpty() && pipeline.isEmpty()) )
249 if ( canPipelineNext() ) {
251 pipeline.enqueue( nextJob );
257 if ( !pipeline.isEmpty() ) {
258 currentJob = pipeline.dequeue();
260 currentJob = queue.dequeue();
261 startJob( currentJob );
265 void SessionPrivate::startJob(
Job *job )
267 if ( protocolVersion < minimumProtocolVersion() ) {
269 job->setErrorText( i18n(
"Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion() ) );
272 job->d_ptr->startQueued();
281 void SessionPrivate::jobDone(KJob * job)
285 if ( job == currentJob ) {
286 if ( pipeline.isEmpty() ) {
290 currentJob = pipeline.dequeue();
295 queue.removeAll( static_cast<Akonadi::Job*>( job ) );
297 pipeline.removeAll( static_cast<Akonadi::Job*>( job ) );
301 void SessionPrivate::jobWriteFinished(
Akonadi::Job* job )
303 Q_ASSERT( (job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()) );
309 void SessionPrivate::jobDestroyed(QObject * job)
312 jobDone( static_cast<KJob*>( job ) );
318 QObject::connect( job, SIGNAL(result(KJob*)), mParent, SLOT(jobDone(KJob*)) );
320 QObject::connect( job, SIGNAL(destroyed(QObject*)), mParent, SLOT(jobDestroyed(QObject*)) );
332 socket->write( data );
334 kWarning() <<
"Trying to write while session is disconnected!" << kBacktrace();
347 foreach (
Job *job, queue )
348 job->d_ptr->updateItemRevision( itemId, oldRevision, newRevision );
354 SessionPrivate::SessionPrivate(
Session *parent )
355 : mParent( parent ), socket( 0 ), protocolVersion( 0 ), currentJob( 0 ), parser( 0 )
359 void SessionPrivate::init(
const QByteArray &
id )
362 parser =
new ImapParser();
364 if ( !
id.isEmpty() ) {
367 sessionId = QCoreApplication::instance()->applicationName().toUtf8()
368 +
'-' + QByteArray::number( qrand() );
388 socket->disconnect( mParent );
392 QMetaObject::invokeMethod( mParent,
"reconnect", Qt::QueuedConnection );
400 d->init( sessionId );
407 d->init( sessionId );
421 static QThreadStorage<Session*> instances;
425 Q_ASSERT_X( !sessionId.isEmpty(),
"SessionPrivate::createDefaultSession",
426 "You tried to create a default session with empty session id!" );
427 Q_ASSERT_X( !instances.hasLocalData(),
"SessionPrivate::createDefaultSession",
428 "You tried to create a default session twice!" );
430 instances.setLocalData(
new Session( sessionId ) );
435 instances.setLocalData( session );
440 if ( !instances.hasLocalData() )
441 instances.setLocalData(
new Session() );
442 return instances.localData();
447 foreach (
Job* job, d->queue )
448 job->kill( KJob::EmitResult );
450 foreach (
Job* job, d->pipeline ) {
451 job->d_ptr->mStarted =
false;
452 job->kill( KJob::EmitResult );
455 if ( d->currentJob ) {
456 d->currentJob->d_ptr->mStarted =
false;
457 d->currentJob->kill( KJob::EmitResult );
462 #include "moc_session.cpp"
The server protocol version is too old or too new.
void forceReconnect()
Disconnects a previously existing connection and tries to reconnect.
int nextTag()
Returns the next IMAP tag.
static ServerManager * self()
Returns the singleton instance of this class, for connecting to its signals.
static void createDefaultSession(const QByteArray &sessionId)
Creates a new default session for this thread with the given sessionId.
Base class for all actions in the Akonadi storage.
static void setDefaultSession(Session *session)
Sets the default session.
static Session * defaultSession()
Returns the default session for this thread.
~Session()
Destroys the session.
QByteArray sessionId() const
Returns the session identifier.
void clear()
Stops all jobs queued for execution.
static State state()
Returns the state of the server.
A communication session with the Akonadi storage.
Server is not running, could be no one started it yet or it failed to start.
static bool start()
Starts the server.
virtual void addJob(Job *job)
Associates the given Job object with this session.
void reconnected()
This signal is emitted whenever the session has been reconnected to the server (e.g.
virtual void reconnect()
Attemps to establish a connections to the Akonadi server.
void itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
Propagate item revision changes to following jobs.
Server is running and operational.
void writeData(const QByteArray &data)
Sends the given raw data.
static QString connectionFile()
Default location for akonadiconnectionrc.
State
Enum for the various states the server can be in.
Session(const QByteArray &sessionId=QByteArray(), QObject *parent=0)
Creates a new session.